From bb9afeb898ad2cc886093b05f1f11c3740bfe819 Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Tue, 3 Mar 2026 18:59:21 -0300 Subject: [PATCH] nlp streaming (#2264) * fix cooccurences * unused async fixes --- .../patch-20260302184111878545.json | 4 + .../build_noun_graph/build_noun_graph.py | 180 +++++++++--------- .../index/workflows/extract_graph_nlp.py | 123 ++++++++---- 3 files changed, 179 insertions(+), 128 deletions(-) create mode 100644 .semversioner/next-release/patch-20260302184111878545.json diff --git a/.semversioner/next-release/patch-20260302184111878545.json b/.semversioner/next-release/patch-20260302184111878545.json new file mode 100644 index 000000000..c990dd624 --- /dev/null +++ b/.semversioner/next-release/patch-20260302184111878545.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "extract_graph_nlp streaming" +} diff --git a/packages/graphrag/graphrag/index/operations/build_noun_graph/build_noun_graph.py b/packages/graphrag/graphrag/index/operations/build_noun_graph/build_noun_graph.py index 3a9f3e8f3..ece890a57 100644 --- a/packages/graphrag/graphrag/index/operations/build_noun_graph/build_noun_graph.py +++ b/packages/graphrag/graphrag/index/operations/build_noun_graph/build_noun_graph.py @@ -1,141 +1,143 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License """Graph extraction using NLP.""" +import logging +from collections import defaultdict from itertools import combinations -import numpy as np import pandas as pd from graphrag_cache import Cache +from graphrag_storage.tables.table import Table -from graphrag.config.enums import AsyncType from graphrag.graphs.edge_weights import calculate_pmi_edge_weights from graphrag.index.operations.build_noun_graph.np_extractors.base import ( BaseNounPhraseExtractor, ) -from graphrag.index.utils.derive_from_rows import derive_from_rows from graphrag.index.utils.hashing import gen_sha512_hash +logger = logging.getLogger(__name__) + async def build_noun_graph( - text_unit_df: pd.DataFrame, + text_unit_table: Table, text_analyzer: BaseNounPhraseExtractor, normalize_edge_weights: bool, - num_threads: int, - async_mode: AsyncType, cache: Cache, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Build a noun graph from text units.""" - text_units = text_unit_df.loc[:, ["id", "text"]] - nodes_df = await _extract_nodes( - text_units, + title_to_ids = await _extract_nodes( + text_unit_table, text_analyzer, - num_threads=num_threads, - async_mode=async_mode, cache=cache, ) - edges_df = _extract_edges(nodes_df, normalize_edge_weights=normalize_edge_weights) + + nodes_df = pd.DataFrame( + [ + { + "title": title, + "frequency": len(ids), + "text_unit_ids": ids, + } + for title, ids in title_to_ids.items() + ], + columns=["title", "frequency", "text_unit_ids"], + ) + + edges_df = _extract_edges( + title_to_ids, + nodes_df=nodes_df, + normalize_edge_weights=normalize_edge_weights, + ) return (nodes_df, edges_df) async def _extract_nodes( - text_unit_df: pd.DataFrame, + text_unit_table: Table, text_analyzer: BaseNounPhraseExtractor, - num_threads: int, - async_mode: AsyncType, cache: Cache, -) -> pd.DataFrame: - """ - Extract initial nodes and edges from text units. +) -> dict[str, list[str]]: + """Extract noun-phrase nodes from text units. - Input: text unit df with schema [id, text, document_id] - Returns a dataframe with schema [id, title, frequency, text_unit_ids]. + NLP extraction is CPU-bound (spaCy/TextBlob), so threading + provides no benefit under the GIL. We process rows + sequentially, relying on the cache to skip repeated work. + + Returns a mapping of noun-phrase title to text-unit ids. """ - cache = cache.child("extract_noun_phrases") + extraction_cache = cache.child("extract_noun_phrases") + total = await text_unit_table.length() + title_to_ids: dict[str, list[str]] = defaultdict(list) + completed = 0 - async def extract(row): + async for row in text_unit_table: + text_unit_id = row["id"] text = row["text"] + attrs = {"text": text, "analyzer": str(text_analyzer)} key = gen_sha512_hash(attrs, attrs.keys()) - result = await cache.get(key) + result = await extraction_cache.get(key) if not result: result = text_analyzer.extract(text) - await cache.set(key, result) - return result - - text_unit_df["noun_phrases"] = await derive_from_rows( # type: ignore - text_unit_df, - extract, - num_threads=num_threads, - async_type=async_mode, - progress_msg="extract noun phrases progress: ", - ) + await extraction_cache.set(key, result) - noun_node_df = text_unit_df.explode("noun_phrases") - noun_node_df = noun_node_df.rename( - columns={"noun_phrases": "title", "id": "text_unit_id"} - ) + for phrase in result: + title_to_ids[phrase].append(text_unit_id) - # group by title and count the number of text units - grouped_node_df = ( - noun_node_df.groupby("title").agg({"text_unit_id": list}).reset_index() - ) - grouped_node_df = grouped_node_df.rename(columns={"text_unit_id": "text_unit_ids"}) - grouped_node_df["frequency"] = grouped_node_df["text_unit_ids"].apply(len) - grouped_node_df = grouped_node_df[["title", "frequency", "text_unit_ids"]] - return grouped_node_df.loc[:, ["title", "frequency", "text_unit_ids"]] + completed += 1 + if completed % 100 == 0 or completed == total: + logger.info( + "extract noun phrases progress: %d/%d", + completed, + total, + ) + + return dict(title_to_ids) def _extract_edges( + title_to_ids: dict[str, list[str]], nodes_df: pd.DataFrame, normalize_edge_weights: bool = True, ) -> pd.DataFrame: - """ - Extract edges from nodes. + """Build co-occurrence edges between noun phrases. - Nodes appear in the same text unit are connected. - Input: nodes_df with schema [id, title, frequency, text_unit_ids] - Returns: edges_df with schema [source, target, weight, text_unit_ids] + Nodes that appear in the same text unit are connected. + Returns edges with schema [source, target, weight, text_unit_ids]. """ - if nodes_df.empty: - return pd.DataFrame(columns=["source", "target", "weight", "text_unit_ids"]) - - text_units_df = nodes_df.explode("text_unit_ids") - text_units_df = text_units_df.rename(columns={"text_unit_ids": "text_unit_id"}) - text_units_df = ( - text_units_df - .groupby("text_unit_id") - .agg({"title": lambda x: list(x) if len(x) > 1 else np.nan}) - .reset_index() - ) - text_units_df = text_units_df.dropna() - titles = text_units_df["title"].tolist() - all_edges: list[list[tuple[str, str]]] = [list(combinations(t, 2)) for t in titles] - - text_units_df = text_units_df.assign(edges=all_edges) # type: ignore - edge_df = text_units_df.explode("edges")[["edges", "text_unit_id"]] - - edge_df[["source", "target"]] = edge_df.loc[:, "edges"].to_list() - edge_df["min_source"] = edge_df[["source", "target"]].min(axis=1) - edge_df["max_target"] = edge_df[["source", "target"]].max(axis=1) - edge_df = edge_df.drop(columns=["source", "target"]).rename( - columns={"min_source": "source", "max_target": "target"} # type: ignore + if not title_to_ids: + return pd.DataFrame( + columns=["source", "target", "weight", "text_unit_ids"], + ) + + text_unit_to_titles: dict[str, list[str]] = defaultdict(list) + for title, tu_ids in title_to_ids.items(): + for tu_id in tu_ids: + text_unit_to_titles[tu_id].append(title) + + edge_map: dict[tuple[str, str], list[str]] = defaultdict(list) + for tu_id, titles in text_unit_to_titles.items(): + if len(titles) < 2: + continue + for pair in combinations(sorted(set(titles)), 2): + edge_map[pair].append(tu_id) + + records = [ + { + "source": src, + "target": tgt, + "weight": len(tu_ids), + "text_unit_ids": tu_ids, + } + for (src, tgt), tu_ids in edge_map.items() + ] + edges_df = pd.DataFrame( + records, + columns=["source", "target", "weight", "text_unit_ids"], ) - edge_df = edge_df[(edge_df.source.notna()) & (edge_df.target.notna())] - edge_df = edge_df.drop(columns=["edges"]) - # group by source and target, count the number of text units - grouped_edge_df = ( - edge_df.groupby(["source", "target"]).agg({"text_unit_id": list}).reset_index() - ) - grouped_edge_df = grouped_edge_df.rename(columns={"text_unit_id": "text_unit_ids"}) - grouped_edge_df["weight"] = grouped_edge_df["text_unit_ids"].apply(len) - grouped_edge_df = grouped_edge_df.loc[ - :, ["source", "target", "weight", "text_unit_ids"] - ] - if normalize_edge_weights: - # use PMI weight instead of raw weight - grouped_edge_df = calculate_pmi_edge_weights(nodes_df, grouped_edge_df) + if normalize_edge_weights and not edges_df.empty: + edges_df = calculate_pmi_edge_weights(nodes_df, edges_df) - return grouped_edge_df + return edges_df diff --git a/packages/graphrag/graphrag/index/workflows/extract_graph_nlp.py b/packages/graphrag/graphrag/index/workflows/extract_graph_nlp.py index 3bd51f902..d4cae458b 100644 --- a/packages/graphrag/graphrag/index/workflows/extract_graph_nlp.py +++ b/packages/graphrag/graphrag/index/workflows/extract_graph_nlp.py @@ -1,17 +1,19 @@ -# Copyright (c) 2024 Microsoft Corporation. +# Copyright (C) 2026 Microsoft # Licensed under the MIT License -"""A module containing run_workflow method definition.""" +"""NLP-based graph extraction workflow.""" import logging +from typing import Any import pandas as pd from graphrag_cache import Cache +from graphrag_storage.tables.table import Table -from graphrag.config.enums import AsyncType from graphrag.config.models.graph_rag_config import GraphRagConfig -from graphrag.data_model.data_reader import DataReader -from graphrag.index.operations.build_noun_graph.build_noun_graph import build_noun_graph +from graphrag.index.operations.build_noun_graph.build_noun_graph import ( + build_noun_graph, +) from graphrag.index.operations.build_noun_graph.np_extractors.base import ( BaseNounPhraseExtractor, ) @@ -28,51 +30,47 @@ async def run_workflow( config: GraphRagConfig, context: PipelineRunContext, ) -> WorkflowFunctionOutput: - """All the steps to create the base entity graph.""" + """Run the NLP graph-extraction pipeline.""" logger.info("Workflow started: extract_graph_nlp") - reader = DataReader(context.output_table_provider) - text_units = await reader.text_units() text_analyzer_config = config.extract_graph_nlp.text_analyzer text_analyzer = create_noun_phrase_extractor(text_analyzer_config) - entities, relationships = await extract_graph_nlp( - text_units, - context.cache, - text_analyzer=text_analyzer, - normalize_edge_weights=config.extract_graph_nlp.normalize_edge_weights, - num_threads=config.extract_graph_nlp.concurrent_requests, - async_type=config.extract_graph_nlp.async_mode, - ) - - await context.output_table_provider.write_dataframe("entities", entities) - await context.output_table_provider.write_dataframe("relationships", relationships) + async with ( + context.output_table_provider.open( + "text_units", truncate=False + ) as text_units_table, + context.output_table_provider.open("entities") as entities_table, + context.output_table_provider.open( + "relationships", + ) as relationships_table, + ): + result = await extract_graph_nlp( + text_units_table, + context.cache, + entities_table=entities_table, + relationships_table=relationships_table, + text_analyzer=text_analyzer, + normalize_edge_weights=(config.extract_graph_nlp.normalize_edge_weights), + ) logger.info("Workflow completed: extract_graph_nlp") - - return WorkflowFunctionOutput( - result={ - "entities": entities, - "relationships": relationships, - } - ) + return WorkflowFunctionOutput(result=result) async def extract_graph_nlp( - text_units: pd.DataFrame, + text_units_table: Table, cache: Cache, + entities_table: Table, + relationships_table: Table, text_analyzer: BaseNounPhraseExtractor, normalize_edge_weights: bool, - num_threads: int, - async_type: AsyncType, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """All the steps to create the base entity graph.""" +) -> dict[str, list[dict[str, Any]]]: + """Extract noun-phrase graph and stream results to output tables.""" extracted_nodes, extracted_edges = await build_noun_graph( - text_units, + text_units_table, text_analyzer=text_analyzer, normalize_edge_weights=normalize_edge_weights, - num_threads=num_threads, - async_mode=async_type, cache=cache, ) @@ -89,9 +87,56 @@ async def extract_graph_nlp( ) logger.error(error_msg) - # add in any other columns required by downstream workflows - extracted_nodes["type"] = "NOUN PHRASE" - extracted_nodes["description"] = "" - extracted_edges["description"] = "" + entity_samples = await _write_entities( + extracted_nodes, + entities_table, + ) + relationship_samples = await _write_relationships( + extracted_edges, + relationships_table, + ) - return (extracted_nodes, extracted_edges) + return { + "entities": entity_samples, + "relationships": relationship_samples, + } + + +async def _write_entities( + nodes_df: pd.DataFrame, + table: Table, +) -> list[dict[str, Any]]: + """Stream entity rows into the output table.""" + samples: list[dict[str, Any]] = [] + for row_tuple in nodes_df.itertuples(index=False): + row = { + "title": row_tuple.title, + "frequency": row_tuple.frequency, + "text_unit_ids": row_tuple.text_unit_ids, + "type": "NOUN PHRASE", + "description": "", + } + await table.write(row) + if len(samples) < 5: + samples.append(row) + return samples + + +async def _write_relationships( + edges_df: pd.DataFrame, + table: Table, +) -> list[dict[str, Any]]: + """Stream relationship rows into the output table.""" + samples: list[dict[str, Any]] = [] + for row_tuple in edges_df.itertuples(index=False): + row = { + "source": row_tuple.source, + "target": row_tuple.target, + "weight": row_tuple.weight, + "text_unit_ids": row_tuple.text_unit_ids, + "description": "", + } + await table.write(row) + if len(samples) < 5: + samples.append(row) + return samples