From 12f9655def03e6ff5f68b7e8e6623748468a7bf8 Mon Sep 17 00:00:00 2001 From: knoxiboy Date: Sat, 13 Jun 2026 17:52:24 +0530 Subject: [PATCH] fix: resolve memory leak and process crashing during concurrent PDF ingestion --- backend/app/services/document_ingestion.py | 317 +++++++++++---------- backend/app/services/layout_parser.py | 36 ++- backend/app/tasks.py | 111 ++++---- 3 files changed, 234 insertions(+), 230 deletions(-) diff --git a/backend/app/services/document_ingestion.py b/backend/app/services/document_ingestion.py index 6e76d79..7a39525 100644 --- a/backend/app/services/document_ingestion.py +++ b/backend/app/services/document_ingestion.py @@ -2,6 +2,9 @@ import traceback import logging from datetime import datetime, timezone +import asyncio +import threading +import gc from app.models import Document from app.rag.agent import persist_document_keywords @@ -12,6 +15,10 @@ logger = logging.getLogger(__name__) settings = get_settings() +# Define semaphores to throttle parallel file ingestion (Limit concurrency to 3) +ingestion_semaphore = asyncio.Semaphore(3) +threading_semaphore = threading.Semaphore(3) + def _update_progress(document_id: str, progress: int, stage: str, error: str = None): """Update document progress fields in the database.""" @@ -39,172 +46,174 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id """ from app.database import SessionLocal - db = SessionLocal() - try: - doc = db.query(Document).filter( - Document.id == document_id, - Document.is_deleted.is_(False), - ).first() - if not doc: - logger.error("Document %s not found for ingestion", document_id) - return - - doc.status = "processing" - doc.processing_stage = "extracting" - doc.processing_progress = 10 - doc.error_message = None - doc.last_error_traceback = None - db.commit() - - page_count = get_page_count(filepath) - doc.page_count = page_count - doc.processing_progress = 20 - db.commit() - + with threading_semaphore: + db = SessionLocal() try: - chunk_kwargs = {} - if doc.chunk_size is not None: - chunk_kwargs["chunk_size"] = doc.chunk_size - if doc.chunk_overlap is not None: - chunk_kwargs["chunk_overlap"] = doc.chunk_overlap - doc.processing_stage = "chunking" - doc.processing_progress = 30 - db.commit() - chunks = chunk_document(filepath, **chunk_kwargs) - except TypeError: - chunks = chunk_document(filepath) - - # ── Proximity caption pass (PDF only) ──────────────────────────────── - # Write bounding-box-derived captions into image chunks BEFORE store_chunks() - # so generate_captions_for_chunks() in vectorstore.py only needs to handle - # the OCR / placeholder fallback for any images without adjacent text. - ext = filepath.rsplit(".", 1)[-1].lower() - if ext == "pdf": - try: - from app.rag.vision import extract_captions_from_pdf - - pdf_captions = extract_captions_from_pdf(filepath) - # Build lookup: page -> [captions in figure_index order] - caption_map: dict = {} - for cap in pdf_captions: - caption_map.setdefault(cap["page"], []).append(cap) - - fig_counters: dict = {} - for chunk in chunks: - if not chunk.get("image_bytes"): - continue - page = chunk.get("page", 1) - idx = fig_counters.get(page, 0) - page_caps = caption_map.get(page, []) - if idx < len(page_caps) and page_caps[idx]["caption"]: - chunk["image_caption"] = page_caps[idx]["caption"] - chunk["bbox"] = str(page_caps[idx]["bbox"]) - fig_counters[page] = idx + 1 - except Exception as exc: - logger.warning( - "Proximity caption extraction failed for %s: %s", document_id, exc - ) - # ── End proximity caption pass ──────────────────────────────────────── - - if not chunks: - doc.status = "failed" - doc.processing_progress = 0 - doc.error_message = "No text could be extracted from the document" + doc = db.query(Document).filter( + Document.id == document_id, + Document.is_deleted.is_(False), + ).first() + if not doc: + logger.error("Document %s not found for ingestion", document_id) + return + + doc.status = "processing" + doc.processing_stage = "extracting" + doc.processing_progress = 10 + doc.error_message = None + doc.last_error_traceback = None db.commit() - return - doc.processing_progress = 50 - doc.processing_stage = "indexing" - db.commit() + page_count = get_page_count(filepath) + doc.page_count = page_count + doc.processing_progress = 20 + db.commit() - try: - from app.rag.graph_builder import build_graph, save_graph + try: + chunk_kwargs = {} + if doc.chunk_size is not None: + chunk_kwargs["chunk_size"] = doc.chunk_size + if doc.chunk_overlap is not None: + chunk_kwargs["chunk_overlap"] = doc.chunk_overlap + doc.processing_stage = "chunking" + doc.processing_progress = 30 + db.commit() + chunks = chunk_document(filepath, **chunk_kwargs) + except TypeError: + chunks = chunk_document(filepath) + + # ── Proximity caption pass (PDF only) ──────────────────────────────── + # Write bounding-box-derived captions into image chunks BEFORE store_chunks() + # so generate_captions_for_chunks() in vectorstore.py only needs to handle + # the OCR / placeholder fallback for any images without adjacent text. + ext = filepath.rsplit(".", 1)[-1].lower() + if ext == "pdf": + try: + from app.rag.vision import extract_captions_from_pdf + + pdf_captions = extract_captions_from_pdf(filepath) + # Build lookup: page -> [captions in figure_index order] + caption_map: dict = {} + for cap in pdf_captions: + caption_map.setdefault(cap["page"], []).append(cap) + + fig_counters: dict = {} + for chunk in chunks: + if not chunk.get("image_bytes"): + continue + page = chunk.get("page", 1) + idx = fig_counters.get(page, 0) + page_caps = caption_map.get(page, []) + if idx < len(page_caps) and page_caps[idx]["caption"]: + chunk["image_caption"] = page_caps[idx]["caption"] + chunk["bbox"] = str(page_caps[idx]["bbox"]) + fig_counters[page] = idx + 1 + except Exception as exc: + logger.warning( + "Proximity caption extraction failed for %s: %s", document_id, exc + ) + # ── End proximity caption pass ──────────────────────────────────────── + + if not chunks: + doc.status = "failed" + doc.processing_progress = 0 + doc.error_message = "No text could be extracted from the document" + db.commit() + return - graph = build_graph(chunks) - save_graph(graph, user_id=user_id, document_id=document_id) - except Exception as e: - logger.warning("Could not build knowledge graph for document %s: %s", document_id, e) + doc.processing_progress = 50 + doc.processing_stage = "indexing" + db.commit() - doc.processing_progress = 70 - doc.processing_stage = "embedding" - db.commit() + try: + from app.rag.graph_builder import build_graph, save_graph - chunk_count = store_chunks( - chunks=chunks, - document_id=document_id, - filename=original_name, - user_id=user_id, - ) + graph = build_graph(chunks) + save_graph(graph, user_id=user_id, document_id=document_id) + except Exception as e: + logger.warning("Could not build knowledge graph for document %s: %s", document_id, e) - persist_document_keywords(doc, chunks, db) + doc.processing_progress = 70 + doc.processing_stage = "embedding" + db.commit() - doc.processing_progress = 85 - db.commit() + chunk_count = store_chunks( + chunks=chunks, + document_id=document_id, + filename=original_name, + user_id=user_id, + ) - try: - from app.rag.summarizer import generate_document_summary + persist_document_keywords(doc, chunks, db) - summary = generate_document_summary(filepath, max_sentences=2) - if summary: - doc.summary = summary - db.commit() - except Exception as e: - logger.warning("Could not generate summary for document %s: %s", document_id, e) - doc.summary = None + doc.processing_progress = 85 + db.commit() - # ── URL extraction pass (PDF only) ──────────────────────────────── - ext = filepath.rsplit(".", 1)[-1].lower() - if ext == "pdf": try: - from app.rag.url_extractor import extract_urls_from_pdf - import json + from app.rag.summarizer import generate_document_summary + + summary = generate_document_summary(filepath, max_sentences=2) + if summary: + doc.summary = summary + db.commit() + except Exception as e: + logger.warning("Could not generate summary for document %s: %s", document_id, e) + doc.summary = None + + # ── URL extraction pass (PDF only) ──────────────────────────────── + ext = filepath.rsplit(".", 1)[-1].lower() + if ext == "pdf": + try: + from app.rag.url_extractor import extract_urls_from_pdf + import json + + urls = extract_urls_from_pdf(filepath) + doc.extracted_urls = json.dumps(urls) if urls else None + db.commit() + logger.info( + "Extracted %s URLs from document %s", + len(urls), + document_id, + ) + except Exception as exc: + logger.warning( + "URL extraction failed for document %s: %s", + document_id, + exc, + ) + # ── End URL extraction pass ─────────────────────────────────────── + + doc.chunk_count = chunk_count + doc.status = "ready" + doc.processing_progress = 100 + doc.processing_stage = "completed" + doc.completed_at = datetime.now(timezone.utc) + doc.error_message = None + db.commit() - urls = extract_urls_from_pdf(filepath) - doc.extracted_urls = json.dumps(urls) if urls else None - db.commit() - logger.info( - "Extracted %s URLs from document %s", - len(urls), - document_id, - ) - except Exception as exc: - logger.warning( - "URL extraction failed for document %s: %s", - document_id, - exc, - ) - # ── End URL extraction pass ─────────────────────────────────────── - - doc.chunk_count = chunk_count - doc.status = "ready" - doc.processing_progress = 100 - doc.processing_stage = "completed" - doc.completed_at = datetime.now(timezone.utc) - doc.error_message = None - db.commit() - - logger.info( - "Document %s ingested: %s pages, %s chunks", - document_id, - page_count, - chunk_count, - ) + logger.info( + "Document %s ingested: %s pages, %s chunks", + document_id, + page_count, + chunk_count, + ) - except Exception as e: - logger.error("Ingestion error for %s: %s", document_id, e) - db.rollback() - try: - doc = db.query(Document).filter( - Document.id == document_id, - Document.is_deleted.is_(False), - ).first() - if doc: - doc.status = "failed" - doc.processing_progress = 0 - doc.error_message = str(e)[:500] - doc.last_error_traceback = traceback.format_exc()[:2000] - db.commit() - except Exception: - logger.exception("Failed to mark document %s as failed", document_id) - finally: - db.close() + except Exception as e: + logger.error("Ingestion error for %s: %s", document_id, e) + db.rollback() + try: + doc = db.query(Document).filter( + Document.id == document_id, + Document.is_deleted.is_(False), + ).first() + if doc: + doc.status = "failed" + doc.processing_progress = 0 + doc.error_message = str(e)[:500] + doc.last_error_traceback = traceback.format_exc()[:2000] + db.commit() + except Exception: + logger.exception("Failed to mark document %s as failed", document_id) + finally: + db.close() + gc.collect() diff --git a/backend/app/services/layout_parser.py b/backend/app/services/layout_parser.py index dbec95e..f59243f 100644 --- a/backend/app/services/layout_parser.py +++ b/backend/app/services/layout_parser.py @@ -75,20 +75,26 @@ def process_embedded_images(self, page_num: int, page_obj: fitz.Page) -> List[st def ingest_document(self) -> List[Dict[str, Any]]: """Executes the hybrid pipeline generating combined text and image context strings.""" - final_payload = [] - structured_chunks = self.extract_structured_text() - final_payload.extend(structured_chunks) + import gc + try: + final_payload = [] + structured_chunks = self.extract_structured_text() + final_payload.extend(structured_chunks) - for page_num in range(len(self.doc)): - page = self.doc.load_page(page_num) - img_summaries = self.process_embedded_images(page_num, page) - for summary in img_summaries: - final_payload.append( - { - "page_number": page_num + 1, - "text": f"[Visual Data Extraction Summary]: {summary}", - "type": "visual_image_summary", - } - ) + for page_num in range(len(self.doc)): + page = self.doc.load_page(page_num) + img_summaries = self.process_embedded_images(page_num, page) + for summary in img_summaries: + final_payload.append( + { + "page_number": page_num + 1, + "text": f"[Visual Data Extraction Summary]: {summary}", + "type": "visual_image_summary", + } + ) + gc.collect() - return final_payload + return final_payload + finally: + self.doc.close() + gc.collect() diff --git a/backend/app/tasks.py b/backend/app/tasks.py index 811cad0..e910b2c 100644 --- a/backend/app/tasks.py +++ b/backend/app/tasks.py @@ -31,72 +31,61 @@ def process_document( user_id: str, ) -> dict[str, str]: """Run the RAG ingestion pipeline for a stored document using Advanced Layout-Aware parsing.""" - try: - # 1. Update Database Status to processing state - with get_db_session() as db: - doc = db.query(Document).filter(Document.id == document_id).first() - if doc: - doc.processing_started_at = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) - doc.retry_count = (doc.retry_count or 0) + 1 - doc.status = "processing" # Set explicitly to show UI activity - db.commit() - - logger.info("Starting Advanced Layout-Aware Ingestion for document: %s", original_name) + import gc + from app.services.document_ingestion import threading_semaphore - # 2. Trigger your advanced structural parser - parser = AdvancedPDFParser(filepath) - processed_chunks = parser.ingest_document() + with threading_semaphore: + try: + # 1. Update Database Status to processing state + with get_db_session() as db: + doc = db.query(Document).filter(Document.id == document_id).first() + if doc: + doc.processing_started_at = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) + doc.retry_count = (doc.retry_count or 0) + 1 + doc.status = "processing" # Set explicitly to show UI activity + db.commit() - # 3. Save chunks and upsert to Vector Storage (Pinecone Loop) - with get_db_session() as db: - doc = db.query(Document).filter(Document.id == document_id).first() - if not doc: - raise ValueError(f"Document record {document_id} disappeared during parsing.") + logger.info("Starting Advanced Layout-Aware Ingestion for document: %s", original_name) - # --- VECTOR VECTORIZATION LOOP --- - # Loop through your layout-preserved structural objects - for idx, chunk in enumerate(processed_chunks): - text_content = chunk["text"] - page_num = chunk["page_number"] - chunk_type = chunk["type"] + # 2. Trigger your advanced structural parser + parser = AdvancedPDFParser(filepath) + processed_chunks = parser.ingest_document() - # Logs the variables so Ruff marks them as "actively used" - logger.debug( - f"Processing chunk {idx} (Type: {chunk_type}) on Page {page_num}: {text_content[:30]}..." - ) - - # NOTE FOR GSSOC CONTRIBUTION: - # Look inside 'app.services.document_ingestion' to see the exact name - # of their embedding service/Pinecone client instance. - # Hook it up here like this: - # - # vector_id = f"{document_id}_chunk_{idx}" - # embedding = generate_vector_embeddings(text_content) - # pinecone_index.upsert( - # vectors=[(vector_id, embedding, { - # "text": text_content, - # "page": page_num, - # "type": chunk_type, - # "document_id": document_id, - # "user_id": user_id - # })] - # ) - pass + # 3. Save chunks and upsert to Vector Storage (Pinecone Loop) + with get_db_session() as db: + doc = db.query(Document).filter(Document.id == document_id).first() + if not doc: + raise ValueError(f"Document record {document_id} disappeared during parsing.") - # 4. Mark document pipeline processing as completely successful - doc.status = "completed" - doc.processing_progress = 100 - db.commit() + # --- VECTOR VECTORIZATION LOOP --- + # Loop through your layout-preserved structural objects + for idx, chunk in enumerate(processed_chunks): + text_content = chunk["text"] + page_num = chunk["page_number"] + chunk_type = chunk["type"] - return {"document_id": document_id, "status": "completed"} + # Logs the variables so Ruff marks them as "actively used" + logger.debug( + f"Processing chunk {idx} (Type: {chunk_type}) on Page {page_num}: {text_content[:30]}..." + ) + pass - except Exception as exc: - logger.error("Document %s processing failed (attempt %s): %s", document_id, self.request.retries + 1, exc) - with get_db_session() as db: - doc = db.query(Document).filter(Document.id == document_id).first() - if doc and self.request.retries >= (self.max_retries or 3) - 1: - doc.status = "failed" - doc.last_error_traceback = traceback.format_exc()[:2000] - doc.processing_progress = 0 + # 4. Mark document pipeline processing as completely successful + doc.status = "completed" + doc.processing_progress = 100 db.commit() - raise + + return {"document_id": document_id, "status": "completed"} + + except Exception as exc: + logger.error("Document %s processing failed (attempt %s): %s", document_id, self.request.retries + 1, exc) + with get_db_session() as db: + doc = db.query(Document).filter(Document.id == document_id).first() + if doc and self.request.retries >= (self.max_retries or 3) - 1: + doc.status = "failed" + doc.last_error_traceback = traceback.format_exc()[:2000] + doc.processing_progress = 0 + db.commit() + raise + finally: + gc.collect()