RAG 파이프라인의 Graph DB 적재/검색을 담당하는 마이크로서비스입니다.
문서를 단락(Paragraph) 단위로 받아 LLM으로 Entity/Relationship을 추출한 뒤 Neo4j에 저장하고, 그래프 탐색 기반의 하이브리드 검색을 제공합니다.
[HTTP 요청] [RabbitMQ 메시지]
│ │
▼ ▼
FastAPI App GraphGenConsumer
│ │
└──────────┬───────────┘
▼
graph_service
(LLM 추출 → Neo4j 저장)
│
┌──────┴──────┐
▼ ▼
Neo4j Redis (RQ 작업 큐)
처리 경로 2가지
| 경로 | 진입점 | 특징 |
|---|---|---|
| HTTP (비동기) | POST /api/v3/ingest_async |
RQ 워커에 작업 위임, job_id 즉시 반환 |
| RabbitMQ | rag.graphgen.request 큐 |
GraphGenConsumer가 직접 처리 |
graph_manager.py # FastAPI 앱, lifespan, GraphGenConsumer
services/
dependencies.py # Neo4j driver, Redis, RQ Queue 싱글턴
graph_service.py # 비즈니스 로직 (그룹핑, LLM 추출, Neo4j 저장)
routes/
ingest.py # POST /api/v3/ingest_async
# GET /api/v3/task_status/{job_id}
search.py # POST /api/v3/search/graph
models/
__init__.py # Pydantic 모델 (Paragraph, IngestRequest, QueryRequest 등)
config/
__init__.py # 환경변수 기반 설정 (pydantic-settings)
utils/
llm.py # LLM API 호출
documents_formatter.py # MongoDB 조회 + XML 포맷 변환
wrapper/
rabbitmq_wrapper.py # RabbitMQ 클라이언트 기반 클래스
rabbitmq_wrapper_for_rag.py # RAG 파이프라인 전용 Producer/Consumer 정의
redis_wrapper.py # Redis 비동기 클라이언트
logger_wrapper.py # 로거 설정
| 서비스 | 용도 | 기본 주소 |
|---|---|---|
| Neo4j | 그래프 DB (Entity/Relationship 저장) | bolt://127.0.0.1:7687 |
| Redis | RQ 작업 큐 / Paragraph 임시 저장 | 127.0.0.1:6379 |
| MongoDB | 원본 문서 조회 (검색 응답 포맷용) | 127.0.0.1:27017 |
| RabbitMQ | 메시지 큐 (GraphGen 요청 수신) | 127.0.0.1:5672 |
| LLM API | Entity/Relationship 추출 | http://127.0.0.1:51271/v1 |
(Collection)-[:HAS_DOCUMENT]->(Document)-[:HAS_PART]->(Evidence)
│
[:MENTIONS]│
▼
(Entity)-[:RELATED]->(Entity)
| 노드 | 주요 속성 |
|---|---|
Collection |
collection_id |
Document |
doc_id, collection_id, file_name |
Evidence |
paragraph_id, quote, collection_id |
Entity |
id (정규화된 label), label, type |
| 관계 | 속성 |
|---|---|
RELATED |
relation (동사구, 예: "운영하다") |
.env 파일로 설정합니다. 환경변수는 __ 구분자로 중첩 설정을 지원합니다.
# Neo4j
NEO4J__URI=bolt://127.0.0.1:7687
NEO4J__USER=neo4j
NEO4J__PASSWORD=password
# Redis
REDIS__HOST=127.0.0.1
REDIS__PORT=6379
# MongoDB
DB__HOST=127.0.0.1
DB__PORT=27017
# RabbitMQ
MQ__HOST=127.0.0.1
MQ__PORT=5672
MQ__USERNAME=guest
MQ__PASSWORD=guest
# LLM
LLM__BASE_URL=http://127.0.0.1:51271/v1
LLM__MODEL=gpt-oss-120b
LLM__MAX_COMPLETION_TOKENS=30000
# 앱
APP__PORT=28201전체 설정 항목은 config/__init__.py를 참조하세요.
run_all.cmdFastAPI 서버와 RQ 워커를 별도 창으로 동시에 시작합니다.
# FastAPI 서버
uvicorn graph_manager:app --host 0.0.0.0 --port 28201 --reload
# RQ 워커 (별도 터미널)
python -m rq.cli worker graphq --url redis://127.0.0.1:6379/0docker compose up -d문서 단락들을 Graph DB에 비동기로 적재합니다.
Request Body
{
"doc_id": "문서 ID",
"collection_id": "컬렉션 ID",
"group_size": 3,
"stride": 1,
"batch_size": 5,
"paragraphs": [
{
"paragraph_id": "p-001",
"context": "단락 본문",
"size": 42,
"collection_id": "컬렉션 ID",
"page_number": 1,
"metadatas": {
"doc_id": "문서 ID",
"doc_type": "image",
"file_name": "파일명.pdf",
"current_page": 1
}
}
]
}Response
{ "job_id": "abc123", "status": "queued" }적재 작업의 진행 상태를 조회합니다.
Response
{
"job_id": "abc123",
"status": "finished",
"stage": "done",
"result": { "status": "done", "doc_id": "...", "success_groups": 10, "failed_groups": 0 }
}stage 값: grouping → llm_extraction → neo4j_upsert → done / failed
Qdrant에서 검색된 Paragraph ID 목록을 기반으로 그래프를 탐색해 문서 컨텍스트를 반환합니다.
Request Body
{
"question": "질문 텍스트",
"collection_id": "컬렉션 ID",
"top_k": 5,
"paragraphs": [
{ "paragraph_id": "p-001", "score": 0.92 }
]
}Response: XML 형식의 <documents> 문자열
{
"job_id": "unique-job-id",
"filename": "파일명.pdf",
"doc_id": "문서 ID",
"collection_id": "컬렉션 ID",
"redis_para_key": "rag.graph.{doc_id}.para",
"batch_size": 5
}Redis의 redis_para_key에 저장된 Paragraph 목록을 읽어 처리합니다.
| 큐 | 시점 |
|---|---|
rag.graphgen.started |
작업 시작 시 |
rag.graphgen.completed |
작업 완료 시 |
rag.graphgen.failed |
작업 실패 시 |