Conversation
3a2bdaa to
b7360d0
Compare
a4a1b26 to
279404c
Compare
1e6083b to
da47d7b
Compare
decoNR
left a comment
There was a problem hiding this comment.
Just a clarification: is there a chance the preprocess queue could become flooded?
Great question. There is a chance actually, but I'm addressing to handle this in this PR. Basically the schedule fuzz will push messages there only if the queue is not flooded. |
This approach seems nice! Thanks. Maybe we should deploy them together. |
4ddbb9f to
b35cef0
Compare
b35cef0 to
a333de1
Compare
Signed-off-by: Javan Lacerda <javanlacerda@google.com> update tests Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix e2e tests Signed-off-by: Javan Lacerda <javanlacerda@google.com> lint Signed-off-by: Javan Lacerda <javanlacerda@google.com> prepare unscheduled_tasks Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix Signed-off-by: Javan Lacerda <javanlacerda@google.com> add testcase based tasks to prepare tasks Signed-off-by: Javan Lacerda <javanlacerda@google.com> check Utask Signed-off-by: Javan Lacerda <javanlacerda@google.com> move to not ack not scheduled tasks instead of pushing to pre Signed-off-by: Javan Lacerda <javanlacerda@google.com> fixes Signed-off-by: Javan Lacerda <javanlacerda@google.com>
a333de1 to
b90da76
Compare
| """Error thrown when the queue limit is reached.""" | ||
|
|
||
| def __init__(self, size): | ||
| super().__init__(f'Queue limit reached {size}.') |
There was a problem hiding this comment.
Nit: Maybe it would be nice to have the queue name here also (self, queue_name, size) -> f'Queue {queue_name} limit reached {size} '
|
|
||
| def get_utask_main_queue_size(): | ||
| """Returns the size of the utask main queue.""" | ||
| queue_name = UTASK_MAIN_QUEUE |
There was a problem hiding this comment.
Would be nice to have a method to get the queue_name with the optional base_os_version, to centralize this.
| now = time.time() | ||
| interval = monitoring_v3.TimeInterval( | ||
| end_time={'seconds': int(now)}, | ||
| start_time={'seconds': int(now - 5 * 60)}, |
There was a problem hiding this comment.
Maybe better to have a global var defining the interval _QUEUE_LIMIT_INTERVAL?
| for i, task in enumerate(remaining_tasks): | ||
| adapter_id = list(frequencies.keys())[i % len(frequencies)] | ||
| tasks_by_adapter[adapter_id].append(task) | ||
| if sum(frequencies.values()) >= 0.999: |
There was a problem hiding this comment.
Not sure I understand this logic, if the frequencies don't add up to 100%, we are simply not scheduling all tasks?
| self.execute_locally(task_argument, job_type, uworker_env) | ||
| return | ||
|
|
||
| utask_main_queue_size = tasks.get_utask_main_queue_size() |
There was a problem hiding this comment.
Not sure if this is in the correct place, shouldn't this be on utask main? Here the task would be doing preprocess.
There was a problem hiding this comment.
Correcting myself here, think I got it, the idea is to not even execute preprocess if the utask main queue is full, right?
ViniciustCosta
left a comment
There was a problem hiding this comment.
Lgtm! This seems a great improvement! Thanks
This PR enhances the reliability of remote task scheduling by introducing queue size limits and handling uncreated tasks via lease cancellation. It refactors RemoteTaskGate, GcpBatchService, and KubernetesService to return tasks that fail scheduling, ensuring they are redelivered by Pub/Sub.
If the task cannot be scheduled to the runtime, it returns to utask-main scheduler queue.
Also, the UTask execute function raises an exception if the utask-main schedule queue size is higher than a threshold. This exception makes the preprocess task to not be acked.
Finally, this follow up PR #5153 addresses the schedule-fuzz implementation to avoid scheduling more tasks if the preprocess queue size is also bigger then a threshold.
This entire logic described was tested in the dev environment. Follow the evidences:
Then the Kubernetes job limiter was reached, the utask-main queue started to grow due to the not acked tasks. It stopped close to 10k messages, because the UTask execute function only runs the preprocess if the utask-main queue is under 10k.

As the tworkers stopped run preprocess, the preprocess queue started to grow. Note that the preprocess queue size stick close to 10K as well. It happens because the schedule-fuzz only creates the tasks based on preprocess queue size, aiming to have 10k messages there. It is being addressed in this PR #5153.
