Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 163 additions & 154 deletions backend/app/services/document_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import traceback
import logging
from datetime import datetime, timezone
import asyncio
import threading
import gc

from app.models import Document
from app.rag.agent import persist_document_keywords
Expand All @@ -12,6 +15,10 @@
logger = logging.getLogger(__name__)
settings = get_settings()

# Define semaphores to throttle parallel file ingestion (Limit concurrency to 3)
ingestion_semaphore = asyncio.Semaphore(3)
threading_semaphore = threading.Semaphore(3)


def _update_progress(document_id: str, progress: int, stage: str, error: str = None):
"""Update document progress fields in the database."""
Expand Down Expand Up @@ -39,172 +46,174 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id
"""
from app.database import SessionLocal

db = SessionLocal()
try:
doc = db.query(Document).filter(
Document.id == document_id,
Document.is_deleted.is_(False),
).first()
if not doc:
logger.error("Document %s not found for ingestion", document_id)
return

doc.status = "processing"
doc.processing_stage = "extracting"
doc.processing_progress = 10
doc.error_message = None
doc.last_error_traceback = None
db.commit()

page_count = get_page_count(filepath)
doc.page_count = page_count
doc.processing_progress = 20
db.commit()

with threading_semaphore:
db = SessionLocal()
try:
chunk_kwargs = {}
if doc.chunk_size is not None:
chunk_kwargs["chunk_size"] = doc.chunk_size
if doc.chunk_overlap is not None:
chunk_kwargs["chunk_overlap"] = doc.chunk_overlap
doc.processing_stage = "chunking"
doc.processing_progress = 30
db.commit()
chunks = chunk_document(filepath, **chunk_kwargs)
except TypeError:
chunks = chunk_document(filepath)

# ── Proximity caption pass (PDF only) ────────────────────────────────
# Write bounding-box-derived captions into image chunks BEFORE store_chunks()
# so generate_captions_for_chunks() in vectorstore.py only needs to handle
# the OCR / placeholder fallback for any images without adjacent text.
ext = filepath.rsplit(".", 1)[-1].lower()
if ext == "pdf":
try:
from app.rag.vision import extract_captions_from_pdf

pdf_captions = extract_captions_from_pdf(filepath)
# Build lookup: page -> [captions in figure_index order]
caption_map: dict = {}
for cap in pdf_captions:
caption_map.setdefault(cap["page"], []).append(cap)

fig_counters: dict = {}
for chunk in chunks:
if not chunk.get("image_bytes"):
continue
page = chunk.get("page", 1)
idx = fig_counters.get(page, 0)
page_caps = caption_map.get(page, [])
if idx < len(page_caps) and page_caps[idx]["caption"]:
chunk["image_caption"] = page_caps[idx]["caption"]
chunk["bbox"] = str(page_caps[idx]["bbox"])
fig_counters[page] = idx + 1
except Exception as exc:
logger.warning(
"Proximity caption extraction failed for %s: %s", document_id, exc
)
# ── End proximity caption pass ────────────────────────────────────────

if not chunks:
doc.status = "failed"
doc.processing_progress = 0
doc.error_message = "No text could be extracted from the document"
doc = db.query(Document).filter(
Document.id == document_id,
Document.is_deleted.is_(False),
).first()
if not doc:
logger.error("Document %s not found for ingestion", document_id)
return

doc.status = "processing"
doc.processing_stage = "extracting"
doc.processing_progress = 10
doc.error_message = None
doc.last_error_traceback = None
db.commit()
return

doc.processing_progress = 50
doc.processing_stage = "indexing"
db.commit()
page_count = get_page_count(filepath)
doc.page_count = page_count
doc.processing_progress = 20
db.commit()

try:
from app.rag.graph_builder import build_graph, save_graph
try:
chunk_kwargs = {}
if doc.chunk_size is not None:
chunk_kwargs["chunk_size"] = doc.chunk_size
if doc.chunk_overlap is not None:
chunk_kwargs["chunk_overlap"] = doc.chunk_overlap
doc.processing_stage = "chunking"
doc.processing_progress = 30
db.commit()
chunks = chunk_document(filepath, **chunk_kwargs)
except TypeError:
chunks = chunk_document(filepath)

# ── Proximity caption pass (PDF only) ────────────────────────────────
# Write bounding-box-derived captions into image chunks BEFORE store_chunks()
# so generate_captions_for_chunks() in vectorstore.py only needs to handle
# the OCR / placeholder fallback for any images without adjacent text.
ext = filepath.rsplit(".", 1)[-1].lower()
if ext == "pdf":
try:
from app.rag.vision import extract_captions_from_pdf

pdf_captions = extract_captions_from_pdf(filepath)
# Build lookup: page -> [captions in figure_index order]
caption_map: dict = {}
for cap in pdf_captions:
caption_map.setdefault(cap["page"], []).append(cap)

fig_counters: dict = {}
for chunk in chunks:
if not chunk.get("image_bytes"):
continue
page = chunk.get("page", 1)
idx = fig_counters.get(page, 0)
page_caps = caption_map.get(page, [])
if idx < len(page_caps) and page_caps[idx]["caption"]:
chunk["image_caption"] = page_caps[idx]["caption"]
chunk["bbox"] = str(page_caps[idx]["bbox"])
fig_counters[page] = idx + 1
except Exception as exc:
logger.warning(
"Proximity caption extraction failed for %s: %s", document_id, exc
)
# ── End proximity caption pass ────────────────────────────────────────

if not chunks:
doc.status = "failed"
doc.processing_progress = 0
doc.error_message = "No text could be extracted from the document"
db.commit()
return

graph = build_graph(chunks)
save_graph(graph, user_id=user_id, document_id=document_id)
except Exception as e:
logger.warning("Could not build knowledge graph for document %s: %s", document_id, e)
doc.processing_progress = 50
doc.processing_stage = "indexing"
db.commit()

doc.processing_progress = 70
doc.processing_stage = "embedding"
db.commit()
try:
from app.rag.graph_builder import build_graph, save_graph

chunk_count = store_chunks(
chunks=chunks,
document_id=document_id,
filename=original_name,
user_id=user_id,
)
graph = build_graph(chunks)
save_graph(graph, user_id=user_id, document_id=document_id)
except Exception as e:
logger.warning("Could not build knowledge graph for document %s: %s", document_id, e)

persist_document_keywords(doc, chunks, db)
doc.processing_progress = 70
doc.processing_stage = "embedding"
db.commit()

doc.processing_progress = 85
db.commit()
chunk_count = store_chunks(
chunks=chunks,
document_id=document_id,
filename=original_name,
user_id=user_id,
)

try:
from app.rag.summarizer import generate_document_summary
persist_document_keywords(doc, chunks, db)

summary = generate_document_summary(filepath, max_sentences=2)
if summary:
doc.summary = summary
db.commit()
except Exception as e:
logger.warning("Could not generate summary for document %s: %s", document_id, e)
doc.summary = None
doc.processing_progress = 85
db.commit()

# ── URL extraction pass (PDF only) ────────────────────────────────
ext = filepath.rsplit(".", 1)[-1].lower()
if ext == "pdf":
try:
from app.rag.url_extractor import extract_urls_from_pdf
import json
from app.rag.summarizer import generate_document_summary

summary = generate_document_summary(filepath, max_sentences=2)
if summary:
doc.summary = summary
db.commit()
except Exception as e:
logger.warning("Could not generate summary for document %s: %s", document_id, e)
doc.summary = None

# ── URL extraction pass (PDF only) ────────────────────────────────
ext = filepath.rsplit(".", 1)[-1].lower()
if ext == "pdf":
try:
from app.rag.url_extractor import extract_urls_from_pdf
import json

urls = extract_urls_from_pdf(filepath)
doc.extracted_urls = json.dumps(urls) if urls else None
db.commit()
logger.info(
"Extracted %s URLs from document %s",
len(urls),
document_id,
)
except Exception as exc:
logger.warning(
"URL extraction failed for document %s: %s",
document_id,
exc,
)
# ── End URL extraction pass ───────────────────────────────────────

doc.chunk_count = chunk_count
doc.status = "ready"
doc.processing_progress = 100
doc.processing_stage = "completed"
doc.completed_at = datetime.now(timezone.utc)
doc.error_message = None
db.commit()

urls = extract_urls_from_pdf(filepath)
doc.extracted_urls = json.dumps(urls) if urls else None
db.commit()
logger.info(
"Extracted %s URLs from document %s",
len(urls),
document_id,
)
except Exception as exc:
logger.warning(
"URL extraction failed for document %s: %s",
document_id,
exc,
)
# ── End URL extraction pass ───────────────────────────────────────

doc.chunk_count = chunk_count
doc.status = "ready"
doc.processing_progress = 100
doc.processing_stage = "completed"
doc.completed_at = datetime.now(timezone.utc)
doc.error_message = None
db.commit()

logger.info(
"Document %s ingested: %s pages, %s chunks",
document_id,
page_count,
chunk_count,
)
logger.info(
"Document %s ingested: %s pages, %s chunks",
document_id,
page_count,
chunk_count,
)

except Exception as e:
logger.error("Ingestion error for %s: %s", document_id, e)
db.rollback()
try:
doc = db.query(Document).filter(
Document.id == document_id,
Document.is_deleted.is_(False),
).first()
if doc:
doc.status = "failed"
doc.processing_progress = 0
doc.error_message = str(e)[:500]
doc.last_error_traceback = traceback.format_exc()[:2000]
db.commit()
except Exception:
logger.exception("Failed to mark document %s as failed", document_id)
finally:
db.close()
except Exception as e:
logger.error("Ingestion error for %s: %s", document_id, e)
db.rollback()
try:
doc = db.query(Document).filter(
Document.id == document_id,
Document.is_deleted.is_(False),
).first()
if doc:
doc.status = "failed"
doc.processing_progress = 0
doc.error_message = str(e)[:500]
doc.last_error_traceback = traceback.format_exc()[:2000]
db.commit()
except Exception:
logger.exception("Failed to mark document %s as failed", document_id)
finally:
db.close()
gc.collect()
36 changes: 21 additions & 15 deletions backend/app/services/layout_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,26 @@ def process_embedded_images(self, page_num: int, page_obj: fitz.Page) -> List[st

def ingest_document(self) -> List[Dict[str, Any]]:
"""Executes the hybrid pipeline generating combined text and image context strings."""
final_payload = []
structured_chunks = self.extract_structured_text()
final_payload.extend(structured_chunks)
import gc
try:
final_payload = []
structured_chunks = self.extract_structured_text()
final_payload.extend(structured_chunks)

for page_num in range(len(self.doc)):
page = self.doc.load_page(page_num)
img_summaries = self.process_embedded_images(page_num, page)
for summary in img_summaries:
final_payload.append(
{
"page_number": page_num + 1,
"text": f"[Visual Data Extraction Summary]: {summary}",
"type": "visual_image_summary",
}
)
for page_num in range(len(self.doc)):
page = self.doc.load_page(page_num)
img_summaries = self.process_embedded_images(page_num, page)
for summary in img_summaries:
final_payload.append(
{
"page_number": page_num + 1,
"text": f"[Visual Data Extraction Summary]: {summary}",
"type": "visual_image_summary",
}
)
gc.collect()

return final_payload
return final_payload
finally:
self.doc.close()
gc.collect()
Loading
Loading