From 31a604a37d383765c5a8c8f028d6128fd93f03ce Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 07:58:08 -0400 Subject: [PATCH 1/9] dont pass workflows to retriever --- workers/aragorn_pathfinder/worker.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 37fbd54..15d9b08 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -77,6 +77,10 @@ async def shadowfax(task, logger: logging.Logger) -> str: qgraph = message["message"]["query_graph"] pinned_node_keys = [] pinned_node_ids = [] + retriever_query = { + "message": message["message"], + "parameters": parameters + } for node_key, node in qgraph["nodes"].items(): pinned_node_keys.append(node_key) if node.get("ids", None) is not None: @@ -106,7 +110,7 @@ async def shadowfax(task, logger: logging.Logger) -> str: # Create 3-hop query - message["message"]["query_graph"] = { + retriever_query["message"]["query_graph"] = { "nodes": { pinned_node_keys[0]: {"ids": [pinned_node_ids[0]]}, "intermediate_0": { @@ -219,7 +223,7 @@ async def shadowfax(task, logger: logging.Logger) -> str: async with httpx.AsyncClient(timeout=100) as client: await client.post( settings.kg_retrieval_url, - json=message, + json=retriever_query, ) # this worker might have a timeout set for if the lookups don't finish within a certain From 0b5939e25f3a6f898eaff18a5a3315c6892f0789 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 07:58:23 -0400 Subject: [PATCH 2/9] add callback --- workers/aragorn_pathfinder/worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 15d9b08..3ffd188 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -216,8 +216,7 @@ async def shadowfax(task, logger: logging.Logger) -> str: # Put callback UID and query ID in postgres await add_callback_id(query_id, callback_id, otel, logger) - message["callback"] = f"{settings.callback_host}/aragorn/callback/{callback_id}" - + retriever_query["callback"] = f"{settings.callback_host}/aragorn/callback/{callback_id}" logger.debug(f"""Sending pathfinder query to {settings.kg_retrieval_url}.""") with tracer.start_as_current_span(f"aragorn.pathfinder.{callback_id}"): async with httpx.AsyncClient(timeout=100) as client: From c47149478e75de6eb13470400f172cbe8f41da1e Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 07:58:42 -0400 Subject: [PATCH 3/9] log errors from retriever --- workers/aragorn_pathfinder/worker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 3ffd188..312bd65 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -220,10 +220,16 @@ async def shadowfax(task, logger: logging.Logger) -> str: logger.debug(f"""Sending pathfinder query to {settings.kg_retrieval_url}.""") with tracer.start_as_current_span(f"aragorn.pathfinder.{callback_id}"): async with httpx.AsyncClient(timeout=100) as client: - await client.post( + retriever_async_response = await client.post( settings.kg_retrieval_url, json=retriever_query, ) + try: + retriever_async_response.raise_for_status() + except Exception as e: + logger.error(f"Error contact retriever: {e}") + logger.debug(f"Error details: {retriever_async_response.json()}") + # this worker might have a timeout set for if the lookups don't finish within a certain # amount of time From 614615b30897301fb0ee333fd9215131bf5bdcd6 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 07:59:16 -0400 Subject: [PATCH 4/9] remove constraints from query --- workers/aragorn_pathfinder/worker.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 312bd65..3e036e1 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -89,7 +89,6 @@ async def shadowfax(task, logger: logging.Logger) -> str: if len(set(pinned_node_ids)) != 2: raise Exception("Pathfinder queries require two pinned nodes.") - intermediate_categories = [] path_key = next(iter(qgraph["paths"].keys())) qpath = qgraph["paths"][path_key] if qpath.get("constraints", None) is not None: @@ -101,12 +100,10 @@ async def shadowfax(task, logger: logging.Logger) -> str: intermediate_categories = ( constraints[0].get("intermediate_categories", None) or [] ) - if len(intermediate_categories) > 1: - raise Exception( - "Pathfinder queries do not support multiple intermediate categories" - ) - else: - intermediate_categories = ["biolink:NamedThing"] + if len(intermediate_categories) > 1: + raise Exception( + "Pathfinder queries do not support multiple intermediate categories" + ) # Create 3-hop query @@ -114,10 +111,10 @@ async def shadowfax(task, logger: logging.Logger) -> str: "nodes": { pinned_node_keys[0]: {"ids": [pinned_node_ids[0]]}, "intermediate_0": { - "categories": intermediate_categories, + "categories": ["biolink:NamedThing"], }, "intermediate_1": { - "categories": intermediate_categories, + "categories": ["biolink:NamedThing"], }, pinned_node_keys[1]: {"ids": [pinned_node_ids[1]]}, }, From 4662157e8bd57adfb6578ece7767e81339d17103 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 08:02:27 -0400 Subject: [PATCH 5/9] handle intermedicate category constraints --- workers/merge_message/worker.py | 38 +++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/workers/merge_message/worker.py b/workers/merge_message/worker.py index a973840..62843ad 100644 --- a/workers/merge_message/worker.py +++ b/workers/merge_message/worker.py @@ -534,6 +534,17 @@ def merge_messages( object_node_id = og_path.get("object") if subject_node_id is None or object_node_id is None: raise KeyError("Missing either subject or object from path.") + + intermediate_category = None + constraints = og_path.get("constraints") or [] + if len(constraints) > 0: + intermediate_categories = constraints[0].get("intermediate_categories") or [] + if len(intermediate_categories) > 0: + intermediate_category = intermediate_categories[0] + + kg_nodes = result["message"]["knowledge_graph"].get("nodes", {}) + kg_edges = result["message"]["knowledge_graph"].get("edges", {}) + aux_counter = 0 score = 0 analyses = [] @@ -544,11 +555,34 @@ def merge_messages( for qg_edge_key, bindings in edge_bindings.items(): for binding in bindings: path_edge_ids.add(binding["id"]) - score = new_result.get("score") - + score = new_result.get("score") if not path_edge_ids: continue + if ( + intermediate_category is not None + and intermediate_category != "biolink:NamedThing" + ): + nb = new_result.get("node_bindings", {}) + pinned_ids = set() + for pinned in (subject_node_id, object_node_id): + for binding in nb.get(pinned, []) or []: + pinned_ids.add(binding["id"]) + intermediate_node_ids = set() + for edge_id in path_edge_ids: + edge = kg_edges.get(edge_id) + if edge is None: + continue + for node_id in (edge.get("subject"), edge.get("object")): + if node_id and node_id not in pinned_ids: + intermediate_node_ids.add(node_id) + if not any( + intermediate_category + in (kg_nodes.get(nid, {}).get("categories") or []) + for nid in intermediate_node_ids + ): + continue + aux_id = f"a_{aux_counter}" aux_counter += 1 From c7411f5590927cd0d30c0530432a56345f427775 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 08:41:02 -0400 Subject: [PATCH 6/9] black --- workers/aragorn_pathfinder/worker.py | 5 +++-- workers/merge_message/worker.py | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 3e036e1..edb3627 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -213,7 +213,9 @@ async def shadowfax(task, logger: logging.Logger) -> str: # Put callback UID and query ID in postgres await add_callback_id(query_id, callback_id, otel, logger) - retriever_query["callback"] = f"{settings.callback_host}/aragorn/callback/{callback_id}" + retriever_query["callback"] = ( + f"{settings.callback_host}/aragorn/callback/{callback_id}" + ) logger.debug(f"""Sending pathfinder query to {settings.kg_retrieval_url}.""") with tracer.start_as_current_span(f"aragorn.pathfinder.{callback_id}"): async with httpx.AsyncClient(timeout=100) as client: @@ -227,7 +229,6 @@ async def shadowfax(task, logger: logging.Logger) -> str: logger.error(f"Error contact retriever: {e}") logger.debug(f"Error details: {retriever_async_response.json()}") - # this worker might have a timeout set for if the lookups don't finish within a certain # amount of time MAX_QUERY_TIME = message["parameters"]["timeout"] diff --git a/workers/merge_message/worker.py b/workers/merge_message/worker.py index 62843ad..1c51d7f 100644 --- a/workers/merge_message/worker.py +++ b/workers/merge_message/worker.py @@ -538,7 +538,9 @@ def merge_messages( intermediate_category = None constraints = og_path.get("constraints") or [] if len(constraints) > 0: - intermediate_categories = constraints[0].get("intermediate_categories") or [] + intermediate_categories = ( + constraints[0].get("intermediate_categories") or [] + ) if len(intermediate_categories) > 0: intermediate_category = intermediate_categories[0] From 36da5c8a55fe2056cb86c5ea056d8b9a7281b512 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 08:45:06 -0400 Subject: [PATCH 7/9] black again --- workers/aragorn_pathfinder/worker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index edb3627..444e2a8 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -77,10 +77,7 @@ async def shadowfax(task, logger: logging.Logger) -> str: qgraph = message["message"]["query_graph"] pinned_node_keys = [] pinned_node_ids = [] - retriever_query = { - "message": message["message"], - "parameters": parameters - } + retriever_query = {"message": message["message"], "parameters": parameters} for node_key, node in qgraph["nodes"].items(): pinned_node_keys.append(node_key) if node.get("ids", None) is not None: From 898cdbe15af3dc5ae7d9ca3694c49f47496e878d Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 09:23:07 -0400 Subject: [PATCH 8/9] remove bad constraint test --- tests/unit/test_aragorn_pathfinder.py | 42 --------------------------- 1 file changed, 42 deletions(-) diff --git a/tests/unit/test_aragorn_pathfinder.py b/tests/unit/test_aragorn_pathfinder.py index a97747e..17e5382 100644 --- a/tests/unit/test_aragorn_pathfinder.py +++ b/tests/unit/test_aragorn_pathfinder.py @@ -128,48 +128,6 @@ async def test_shadowfax_rejects_multiple_intermediate_categories(redis_mock, mo await shadowfax(_make_task(), logger) -@pytest.mark.asyncio -async def test_shadowfax_uses_intermediate_category_from_constraint(redis_mock, mocker): - """When a constraint provides an intermediate category, the threehop's - intermediates carry that category instead of biolink:NamedThing.""" - msg = _pathfinder_message( - constraints=[{"intermediate_categories": ["biolink:Gene"]}] - ) - mocker.patch( - "workers.aragorn_pathfinder.worker.get_message", - new_callable=mocker.AsyncMock, - return_value=msg, - ) - mocker.patch( - "workers.aragorn_pathfinder.worker.add_callback_id", - new_callable=mocker.AsyncMock, - ) - mocker.patch( - "workers.aragorn_pathfinder.worker.get_running_callbacks", - new_callable=mocker.AsyncMock, - return_value=[], - ) - - mock_response = mocker.Mock() - mock_response.status_code = 200 - mock_httpx = mocker.patch( - "httpx.AsyncClient.post", - new_callable=mocker.AsyncMock, - return_value=mock_response, - ) - - await shadowfax(_make_task(), logger) - - mock_httpx.assert_awaited_once() - - args, kwargs = mock_httpx.call_args - - threehop = kwargs["json"] - nodes = threehop["message"]["query_graph"]["nodes"] - assert nodes["intermediate_0"]["categories"] == ["biolink:Gene"] - assert nodes["intermediate_1"]["categories"] == ["biolink:Gene"] - - @pytest.mark.asyncio async def test_shadowfax_propagates_gandalf_parameters(redis_mock, mocker): """Custom gandalf_parameters in the input should ride along into the From ec896e83b830178f784c2e02fab3a98fb70e65e5 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Wed, 10 Jun 2026 09:33:12 -0400 Subject: [PATCH 9/9] typo --- workers/aragorn_pathfinder/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/aragorn_pathfinder/worker.py b/workers/aragorn_pathfinder/worker.py index 444e2a8..fc82122 100644 --- a/workers/aragorn_pathfinder/worker.py +++ b/workers/aragorn_pathfinder/worker.py @@ -223,7 +223,7 @@ async def shadowfax(task, logger: logging.Logger) -> str: try: retriever_async_response.raise_for_status() except Exception as e: - logger.error(f"Error contact retriever: {e}") + logger.error(f"Error contacting retriever: {e}") logger.debug(f"Error details: {retriever_async_response.json()}") # this worker might have a timeout set for if the lookups don't finish within a certain