From 70a7cf09b12173f9e874dc025546d7bc8515945c Mon Sep 17 00:00:00 2001 From: seyeong Date: Sat, 28 Feb 2026 16:09:09 +0900 Subject: [PATCH 1/3] feat(catalog): add load_lineage_documents() to DataHubCatalogLoader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit build_table_metadata()를 활용하여 DataHub의 upstream/downstream 테이블 lineage와 컬럼 단위 lineage를 TextDocument로 변환한다. EnrichedNL2SQL의 documents 파라미터에 전달하면 VectorRetriever가 lineage 컨텍스트를 SQL 생성 시 참조할 수 있다. - _urn_to_lineage_document(): URN별 변환 로직 분리 - _format_lineage(): 자연어 텍스트 포맷 정적 메서드로 분리 - 사이클 안전성은 하위 레이어(min_degree_lineage, degree 필터)에 위임 - lineage 없는 테이블은 자동 제외 --- src/lang2sql/integrations/catalog/datahub_.py | 101 +++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/src/lang2sql/integrations/catalog/datahub_.py b/src/lang2sql/integrations/catalog/datahub_.py index cb3154f..5c09216 100644 --- a/src/lang2sql/integrations/catalog/datahub_.py +++ b/src/lang2sql/integrations/catalog/datahub_.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ...core.catalog import CatalogEntry +from ...core.catalog import CatalogEntry, TextDocument from ...core.exceptions import IntegrationMissingError try: @@ -59,3 +59,102 @@ def load(self, urns: list[str] | None = None) -> list[CatalogEntry]: CatalogEntry(name=name, description=description, columns=columns) ) return entries + + def load_lineage_documents( + self, + urns: list[str] | None = None, + max_degree: int = 2, + ) -> list[TextDocument]: + """DataHub lineage 정보를 TextDocument 목록으로 변환한다. + + 내부적으로 build_table_metadata()를 사용하며, 사이클 안전성은 + 하위 레이어에서 보장된다: + - get_table_lineage(): GraphQL degree 필터로 depth 상한 적용 + - min_degree_lineage(): 테이블별 최소 degree만 유지 (사이클 경로 dedup) + - build_table_metadata(): 자기 자신(table == current_table) 제외 + + Args: + urns: 조회할 URN 목록. None이면 전체 URN을 조회한다. + max_degree: 포함할 최대 lineage depth. 기본값 2. + + Returns: + TextDocument 목록. lineage 없는 테이블은 제외된다. + + Usage:: + + loader = DataHubCatalogLoader(gms_server="http://localhost:8080") + pipeline = EnrichedNL2SQL( + catalog=loader.load(), + documents=loader.load_lineage_documents(), + llm=..., db=..., embedding=..., + ) + """ + if urns is None: + urns = list(self._fetcher.get_urns()) + + return [ + doc + for urn in urns + if (doc := self._urn_to_lineage_document(urn, max_degree)) is not None + ] + + def _urn_to_lineage_document( + self, urn: str, max_degree: int + ) -> TextDocument | None: + """단일 URN의 lineage를 TextDocument로 변환. lineage 없으면 None 반환.""" + try: + # build_table_metadata가 upstream/downstream/column lineage를 + # 파싱 및 dedup까지 처리해준다. + meta = self._fetcher.build_table_metadata(urn, max_degree=max_degree) + except Exception: + return None + + table_name = meta.get("table_name") or "" + lineage = meta.get("lineage", {}) + upstream = lineage.get("upstream", []) + downstream = lineage.get("downstream", []) + upstream_columns = lineage.get("upstream_columns", []) + + if not upstream and not downstream and not upstream_columns: + return None + + return TextDocument( + id=f"lineage__{table_name}", + title=f"{table_name} 리니지", + content=self._format_lineage(table_name, upstream, downstream, upstream_columns), + source="datahub", + metadata={"urn": urn, "table_name": table_name}, + ) + + @staticmethod + def _format_lineage( + table_name: str, + upstream: list[dict], + downstream: list[dict], + upstream_columns: list[dict], + ) -> str: + """lineage 데이터를 자연어 텍스트로 포맷한다.""" + lines: list[str] = [f"테이블: {table_name}", ""] + + if upstream: + lines += ["[Upstream — 이 테이블의 원천 데이터]"] + lines += [f" - {t['table']} (depth: {t['degree']})" for t in upstream] + lines.append("") + + if downstream: + lines += ["[Downstream — 이 테이블을 참조하는 테이블]"] + lines += [f" - {t['table']} (depth: {t['degree']})" for t in downstream] + lines.append("") + + if upstream_columns: + lines += ["[컬럼 단위 Upstream Lineage]"] + for dataset in upstream_columns: + lines.append(f" {dataset.get('upstream_dataset', '')}:") + lines += [ + f" {col['upstream_column']} → {col['downstream_column']}" + f" (신뢰도: {col.get('confidence', 1.0):.2f})" + for col in dataset.get("columns", []) + ] + lines.append("") + + return "\n".join(lines).strip() From 0aca3b4addc1c9a8acda57eef097260a21d605cc Mon Sep 17 00:00:00 2001 From: seyeong Date: Sat, 28 Feb 2026 16:11:53 +0900 Subject: [PATCH 2/3] style: apply pre-commit formatting --- src/lang2sql/integrations/catalog/datahub_.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lang2sql/integrations/catalog/datahub_.py b/src/lang2sql/integrations/catalog/datahub_.py index 5c09216..417ef1b 100644 --- a/src/lang2sql/integrations/catalog/datahub_.py +++ b/src/lang2sql/integrations/catalog/datahub_.py @@ -121,7 +121,9 @@ def _urn_to_lineage_document( return TextDocument( id=f"lineage__{table_name}", title=f"{table_name} 리니지", - content=self._format_lineage(table_name, upstream, downstream, upstream_columns), + content=self._format_lineage( + table_name, upstream, downstream, upstream_columns + ), source="datahub", metadata={"urn": urn, "table_name": table_name}, ) From 60f00d0b664952d9250a07ab6b74e5d4d7bf457d Mon Sep 17 00:00:00 2001 From: seyeong Date: Sat, 28 Feb 2026 16:18:02 +0900 Subject: [PATCH 3/3] feat(ports): add CatalogLoaderPort and make DataHubCatalogLoader inherit it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit core/ports.py에 CatalogLoaderPort Protocol을 추가하고 DataHubCatalogLoader가 이를 명시적으로 상속하도록 수정한다. 다른 Integration(LLMPort, EmbeddingPort, DBPort 등)과 동일한 패턴을 적용한다. --- src/lang2sql/core/ports.py | 8 +++++++- src/lang2sql/integrations/catalog/datahub_.py | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/lang2sql/core/ports.py b/src/lang2sql/core/ports.py index ff6bc6f..b04bc61 100644 --- a/src/lang2sql/core/ports.py +++ b/src/lang2sql/core/ports.py @@ -2,7 +2,7 @@ from typing import Any, Protocol, runtime_checkable -from .catalog import TextDocument +from .catalog import CatalogEntry, TextDocument class LLMPort(Protocol): @@ -57,3 +57,9 @@ class DocumentLoaderPort(Protocol): """Converts a file path or directory to list[TextDocument].""" def load(self, path: str) -> list[TextDocument]: ... + + +class CatalogLoaderPort(Protocol): + """Abstracts catalog loading from external sources (DataHub, file, database, etc.).""" + + def load(self) -> list[CatalogEntry]: ... diff --git a/src/lang2sql/integrations/catalog/datahub_.py b/src/lang2sql/integrations/catalog/datahub_.py index 417ef1b..7ac2e51 100644 --- a/src/lang2sql/integrations/catalog/datahub_.py +++ b/src/lang2sql/integrations/catalog/datahub_.py @@ -2,6 +2,7 @@ from ...core.catalog import CatalogEntry, TextDocument from ...core.exceptions import IntegrationMissingError +from ...core.ports import CatalogLoaderPort try: import datahub as _datahub # type: ignore[import] @@ -9,7 +10,7 @@ _datahub = None # type: ignore[assignment] -class DataHubCatalogLoader: +class DataHubCatalogLoader(CatalogLoaderPort): """DataHub URN → list[CatalogEntry] 변환. DataHub GMS 서버에서 테이블 메타데이터를 조회하여