diff --git a/docs-next/content/docs/guides/core/execution-model.mdx b/docs-next/content/docs/guides/core/execution-model.mdx new file mode 100644 index 0000000..7ee5953 --- /dev/null +++ b/docs-next/content/docs/guides/core/execution-model.mdx @@ -0,0 +1,135 @@ +--- +title: Execution Models +description: "Thread pool, prefork, native async — pick the right one for your workload." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Choose how tasks execute: OS threads (default), child processes (prefork), or +native async. + +## Decision tree + +|CPU-bound| B[Prefork Pool] + A -->|I/O-bound sync| C[Thread Pool] + A -->|I/O-bound async| D[Native Async] + A -->|Mixed| B`} +/> + +## Comparison + +| Mode | Concurrency | GIL | Memory per worker | Startup cost | Best for | +|------|------------|-----|-------------------|--------------|----------| +| **Thread Pool** | `workers` OS threads | Shared | ~1 MB | None | I/O-bound sync tasks | +| **Prefork** | `workers` child processes | Independent | ~30 MB | One app import per child | CPU-bound tasks, mixed workloads | +| **Native Async** | `async_concurrency` coroutines | Shared (event loop) | Negligible per coroutine | None | I/O-bound async tasks | + +## Thread pool (default) + +The default. Runs sync task functions on Rust `std::thread` threads. Each +worker acquires the Python GIL only during task execution — the scheduler and +dispatch logic never touch it. + +```python +# Default — thread pool with auto-detected worker count +queue.run_worker() + +# Explicit worker count +queue.run_worker(workers=8) +``` + +```bash +taskito worker --app myapp:queue --workers 8 +``` + +Because threads share a single GIL, CPU-bound tasks block each other. For +Python code that spends most of its time in C extensions (numpy, pandas) that +release the GIL, threads still work well. + +## Prefork pool + +Spawns separate child processes. Each process has its own Python interpreter +and GIL, so CPU-bound tasks run in true parallel. + +```python +queue.run_worker(pool="prefork", app="myapp:queue") +``` + +```bash +taskito worker --app myapp:queue --pool prefork +``` + +The `app` parameter tells each child process where to import your `Queue` +instance. It must be a module-level name (`"module:attribute"` format) — +tasks defined inside functions or closures cannot be imported by child +processes. + +For more details, see the [Prefork Pool guide](/docs/guides/advanced-execution). + +## Native async + +`async def` task functions run on a dedicated Python event loop thread. No +`asyncio.run()` wrapping, no thread-per-task overhead. + +```python +@queue.task() +async def fetch_prices(symbol: str) -> dict: + async with httpx.AsyncClient() as client: + r = await client.get(f"https://api.example.com/prices/{symbol}") + return r.json() +``` + +Control how many coroutines run at once: + +```python +queue = Queue( + db_path="myapp.db", + async_concurrency=200, # default: 100 +) +``` + +For more details, see the [Native Async Tasks guide](/docs/guides/advanced-execution). + +## Mixing sync and async + +A single queue handles both sync and async tasks. No configuration needed — +the worker inspects each task at registration time and routes it to the +correct pool. + +```python +@queue.task() +def resize_image(path: str) -> str: + # Sync — runs on thread pool + ... + +@queue.task() +async def send_notification(user_id: str) -> None: + # Async — runs on event loop + ... +``` + +Both are enqueued, retried, rate-limited, and monitored identically. + +## workers vs async_concurrency + +These two parameters are independent: + +```python +queue = Queue( + workers=4, # OS threads (or child processes) for sync tasks + async_concurrency=200, # concurrent coroutines for async tasks +) +``` + +`workers=4` means 4 sync tasks can execute at the same time. +`async_concurrency=200` means 200 async tasks can be in-flight concurrently +on the event loop. A queue with both set runs up to `4 + 200` tasks +simultaneously. + + + For mostly-async workloads, keep `workers` small (2–4) and raise + `async_concurrency`. For mostly-sync I/O workloads, raise `workers`. For + CPU-bound workloads, switch to prefork. + diff --git a/docs-next/content/docs/guides/core/index.mdx b/docs-next/content/docs/guides/core/index.mdx index e939918..7c11488 100644 --- a/docs-next/content/docs/guides/core/index.mdx +++ b/docs-next/content/docs/guides/core/index.mdx @@ -1,10 +1,15 @@ --- title: Core -description: "Core taskito features: tasks, queues, results." +description: "The building blocks of every taskito application." --- -import { Callout } from 'fumadocs-ui/components/callout'; +The building blocks of every taskito application. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Guide | Description | +|---|---| +| [Tasks](/docs/guides/core/tasks) | Define tasks with `@queue.task()`, configure retries, timeouts, and options | +| [Workers](/docs/guides/core/workers) | Start workers, control concurrency, graceful shutdown | +| [Execution Models](/docs/guides/core/execution-model) | How tasks move from enqueue to completion | +| [Queues & Priority](/docs/guides/core/queues) | Named queues, priority levels, and routing | +| [Scheduling](/docs/guides/core/scheduling) | Periodic tasks with cron expressions | +| [Workflows](/docs/guides/core/workflows) | Chains, groups, and chords for multi-step pipelines | diff --git a/docs-next/content/docs/guides/core/meta.json b/docs-next/content/docs/guides/core/meta.json index eab7e55..56368fd 100644 --- a/docs-next/content/docs/guides/core/meta.json +++ b/docs-next/content/docs/guides/core/meta.json @@ -1,4 +1,12 @@ { "title": "Core", - "pages": ["index"] + "pages": [ + "index", + "tasks", + "queues", + "workers", + "execution-model", + "scheduling", + "workflows" + ] } diff --git a/docs-next/content/docs/guides/core/queues.mdx b/docs-next/content/docs/guides/core/queues.mdx new file mode 100644 index 0000000..d12a05e --- /dev/null +++ b/docs-next/content/docs/guides/core/queues.mdx @@ -0,0 +1,145 @@ +--- +title: Queues & Priority +description: "Named queues, integer priority, queue-level rate limits and concurrency caps." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +## Named queues + +Route tasks to different queues for isolation and dedicated processing: + +```python +@queue.task(queue="emails") +def send_email(to, subject, body): + ... + +@queue.task(queue="reports") +def generate_report(report_id): + ... + +@queue.task() # Goes to "default" queue +def process_data(data): + ... +``` + +### Worker queue subscription + +Workers can listen to specific queues: + +```bash +# Process only email tasks +taskito worker --app myapp:queue --queues emails + +# Process multiple queues +taskito worker --app myapp:queue --queues emails,reports + +# Process all registered queues (default) +taskito worker --app myapp:queue +``` + +Or programmatically: + +```python +queue.run_worker(queues=["emails", "reports"]) +``` + + + Separate I/O-bound tasks (API calls, emails) from CPU-bound tasks (data + processing, report generation) into different queues. Run them on different + worker processes for optimal resource usage. + + +## Priority + +Higher priority jobs are dequeued first within the same queue. Priority is an +integer — higher values mean more urgent. + +### Default priority + +Set at task registration: + +```python +@queue.task(priority=10) +def urgent_task(data): + ... + +@queue.task(priority=0) # Default +def normal_task(data): + ... +``` + +### Override at enqueue time + +```python +# This specific job is extra urgent +urgent_task.apply_async(args=(data,), priority=100) +``` + +### How it works + +Jobs are dequeued using a compound index: `(queue, status, priority DESC, scheduled_at ASC)`. This means: + +1. Higher priority jobs go first +2. Among equal priority, older jobs (earlier `scheduled_at`) go first +3. Each queue is processed independently + +```python +# These three jobs are in the same queue +low = task.apply_async(args=(1,), priority=1) +mid = task.apply_async(args=(2,), priority=5) +high = task.apply_async(args=(3,), priority=10) + +# Processing order: high (10), mid (5), low (1) +``` + +## Queue-level limits + +Apply a rate limit or concurrency cap to an entire queue, independently of +per-task settings. These limits are checked in the scheduler before any +per-task limits. + +### Rate limiting a queue + +```python +queue.set_queue_rate_limit("default", "100/m") # Max 100 jobs per minute +queue.set_queue_rate_limit("emails", "20/s") # Max 20 emails per second +``` + +The format is the same as `rate_limit` on `@queue.task()`: `"N/s"`, `"N/m"`, +or `"N/h"`. + +### Capping concurrency per queue + +```python +queue.set_queue_concurrency("default", 10) # Max 10 jobs running at once +queue.set_queue_concurrency("reports", 2) # Heavy tasks: max 2 at a time +``` + +`set_queue_concurrency` limits how many jobs from that queue run +simultaneously across all workers. + + + Queue-level limits apply to all tasks in the queue regardless of their + individual settings. Per-task `rate_limit` and `max_concurrent` are checked + afterwards and may impose stricter caps. Set queue limits to protect shared + downstream resources (APIs, databases) and per-task limits to manage + individual task capacity. + + +Both methods can be called at any point before or after `run_worker()` starts. + +## Default queue settings + +Configure defaults at the Queue level: + +```python +queue = Queue( + db_path="myapp.db", + default_priority=0, # Default priority for all tasks + default_retry=3, # Default max retries + default_timeout=300, # Default timeout in seconds +) +``` + +Individual `@queue.task()` decorators override these defaults. diff --git a/docs-next/content/docs/guides/core/scheduling.mdx b/docs-next/content/docs/guides/core/scheduling.mdx new file mode 100644 index 0000000..05f1365 --- /dev/null +++ b/docs-next/content/docs/guides/core/scheduling.mdx @@ -0,0 +1,156 @@ +--- +title: Scheduling +description: "Delayed tasks via apply_async(delay=) and periodic tasks with 6-field cron expressions." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito supports both **delayed tasks** (run once in the future) and +**periodic tasks** (run on a cron schedule). + +## Delayed tasks + +Schedule a task to run after a delay: + +```python +# Run 1 hour from now +send_email.apply_async( + args=("user@example.com", "Reminder", "Don't forget!"), + delay=3600, # seconds +) + +# Run 30 seconds from now +cleanup.apply_async(args=(), delay=30) +``` + +The job is created immediately with `status=pending` but won't be picked up by +a worker until the `scheduled_at` timestamp is reached. + +## Periodic tasks + +Register recurring tasks with cron expressions: + +```python +@queue.periodic(cron="0 */5 * * * *") +def health_check(): + """Run every 5 minutes.""" + ping_services() + +@queue.periodic(cron="0 0 0 * * *") +def daily_cleanup(): + """Run at midnight every day.""" + queue.purge_completed(older_than=86400) + +@queue.periodic(cron="0 0 9 * * 1", args=("weekly",)) +def weekly_report(report_type): + """Run every Monday at 9:00 AM.""" + generate_report(report_type) +``` + +### Cron expression format + +taskito uses **6-field cron expressions** (with seconds): + +``` +┌─────────── second (0-59) +│ ┌───────── minute (0-59) +│ │ ┌─────── hour (0-23) +│ │ │ ┌───── day of month (1-31) +│ │ │ │ ┌─── month (1-12) +│ │ │ │ │ ┌─ day of week (0-6, Sun=0) +│ │ │ │ │ │ +* * * * * * +``` + +| Expression | Schedule | +|---|---| +| `*/30 * * * * *` | Every 30 seconds | +| `0 */5 * * * *` | Every 5 minutes | +| `0 0 * * * *` | Every hour | +| `0 30 * * * *` | Every hour at :30 | +| `0 0 */2 * * *` | Every 2 hours | +| `0 0 0 * * *` | Every day at midnight | +| `0 0 9 * * *` | Every day at 9:00 AM | +| `0 0 9 * * 1-5` | Weekdays at 9:00 AM | +| `0 30 9 * * 1-5` | Weekdays at 9:30 AM | +| `0 0 0 1 * *` | First day of every month at midnight | +| `0 0 0 * * 0` | Every Sunday at midnight | +| `0 0 0 1 1 *` | January 1st at midnight (yearly) | + +### Decorator options + +```python +@queue.periodic( + cron="0 0 * * * *", # Required: cron expression + name="hourly-cleanup", # Optional: explicit name + args=(3600,), # Optional: positional args + kwargs={"force": True}, # Optional: keyword args + queue="maintenance", # Optional: target queue + timezone="America/New_York", # Optional: IANA timezone (default: UTC) +) +def cleanup(older_than, force=False): + ... +``` + +### Timezone support + +By default, cron expressions are evaluated in UTC. Pass any IANA timezone name +to schedule in a specific timezone: + +```python +@queue.periodic(cron="0 0 9 * * 1-5", timezone="America/New_York") +def morning_report(): + """Run weekdays at 9:00 AM Eastern.""" + generate_report() + +@queue.periodic(cron="0 0 18 * * *", timezone="Europe/London") +def end_of_day_summary(): + """Run at 6:00 PM London time.""" + send_summary() +``` + +Timezone handling uses `chrono-tz` under the hood. Daylight saving time +transitions are handled automatically. The `timezone` parameter defaults to +UTC when omitted. + +### How periodic tasks work + +1. Periodic tasks are registered with the Rust scheduler when the worker starts +2. The scheduler checks for due tasks every ~3 seconds +3. When a task is due, a new job is enqueued automatically +4. The task's `next_run` is computed using the cron expression +5. Periodic task state is persisted in the `periodic_tasks` SQLite table + + + Periodic tasks are only active while a worker is running. If no worker is + running, tasks accumulate and the **next due** job is enqueued when a worker + starts. + + +## Edge cases + +### Task takes longer than the interval + +If a periodic task's execution time exceeds its cron interval, the next run +is **skipped**, not stacked. Periodic tasks use `unique_key` deduplication +internally — if the previous run is still pending or running, the new +enqueue is silently dropped. + +### Multiple workers running periodic tasks + +Safe by design. Each worker's scheduler checks for due periodic tasks +independently, but they all use the same `unique_key` for deduplication. Only +one instance of each periodic task runs at a time, regardless of how many +workers are active. + +### Timezone handling + +```python +@queue.periodic(cron="0 9 * * *", timezone="America/New_York") +def morning_report(): + ... +``` + +Without `timezone`, cron expressions are evaluated in **UTC**. Specify a +timezone string (any valid IANA timezone) to schedule in local time. Daylight +saving transitions are handled automatically via `chrono-tz`. diff --git a/docs-next/content/docs/guides/core/tasks.mdx b/docs-next/content/docs/guides/core/tasks.mdx new file mode 100644 index 0000000..692eac9 --- /dev/null +++ b/docs-next/content/docs/guides/core/tasks.mdx @@ -0,0 +1,241 @@ +--- +title: Tasks +description: "Define tasks with @queue.task() — decorator options, naming, enqueue, batching." +--- + +Tasks are Python functions registered with a queue via the `@queue.task()` decorator. + +## Defining a task + +```python +from taskito import Queue + +queue = Queue(db_path="myapp.db") + +@queue.task() +def process_data(data: dict) -> str: + # Your logic here + return "done" +``` + +## Decorator options + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str \| None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | +| `max_retries` | `int` | `3` | Max retry attempts before moving to DLQ. | +| `retry_backoff` | `float` | `1.0` | Base delay in seconds for exponential backoff. | +| `retry_delays` | `list[float] \| None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | +| `max_retry_delay` | `int \| None` | `None` | Cap on backoff delay in seconds (default 300 s). | +| `timeout` | `int` | `300` | Max execution time in seconds (hard timeout). | +| `soft_timeout` | `float \| None` | `None` | Cooperative time limit; checked via `current_job.check_timeout()`. | +| `priority` | `int` | `0` | Default priority (higher = more urgent). | +| `rate_limit` | `str \| None` | `None` | Rate limit string, e.g. `"100/m"`. | +| `queue` | `str` | `"default"` | Named queue to submit to. | +| `circuit_breaker` | `dict \| None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | +| `middleware` | `list[TaskMiddleware] \| None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | +| `expires` | `float \| None` | `None` | Seconds until the job expires if not started. | +| `inject` | `list[str] \| None` | `None` | Worker resource names to inject as keyword arguments. See [Resource System](/docs/guides/resources). | +| `serializer` | `Serializer \| None` | `None` | Per-task serializer override. Falls back to the queue-level serializer. | +| `max_concurrent` | `int \| None` | `None` | Max concurrent running instances of this task. `None` means no limit. | + +```python +@queue.task( + name="emails.send", + max_retries=5, + retry_backoff=2.0, + max_retry_delay=60, # cap backoff at 60 s + timeout=60, + priority=10, + rate_limit="100/m", + queue="emails", + max_concurrent=10, +) +def send_email(to: str, subject: str, body: str): + ... +``` + +### Custom retry delays + +Use `retry_delays` to specify exact wait times between each retry attempt instead of exponential backoff: + +```python +@queue.task(retry_delays=[1, 5, 30]) # 1s after 1st fail, 5s after 2nd, 30s after 3rd +def flaky_api_call(): + ... +``` + +### Soft timeouts + +A soft timeout raises `SoftTimeoutError` only when the task cooperatively checks: + +```python +from taskito import current_job + +@queue.task(timeout=300, soft_timeout=60) +def long_running(items): + for item in items: + current_job.check_timeout() # raises SoftTimeoutError if soft_timeout exceeded + process(item) +``` + +### Circuit breakers + +Automatically open a circuit after repeated failures and refuse new executions during the cooldown period: + +```python +@queue.task(circuit_breaker={"threshold": 5, "window": 60, "cooldown": 120}) +def call_external_api(): + ... +``` + +- `threshold`: number of failures to trip the breaker +- `window`: rolling time window in seconds +- `cooldown`: seconds the breaker stays open before allowing a retry + +### Per-task middleware + +Apply middleware to a specific task only: + +```python +from taskito.contrib.sentry import SentryMiddleware + +@queue.task(middleware=[SentryMiddleware()]) +def important_task(): + ... +``` + +### Job expiration + +Skip jobs that weren't started within the deadline: + +```python +@queue.task(expires=300) # skip if not started within 5 minutes +def time_sensitive(): + ... +``` + +### Max retry delay + +Cap the exponential backoff so waits don't grow unbounded: + +```python +@queue.task(retry_backoff=2.0, max_retries=10, max_retry_delay=120) +def flaky_service(): + ... +# Delays: 2, 4, 8, 16, 32, 64, 120, 120, 120 s (capped at 2 min) +``` + +### Per-task concurrency limit + +Prevent a single task type from consuming all workers: + +```python +@queue.task(max_concurrent=3) +def expensive_render(): + ... +# At most 3 instances of expensive_render run simultaneously across all workers. +``` + +### Per-task serializer + +Override the queue-level serializer for a specific task: + +```python +from taskito.serializers import JSONSerializer + +@queue.task(serializer=JSONSerializer()) +def api_event(payload: dict) -> dict: + ... +``` + +The per-task serializer is used for the full round-trip: arguments are serialized with it at enqueue time and deserialized with it on the worker before the task function is called. Both the sync worker and the native async worker honour the per-task serializer, falling back to the queue-level serializer for tasks that have none registered. + +Useful when a task needs a different format (e.g., human-readable JSON for audit tasks) or when the payload is not picklable. + +## Task naming + +By default, tasks are named using `module.qualname`: + +```python +# In myapp/tasks.py +@queue.task() +def process(): # Named: myapp.tasks.process + ... +``` + +You can override with an explicit name: + +```python +@queue.task(name="my-custom-name") +def process(): # Named: my-custom-name + ... +``` + +## Enqueuing jobs + +### `.delay()` — quick submit + +Submit with default options: + +```python +job = send_email.delay("user@example.com", "Hello", "World") +``` + +### `.apply_async()` — full control + +Override any option at enqueue time: + +```python +job = send_email.apply_async( + args=("user@example.com", "Hello", "World"), + priority=100, # Override priority + delay=3600, # Run 1 hour from now + queue="urgent-emails", # Override queue + max_retries=10, # Override retries + timeout=120, # Override timeout + unique_key="welcome-user@example.com", # Deduplicate + metadata='{"source": "signup"}', # Attach JSON metadata +) +``` + +### Direct call + +Calling a task directly runs it synchronously, bypassing the queue: + +```python +result = send_email("user@example.com", "Hello", "World") # Runs immediately +``` + +## Batch enqueue + +Enqueue many jobs in a single SQLite transaction: + +```python +# Via task.map() +jobs = send_email.map([ + ("alice@example.com", "Hi", "Body"), + ("bob@example.com", "Hi", "Body"), + ("carol@example.com", "Hi", "Body"), +]) + +# Via queue.enqueue_many() +jobs = queue.enqueue_many( + task_name=send_email.name, + args_list=[("alice@example.com",), ("bob@example.com",)], + kwargs_list=[{"subject": "Hi", "body": "Body"}] * 2, +) +``` + +## Metadata + +Attach arbitrary JSON metadata to jobs: + +```python +job = process.apply_async( + args=(data,), + metadata='{"user_id": 42, "source": "api"}', +) +``` + +Metadata is stored with the job and visible in dead letter queue entries. diff --git a/docs-next/content/docs/guides/core/workers.mdx b/docs-next/content/docs/guides/core/workers.mdx new file mode 100644 index 0000000..7f38bc6 --- /dev/null +++ b/docs-next/content/docs/guides/core/workers.mdx @@ -0,0 +1,305 @@ +--- +title: Workers +description: "OS-thread, prefork, and async worker pools — startup, count, specialization, shutdown." +--- + +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +Workers process queued jobs. taskito runs workers as OS threads within a +single process, managed by a Rust scheduler. + +## Starting a worker + + + + +```bash +taskito worker --app myapp.tasks:queue +``` + +| Flag | Description | +|---|---| +| `--app` | Python path to your Queue instance (`module:attribute`) | +| `--queues` | Comma-separated queue names (default: all registered) | + + + + +```python +# Blocks the current thread +queue.run_worker() + +# With specific queues +queue.run_worker(queues=["emails", "reports"]) +``` + + + + +```python +import threading + +t = threading.Thread(target=queue.run_worker, daemon=True) +t.start() + +# Your application continues... +``` + + + + +```python +import asyncio + +async def main(): + # Runs worker in a thread pool, non-blocking + await queue.arun_worker() + +asyncio.run(main()) +``` + + + + +## Worker count + +By default, taskito auto-detects the number of CPU cores: + +```python +queue = Queue(db_path="myapp.db", workers=0) # Auto-detect (default) +queue = Queue(db_path="myapp.db", workers=8) # Explicit count +``` + +## Prefork pool + +The default worker pool uses OS threads, which share a single Python GIL. For +CPU-bound tasks, use the prefork pool — it spawns separate child processes, +each with its own GIL: + +```python +queue.run_worker(pool="prefork", app="myapp:queue", workers=4) +``` + +```bash +taskito worker --app myapp:queue --pool prefork +``` + +Each child is a full Python interpreter that imports your app, builds the task +registry, and executes tasks independently. + +### When to use prefork + +| Workload | Pool | Why | +|----------|------|-----| +| I/O-bound (HTTP, DB) | `thread` (default) | Threads release the GIL during I/O | +| CPU-bound (data processing) | `prefork` | Each process has its own GIL | +| Mixed | `prefork` | CPU tasks benefit; I/O tasks work fine too | + +### How it works + +|"Job JSON"| P["PreforkPool"] + P -->|stdin| C1["Child 1
(own GIL)"] + P -->|stdin| C2["Child 2
(own GIL)"] + P -->|stdin| CN["Child N
(own GIL)"] + + C1 -->|stdout| R1["Reader 1"] + C2 -->|stdout| R2["Reader 2"] + CN -->|stdout| RN["Reader N"] + + R1 -->|JobResult| RCH["Result Channel"] + R2 -->|JobResult| RCH + RN -->|JobResult| RCH + + RCH --> ML["Result Handler"]`} +/> + +Jobs are serialized as JSON Lines over stdin pipes. Each child reads a job, +executes the task wrapper (with middleware, resources, proxies), and writes +the result as JSON to stdout. The parent's reader threads parse results and +feed them to the scheduler. + +### Configuration + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `pool` | `str` | `"thread"` | Worker pool type: `"thread"` or `"prefork"` | +| `app` | `str` | — | Import path to Queue (required for prefork) | +| `workers` | `int` | CPU count | Number of child processes | + + + The `app` parameter must be an importable path like `"myapp.tasks:queue"`. + Each child process imports this path to build its task registry. Tasks + defined inside functions or closures cannot be imported by children. + + +## Worker specialization + +Tag workers to route jobs to specific machines or capabilities: + +```python +# Start a worker that only processes jobs tagged for GPU or heavy workloads +queue.run_worker(tags=["gpu", "heavy"]) +``` + +Jobs submitted to a queue with `tags` are only picked up by workers that have +all the required tags. Workers without tags process untagged jobs. + +```bash +# CLI equivalent +taskito worker --app myapp:queue --tags gpu,heavy +``` + + + Workers are **OS threads**, not processes. Each worker acquires the Python + GIL only during task execution, so the scheduler and dispatch logic run + without GIL contention. + + +## Graceful shutdown + +taskito supports graceful shutdown via `Ctrl+C`: + +1. **First `Ctrl+C`**: Stops accepting new jobs, waits for in-flight tasks to + complete (up to `drain_timeout` seconds) +2. **Second `Ctrl+C`**: Force-kills immediately + +Configure the drain timeout when constructing the queue: + +```python +queue = Queue(db_path="myapp.db", drain_timeout=60) # wait up to 60 seconds +``` + +The default `drain_timeout` is 30 seconds. + +``` +$ taskito worker --app myapp:queue +[taskito] Starting worker... +[taskito] Registered tasks: 3 +[taskito] Queues: default, emails +^C +[taskito] Shutting down gracefully (waiting for in-flight jobs)... +[taskito] Worker stopped. +``` + +### Programmatic shutdown + +```python +# From another thread or signal handler +queue._inner.request_shutdown() +``` + +## Worker discovery + +Inspect live workers across all machines: + +```python +for w in queue.workers(): + print(f"{w['worker_id']} on {w['hostname']} (pid {w['pid']}, {w['status']})") +``` + +Each worker entry includes: + +| Field | Type | Description | +|-------|------|-------------| +| `worker_id` | `str` | Unique ID (UUIDv7) | +| `hostname` | `str` | OS hostname | +| `pid` | `int` | Process ID | +| `status` | `str` | `"active"`, `"draining"`, or deleted on exit | +| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | +| `started_at` | `int` | Registration timestamp (ms) | +| `queues` | `str` | Comma-separated queue names | +| `threads` | `int` | Worker thread/process count | +| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | + +### Status lifecycle + + active: register + active --> draining: shutdown signal + draining --> [*]: clean exit + active --> [*]: crash (reaped after 30s)`} +/> + +### Lifecycle events + +Subscribe to worker lifecycle changes: + +```python +from taskito import EventType + +@queue.on_event(EventType.WORKER_ONLINE) +def on_online(event_type, payload): + print(f"Worker {payload['worker_id']} joined") + +@queue.on_event(EventType.WORKER_OFFLINE) +def on_offline(event_type, payload): + print(f"Worker {payload['worker_id']} went away") + +@queue.on_event(EventType.WORKER_UNHEALTHY) +def on_unhealthy(event_type, payload): + print(f"Worker {payload['worker_id']} unhealthy: {payload['resources']}") +``` + +| Event | Fires when | Payload | +|-------|-----------|---------| +| `WORKER_ONLINE` | Worker registered in storage | `worker_id`, `queues`, `pool` | +| `WORKER_OFFLINE` | Dead worker reaped (no heartbeat for 30s) | `worker_id` | +| `WORKER_UNHEALTHY` | Resource health transitions to unhealthy | `worker_id`, `resources` | + +## Async tasks + +`async def` task functions are dispatched natively — they run on a dedicated +event loop thread, not wrapped in `asyncio.run()` on a worker thread. + +```python +@queue.task() +async def fetch_data(url: str) -> dict: + import httpx + async with httpx.AsyncClient() as client: + r = await client.get(url) + return r.json() +``` + +Control the max number of async tasks running concurrently: + +```python +queue = Queue( + db_path="myapp.db", + workers=4, # OS threads for sync tasks + async_concurrency=200, # concurrent async tasks (default: 100) +) +``` + +See [Native Async Tasks](/docs/guides/advanced-execution) for the full guide. + +## How workers work + +(Tokio async)"] -->|"sync job"| CH["Bounded Channel"] + S -->|"async job"| AP["Native Async Pool"] + + CH --> W1["Worker 1"] + CH --> W2["Worker 2"] + CH --> WN["Worker N"] + + AP --> EL["Event Loop Thread"] + + W1 -->|Result| RCH["Result Channel"] + W2 -->|Result| RCH + WN -->|Result| RCH + EL -->|Result| RCH + + RCH --> ML["Result Handler"] + ML -->|"complete / retry / DLQ"| DB[("SQLite")]`} +/> + +1. The **scheduler** runs in a dedicated Tokio async thread, polling SQLite for ready jobs every 50ms +2. Sync jobs are sent to the **worker thread pool** via a bounded channel; each worker acquires the GIL and runs the Python function +3. Async jobs are dispatched to the **native async pool** and scheduled on a dedicated Python event loop +4. Results from both pools flow back through a **result channel** to the main loop +5. The main loop updates job status in SQLite (complete, retry, or DLQ) diff --git a/docs-next/content/docs/guides/core/workflows.mdx b/docs-next/content/docs/guides/core/workflows.mdx new file mode 100644 index 0000000..107ed47 --- /dev/null +++ b/docs-next/content/docs/guides/core/workflows.mdx @@ -0,0 +1,13 @@ +--- +title: Workflows +description: "DAG workflows for multi-step pipelines and Canvas primitives for simple chains." +--- + +taskito provides two workflow models. See the dedicated +[Workflows section](/docs/guides/workflows) for full documentation. + +- **[DAG Workflows](/docs/guides/workflows)** — Multi-step pipelines as + directed acyclic graphs with fan-out, conditions, gates, sub-workflows, + incremental caching, and visualization. +- **[Canvas Primitives](/docs/api-reference/canvas)** — Lightweight chain, + group, and chord composition for simple pipelines.