From ba9169a2328598bf80220554bd86e0aa77050f55 Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Mon, 23 Feb 2026 13:57:52 -0300 Subject: [PATCH 1/2] streaming finalize_graph (#2240) * add streaming * add patch * fix spelling * add improvements and test --- .../patch-20260220143557050413.json | 4 + .../index/operations/finalize_entities.py | 76 +-- .../operations/finalize_relationships.py | 83 ++-- .../index/workflows/finalize_graph.py | 127 +++-- tests/unit/indexing/test_finalize_graph.py | 444 ++++++++++++++++++ 5 files changed, 639 insertions(+), 95 deletions(-) create mode 100644 .semversioner/next-release/patch-20260220143557050413.json create mode 100644 tests/unit/indexing/test_finalize_graph.py diff --git a/.semversioner/next-release/patch-20260220143557050413.json b/.semversioner/next-release/patch-20260220143557050413.json new file mode 100644 index 000000000..c92ef0ebc --- /dev/null +++ b/.semversioner/next-release/patch-20260220143557050413.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "finalize_graph streaming" +} diff --git a/packages/graphrag/graphrag/index/operations/finalize_entities.py b/packages/graphrag/graphrag/index/operations/finalize_entities.py index 71d6acc53..a0121a5b7 100644 --- a/packages/graphrag/graphrag/index/operations/finalize_entities.py +++ b/packages/graphrag/graphrag/index/operations/finalize_entities.py @@ -1,34 +1,56 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License -"""All the steps to transform final entities.""" +"""Stream-finalize entity rows into an output Table.""" +from typing import Any from uuid import uuid4 -import pandas as pd +from graphrag_storage.tables.table import Table from graphrag.data_model.schemas import ENTITIES_FINAL_COLUMNS -from graphrag.graphs.compute_degree import compute_degree - - -def finalize_entities( - entities: pd.DataFrame, - relationships: pd.DataFrame, -) -> pd.DataFrame: - """All the steps to transform final entities.""" - degrees = compute_degree(relationships) - final_entities = entities.merge(degrees, on="title", how="left").drop_duplicates( - subset="title" - ) - final_entities = final_entities.loc[entities["title"].notna()].reset_index() - # disconnected nodes and those with no community even at level 0 can be missing degree - final_entities["degree"] = final_entities["degree"].fillna(0).astype(int) - final_entities.reset_index(inplace=True) - final_entities["human_readable_id"] = final_entities.index - final_entities["id"] = final_entities["human_readable_id"].apply( - lambda _x: str(uuid4()) - ) - return final_entities.loc[ - :, - ENTITIES_FINAL_COLUMNS, - ] + + +async def finalize_entities( + entities_table: Table, + degree_map: dict[str, int], +) -> list[dict[str, Any]]: + """Read entity rows, enrich with degree, and write back. + + Streams through the entities table, deduplicates by title, + assigns degree from the pre-computed degree map, and writes + each finalized row back to the same table (safe when using + truncate=True, which reads from the original and writes to + a temp file). + + Args + ---- + entities_table: Table + Opened table for both reading input and writing output. + degree_map: dict[str, int] + Pre-computed mapping of entity title to node degree. + + Returns + ------- + list[dict[str, Any]] + Sample of up to 5 entity rows for logging. + """ + sample_rows: list[dict[str, Any]] = [] + seen_titles: set[str] = set() + human_readable_id = 0 + + async for row in entities_table: + title = row.get("title") + if not title or title in seen_titles: + continue + seen_titles.add(title) + row["degree"] = degree_map.get(title, 0) + row["human_readable_id"] = human_readable_id + row["id"] = str(uuid4()) + human_readable_id += 1 + out = {col: row.get(col) for col in ENTITIES_FINAL_COLUMNS} + await entities_table.write(out) + if len(sample_rows) < 5: + sample_rows.append(out) + + return sample_rows diff --git a/packages/graphrag/graphrag/index/operations/finalize_relationships.py b/packages/graphrag/graphrag/index/operations/finalize_relationships.py index d4d14d097..c7f5333d2 100644 --- a/packages/graphrag/graphrag/index/operations/finalize_relationships.py +++ b/packages/graphrag/graphrag/index/operations/finalize_relationships.py @@ -1,42 +1,55 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License -"""All the steps to transform final relationships.""" +"""Stream-finalize relationship rows into an output Table.""" +from typing import Any from uuid import uuid4 -import pandas as pd +from graphrag_storage.tables.table import Table from graphrag.data_model.schemas import RELATIONSHIPS_FINAL_COLUMNS -from graphrag.graphs.compute_degree import compute_degree -from graphrag.index.operations.compute_edge_combined_degree import ( - compute_edge_combined_degree, -) - - -def finalize_relationships( - relationships: pd.DataFrame, -) -> pd.DataFrame: - """All the steps to transform final relationships.""" - degrees = compute_degree(relationships) - - final_relationships = relationships.drop_duplicates(subset=["source", "target"]) - final_relationships["combined_degree"] = compute_edge_combined_degree( - final_relationships, - degrees, - node_name_column="title", - node_degree_column="degree", - edge_source_column="source", - edge_target_column="target", - ) - - final_relationships.reset_index(inplace=True) - final_relationships["human_readable_id"] = final_relationships.index - final_relationships["id"] = final_relationships["human_readable_id"].apply( - lambda _x: str(uuid4()) - ) - - return final_relationships.loc[ - :, - RELATIONSHIPS_FINAL_COLUMNS, - ] + + +async def finalize_relationships( + relationships_table: Table, + degree_map: dict[str, int], +) -> list[dict[str, Any]]: + """Deduplicate relationships, enrich with combined degree, and write. + + Streams through the relationships table, deduplicates by + (source, target) pair, computes combined_degree as the sum of + source and target node degrees, and writes each finalized row + back to the table. + + Args + ---- + relationships_table: Table + Opened table for reading and writing relationship rows. + degree_map: dict[str, int] + Pre-computed mapping of entity title to node degree. + + Returns + ------- + list[dict[str, Any]] + Sample of up to 5 relationship rows for logging. + """ + sample_rows: list[dict[str, Any]] = [] + seen: set[tuple[str, str]] = set() + human_readable_id = 0 + + async for row in relationships_table: + key = (row.get("source", ""), row.get("target", "")) + if key in seen: + continue + seen.add(key) + row["combined_degree"] = degree_map.get(key[0], 0) + degree_map.get(key[1], 0) + row["human_readable_id"] = human_readable_id + row["id"] = str(uuid4()) + human_readable_id += 1 + final = {col: row.get(col) for col in RELATIONSHIPS_FINAL_COLUMNS} + await relationships_table.write(final) + if len(sample_rows) < 5: + sample_rows.append(final) + + return sample_rows diff --git a/packages/graphrag/graphrag/index/workflows/finalize_graph.py b/packages/graphrag/graphrag/index/workflows/finalize_graph.py index 395299eaa..4be6171f3 100644 --- a/packages/graphrag/graphrag/index/workflows/finalize_graph.py +++ b/packages/graphrag/graphrag/index/workflows/finalize_graph.py @@ -1,16 +1,23 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License """A module containing run_workflow method definition.""" import logging +from collections import Counter +from typing import Any -import pandas as pd +from graphrag_storage.tables.table import Table from graphrag.config.models.graph_rag_config import GraphRagConfig -from graphrag.data_model.data_reader import DataReader +from graphrag.data_model.row_transformers import ( + transform_entity_row, + transform_relationship_row, +) from graphrag.index.operations.finalize_entities import finalize_entities -from graphrag.index.operations.finalize_relationships import finalize_relationships +from graphrag.index.operations.finalize_relationships import ( + finalize_relationships, +) from graphrag.index.operations.snapshot_graphml import snapshot_graphml from graphrag.index.typing.context import PipelineRunContext from graphrag.index.typing.workflow import WorkflowFunctionOutput @@ -24,41 +31,95 @@ async def run_workflow( ) -> WorkflowFunctionOutput: """All the steps to create the base entity graph.""" logger.info("Workflow started: finalize_graph") - reader = DataReader(context.output_table_provider) - entities = await reader.entities() - relationships = await reader.relationships() - final_entities, final_relationships = finalize_graph( - entities, - relationships, - ) - - await context.output_table_provider.write_dataframe("entities", final_entities) - await context.output_table_provider.write_dataframe( - "relationships", final_relationships - ) + async with ( + context.output_table_provider.open( + "entities", + transformer=transform_entity_row, + ) as entities_table, + context.output_table_provider.open( + "relationships", + transformer=transform_relationship_row, + ) as relationships_table, + ): + result = await finalize_graph( + entities_table, + relationships_table, + ) if config.snapshots.graphml: + rels = await context.output_table_provider.read_dataframe("relationships") await snapshot_graphml( - final_relationships, + rels, name="graph", storage=context.output_storage, ) logger.info("Workflow completed: finalize_graph") - return WorkflowFunctionOutput( - result={ - "entities": entities, - "relationships": relationships, - } - ) - - -def finalize_graph( - entities: pd.DataFrame, - relationships: pd.DataFrame, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """All the steps to finalize the entity and relationship formats.""" - final_entities = finalize_entities(entities, relationships) - final_relationships = finalize_relationships(relationships) - return (final_entities, final_relationships) + return WorkflowFunctionOutput(result=result) + + +async def finalize_graph( + entities_table: Table, + relationships_table: Table, +) -> dict[str, list[dict[str, Any]]]: + """Compute degrees and finalize entities and relationships. + + Streams relationship rows to build a degree map without + materializing a DataFrame, then delegates to the individual + finalize operations for streaming row-by-row enrichment and + writing. + + Args + ---- + entities_table: Table + Opened table for reading and writing entity rows. + relationships_table: Table + Opened table for reading relationships into a DataFrame + and writing finalized relationship rows. + + Returns + ------- + dict[str, list[dict[str, Any]]] + Sample rows keyed by ``"entities"`` and + ``"relationships"``, up to 5 each. + """ + degree_map = await _build_degree_map(relationships_table) + + entity_samples = await finalize_entities(entities_table, degree_map) + relationship_samples = await finalize_relationships(relationships_table, degree_map) + + return { + "entities": entity_samples, + "relationships": relationship_samples, + } + + +async def _build_degree_map( + relationships_table: Table, +) -> dict[str, int]: + """Stream relationship rows to compute node degrees. + + Normalizes each edge to an undirected pair and deduplicates + on the fly, matching the behavior of ``compute_degree`` but + without materializing a DataFrame. + + Args + ---- + relationships_table: Table + Opened table to stream relationship rows from. + + Returns + ------- + dict[str, int] + Mapping of entity title to its node degree. + """ + seen: set[tuple[str, str]] = set() + degree: Counter[str] = Counter() + async for row in relationships_table: + lo, hi = sorted((row["source"], row["target"])) + if (lo, hi) not in seen: + seen.add((lo, hi)) + degree[lo] += 1 + degree[hi] += 1 + return dict(degree) diff --git a/tests/unit/indexing/test_finalize_graph.py b/tests/unit/indexing/test_finalize_graph.py new file mode 100644 index 000000000..20daa4966 --- /dev/null +++ b/tests/unit/indexing/test_finalize_graph.py @@ -0,0 +1,444 @@ +# Copyright (C) 2026 Microsoft +# Licensed under the MIT License + +"""Tests for the finalize_graph streaming functions. + +Covers _build_degree_map, finalize_entities, finalize_relationships, +and the orchestrating finalize_graph function. +""" + +from typing import Any + +import pytest +from graphrag.data_model.schemas import ( + ENTITIES_FINAL_COLUMNS, + RELATIONSHIPS_FINAL_COLUMNS, +) +from graphrag.index.operations.finalize_entities import finalize_entities +from graphrag.index.operations.finalize_relationships import ( + finalize_relationships, +) +from graphrag.index.workflows.finalize_graph import ( + _build_degree_map, + finalize_graph, +) +from graphrag_storage.tables.table import Table + + +class FakeTable(Table): + """In-memory table that supports async iteration and write collection. + + Rows passed to write() are collected in ``written`` for assertions. + Each call to ``__aiter__`` resets the read cursor so the table can + be iterated multiple times, matching the real CSVTable behavior + under truncate mode. + """ + + def __init__(self, rows: list[dict[str, Any]] | None = None) -> None: + self._rows = list(rows or []) + self._index = 0 + self.written: list[dict[str, Any]] = [] + + def __aiter__(self): + """Return an async iterator over the seed rows.""" + self._index = 0 + return self + + async def __anext__(self) -> dict[str, Any]: + """Yield the next row or stop.""" + if self._index >= len(self._rows): + raise StopAsyncIteration + row = dict(self._rows[self._index]) + self._index += 1 + return row + + async def length(self) -> int: + """Return number of seed rows.""" + return len(self._rows) + + async def has(self, row_id: str) -> bool: + """Check if a row with the given ID exists in seed rows.""" + return any(r.get("id") == row_id for r in self._rows) + + async def write(self, row: dict[str, Any]) -> None: + """Collect written rows for test assertions.""" + self.written.append(row) + + async def close(self) -> None: + """No-op.""" + + +def _make_entity_row( + title: str, + entity_type: str = "ENTITY", + description: str = "", + frequency: int = 1, +) -> dict[str, Any]: + """Build a minimal entity row matching pre-finalization shape.""" + return { + "title": title, + "type": entity_type, + "description": description, + "frequency": frequency, + "text_unit_ids": ["tu1"], + } + + +def _make_relationship_row( + source: str, + target: str, + weight: float = 1.0, + description: str = "", +) -> dict[str, Any]: + """Build a minimal relationship row matching pre-finalization shape.""" + return { + "source": source, + "target": target, + "weight": weight, + "description": description, + "text_unit_ids": ["tu1"], + } + + +class TestBuildDegreeMap: + """Tests for the streaming _build_degree_map helper.""" + + async def test_simple_triangle(self): + """Three nodes forming a triangle should each have degree 2.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("B", "C"), + _make_relationship_row("A", "C"), + ]) + result = await _build_degree_map(table) + assert result == {"A": 2, "B": 2, "C": 2} + + async def test_star_topology(self): + """Hub connected to four leaves should have degree 4; leaves degree 1.""" + table = FakeTable([ + _make_relationship_row("hub", "a"), + _make_relationship_row("hub", "b"), + _make_relationship_row("hub", "c"), + _make_relationship_row("hub", "d"), + ]) + result = await _build_degree_map(table) + assert result["hub"] == 4 + for leaf in ("a", "b", "c", "d"): + assert result[leaf] == 1 + + async def test_duplicate_edges_deduplicated(self): + """Duplicate (A, B) edges should be counted only once.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("A", "B"), + ]) + result = await _build_degree_map(table) + assert result == {"A": 1, "B": 1} + + async def test_reversed_duplicate_edges_deduplicated(self): + """(A, B) and (B, A) are the same undirected edge.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("B", "A"), + ]) + result = await _build_degree_map(table) + assert result == {"A": 1, "B": 1} + + async def test_empty_table(self): + """Empty relationship table should produce empty degree map.""" + table = FakeTable([]) + result = await _build_degree_map(table) + assert result == {} + + async def test_single_edge(self): + """One edge yields degree 1 for both endpoints.""" + table = FakeTable([_make_relationship_row("X", "Y")]) + result = await _build_degree_map(table) + assert result == {"X": 1, "Y": 1} + + async def test_disconnected_components(self): + """Two separate components computed correctly.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("C", "D"), + ]) + result = await _build_degree_map(table) + assert result == {"A": 1, "B": 1, "C": 1, "D": 1} + + +class TestFinalizeEntities: + """Tests for stream-finalize entity rows.""" + + async def test_enriches_with_degree(self): + """Entities should receive degree from the degree map.""" + table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("B"), + ]) + degree_map = {"A": 3, "B": 1} + await finalize_entities(table, degree_map) + + assert len(table.written) == 2 + assert table.written[0]["degree"] == 3 + assert table.written[1]["degree"] == 1 + + async def test_missing_degree_defaults_to_zero(self): + """Entity not in degree map should get degree 0.""" + table = FakeTable([_make_entity_row("UNKNOWN")]) + degree_map = {"A": 5} + await finalize_entities(table, degree_map) + + assert len(table.written) == 1 + assert table.written[0]["degree"] == 0 + + async def test_deduplicates_by_title(self): + """Duplicate titles should be skipped.""" + table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("A"), + _make_entity_row("B"), + ]) + degree_map = {"A": 1, "B": 2} + await finalize_entities(table, degree_map) + + assert len(table.written) == 2 + titles = [r["title"] for r in table.written] + assert titles == ["A", "B"] + + async def test_skips_empty_title(self): + """Rows with empty or missing title should be skipped.""" + table = FakeTable([ + _make_entity_row(""), + _make_entity_row("A"), + ]) + degree_map = {"A": 1} + await finalize_entities(table, degree_map) + + assert len(table.written) == 1 + assert table.written[0]["title"] == "A" + + async def test_assigns_sequential_human_readable_ids(self): + """human_readable_id should be 0-based sequential.""" + table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("B"), + _make_entity_row("C"), + ]) + degree_map = {"A": 1, "B": 1, "C": 1} + await finalize_entities(table, degree_map) + + ids = [r["human_readable_id"] for r in table.written] + assert ids == [0, 1, 2] + + async def test_assigns_unique_ids(self): + """Each entity should get a unique UUID id.""" + table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("B"), + ]) + degree_map = {"A": 1, "B": 1} + await finalize_entities(table, degree_map) + + row_ids = [r["id"] for r in table.written] + assert len(set(row_ids)) == 2 + + async def test_output_columns_match_schema(self): + """Written rows should contain exactly the ENTITIES_FINAL_COLUMNS.""" + table = FakeTable([_make_entity_row("A")]) + degree_map = {"A": 1} + await finalize_entities(table, degree_map) + + assert set(table.written[0].keys()) == set(ENTITIES_FINAL_COLUMNS) + + async def test_returns_sample_rows_up_to_five(self): + """Should return at most 5 sample rows.""" + rows = [_make_entity_row(f"E{i}") for i in range(8)] + table = FakeTable(rows) + degree_map = {f"E{i}": 1 for i in range(8)} + samples = await finalize_entities(table, degree_map) + + assert len(samples) == 5 + assert len(table.written) == 8 + + async def test_empty_table(self): + """Empty input should produce no output.""" + table = FakeTable([]) + degree_map = {} + samples = await finalize_entities(table, degree_map) + + assert samples == [] + assert table.written == [] + + +class TestFinalizeRelationships: + """Tests for stream-finalize relationship rows.""" + + async def test_enriches_with_combined_degree(self): + """combined_degree should be sum of source and target degrees.""" + table = FakeTable([_make_relationship_row("A", "B")]) + degree_map = {"A": 3, "B": 2} + await finalize_relationships(table, degree_map) + + assert len(table.written) == 1 + assert table.written[0]["combined_degree"] == 5 + + async def test_missing_degree_defaults_to_zero(self): + """Nodes not in degree map contribute 0 to combined_degree.""" + table = FakeTable([_make_relationship_row("X", "Y")]) + degree_map = {"X": 4} + await finalize_relationships(table, degree_map) + + assert table.written[0]["combined_degree"] == 4 + + async def test_deduplicates_by_source_target(self): + """Duplicate (source, target) pairs should be skipped.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("A", "B"), + _make_relationship_row("B", "C"), + ]) + degree_map = {"A": 1, "B": 2, "C": 1} + await finalize_relationships(table, degree_map) + + assert len(table.written) == 2 + pairs = [(r["source"], r["target"]) for r in table.written] + assert pairs == [("A", "B"), ("B", "C")] + + async def test_reversed_pair_not_deduplicated(self): + """(A,B) and (B,A) are treated as distinct directed edges.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("B", "A"), + ]) + degree_map = {"A": 1, "B": 1} + await finalize_relationships(table, degree_map) + + assert len(table.written) == 2 + + async def test_assigns_sequential_human_readable_ids(self): + """human_readable_id should be 0-based sequential.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("B", "C"), + ]) + degree_map = {"A": 1, "B": 2, "C": 1} + await finalize_relationships(table, degree_map) + + ids = [r["human_readable_id"] for r in table.written] + assert ids == [0, 1] + + async def test_assigns_unique_ids(self): + """Each relationship should get a unique UUID id.""" + table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("B", "C"), + ]) + degree_map = {"A": 1, "B": 2, "C": 1} + await finalize_relationships(table, degree_map) + + row_ids = [r["id"] for r in table.written] + assert len(set(row_ids)) == 2 + + async def test_output_columns_match_schema(self): + """Written rows should contain exactly RELATIONSHIPS_FINAL_COLUMNS.""" + table = FakeTable([_make_relationship_row("A", "B")]) + degree_map = {"A": 1, "B": 1} + await finalize_relationships(table, degree_map) + + assert set(table.written[0].keys()) == set(RELATIONSHIPS_FINAL_COLUMNS) + + async def test_returns_sample_rows_up_to_five(self): + """Should return at most 5 sample rows.""" + rows = [_make_relationship_row(f"S{i}", f"T{i}") for i in range(8)] + table = FakeTable(rows) + degree_map = {f"S{i}": 1 for i in range(8)} | {f"T{i}": 1 for i in range(8)} + samples = await finalize_relationships(table, degree_map) + + assert len(samples) == 5 + assert len(table.written) == 8 + + async def test_empty_table(self): + """Empty input should produce no output.""" + table = FakeTable([]) + degree_map = {} + samples = await finalize_relationships(table, degree_map) + + assert samples == [] + assert table.written == [] + + +class TestFinalizeGraph: + """Tests for the orchestrating finalize_graph function.""" + + async def test_produces_entities_and_relationships_keys(self): + """Result dict should have 'entities' and 'relationships' keys.""" + entities_table = FakeTable([_make_entity_row("A")]) + relationships_table = FakeTable([_make_relationship_row("A", "B")]) + result = await finalize_graph(entities_table, relationships_table) + + assert "entities" in result + assert "relationships" in result + + async def test_degree_flows_through_to_entities(self): + """Entity degree should reflect computed edge degrees.""" + entities_table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("B"), + _make_entity_row("C"), + ]) + relationships_table = FakeTable([ + _make_relationship_row("A", "B"), + _make_relationship_row("A", "C"), + ]) + await finalize_graph(entities_table, relationships_table) + + degree_by_title = {r["title"]: r["degree"] for r in entities_table.written} + assert degree_by_title["A"] == 2 + assert degree_by_title["B"] == 1 + assert degree_by_title["C"] == 1 + + async def test_combined_degree_flows_through_to_relationships(self): + """Relationship combined_degree should be sum of endpoint degrees.""" + entities_table = FakeTable([ + _make_entity_row("A"), + _make_entity_row("B"), + ]) + relationships_table = FakeTable([ + _make_relationship_row("A", "B"), + ]) + await finalize_graph(entities_table, relationships_table) + + assert len(relationships_table.written) == 1 + assert relationships_table.written[0]["combined_degree"] == 2 + + async def test_empty_graph(self): + """Empty tables should produce empty results.""" + entities_table = FakeTable([]) + relationships_table = FakeTable([]) + result = await finalize_graph(entities_table, relationships_table) + + assert result == {"entities": [], "relationships": []} + + @pytest.mark.parametrize( + ("entity_count", "relationship_count"), + [ + (3, 2), + (10, 15), + ], + ids=["small", "medium"], + ) + async def test_all_rows_written(self, entity_count: int, relationship_count: int): + """All unique entities and relationships should be written.""" + entity_rows = [_make_entity_row(f"E{i}") for i in range(entity_count)] + relationship_rows = [ + _make_relationship_row(f"E{i}", f"E{(i + 1) % entity_count}") + for i in range(relationship_count) + ] + entities_table = FakeTable(entity_rows) + relationships_table = FakeTable(relationship_rows) + + await finalize_graph(entities_table, relationships_table) + + assert len(entities_table.written) == entity_count + unique_rel_pairs = {(r["source"], r["target"]) for r in relationship_rows} + assert len(relationships_table.written) == len(unique_rel_pairs) From 1cedb796b3e26f432743e9884b62f6c23c8a37b0 Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Mon, 23 Feb 2026 14:16:39 -0300 Subject: [PATCH 2/2] streaming create_final_documents (#2243) * streaming create_final_documents * add semversioner --- .../patch-20260223133523034773.json | 4 + .../index/workflows/create_final_documents.py | 97 ++++++++++--------- 2 files changed, 55 insertions(+), 46 deletions(-) create mode 100644 .semversioner/next-release/patch-20260223133523034773.json diff --git a/.semversioner/next-release/patch-20260223133523034773.json b/.semversioner/next-release/patch-20260223133523034773.json new file mode 100644 index 000000000..67ae560b1 --- /dev/null +++ b/.semversioner/next-release/patch-20260223133523034773.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "create_final_documents streaming" +} diff --git a/packages/graphrag/graphrag/index/workflows/create_final_documents.py b/packages/graphrag/graphrag/index/workflows/create_final_documents.py index ccbd96782..7b3f65f99 100644 --- a/packages/graphrag/graphrag/index/workflows/create_final_documents.py +++ b/packages/graphrag/graphrag/index/workflows/create_final_documents.py @@ -1,14 +1,17 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License -"""A module containing run_workflow method definition.""" +"""Workflow to create final documents with text unit mappings.""" import logging +from typing import Any -import pandas as pd +from graphrag_storage.tables.table import Table from graphrag.config.models.graph_rag_config import GraphRagConfig -from graphrag.data_model.data_reader import DataReader +from graphrag.data_model.row_transformers import ( + transform_document_row, +) from graphrag.data_model.schemas import DOCUMENTS_FINAL_COLUMNS from graphrag.index.typing.context import PipelineRunContext from graphrag.index.typing.workflow import WorkflowFunctionOutput @@ -20,49 +23,51 @@ async def run_workflow( _config: GraphRagConfig, context: PipelineRunContext, ) -> WorkflowFunctionOutput: - """All the steps to transform final documents.""" + """Transform final documents via streaming Table reads/writes.""" logger.info("Workflow started: create_final_documents") - reader = DataReader(context.output_table_provider) - documents = await reader.documents() - text_units = await reader.text_units() - output = create_final_documents(documents, text_units) - - await context.output_table_provider.write_dataframe("documents", output) + async with ( + context.output_table_provider.open( + "text_units", + ) as text_units_table, + context.output_table_provider.open( + "documents", + transformer=transform_document_row, + ) as documents_table, + context.output_table_provider.open( + "documents", + ) as output_table, + ): + sample = await create_final_documents( + text_units_table, + documents_table, + output_table, + ) logger.info("Workflow completed: create_final_documents") - return WorkflowFunctionOutput(result=output) - - -def create_final_documents( - documents: pd.DataFrame, text_units: pd.DataFrame -) -> pd.DataFrame: - """All the steps to transform final documents.""" - renamed = text_units.loc[:, ["id", "document_id", "text"]].rename( - columns={ - "document_id": "chunk_doc_id", - "id": "chunk_id", - "text": "chunk_text", - } - ) - - joined = renamed.merge( - documents, - left_on="chunk_doc_id", - right_on="id", - how="inner", - copy=False, - ) - - docs_with_text_units = joined.groupby("id", sort=False).agg( - text_unit_ids=("chunk_id", list) - ) - - rejoined = docs_with_text_units.merge( - documents, - on="id", - how="right", - copy=False, - ).reset_index(drop=True) - - return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS] + return WorkflowFunctionOutput(result=sample) + + +async def create_final_documents( + text_units_table: Table, + documents_table: Table, + output_table: Table, +) -> list[dict[str, Any]]: + """Build text-unit mapping, then stream-enrich documents.""" + mapping: dict[str, list[str]] = {} + async for row in text_units_table: + document_id = row.get("document_id", "") + if document_id: + mapping.setdefault(document_id, []).append( + row["id"], + ) + + sample_rows: list[dict[str, Any]] = [] + async for row in documents_table: + row["text_unit_ids"] = mapping.get(row["id"], []) + out = {c: row.get(c) for c in DOCUMENTS_FINAL_COLUMNS} + await output_table.write(out) + if len(sample_rows) < 5: + sample_rows.append(out) + + return sample_rows