Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260302184111878545.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "extract_graph_nlp streaming"
}
Original file line number Diff line number Diff line change
@@ -1,141 +1,143 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""Graph extraction using NLP."""

import logging
from collections import defaultdict
from itertools import combinations

import numpy as np
import pandas as pd
from graphrag_cache import Cache
from graphrag_storage.tables.table import Table

from graphrag.config.enums import AsyncType
from graphrag.graphs.edge_weights import calculate_pmi_edge_weights
from graphrag.index.operations.build_noun_graph.np_extractors.base import (
BaseNounPhraseExtractor,
)
from graphrag.index.utils.derive_from_rows import derive_from_rows
from graphrag.index.utils.hashing import gen_sha512_hash

logger = logging.getLogger(__name__)


async def build_noun_graph(
text_unit_df: pd.DataFrame,
text_unit_table: Table,
text_analyzer: BaseNounPhraseExtractor,
normalize_edge_weights: bool,
num_threads: int,
async_mode: AsyncType,
cache: Cache,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Build a noun graph from text units."""
text_units = text_unit_df.loc[:, ["id", "text"]]
nodes_df = await _extract_nodes(
text_units,
title_to_ids = await _extract_nodes(
text_unit_table,
text_analyzer,
num_threads=num_threads,
async_mode=async_mode,
cache=cache,
)
edges_df = _extract_edges(nodes_df, normalize_edge_weights=normalize_edge_weights)

nodes_df = pd.DataFrame(
[
{
"title": title,
"frequency": len(ids),
"text_unit_ids": ids,
}
for title, ids in title_to_ids.items()
],
columns=["title", "frequency", "text_unit_ids"],
)

edges_df = _extract_edges(
title_to_ids,
nodes_df=nodes_df,
normalize_edge_weights=normalize_edge_weights,
)
return (nodes_df, edges_df)


async def _extract_nodes(
text_unit_df: pd.DataFrame,
text_unit_table: Table,
text_analyzer: BaseNounPhraseExtractor,
num_threads: int,
async_mode: AsyncType,
cache: Cache,
) -> pd.DataFrame:
"""
Extract initial nodes and edges from text units.
) -> dict[str, list[str]]:
"""Extract noun-phrase nodes from text units.

Input: text unit df with schema [id, text, document_id]
Returns a dataframe with schema [id, title, frequency, text_unit_ids].
NLP extraction is CPU-bound (spaCy/TextBlob), so threading
provides no benefit under the GIL. We process rows
sequentially, relying on the cache to skip repeated work.

Returns a mapping of noun-phrase title to text-unit ids.
"""
cache = cache.child("extract_noun_phrases")
extraction_cache = cache.child("extract_noun_phrases")
total = await text_unit_table.length()
title_to_ids: dict[str, list[str]] = defaultdict(list)
completed = 0

async def extract(row):
async for row in text_unit_table:
text_unit_id = row["id"]
text = row["text"]

attrs = {"text": text, "analyzer": str(text_analyzer)}
key = gen_sha512_hash(attrs, attrs.keys())
result = await cache.get(key)
result = await extraction_cache.get(key)
if not result:
result = text_analyzer.extract(text)
await cache.set(key, result)
return result

text_unit_df["noun_phrases"] = await derive_from_rows( # type: ignore
text_unit_df,
extract,
num_threads=num_threads,
async_type=async_mode,
progress_msg="extract noun phrases progress: ",
)
await extraction_cache.set(key, result)

noun_node_df = text_unit_df.explode("noun_phrases")
noun_node_df = noun_node_df.rename(
columns={"noun_phrases": "title", "id": "text_unit_id"}
)
for phrase in result:
title_to_ids[phrase].append(text_unit_id)

# group by title and count the number of text units
grouped_node_df = (
noun_node_df.groupby("title").agg({"text_unit_id": list}).reset_index()
)
grouped_node_df = grouped_node_df.rename(columns={"text_unit_id": "text_unit_ids"})
grouped_node_df["frequency"] = grouped_node_df["text_unit_ids"].apply(len)
grouped_node_df = grouped_node_df[["title", "frequency", "text_unit_ids"]]
return grouped_node_df.loc[:, ["title", "frequency", "text_unit_ids"]]
completed += 1
if completed % 100 == 0 or completed == total:
logger.info(
"extract noun phrases progress: %d/%d",
completed,
total,
)

return dict(title_to_ids)


def _extract_edges(
title_to_ids: dict[str, list[str]],
nodes_df: pd.DataFrame,
normalize_edge_weights: bool = True,
) -> pd.DataFrame:
"""
Extract edges from nodes.
"""Build co-occurrence edges between noun phrases.

Nodes appear in the same text unit are connected.
Input: nodes_df with schema [id, title, frequency, text_unit_ids]
Returns: edges_df with schema [source, target, weight, text_unit_ids]
Nodes that appear in the same text unit are connected.
Returns edges with schema [source, target, weight, text_unit_ids].
"""
if nodes_df.empty:
return pd.DataFrame(columns=["source", "target", "weight", "text_unit_ids"])

text_units_df = nodes_df.explode("text_unit_ids")
text_units_df = text_units_df.rename(columns={"text_unit_ids": "text_unit_id"})
text_units_df = (
text_units_df
.groupby("text_unit_id")
.agg({"title": lambda x: list(x) if len(x) > 1 else np.nan})
.reset_index()
)
text_units_df = text_units_df.dropna()
titles = text_units_df["title"].tolist()
all_edges: list[list[tuple[str, str]]] = [list(combinations(t, 2)) for t in titles]

text_units_df = text_units_df.assign(edges=all_edges) # type: ignore
edge_df = text_units_df.explode("edges")[["edges", "text_unit_id"]]

edge_df[["source", "target"]] = edge_df.loc[:, "edges"].to_list()
edge_df["min_source"] = edge_df[["source", "target"]].min(axis=1)
edge_df["max_target"] = edge_df[["source", "target"]].max(axis=1)
edge_df = edge_df.drop(columns=["source", "target"]).rename(
columns={"min_source": "source", "max_target": "target"} # type: ignore
if not title_to_ids:
return pd.DataFrame(
columns=["source", "target", "weight", "text_unit_ids"],
)

text_unit_to_titles: dict[str, list[str]] = defaultdict(list)
for title, tu_ids in title_to_ids.items():
for tu_id in tu_ids:
text_unit_to_titles[tu_id].append(title)

edge_map: dict[tuple[str, str], list[str]] = defaultdict(list)
for tu_id, titles in text_unit_to_titles.items():
if len(titles) < 2:
continue
for pair in combinations(sorted(set(titles)), 2):
edge_map[pair].append(tu_id)

records = [
{
"source": src,
"target": tgt,
"weight": len(tu_ids),
"text_unit_ids": tu_ids,
}
for (src, tgt), tu_ids in edge_map.items()
]
edges_df = pd.DataFrame(
records,
columns=["source", "target", "weight", "text_unit_ids"],
)

edge_df = edge_df[(edge_df.source.notna()) & (edge_df.target.notna())]
edge_df = edge_df.drop(columns=["edges"])
# group by source and target, count the number of text units
grouped_edge_df = (
edge_df.groupby(["source", "target"]).agg({"text_unit_id": list}).reset_index()
)
grouped_edge_df = grouped_edge_df.rename(columns={"text_unit_id": "text_unit_ids"})
grouped_edge_df["weight"] = grouped_edge_df["text_unit_ids"].apply(len)
grouped_edge_df = grouped_edge_df.loc[
:, ["source", "target", "weight", "text_unit_ids"]
]
if normalize_edge_weights:
# use PMI weight instead of raw weight
grouped_edge_df = calculate_pmi_edge_weights(nodes_df, grouped_edge_df)
if normalize_edge_weights and not edges_df.empty:
edges_df = calculate_pmi_edge_weights(nodes_df, edges_df)

return grouped_edge_df
return edges_df
Loading
Loading