Skip to content

Implement Kubernetes job limiter#5150

Open
javanlacerda wants to merge 1 commit intomasterfrom
javan.k8s-service-job-limiter
Open

Implement Kubernetes job limiter#5150
javanlacerda wants to merge 1 commit intomasterfrom
javan.k8s-service-job-limiter

Conversation

@javanlacerda
Copy link
Collaborator

@javanlacerda javanlacerda commented Jan 29, 2026

This PR enhances the reliability of remote task scheduling by introducing queue size limits and handling uncreated tasks via lease cancellation. It refactors RemoteTaskGate, GcpBatchService, and KubernetesService to return tasks that fail scheduling, ensuring they are redelivered by Pub/Sub.

If the task cannot be scheduled to the runtime, it returns to utask-main scheduler queue.

Also, the UTask execute function raises an exception if the utask-main schedule queue size is higher than a threshold. This exception makes the preprocess task to not be acked.

Finally, this follow up PR #5153 addresses the schedule-fuzz implementation to avoid scheduling more tasks if the preprocess queue size is also bigger then a threshold.

This entire logic described was tested in the dev environment. Follow the evidences:

Then the Kubernetes job limiter was reached, the utask-main queue started to grow due to the not acked tasks. It stopped close to 10k messages, because the UTask execute function only runs the preprocess if the utask-main queue is under 10k.
image

As the tworkers stopped run preprocess, the preprocess queue started to grow. Note that the preprocess queue size stick close to 10K as well. It happens because the schedule-fuzz only creates the tasks based on preprocess queue size, aiming to have 10k messages there. It is being addressed in this PR #5153.
image

@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch from 3a2bdaa to b7360d0 Compare January 29, 2026 23:02
@javanlacerda javanlacerda marked this pull request as ready for review January 30, 2026 18:02
@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch from a4a1b26 to 279404c Compare January 30, 2026 18:26
@javanlacerda javanlacerda changed the title Javan.k8s service job limiter Implement Kubernetes job limiter Jan 30, 2026
@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch from 1e6083b to da47d7b Compare January 31, 2026 23:47
Copy link
Contributor

@decoNR decoNR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a clarification: is there a chance the preprocess queue could become flooded?

@javanlacerda
Copy link
Collaborator Author

Just a clarification: is there a chance the preprocess queue could become flooded?

Great question. There is a chance actually, but I'm addressing to handle this in this PR. Basically the schedule fuzz will push messages there only if the queue is not flooded.

@decoNR
Copy link
Contributor

decoNR commented Feb 2, 2026

Great question. There is a chance actually, but I'm addressing to handle this in #5153. Basically the schedule fuzz will push messages there only if the queue is not flooded.

This approach seems nice! Thanks. Maybe we should deploy them together.

@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch 7 times, most recently from 4ddbb9f to b35cef0 Compare February 4, 2026 16:52
@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch from b35cef0 to a333de1 Compare February 4, 2026 20:37
Signed-off-by: Javan Lacerda <javanlacerda@google.com>

update tests

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

fix e2e tests

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

lint

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

prepare unscheduled_tasks

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

fix

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

fix

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

add testcase based tasks to prepare tasks

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

check Utask

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

move to not ack not scheduled tasks instead of pushing to pre

Signed-off-by: Javan Lacerda <javanlacerda@google.com>

fixes

Signed-off-by: Javan Lacerda <javanlacerda@google.com>
@javanlacerda javanlacerda force-pushed the javan.k8s-service-job-limiter branch from a333de1 to b90da76 Compare February 4, 2026 20:44
"""Error thrown when the queue limit is reached."""

def __init__(self, size):
super().__init__(f'Queue limit reached {size}.')
Copy link
Collaborator

@ViniciustCosta ViniciustCosta Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe it would be nice to have the queue name here also (self, queue_name, size) -> f'Queue {queue_name} limit reached {size} '


def get_utask_main_queue_size():
"""Returns the size of the utask main queue."""
queue_name = UTASK_MAIN_QUEUE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a method to get the queue_name with the optional base_os_version, to centralize this.

now = time.time()
interval = monitoring_v3.TimeInterval(
end_time={'seconds': int(now)},
start_time={'seconds': int(now - 5 * 60)},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have a global var defining the interval _QUEUE_LIMIT_INTERVAL?

for i, task in enumerate(remaining_tasks):
adapter_id = list(frequencies.keys())[i % len(frequencies)]
tasks_by_adapter[adapter_id].append(task)
if sum(frequencies.values()) >= 0.999:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this logic, if the frequencies don't add up to 100%, we are simply not scheduling all tasks?

self.execute_locally(task_argument, job_type, uworker_env)
return

utask_main_queue_size = tasks.get_utask_main_queue_size()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is in the correct place, shouldn't this be on utask main? Here the task would be doing preprocess.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correcting myself here, think I got it, the idea is to not even execute preprocess if the utask main queue is full, right?

Copy link
Collaborator

@ViniciustCosta ViniciustCosta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm! This seems a great improvement! Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants