diff --git a/pyproject.toml b/pyproject.toml index cf63a07..55e3ca8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,19 +1,19 @@ [project] name = "shepherd" -version = "0.8.6" +version = "0.8.7" description = "A shared platform for Translator ARAs" authors = [{ name = "Max Wang", email = "max@covar.com" }] readme = "README.md" -requires-python = ">=3.10" +requires-python = ">=3.12" dependencies = [ "fastapi~=0.110", "httpx~=0.28", - "opentelemetry-api", - "opentelemetry-sdk==1.16.0", - "opentelemetry-exporter-otlp", - "opentelemetry-instrumentation-fastapi==0.37b0", - "opentelemetry-instrumentation-httpx==0.37b0", - "opentelemetry-instrumentation-redis", + "opentelemetry-api==1.42.1", + "opentelemetry-sdk==1.42.1", + "opentelemetry-exporter-otlp==1.42.1", + "opentelemetry-instrumentation-fastapi==0.63b1", + "opentelemetry-instrumentation-httpx==0.63b1", + "opentelemetry-instrumentation-redis==0.63b1", "orjson~=3.10", "psycopg[binary]~=3.1", "psycopg_pool~=3.2", diff --git a/shepherd_server/Dockerfile b/shepherd_server/Dockerfile index 5991791..5f82721 100644 --- a/shepherd_server/Dockerfile +++ b/shepherd_server/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/shepherd_server/base_routes.py b/shepherd_server/base_routes.py index 610778e..242887a 100644 --- a/shepherd_server/base_routes.py +++ b/shepherd_server/base_routes.py @@ -270,7 +270,8 @@ async def callback( logger.debug(f"Saved callback {callback_id} to redis") # adds otel trace to carrier for next worker parent_ctx = extract(json.loads(original_query[1])) - with tracer.start_as_current_span(f"callback.{callback_id}", context=parent_ctx): + with tracer.start_as_current_span("callback", context=parent_ctx) as span: + span.set_attribute("callback_id", callback_id) span_carrier = {} inject(span_carrier) # add new task to merge callback response into original message diff --git a/shepherd_utils/otel.py b/shepherd_utils/otel.py index 978e1c3..474a090 100644 --- a/shepherd_utils/otel.py +++ b/shepherd_utils/otel.py @@ -3,20 +3,35 @@ from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.trace.export import BatchSpanProcessor from .config import settings +# enforce only one tracer provider with this instance +_TRACER_PROVIDER: TracerProvider | None = None + def setup_tracer(service_name: str): - provider = TracerProvider(resource=Resource.create({SERVICE_NAME: service_name})) - trace.set_tracer_provider(provider) - span_processor = BatchSpanProcessor(ConsoleSpanExporter()) - span_processor = BatchSpanProcessor( - OTLPSpanExporter( - endpoint=f"{settings.jaeger_host}:{settings.jaeger_port}", + """Configure the global tracer provider once and return a tracer. + + When ``otel_enabled`` is false, the provider is left uninitialized: the + default proxy provider yields non-recording spans, so instrumented code + runs unchanged with effectively no-op spans and nothing is exported. + """ + global _TRACER_PROVIDER + if not settings.otel_enabled: + return trace.get_tracer(__name__) + if _TRACER_PROVIDER is None: + _TRACER_PROVIDER = TracerProvider( + resource=Resource.create({SERVICE_NAME: service_name}) + ) + _TRACER_PROVIDER.add_span_processor( + BatchSpanProcessor( + OTLPSpanExporter( + endpoint=f"{settings.jaeger_host}:{settings.jaeger_port}", + ) + ) ) - ) - provider.add_span_processor(span_processor) - HTTPXClientInstrumentor().instrument() + trace.set_tracer_provider(_TRACER_PROVIDER) + HTTPXClientInstrumentor().instrument() return trace.get_tracer(__name__) diff --git a/shepherd_utils/shared.py b/shepherd_utils/shared.py index 55738d0..0fd3a6f 100644 --- a/shepherd_utils/shared.py +++ b/shepherd_utils/shared.py @@ -6,8 +6,10 @@ import time from typing import AsyncGenerator, Dict, List, Tuple +from opentelemetry import trace from opentelemetry.context.context import Context from opentelemetry.propagate import extract +from opentelemetry.trace import Status, StatusCode from .broker import add_task, broker_client, get_task, mark_task_as_complete from .config import settings @@ -229,6 +231,55 @@ async def handle_task_failure( ) +# Proxy tracer: resolves to whatever provider the worker process set up via +# setup_tracer(STREAM) at the time a span is created, so the outer task span +# inherits the worker's service.name without shared.py needing to know it. +_tracer = trace.get_tracer(__name__) + + +async def run_task_lifecycle( + stream: str, + group: str, + task: Tuple[str, dict], + parent_ctx: Context, + logger: logging.Logger, + limiter: asyncio.Semaphore, + worker_fn, +) -> None: + """Span-wrapped task lifecycle shared by the standard workers. + + Activates the per-task span as current so auto-instrumented (httpx) and + manual child spans nest under it, records exceptions + ERROR status on the + span, runs ``worker_fn(task, logger)`` then ``wrap_up_task`` on success or + ``handle_task_failure`` on an unhandled error, and always releases the + limiter. + + ``worker_fn`` is an async callable ``(task, logger) -> None`` holding the + per-worker logic (or a closure for workers that dispatch to a process pool). + """ + start = time.time() + with _tracer.start_as_current_span(stream, context=parent_ctx) as span: + try: + await worker_fn(task, logger) + # Always wrap up the task to ACK it in the broker + try: + await wrap_up_task(stream, group, task, logger) + except Exception as e: + logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") + except asyncio.CancelledError: + logger.warning(f"Task {task[0]} was cancelled") + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + logger.error( + f"Task {task[0]} failed with unhandled error: {e}", exc_info=True + ) + await handle_task_failure(stream, group, task, logger) + finally: + limiter.release() + logger.info(f"Finished task {task[0]} in {time.time() - start}") + + def recursive_get_edge_support_graphs( edge: str, edges: set, diff --git a/tests/unit/test_bte_lookup_branches.py b/tests/unit/test_bte_lookup_branches.py index 332c6a2..5233eca 100644 --- a/tests/unit/test_bte_lookup_branches.py +++ b/tests/unit/test_bte_lookup_branches.py @@ -19,6 +19,7 @@ import httpx import pytest +from shepherd_utils import shared from workers.bte_lookup import worker as btel from workers.bte_lookup.worker import ( AsyncResponse, @@ -486,7 +487,9 @@ def release(self): @pytest.mark.asyncio async def test_bte_lookup_process_task_happy_path(redis_mock, mocker): mocker.patch.object(btel, "bte_lookup", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(btel, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await process_task(_make_task(), None, logger, limiter) assert mock_wrap.called @@ -504,7 +507,7 @@ async def test_bte_lookup_process_task_routes_failure_to_handle_task_failure( side_effect=RuntimeError("kaboom"), ) mock_failure = mocker.patch.object( - btel, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() await process_task(_make_task(), None, logger, limiter) @@ -521,7 +524,7 @@ async def test_bte_lookup_process_task_swallows_cancellation(redis_mock, mocker) side_effect=asyncio.CancelledError, ) mock_failure = mocker.patch.object( - btel, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() await process_task(_make_task(), None, logger, limiter) @@ -534,7 +537,7 @@ async def test_bte_lookup_process_task_swallows_wrap_up_failure(redis_mock, mock """A wrap_up_task failure should be logged but not escape.""" mocker.patch.object(btel, "bte_lookup", new_callable=mocker.AsyncMock) mocker.patch.object( - btel, + shared, "wrap_up_task", new_callable=mocker.AsyncMock, side_effect=RuntimeError("redis dropped"), diff --git a/tests/unit/test_process_task_wrappers.py b/tests/unit/test_process_task_wrappers.py index db36e61..5b34939 100644 --- a/tests/unit/test_process_task_wrappers.py +++ b/tests/unit/test_process_task_wrappers.py @@ -25,6 +25,8 @@ import pytest +from shepherd_utils import shared + logger = logging.getLogger(__name__) @@ -62,7 +64,9 @@ async def test_filter_kgraph_orphans_process_task_happy_path(redis_mock, mocker) from workers.filter_kgraph_orphans import worker as fko mocker.patch.object(fko, "do_filter_kgraph_orphans", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(fko, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await fko.process_task(_make_task("filter_kgraph_orphans"), None, logger, limiter) @@ -83,7 +87,7 @@ async def test_filter_kgraph_orphans_process_task_failure_routes_to_failure_hand side_effect=RuntimeError("kaboom"), ) mock_failure = mocker.patch.object( - fko, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -106,7 +110,7 @@ async def test_filter_kgraph_orphans_process_task_cancellation_does_not_route_fa side_effect=asyncio.CancelledError, ) mock_failure = mocker.patch.object( - fko, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -123,7 +127,9 @@ async def test_filter_results_top_n_process_task_happy_path(redis_mock, mocker): from workers.filter_results_top_n import worker as frt mocker.patch.object(frt, "filter_results_top_n", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(frt, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await frt.process_task(_make_task("filter_results_top_n"), None, logger, limiter) @@ -141,7 +147,7 @@ async def test_filter_results_top_n_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("oops"), ) mock_failure = mocker.patch.object( - frt, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -157,7 +163,9 @@ async def test_filter_analyses_top_n_process_task_happy_path(redis_mock, mocker) from workers.filter_analyses_top_n import worker as fan mocker.patch.object(fan, "filter_analyses_top_n", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(fan, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await fan.process_task(_make_task("filter_analyses_top_n"), None, logger, limiter) @@ -175,7 +183,7 @@ async def test_filter_analyses_top_n_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - fan, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -191,7 +199,9 @@ async def test_sort_results_score_process_task_happy_path(redis_mock, mocker): from workers.sort_results_score import worker as srs mocker.patch.object(srs, "sort_results_score", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(srs, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await srs.process_task(_make_task("sort_results_score"), None, logger, limiter) @@ -209,7 +219,7 @@ async def test_sort_results_score_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - srs, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -225,7 +235,9 @@ async def test_example_ara_process_task_happy_path(redis_mock, mocker): from workers.example_ara import worker as eara mocker.patch.object(eara, "example_ara", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(eara, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await eara.process_task(_make_task("example"), None, logger, limiter) @@ -243,7 +255,7 @@ async def test_example_ara_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - eara, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -260,7 +272,7 @@ async def test_example_score_process_task_happy_path(redis_mock, mocker): mocker.patch.object(escore, "example_score", new_callable=mocker.AsyncMock) mock_wrap = mocker.patch.object( - escore, "wrap_up_task", new_callable=mocker.AsyncMock + shared, "wrap_up_task", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -279,7 +291,7 @@ async def test_example_score_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - escore, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -296,7 +308,7 @@ async def test_example_lookup_process_task_happy_path(redis_mock, mocker): mocker.patch.object(elookup, "example_lookup", new_callable=mocker.AsyncMock) mock_wrap = mocker.patch.object( - elookup, "wrap_up_task", new_callable=mocker.AsyncMock + shared, "wrap_up_task", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -315,7 +327,7 @@ async def test_example_lookup_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - elookup, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -331,7 +343,9 @@ async def test_aragorn_process_task_happy_path(redis_mock, mocker): from workers.aragorn import worker as ar mocker.patch.object(ar, "aragorn", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(ar, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await ar.process_task(_make_task("aragorn"), None, logger, limiter) @@ -349,7 +363,7 @@ async def test_aragorn_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - ar, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -365,7 +379,9 @@ async def test_aragorn_pathfinder_process_task_happy_path(redis_mock, mocker): from workers.aragorn_pathfinder import worker as apf mocker.patch.object(apf, "shadowfax", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(apf, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await apf.process_task(_make_task("aragorn.pathfinder"), None, logger, limiter) @@ -383,7 +399,7 @@ async def test_aragorn_pathfinder_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - apf, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -399,7 +415,9 @@ async def test_bte_process_task_happy_path(redis_mock, mocker): from workers.bte import worker as bte mocker.patch.object(bte, "bte", new_callable=mocker.AsyncMock) - mock_wrap = mocker.patch.object(bte, "wrap_up_task", new_callable=mocker.AsyncMock) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) limiter = _Limiter() await bte.process_task(_make_task("bte"), None, logger, limiter) @@ -417,7 +435,7 @@ async def test_bte_process_task_failure(redis_mock, mocker): side_effect=RuntimeError("nope"), ) mock_failure = mocker.patch.object( - bte, "handle_task_failure", new_callable=mocker.AsyncMock + shared, "handle_task_failure", new_callable=mocker.AsyncMock ) limiter = _Limiter() @@ -436,7 +454,7 @@ async def test_process_task_swallows_wrap_up_failures(redis_mock, mocker): mocker.patch.object(fko, "do_filter_kgraph_orphans", new_callable=mocker.AsyncMock) mocker.patch.object( - fko, + shared, "wrap_up_task", new_callable=mocker.AsyncMock, side_effect=RuntimeError("redis dropped"), diff --git a/tests/unit/test_shared_utils.py b/tests/unit/test_shared_utils.py index f29c21f..9f8766d 100644 --- a/tests/unit/test_shared_utils.py +++ b/tests/unit/test_shared_utils.py @@ -11,6 +11,7 @@ import pytest +from shepherd_utils import shared from shepherd_utils.broker import get_task from shepherd_utils.shared import ( combine_unique_dicts, @@ -21,6 +22,7 @@ merge_kgraph, recursive_get_auxgraph_edges, recursive_get_edge_support_graphs, + run_task_lifecycle, validate_message, wrap_up_task, ) @@ -560,3 +562,96 @@ async def test_handle_task_failure_routes_to_finish_query_with_error_status(redi next_task = await get_task("finish_query", "consumer", "test", logger) assert next_task is not None assert next_task[1]["status"] == "ERROR" + + +# --- run_task_lifecycle --------------------------------------------------- + + +class _Limiter: + """Records whether ``release()`` was called.""" + + def __init__(self): + self.released = False + + def release(self): + self.released = True + + +def _lifecycle_task(): + return [ + "msg-id", + { + "query_id": "qid", + "response_id": "rid", + "workflow": json.dumps([{"id": "some_stream"}]), + "log_level": "20", + "otel": "{}", + "metadata": "{}", + }, + ] + + +@pytest.mark.asyncio +async def test_run_task_lifecycle_success_wraps_up_and_releases(mocker): + worker_fn = mocker.AsyncMock() + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) + mock_fail = mocker.patch.object( + shared, "handle_task_failure", new_callable=mocker.AsyncMock + ) + limiter = _Limiter() + + await run_task_lifecycle( + "some_stream", "consumer", _lifecycle_task(), None, logger, limiter, worker_fn + ) + + worker_fn.assert_awaited_once() + assert mock_wrap.called + assert not mock_fail.called + assert limiter.released + + +@pytest.mark.asyncio +async def test_run_task_lifecycle_failure_records_error_and_routes_to_handler(mocker): + worker_fn = mocker.AsyncMock(side_effect=RuntimeError("kaboom")) + mock_wrap = mocker.patch.object( + shared, "wrap_up_task", new_callable=mocker.AsyncMock + ) + mock_fail = mocker.patch.object( + shared, "handle_task_failure", new_callable=mocker.AsyncMock + ) + # Capture the span so we can assert the error status was recorded (#5). + span = mocker.MagicMock() + cm = mocker.MagicMock() + cm.__enter__.return_value = span + mocker.patch.object(shared._tracer, "start_as_current_span", return_value=cm) + limiter = _Limiter() + + await run_task_lifecycle( + "some_stream", "consumer", _lifecycle_task(), None, logger, limiter, worker_fn + ) + + assert not mock_wrap.called + assert mock_fail.called + span.record_exception.assert_called_once() + span.set_status.assert_called_once() + assert limiter.released + + +@pytest.mark.asyncio +async def test_run_task_lifecycle_cancellation_does_not_route_failure(mocker): + import asyncio + + worker_fn = mocker.AsyncMock(side_effect=asyncio.CancelledError) + mock_fail = mocker.patch.object( + shared, "handle_task_failure", new_callable=mocker.AsyncMock + ) + limiter = _Limiter() + + await run_task_lifecycle( + "some_stream", "consumer", _lifecycle_task(), None, logger, limiter, worker_fn + ) + + assert not mock_fail.called + assert limiter.released diff --git a/tox.ini b/tox.ini index c413ce8..de3225d 100644 --- a/tox.ini +++ b/tox.ini @@ -4,8 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py310,coverage -skip_missing_interpreters = true +envlist = py312,coverage skipsdist = true @@ -21,7 +20,7 @@ commands = python -m pytest -v --junit-xml=junit-report.xml [testenv:coverage] -basepython=python3.10 +basepython=python3.12 deps = {[testenv]deps} pytest-cov diff --git a/workers/aragorn/Dockerfile b/workers/aragorn/Dockerfile index c5b0512..6eb2747 100644 --- a/workers/aragorn/Dockerfile +++ b/workers/aragorn/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/aragorn/worker.py b/workers/aragorn/worker.py index 8f98cce..4bdde0d 100644 --- a/workers/aragorn/worker.py +++ b/workers/aragorn/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message @@ -11,8 +10,7 @@ from shepherd_utils.shared import ( examine_query, get_tasks, - handle_task_failure, - wrap_up_task, + run_task_lifecycle, ) # Queue name @@ -66,23 +64,7 @@ async def aragorn(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await aragorn(task, logger) - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled.") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Task took {time.time() - start}") + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, aragorn) async def poll_for_tasks(): diff --git a/workers/aragorn_lookup/Dockerfile b/workers/aragorn_lookup/Dockerfile index 0d68e33..673b110 100644 --- a/workers/aragorn_lookup/Dockerfile +++ b/workers/aragorn_lookup/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/aragorn_lookup/worker.py b/workers/aragorn_lookup/worker.py index ed233ba..e529717 100644 --- a/workers/aragorn_lookup/worker.py +++ b/workers/aragorn_lookup/worker.py @@ -24,7 +24,7 @@ save_message, ) from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "aragorn.lookup" @@ -92,7 +92,8 @@ async def run_async_lookup( ) -> AsyncResponse: """Return an async lookup response with callback id.""" callback_id = str(uuid.uuid4())[:8] - with tracer.start_as_current_span(f"aragorn.lookup.{callback_id}") as span: + with tracer.start_as_current_span("aragorn.lookup") as span: + span.set_attribute("callback_id", callback_id) lookup_carrier = {} inject(lookup_carrier) # Put callback UID and query ID in postgres @@ -158,7 +159,8 @@ async def aragorn_lookup(task, logger: logging.Logger): # json.dump(message, f, indent=2) logger.debug(f"""Sending lookup query to {settings.kg_retrieval_url}.""") - with tracer.start_as_current_span(f"aragorn.lookup.{callback_id}"): + with tracer.start_as_current_span("aragorn.lookup") as span: + span.set_attribute("callback_id", callback_id) async with httpx.AsyncClient(timeout=100) as client: await client.post( settings.kg_retrieval_url, @@ -351,25 +353,9 @@ def expand_aragorn_query(input_message, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - with tracer.start_as_current_span(STREAM, context=parent_ctx): - try: - await aragorn_lookup(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error( - f"Task {task[0]} failed with unhandled error: {e}", exc_info=True - ) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, aragorn_lookup + ) async def poll_for_tasks(): diff --git a/workers/aragorn_omnicorp/Dockerfile b/workers/aragorn_omnicorp/Dockerfile index d190e06..1a18cc1 100644 --- a/workers/aragorn_omnicorp/Dockerfile +++ b/workers/aragorn_omnicorp/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/aragorn_omnicorp/worker.py b/workers/aragorn_omnicorp/worker.py index 49532ac..a6df055 100644 --- a/workers/aragorn_omnicorp/worker.py +++ b/workers/aragorn_omnicorp/worker.py @@ -16,7 +16,6 @@ import json import logging import os -import time import uuid from collections import defaultdict from concurrent.futures import ProcessPoolExecutor @@ -30,7 +29,7 @@ from shepherd_utils.config import settings from shepherd_utils.db import get_message, save_message from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "aragorn.omnicorp" @@ -468,9 +467,8 @@ async def process_task( crunching -- previously the overlay ran inline on the loop and blocked everything else until it finished. """ - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: + + async def _run(task, logger): # given a task, get the message from the db (async I/O on the loop) response_id = task[1]["response_id"] message = await get_message(response_id, logger) @@ -487,20 +485,8 @@ async def process_task( await save_message(response_id, response, logger) else: logger.error(f"Failed to get {response_id} for omnicorp overlay.") - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, _run) async def poll_for_tasks(): diff --git a/workers/aragorn_pathfinder/Dockerfile b/workers/aragorn_pathfinder/Dockerfile index 0e05e40..1349e9c 100644 --- a/workers/aragorn_pathfinder/Dockerfile +++ b/workers/aragorn_pathfinder/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index d91d9c1..362054a 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -19,8 +19,7 @@ from shepherd_utils.otel import setup_tracer from shepherd_utils.shared import ( get_tasks, - handle_task_failure, - wrap_up_task, + run_task_lifecycle, ) # Queue name @@ -54,7 +53,8 @@ async def shadowfax(task, logger: logging.Logger) -> str: logger.debug( f"""Sending pathfinder rehydration to {settings.kg_rehydrate_url}.""" ) - with tracer.start_as_current_span(f"aragorn.pathfinder.{query_id}"): + with tracer.start_as_current_span("aragorn.pathfinder.rehydrate") as span: + span.set_attribute("query_id", query_id) async with httpx.AsyncClient(timeout=210) as client: # send a sync rehydrate query that "should" be very quick rehydrated_response = await client.post( @@ -214,7 +214,8 @@ async def shadowfax(task, logger: logging.Logger) -> str: f"{settings.callback_host}/aragorn/callback/{callback_id}" ) logger.debug(f"""Sending pathfinder query to {settings.kg_retrieval_url}.""") - with tracer.start_as_current_span(f"aragorn.pathfinder.{callback_id}"): + with tracer.start_as_current_span("aragorn.pathfinder") as span: + span.set_attribute("callback_id", callback_id) async with httpx.AsyncClient(timeout=100) as client: retriever_async_response = await client.post( settings.kg_retrieval_url, @@ -260,25 +261,12 @@ async def shadowfax(task, logger: logging.Logger) -> str: async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: + + async def _run(task, logger): metadata = await shadowfax(task, logger) task[1]["metadata"] = metadata - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, _run) async def poll_for_tasks(): diff --git a/workers/aragorn_score/Dockerfile b/workers/aragorn_score/Dockerfile index 06f9a9b..8fcc677 100644 --- a/workers/aragorn_score/Dockerfile +++ b/workers/aragorn_score/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/aragorn_score/worker.py b/workers/aragorn_score/worker.py index db28de0..8f87277 100644 --- a/workers/aragorn_score/worker.py +++ b/workers/aragorn_score/worker.py @@ -6,7 +6,6 @@ import logging import math import os -import time import uuid from collections import defaultdict from concurrent.futures import ProcessPoolExecutor @@ -16,7 +15,7 @@ from shepherd_utils.db import get_message, save_message from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "aragorn.score" @@ -1217,6 +1216,34 @@ def aragorn_score(in_message, logger: logging.Logger): return in_message +async def process_task(task, parent_ctx, logger, limiter, loop, executor): + """Process a given task and ACK in redis. + + Scoring is CPU-bound, so it is dispatched to a process pool while the + span, wrap-up, and error handling are shared with every worker. + """ + + async def _run(task, logger): + # given a task, get the message from the db + response_id = task[1]["response_id"] + message = await get_message(response_id, logger) + if message is not None: + scored_message = await loop.run_in_executor( + executor, + aragorn_score, + message, + logger, + ) + if scored_message is None: + logger.error("Failed to score message. Returning unscored.") + scored_message = message + await save_message(response_id, scored_message, logger) + else: + logger.error(f"Failed to get {response_id} for scoring.") + + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, _run) + + async def poll_for_tasks(): """On initialization, poll indefinitely for available tasks.""" loop = asyncio.get_running_loop() @@ -1229,42 +1256,7 @@ async def poll_for_tasks(): async for task, parent_ctx, logger, limiter in get_tasks( STREAM, GROUP, CONSUMER, TASK_LIMIT ): - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - # given a task, get the message from the db - response_id = task[1]["response_id"] - message = await get_message(response_id, logger) - if message is not None: - scored_message = await loop.run_in_executor( - executor, - aragorn_score, - message, - logger, - ) - if scored_message is None: - logger.error("Failed to score message. Returning unscored.") - scored_message = message - await save_message(response_id, scored_message, logger) - else: - logger.error(f"Failed to get {response_id} for scoring.") - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error( - f"Task {task[0]} failed with unhandled error: {e}", - exc_info=True, - ) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - logger.info(f"Finished task {task[0]} in {time.time() - start}") - span.end() - limiter.release() + await process_task(task, parent_ctx, logger, limiter, loop, executor) except asyncio.CancelledError: logging.info("Poll loop cancelled, shutting down.") except Exception as e: diff --git a/workers/arax/Dockerfile b/workers/arax/Dockerfile index bfd5a6d..8a62743 100644 --- a/workers/arax/Dockerfile +++ b/workers/arax/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/arax/worker.py b/workers/arax/worker.py index 39d110c..3b30a01 100644 --- a/workers/arax/worker.py +++ b/workers/arax/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import time import uuid import httpx @@ -12,7 +11,7 @@ from shepherd_utils.config import settings from shepherd_utils.db import get_message, save_message from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "arax" @@ -53,24 +52,7 @@ async def arax(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await arax(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, arax) async def poll_for_tasks(): diff --git a/workers/arax_rank/Dockerfile b/workers/arax_rank/Dockerfile index 80ee6f2..56ef15e 100644 --- a/workers/arax_rank/Dockerfile +++ b/workers/arax_rank/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/arax_rank/worker.py b/workers/arax_rank/worker.py index 783e7a4..e3a1edb 100644 --- a/workers/arax_rank/worker.py +++ b/workers/arax_rank/worker.py @@ -14,13 +14,12 @@ import json import logging import os -import time import uuid from concurrent.futures import ProcessPoolExecutor from shepherd_utils.db import get_message, save_message from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from ranker import arax_rank @@ -79,6 +78,34 @@ def rank_message(in_message: dict, logger: logging.Logger) -> dict: return in_message +async def process_task(task, parent_ctx, logger, limiter, loop, executor): + """Process a given task and ACK in redis. + + Ranking is CPU-bound, so it is dispatched to a process pool while the + span, wrap-up, and error handling are shared with every worker. + """ + + async def _run(task, logger): + response_id = task[1]["response_id"] + message = await get_message(response_id, logger) + if message is not None: + # Run ranking in process pool for CPU-intensive operations + ranked_message = await loop.run_in_executor( + executor, + rank_message, + message, + logger, + ) + if ranked_message is None: + logger.error("Ranking returned None. Returning original message.") + ranked_message = message + await save_message(response_id, ranked_message, logger) + else: + logger.error(f"Failed to get {response_id} for ranking.") + + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, _run) + + async def poll_for_tasks() -> None: """ Main loop to poll for and process ranking tasks. @@ -97,50 +124,7 @@ async def poll_for_tasks() -> None: async for task, parent_ctx, logger, limiter in get_tasks( STREAM, GROUP, CONSUMER, TASK_LIMIT ): - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - - try: - # Get task details - response_id = task[1]["response_id"] - - # Get message from Redis - message = await get_message(response_id, logger) - - if message is not None: - # Run ranking in process pool for CPU-intensive operations - ranked_message = await loop.run_in_executor( - executor, - rank_message, - message, - logger, - ) - if ranked_message is None: - logger.error( - "Ranking returned None. Returning original message." - ) - ranked_message = message - await save_message(response_id, ranked_message, logger) - else: - logger.error(f"Failed to get {response_id} for ranking.") - - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error( - f"Task {task[0]} failed with unhandled error: {e}", - exc_info=True, - ) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - logger.info(f"Finished task {task[0]} in {time.time() - start}") - span.end() - limiter.release() + await process_task(task, parent_ctx, logger, limiter, loop, executor) except asyncio.CancelledError: logging.info("Poll loop cancelled, shutting down.") except Exception as e: diff --git a/workers/bte/Dockerfile b/workers/bte/Dockerfile index 70e29f9..90fbd99 100644 --- a/workers/bte/Dockerfile +++ b/workers/bte/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/bte/worker.py b/workers/bte/worker.py index 51afb1c..c882263 100644 --- a/workers/bte/worker.py +++ b/workers/bte/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message @@ -11,8 +10,7 @@ from shepherd_utils.shared import ( examine_query, get_tasks, - handle_task_failure, - wrap_up_task, + run_task_lifecycle, ) # Queue name @@ -59,24 +57,7 @@ async def bte(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await bte(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, bte) async def poll_for_tasks(): diff --git a/workers/bte_lookup/Dockerfile b/workers/bte_lookup/Dockerfile index f5f1a30..9827939 100644 --- a/workers/bte_lookup/Dockerfile +++ b/workers/bte_lookup/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/bte_lookup/worker.py b/workers/bte_lookup/worker.py index f18e870..21f7391 100644 --- a/workers/bte_lookup/worker.py +++ b/workers/bte_lookup/worker.py @@ -24,7 +24,7 @@ save_message, ) from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "bte.lookup" @@ -92,7 +92,8 @@ async def run_async_lookup( ) -> AsyncResponse: """Return an async lookup response with callback id.""" callback_id = str(uuid.uuid4())[:8] - with tracer.start_as_current_span(f"bte.lookup.{callback_id}") as span: + with tracer.start_as_current_span("bte.lookup") as span: + span.set_attribute("callback_id", callback_id) lookup_carrier = {} inject(lookup_carrier) # Put callback UID and query ID in postgres @@ -150,7 +151,8 @@ async def bte_lookup(task, logger: logging.Logger): await add_callback_id(query_id, callback_id, otel, logger) message["callback"] = f"{settings.callback_host}/bte/callback/{callback_id}" - with tracer.start_as_current_span(f"bte.lookup.{callback_id}"): + with tracer.start_as_current_span("bte.lookup") as span: + span.set_attribute("callback_id", callback_id) async with httpx.AsyncClient(timeout=100) as client: await client.post( settings.kg_retrieval_url, @@ -427,25 +429,9 @@ def expand_bte_query(query_dict: dict[str, Any], logger: logging.Logger) -> list async def process_task(task, parent_ctx, logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - with tracer.start_as_current_span(STREAM, context=parent_ctx): - try: - await bte_lookup(task, logger) - try: - # Always wrap up the task to ACK it in the broker - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled.") - except Exception as e: - logger.error( - f"Task {task[0]} failed with unhandled error: {e}", exc_info=True - ) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - limiter.release() - logger.info(f"Task took {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, bte_lookup + ) async def poll_for_tasks(): diff --git a/workers/example_ara/Dockerfile b/workers/example_ara/Dockerfile index 3c6e201..33e9d6c 100644 --- a/workers/example_ara/Dockerfile +++ b/workers/example_ara/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/example_ara/worker.py b/workers/example_ara/worker.py index 5c6fd83..12f0449 100644 --- a/workers/example_ara/worker.py +++ b/workers/example_ara/worker.py @@ -4,10 +4,9 @@ import json import logging -import time import uuid from shepherd_utils.db import get_message -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from shepherd_utils.otel import setup_tracer # Queue name @@ -37,24 +36,9 @@ async def example_ara(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await example_ara(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, example_ara + ) async def poll_for_tasks(): diff --git a/workers/example_lookup/Dockerfile b/workers/example_lookup/Dockerfile index 532c1b9..d1c4191 100644 --- a/workers/example_lookup/Dockerfile +++ b/workers/example_lookup/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/example_lookup/worker.py b/workers/example_lookup/worker.py index 15d3dd0..d95b458 100644 --- a/workers/example_lookup/worker.py +++ b/workers/example_lookup/worker.py @@ -18,7 +18,7 @@ save_message, ) from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "example.lookup" @@ -121,24 +121,9 @@ async def example_lookup(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await example_lookup(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, example_lookup + ) async def poll_for_tasks(): diff --git a/workers/example_score/Dockerfile b/workers/example_score/Dockerfile index 821a047..31ccffb 100644 --- a/workers/example_score/Dockerfile +++ b/workers/example_score/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/example_score/worker.py b/workers/example_score/worker.py index 08ba9fb..e589b0a 100644 --- a/workers/example_score/worker.py +++ b/workers/example_score/worker.py @@ -4,10 +4,9 @@ import json import logging import random -import time import uuid from shepherd_utils.db import get_message, save_message -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from shepherd_utils.otel import setup_tracer # Queue name @@ -34,24 +33,9 @@ async def example_score(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await example_score(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, example_score + ) async def poll_for_tasks(): diff --git a/workers/filter_analyses_top_n/Dockerfile b/workers/filter_analyses_top_n/Dockerfile index ccf3f20..ab7948e 100644 --- a/workers/filter_analyses_top_n/Dockerfile +++ b/workers/filter_analyses_top_n/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/filter_analyses_top_n/worker.py b/workers/filter_analyses_top_n/worker.py index 29b8b50..ad2e267 100644 --- a/workers/filter_analyses_top_n/worker.py +++ b/workers/filter_analyses_top_n/worker.py @@ -3,10 +3,9 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message, save_message, get_query_state -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from shepherd_utils.otel import setup_tracer # Queue name @@ -40,24 +39,9 @@ async def filter_analyses_top_n(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await filter_analyses_top_n(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, filter_analyses_top_n + ) async def poll_for_tasks(): diff --git a/workers/filter_kgraph_orphans/Dockerfile b/workers/filter_kgraph_orphans/Dockerfile index a28bd54..2a3ab05 100644 --- a/workers/filter_kgraph_orphans/Dockerfile +++ b/workers/filter_kgraph_orphans/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/filter_kgraph_orphans/worker.py b/workers/filter_kgraph_orphans/worker.py index ead4723..b44f629 100644 --- a/workers/filter_kgraph_orphans/worker.py +++ b/workers/filter_kgraph_orphans/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message, save_message @@ -11,8 +10,7 @@ from shepherd_utils.shared import ( filter_kgraph_orphans, get_tasks, - handle_task_failure, - wrap_up_task, + run_task_lifecycle, ) # Queue name @@ -39,24 +37,9 @@ async def do_filter_kgraph_orphans(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await do_filter_kgraph_orphans(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, do_filter_kgraph_orphans + ) async def poll_for_tasks(): diff --git a/workers/filter_results_top_n/Dockerfile b/workers/filter_results_top_n/Dockerfile index 15c931a..f81df9a 100644 --- a/workers/filter_results_top_n/Dockerfile +++ b/workers/filter_results_top_n/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/filter_results_top_n/worker.py b/workers/filter_results_top_n/worker.py index b37df9e..06dde62 100644 --- a/workers/filter_results_top_n/worker.py +++ b/workers/filter_results_top_n/worker.py @@ -3,10 +3,9 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message, save_message, get_query_state -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from shepherd_utils.otel import setup_tracer # Queue name @@ -38,24 +37,9 @@ async def filter_results_top_n(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await filter_results_top_n(task, logger) - # Always wrap up the task to ACK it in the broker - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, filter_results_top_n + ) async def poll_for_tasks(): diff --git a/workers/finish_query/Dockerfile b/workers/finish_query/Dockerfile index ccae01f..54c908c 100644 --- a/workers/finish_query/Dockerfile +++ b/workers/finish_query/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/finish_query/worker.py b/workers/finish_query/worker.py index 2f697dc..2e2599d 100644 --- a/workers/finish_query/worker.py +++ b/workers/finish_query/worker.py @@ -5,9 +5,11 @@ import logging import time import uuid - import orjson +from opentelemetry.propagate import inject +from opentelemetry.trace import Status, StatusCode + from shepherd_utils.broker import mark_task_as_complete from shepherd_utils.db import ( cleanup_callbacks, @@ -55,13 +57,19 @@ async def finish_query(task, logger: logging.Logger): message = orjson.loads(message_bytes) message["logs"] = logs payload = orjson.dumps(message) + headers = {"Content-Type": "application/json"} + # Propagate the otel trace context through the callback. + # Matches the inject() carrier pattern used by the + # lookup workers; the active span comes from process_task's + # start_as_current_span. + inject(headers) for attempt in range(CALLBACK_RETRIES): try: async with httpx.AsyncClient(timeout=120) as client: response = await client.post( callback_url, content=payload, - headers={"Content-Type": "application/json"}, + headers=headers, ) response.raise_for_status() logger.info(f"Sent response back to {callback_url}") @@ -85,22 +93,25 @@ async def finish_query(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger: logging.Logger, limiter): """Process a given task and ACK in redis.""" start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await finish_query(task, logger) - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - finally: - # Always wrap up the task to ACK it in the broker + with tracer.start_as_current_span(STREAM, context=parent_ctx) as span: try: - await mark_task_as_complete(STREAM, GROUP, task[0], logger) + await finish_query(task, logger) + except asyncio.CancelledError: + logger.warning(f"Task {task[0]} was cancelled") except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - span.end() - limiter.release() - logger.info(f"Finished task {task[0]} in {time.time() - start}") + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + logger.error( + f"Task {task[0]} failed with unhandled error: {e}", exc_info=True + ) + finally: + # Always wrap up the task to ACK it in the broker + try: + await mark_task_as_complete(STREAM, GROUP, task[0], logger) + except Exception as e: + logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") + limiter.release() + logger.info(f"Finished task {task[0]} in {time.time() - start}") async def poll_for_tasks(): diff --git a/workers/merge_message/Dockerfile b/workers/merge_message/Dockerfile index b7a356c..d5a1461 100644 --- a/workers/merge_message/Dockerfile +++ b/workers/merge_message/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/merge_message/worker.py b/workers/merge_message/worker.py index 1c51d7f..af75230 100644 --- a/workers/merge_message/worker.py +++ b/workers/merge_message/worker.py @@ -710,8 +710,9 @@ async def poll_for_tasks(): callback_id = task[1]["callback_id"] target = task[1]["target"] with tracer.start_as_current_span( - f"{STREAM}.{callback_id}", context=parent_ctx - ): + STREAM, context=parent_ctx + ) as span: + span.set_attribute("callback_id", callback_id) got_lock = await acquire_lock(response_id, CONSUMER, logger) if got_lock: logger.info(f"[{callback_id}] Obtained lock.") diff --git a/workers/monitor/Dockerfile b/workers/monitor/Dockerfile index 24087fa..1cde633 100644 --- a/workers/monitor/Dockerfile +++ b/workers/monitor/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/score_paths/Dockerfile b/workers/score_paths/Dockerfile index 5c2dbe7..ffa657d 100644 --- a/workers/score_paths/Dockerfile +++ b/workers/score_paths/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/score_paths/worker.py b/workers/score_paths/worker.py index 61974f8..aa841d6 100644 --- a/workers/score_paths/worker.py +++ b/workers/score_paths/worker.py @@ -17,7 +17,7 @@ from shepherd_utils.config import settings from shepherd_utils.db import get_message_sync, save_message_sync from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle STREAM = "score_paths" GROUP = "consumer" @@ -261,24 +261,11 @@ def score_paths(task, logger): async def process_task(task, parent_ctx, logger, limiter): - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: + async def _run(task, logger): loop = asyncio.get_event_loop() await loop.run_in_executor(executor, partial(score_paths, task, logger)) - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task cancelled: {task[0]}") - except Exception as e: - logger.error(f"Task {task[0]} failed: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Task took {time.time() - start} seconds") + + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, _run) async def poll_for_tasks(): diff --git a/workers/sipr/Dockerfile b/workers/sipr/Dockerfile index 6887c45..a6c1e65 100644 --- a/workers/sipr/Dockerfile +++ b/workers/sipr/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/sipr/worker.py b/workers/sipr/worker.py index afe8a61..50cf198 100644 --- a/workers/sipr/worker.py +++ b/workers/sipr/worker.py @@ -3,7 +3,6 @@ import asyncio import json import logging -import time import uuid from copy import deepcopy @@ -16,7 +15,7 @@ save_message, ) from shepherd_utils.otel import setup_tracer -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle # Queue name STREAM = "sipr" @@ -338,23 +337,7 @@ async def sipr(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await sipr(task, logger) - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled.") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Task took {time.time() - start}") + await run_task_lifecycle(STREAM, GROUP, task, parent_ctx, logger, limiter, sipr) async def poll_for_tasks(): diff --git a/workers/sort_results_score/Dockerfile b/workers/sort_results_score/Dockerfile index de521c6..0c2877f 100644 --- a/workers/sort_results_score/Dockerfile +++ b/workers/sort_results_score/Dockerfile @@ -1,5 +1,5 @@ # Use RENCI python base image -FROM ghcr.io/translatorsri/renci-python-image:3.11.5 +FROM ghcr.io/translatorsri/renci-python-image:3.12.13 # Add image info LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd diff --git a/workers/sort_results_score/worker.py b/workers/sort_results_score/worker.py index ee4ed50..8d72ffd 100644 --- a/workers/sort_results_score/worker.py +++ b/workers/sort_results_score/worker.py @@ -3,10 +3,9 @@ import asyncio import json import logging -import time import uuid from shepherd_utils.db import get_message, save_message, get_query_state -from shepherd_utils.shared import get_tasks, handle_task_failure, wrap_up_task +from shepherd_utils.shared import get_tasks, run_task_lifecycle from shepherd_utils.otel import setup_tracer # Queue name @@ -52,23 +51,9 @@ async def sort_results_score(task, logger: logging.Logger): async def process_task(task, parent_ctx, logger, limiter): """Process a given task and ACK in redis.""" - start = time.time() - span = tracer.start_span(STREAM, context=parent_ctx) - try: - await sort_results_score(task, logger) - try: - await wrap_up_task(STREAM, GROUP, task, logger) - except Exception as e: - logger.error(f"Task {task[0]}: Failed to wrap up task: {e}") - except asyncio.CancelledError: - logger.warning(f"Task {task[0]} was cancelled.") - except Exception as e: - logger.error(f"Task {task[0]} failed with unhandled error: {e}", exc_info=True) - await handle_task_failure(STREAM, GROUP, task, logger) - finally: - span.end() - limiter.release() - logger.info(f"Task took {time.time() - start}") + await run_task_lifecycle( + STREAM, GROUP, task, parent_ctx, logger, limiter, sort_results_score + ) async def poll_for_tasks():