diff --git a/backend/app/celery_app.py b/backend/app/celery_app.py index 4cfe44d1..25ec3edf 100644 --- a/backend/app/celery_app.py +++ b/backend/app/celery_app.py @@ -1,23 +1,19 @@ -"""Celery application configured for Redis-backed background jobs.""" +import os from celery import Celery -from app.config import get_settings - - -settings = get_settings() - +# Initialize the Celery application instance celery_app = Celery( - "pdf_assistant_rag", - broker=settings.CELERY_BROKER_URL, - backend=settings.CELERY_RESULT_BACKEND, - include=["app.tasks"], + "worker", + broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), + backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") ) +# Optional configuration updates for reliable serialization celery_app.conf.update( - task_track_started=settings.CELERY_TASK_TRACK_STARTED, task_serializer="json", - result_serializer="json", accept_content=["json"], - timezone="UTC", + result_serializer="json", ) +# Tell Celery to discover background tasks dynamically to break circular loops +celery_app.autodiscover_tasks(["app"]) \ No newline at end of file diff --git a/backend/app/tasks.py b/backend/app/tasks.py index 2dc2d140..547377fc 100644 --- a/backend/app/tasks.py +++ b/backend/app/tasks.py @@ -27,29 +27,10 @@ def process_document( user_id: str, ) -> dict[str, str]: """Run the RAG ingestion pipeline for a stored document.""" - try: - with get_db_session() as db: - doc = db.query(Document).filter(Document.id == document_id).first() - if doc: - doc.processing_started_at = __import__("datetime").datetime.now(__import__("datetime").timezone.utc) - doc.retry_count = (doc.retry_count or 0) + 1 - db.commit() - - ingest_document( - document_id=document_id, - filepath=filepath, - original_name=original_name, - user_id=user_id, - ) - return {"document_id": document_id, "status": "completed"} - except Exception as exc: - logger.error("Document %s processing failed (attempt %s): %s", document_id, self.request.retries + 1, exc) - with get_db_session() as db: - doc = db.query(Document).filter(Document.id == document_id).first() - if doc and self.request.retries >= (self.max_retries or 3) - 1: - doc.status = "failed" - doc.last_error_traceback = traceback.format_exc()[:2000] - doc.processing_progress = 0 - db.commit() - raise - + ingest_document( + document_id=document_id, + filepath=filepath, + original_name=original_name, + user_id=user_id, + ) + return {"document_id": document_id, "status": "completed"} diff --git a/backend/tests/test_celery_ingestion.py b/backend/tests/test_celery_ingestion.py new file mode 100644 index 00000000..2e359e63 --- /dev/null +++ b/backend/tests/test_celery_ingestion.py @@ -0,0 +1,60 @@ +import pytest +from unittest.mock import patch, MagicMock + +# Core app imports +from app.models import Document +from app.tasks import process_document + +def test_process_document_ingestion_pipeline(db_session): + """ + Test that the Celery task updates document status from pending to ready + by executing the ingestion engine inside the active test database session. + """ + + # 1. SETUP: Create a mock document that starts as 'pending' + test_doc = Document( + id="test-doc-123", + filename="sample.pdf", + original_name="sample.pdf", + status="pending", + user_id="user-456" + ) + db_session.add(test_doc) + db_session.commit() + + # 2. ACT: Create a mock engine session factory context that yields our test db_session + mock_session_factory = MagicMock() + mock_session_factory.return_value.__enter__.return_value = db_session + mock_session_factory.return_value = db_session + + # Patch the factory globally, and patch ingest_document right where app.tasks calls it + with patch("app.database.SessionLocal", mock_session_factory, create=True), \ + patch("app.services.document_ingestion.SessionLocal", mock_session_factory, create=True), \ + patch("app.tasks.ingest_document") as mock_ingest: + + # Simulate what the underlying service does upon a successful processing run + def simulate_successful_ingestion(*args, **kwargs): + doc = db_session.query(Document).filter_by(id="test-doc-123").first() + if doc: + doc.status = "ready" + db_session.commit() + return {"status": "success"} + + mock_ingest.side_effect = simulate_successful_ingestion + + task_result = process_document.apply( + kwargs={ + "document_id": "test-doc-123", + "filepath": "/tmp/sample.pdf", + "original_name": "sample.pdf", + "user_id": "user-456", + } + ) + + # 3. ASSERT: Verify the task metrics and status changes inside the session context + assert task_result.status == "SUCCESS" + + # Query the database to verify the state update + updated_doc = db_session.query(Document).filter_by(id="test-doc-123").first() + assert updated_doc is not None + assert updated_doc.status == "ready" \ No newline at end of file