From 47fae953930235e11ede77140346e0d0cd24ef0b Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Mon, 23 Feb 2026 19:17:21 -0300 Subject: [PATCH] write stats per workflow (#2244) add stats write --- .../next-release/patch-20260223204942781936.json | 4 ++++ .../graphrag/graphrag/index/run/run_pipeline.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 .semversioner/next-release/patch-20260223204942781936.json diff --git a/.semversioner/next-release/patch-20260223204942781936.json b/.semversioner/next-release/patch-20260223204942781936.json new file mode 100644 index 000000000..68c72e6af --- /dev/null +++ b/.semversioner/next-release/patch-20260223204942781936.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "write stats.json per workflow" +} diff --git a/packages/graphrag/graphrag/index/run/run_pipeline.py b/packages/graphrag/graphrag/index/run/run_pipeline.py index 55a4b0017..6e2c6a05a 100644 --- a/packages/graphrag/graphrag/index/run/run_pipeline.py +++ b/packages/graphrag/graphrag/index/run/run_pipeline.py @@ -123,7 +123,8 @@ async def _run_pipeline( last_workflow = "" try: - await _dump_json(context) + await _dump_stats_json(context) + await _dump_context_json(context) logger.info("Executing pipeline...") for name, workflow_function in pipeline.run(): @@ -138,13 +139,15 @@ async def _run_pipeline( workflow=name, result=result.result, state=context.state, error=None ) context.stats.workflows[name] = profiler.metrics + await _dump_stats_json(context) if result.stop: logger.info("Halting pipeline at workflow request") break context.stats.total_runtime = time.time() - start_time logger.info("Indexing pipeline complete.") - await _dump_json(context) + await _dump_stats_json(context) + await _dump_context_json(context) except Exception as e: logger.exception("error running workflow %s", last_workflow) @@ -153,11 +156,15 @@ async def _run_pipeline( ) -async def _dump_json(context: PipelineRunContext) -> None: - """Dump the stats and context state to the storage.""" +async def _dump_stats_json(context: PipelineRunContext) -> None: + """Dump stats state to storage.""" await context.output_storage.set( "stats.json", json.dumps(asdict(context.stats), indent=4, ensure_ascii=False) ) + + +async def _dump_context_json(context: PipelineRunContext) -> None: + """Dump context state to storage.""" # Dump context state, excluding additional_context temp_context = context.state.pop( "additional_context", None