Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions backend/app/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
"""Celery application configured for Redis-backed background jobs."""
import os
from celery import Celery

from app.config import get_settings


settings = get_settings()

# Initialize the Celery application instance
celery_app = Celery(
"pdf_assistant_rag",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=["app.tasks"],
"worker",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
)

# Optional configuration updates for reliable serialization
celery_app.conf.update(
task_track_started=settings.CELERY_TASK_TRACK_STARTED,
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
result_serializer="json",
)

# Tell Celery to discover background tasks dynamically to break circular loops
celery_app.autodiscover_tasks(["app"])
33 changes: 7 additions & 26 deletions backend/app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,10 @@ def process_document(
user_id: str,
) -> dict[str, str]:
"""Run the RAG ingestion pipeline for a stored document."""
try:
with get_db_session() as db:
doc = db.query(Document).filter(Document.id == document_id).first()
if doc:
doc.processing_started_at = __import__("datetime").datetime.now(__import__("datetime").timezone.utc)
doc.retry_count = (doc.retry_count or 0) + 1
db.commit()

ingest_document(
document_id=document_id,
filepath=filepath,
original_name=original_name,
user_id=user_id,
)
return {"document_id": document_id, "status": "completed"}
except Exception as exc:
logger.error("Document %s processing failed (attempt %s): %s", document_id, self.request.retries + 1, exc)
with get_db_session() as db:
doc = db.query(Document).filter(Document.id == document_id).first()
if doc and self.request.retries >= (self.max_retries or 3) - 1:
doc.status = "failed"
doc.last_error_traceback = traceback.format_exc()[:2000]
doc.processing_progress = 0
db.commit()
raise

ingest_document(
document_id=document_id,
filepath=filepath,
original_name=original_name,
user_id=user_id,
)
return {"document_id": document_id, "status": "completed"}
60 changes: 60 additions & 0 deletions backend/tests/test_celery_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from unittest.mock import patch, MagicMock

# Core app imports
from app.models import Document
from app.tasks import process_document

def test_process_document_ingestion_pipeline(db_session):
"""
Test that the Celery task updates document status from pending to ready
by executing the ingestion engine inside the active test database session.
"""

# 1. SETUP: Create a mock document that starts as 'pending'
test_doc = Document(
id="test-doc-123",
filename="sample.pdf",
original_name="sample.pdf",
status="pending",
user_id="user-456"
)
db_session.add(test_doc)
db_session.commit()

# 2. ACT: Create a mock engine session factory context that yields our test db_session
mock_session_factory = MagicMock()
mock_session_factory.return_value.__enter__.return_value = db_session
mock_session_factory.return_value = db_session

# Patch the factory globally, and patch ingest_document right where app.tasks calls it
with patch("app.database.SessionLocal", mock_session_factory, create=True), \
patch("app.services.document_ingestion.SessionLocal", mock_session_factory, create=True), \
patch("app.tasks.ingest_document") as mock_ingest:

# Simulate what the underlying service does upon a successful processing run
def simulate_successful_ingestion(*args, **kwargs):
doc = db_session.query(Document).filter_by(id="test-doc-123").first()
if doc:
doc.status = "ready"
db_session.commit()
return {"status": "success"}

mock_ingest.side_effect = simulate_successful_ingestion

task_result = process_document.apply(
kwargs={
"document_id": "test-doc-123",
"filepath": "/tmp/sample.pdf",
"original_name": "sample.pdf",
"user_id": "user-456",
}
)

# 3. ASSERT: Verify the task metrics and status changes inside the session context
assert task_result.status == "SUCCESS"

# Query the database to verify the state update
updated_doc = db_session.query(Document).filter_by(id="test-doc-123").first()
assert updated_doc is not None
assert updated_doc.status == "ready"
Loading