diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7483814..0610191 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,6 +40,8 @@ jobs: pip install flake8 flake8-bugbear # Install project deps (skip heavy ML libs with stub extras) pip install -r backend/requirements.txt --quiet || true + # Install document-processing dependencies (force reinstall to fix cached stale files) + pip install --force-reinstall pymupdf4llm google-genai - name: Flake8 lint (errors only, no style noise) run: | @@ -56,6 +58,7 @@ jobs: DATABASE_URL: sqlite:///./ci_test.db DEBUG: "false" HF_TOKEN: ci-dummy-token + GOOGLE_API_KEY: ci-dummy-key UPLOAD_DIR: /tmp/uploads CHROMA_PERSIST_DIR: /tmp/chroma run: | @@ -70,6 +73,7 @@ jobs: DATABASE_URL: sqlite:///./ci_test.db DEBUG: "false" HF_TOKEN: ci-dummy-token + GOOGLE_API_KEY: ci-dummy-key UPLOAD_DIR: /tmp/uploads CHROMA_PERSIST_DIR: /tmp/chroma run: | diff --git a/backend/app/cache.py b/backend/app/cache.py index d58d2c5..8d0a31d 100644 --- a/backend/app/cache.py +++ b/backend/app/cache.py @@ -13,6 +13,7 @@ import json import logging import os +import threading from typing import Optional logger = logging.getLogger(__name__) @@ -74,26 +75,30 @@ def _get_redis(): _lru_store: dict = {} _lru_order: list = [] +_lru_lock = threading.Lock() def _lru_get(key: str) -> Optional[str]: - return _lru_store.get(key) + with _lru_lock: + return _lru_store.get(key) def _lru_set(key: str, value: str) -> None: - if key in _lru_store: - _lru_order.remove(key) - elif len(_lru_store) >= LRU_MAX_SIZE: - oldest = _lru_order.pop(0) - del _lru_store[oldest] - _lru_store[key] = value - _lru_order.append(key) + with _lru_lock: + if key in _lru_store: + _lru_order.remove(key) + elif len(_lru_store) >= LRU_MAX_SIZE: + oldest = _lru_order.pop(0) + del _lru_store[oldest] + _lru_store[key] = value + _lru_order.append(key) def _lru_delete(key: str) -> None: - if key in _lru_store: - del _lru_store[key] - _lru_order.remove(key) + with _lru_lock: + if key in _lru_store: + del _lru_store[key] + _lru_order.remove(key) # --------------------------------------------------------------------------- diff --git a/backend/requirements.txt b/backend/requirements.txt index f46463b..8c3db07 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -29,6 +29,7 @@ httpx # Document Processing PyMuPDF +pymupdf4llm pdfplumber python-docx unstructured[pdf] @@ -54,6 +55,7 @@ spacy>=3.7 neo4j>=5.0 # LLM Inference +google-genai huggingface-hub # Production diff --git a/backend/tests/test_celery_ingestion.py b/backend/tests/test_celery_ingestion.py index 2e359e6..bb4d300 100644 --- a/backend/tests/test_celery_ingestion.py +++ b/backend/tests/test_celery_ingestion.py @@ -5,10 +5,11 @@ from app.models import Document from app.tasks import process_document + def test_process_document_ingestion_pipeline(db_session): """ - Test that the Celery task updates document status from pending to ready - by executing the ingestion engine inside the active test database session. + Test that the Celery task updates document status from pending to completed + by executing the layout-aware parser pipeline inside the active test database session. """ # 1. SETUP: Create a mock document that starts as 'pending' @@ -17,7 +18,7 @@ def test_process_document_ingestion_pipeline(db_session): filename="sample.pdf", original_name="sample.pdf", status="pending", - user_id="user-456" + user_id="user-456", ) db_session.add(test_doc) db_session.commit() @@ -27,20 +28,16 @@ def test_process_document_ingestion_pipeline(db_session): mock_session_factory.return_value.__enter__.return_value = db_session mock_session_factory.return_value = db_session - # Patch the factory globally, and patch ingest_document right where app.tasks calls it + # Patch the factory globally, and mock AdvancedPDFParser so no real PDF is parsed with patch("app.database.SessionLocal", mock_session_factory, create=True), \ patch("app.services.document_ingestion.SessionLocal", mock_session_factory, create=True), \ - patch("app.tasks.ingest_document") as mock_ingest: - - # Simulate what the underlying service does upon a successful processing run - def simulate_successful_ingestion(*args, **kwargs): - doc = db_session.query(Document).filter_by(id="test-doc-123").first() - if doc: - doc.status = "ready" - db_session.commit() - return {"status": "success"} - - mock_ingest.side_effect = simulate_successful_ingestion + patch("app.tasks.AdvancedPDFParser") as mock_parser_cls: + + mock_parser = MagicMock() + mock_parser_cls.return_value = mock_parser + mock_parser.ingest_document.return_value = [ + {"text": "mock chunk 1", "page_number": 1, "type": "text_layout"}, + ] task_result = process_document.apply( kwargs={ @@ -53,8 +50,8 @@ def simulate_successful_ingestion(*args, **kwargs): # 3. ASSERT: Verify the task metrics and status changes inside the session context assert task_result.status == "SUCCESS" - + # Query the database to verify the state update updated_doc = db_session.query(Document).filter_by(id="test-doc-123").first() assert updated_doc is not None - assert updated_doc.status == "ready" \ No newline at end of file + assert updated_doc.status == "completed" \ No newline at end of file