Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ jobs:
pip install flake8 flake8-bugbear
# Install project deps (skip heavy ML libs with stub extras)
pip install -r backend/requirements.txt --quiet || true
# Install document-processing dependencies (force reinstall to fix cached stale files)
pip install --force-reinstall pymupdf4llm google-genai

- name: Flake8 lint (errors only, no style noise)
run: |
Expand All @@ -56,6 +58,7 @@ jobs:
DATABASE_URL: sqlite:///./ci_test.db
DEBUG: "false"
HF_TOKEN: ci-dummy-token
GOOGLE_API_KEY: ci-dummy-key
UPLOAD_DIR: /tmp/uploads
CHROMA_PERSIST_DIR: /tmp/chroma
run: |
Expand All @@ -70,6 +73,7 @@ jobs:
DATABASE_URL: sqlite:///./ci_test.db
DEBUG: "false"
HF_TOKEN: ci-dummy-token
GOOGLE_API_KEY: ci-dummy-key
UPLOAD_DIR: /tmp/uploads
CHROMA_PERSIST_DIR: /tmp/chroma
run: |
Expand Down
27 changes: 16 additions & 11 deletions backend/app/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import json
import logging
import os
import threading
from typing import Optional

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,26 +75,30 @@ def _get_redis():

_lru_store: dict = {}
_lru_order: list = []
_lru_lock = threading.Lock()


def _lru_get(key: str) -> Optional[str]:
return _lru_store.get(key)
with _lru_lock:
return _lru_store.get(key)


def _lru_set(key: str, value: str) -> None:
if key in _lru_store:
_lru_order.remove(key)
elif len(_lru_store) >= LRU_MAX_SIZE:
oldest = _lru_order.pop(0)
del _lru_store[oldest]
_lru_store[key] = value
_lru_order.append(key)
with _lru_lock:
if key in _lru_store:
_lru_order.remove(key)
elif len(_lru_store) >= LRU_MAX_SIZE:
oldest = _lru_order.pop(0)
del _lru_store[oldest]
_lru_store[key] = value
_lru_order.append(key)


def _lru_delete(key: str) -> None:
if key in _lru_store:
del _lru_store[key]
_lru_order.remove(key)
with _lru_lock:
if key in _lru_store:
del _lru_store[key]
_lru_order.remove(key)


# ---------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ httpx

# Document Processing
PyMuPDF
pymupdf4llm
pdfplumber
python-docx
unstructured[pdf]
Expand All @@ -54,6 +55,7 @@ spacy>=3.7
neo4j>=5.0

# LLM Inference
google-genai
huggingface-hub

# Production
Expand Down
31 changes: 14 additions & 17 deletions backend/tests/test_celery_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from app.models import Document
from app.tasks import process_document


def test_process_document_ingestion_pipeline(db_session):
"""
Test that the Celery task updates document status from pending to ready
by executing the ingestion engine inside the active test database session.
Test that the Celery task updates document status from pending to completed
by executing the layout-aware parser pipeline inside the active test database session.
"""

# 1. SETUP: Create a mock document that starts as 'pending'
Expand All @@ -17,7 +18,7 @@ def test_process_document_ingestion_pipeline(db_session):
filename="sample.pdf",
original_name="sample.pdf",
status="pending",
user_id="user-456"
user_id="user-456",
)
db_session.add(test_doc)
db_session.commit()
Expand All @@ -27,20 +28,16 @@ def test_process_document_ingestion_pipeline(db_session):
mock_session_factory.return_value.__enter__.return_value = db_session
mock_session_factory.return_value = db_session

# Patch the factory globally, and patch ingest_document right where app.tasks calls it
# Patch the factory globally, and mock AdvancedPDFParser so no real PDF is parsed
with patch("app.database.SessionLocal", mock_session_factory, create=True), \
patch("app.services.document_ingestion.SessionLocal", mock_session_factory, create=True), \
patch("app.tasks.ingest_document") as mock_ingest:

# Simulate what the underlying service does upon a successful processing run
def simulate_successful_ingestion(*args, **kwargs):
doc = db_session.query(Document).filter_by(id="test-doc-123").first()
if doc:
doc.status = "ready"
db_session.commit()
return {"status": "success"}

mock_ingest.side_effect = simulate_successful_ingestion
patch("app.tasks.AdvancedPDFParser") as mock_parser_cls:

mock_parser = MagicMock()
mock_parser_cls.return_value = mock_parser
mock_parser.ingest_document.return_value = [
{"text": "mock chunk 1", "page_number": 1, "type": "text_layout"},
]

task_result = process_document.apply(
kwargs={
Expand All @@ -53,8 +50,8 @@ def simulate_successful_ingestion(*args, **kwargs):

# 3. ASSERT: Verify the task metrics and status changes inside the session context
assert task_result.status == "SUCCESS"

# Query the database to verify the state update
updated_doc = db_session.query(Document).filter_by(id="test-doc-123").first()
assert updated_doc is not None
assert updated_doc.status == "ready"
assert updated_doc.status == "completed"
Loading