From af741f9e62f7c7344ad05d9e1685c6ad865102f5 Mon Sep 17 00:00:00 2001 From: Rico Furtado Date: Mon, 18 May 2026 13:15:51 -0400 Subject: [PATCH 1/3] feat: implement Docling usage metering service and integrate with LangflowFileService --- src/app/container.py | 18 ++ src/config/settings.py | 20 ++ src/dependencies.py | 4 + src/services/docling_metering_service.py | 113 +++++++++ src/services/docling_polling_service.py | 32 ++- src/services/docling_service.py | 2 +- src/services/langflow_file_service.py | 84 ++++++- tests/unit/test_docling_metering_service.py | 248 ++++++++++++++++++++ tests/unit/test_docling_polling_service.py | 20 ++ 9 files changed, 535 insertions(+), 6 deletions(-) create mode 100644 src/services/docling_metering_service.py create mode 100644 tests/unit/test_docling_metering_service.py diff --git a/src/app/container.py b/src/app/container.py index 1f20d4f4a..32ecee8b0 100644 --- a/src/app/container.py +++ b/src/app/container.py @@ -5,7 +5,10 @@ from api.connector_router import ConnectorRouter from config.settings import ( + DOCLING_DEPLOYMENT_MODE, + DOCLING_METERING_LOG_PATH, ENABLE_BACKEND_DOCLING_POLLING, + ENABLE_DOCLING_METERING, INGESTION_TIMEOUT, JWT_SIGNING_KEY, SESSION_SECRET, @@ -20,6 +23,7 @@ from services.api_key_service import APIKeyService from services.auth_service import AuthService from services.chat_service import ChatService +from services.docling_metering_service import DoclingMeteringService from services.docling_polling_service import DoclingPollingService from services.document_service import DocumentService from services.flows_service import FlowsService @@ -72,6 +76,18 @@ async def initialize_services(): search_service = SearchService(session_manager, models_service) register_search_service(search_service) + # Docling usage metering — optional JSONL sink for billing/auditing. + # Gated by ENABLE_DOCLING_METERING; when disabled callers receive None + # and skip all metering without any branching in hot-path code. + docling_metering_service = ( + DoclingMeteringService( + log_path=DOCLING_METERING_LOG_PATH, + deployment_mode=DOCLING_DEPLOYMENT_MODE, + ) + if ENABLE_DOCLING_METERING + else None + ) + # Backend-side Docling polling coordinator. Constructed once as a # singleton (it is stateless) and gated by ENABLE_BACKEND_DOCLING_POLLING # so operators can roll back to the legacy single-call ingestion path @@ -97,6 +113,7 @@ async def initialize_services(): langflow_file_service = LangflowFileService( flows_service=flows_service, docling_service=clients.docling_service, + metering_service=docling_metering_service, ) langflow_mcp_service = LangflowMCPService() @@ -213,6 +230,7 @@ def _lazy_session_factory(): "langflow_mcp_service": langflow_mcp_service, "docling_service": clients.docling_service, "docling_polling_service": docling_polling_service, + "docling_metering_service": docling_metering_service, "rbac_service": rbac_service, "workspace_config_service": workspace_config_service, } diff --git a/src/config/settings.py b/src/config/settings.py index 7c158b4ec..2b92ff6ae 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -199,6 +199,26 @@ DOCLING_POLL_BACKOFF_FACTOR = get_env_float("DOCLING_POLL_BACKOFF_FACTOR", 1.5) DOCLING_POLL_TRANSIENT_RETRIES = get_env_int("DOCLING_POLL_TRANSIENT_RETRIES", 5) +# Docling usage metering configuration. +# When enabled, a JSONL record is appended for every file submitted to Docling, +# capturing timing, outcome, size, and owner for billing/auditing purposes. +ENABLE_DOCLING_METERING = os.getenv("ENABLE_DOCLING_METERING", "true").lower() in ( + "true", + "1", + "yes", +) + +from config.paths import get_data_file as _get_data_file + +DOCLING_METERING_LOG_PATH = os.getenv( + "DOCLING_METERING_LOG_PATH", _get_data_file("docling_tasks_logs.jsonl") +) +# Deployment mode of the Docling Serve instance — "direct" (synchronous) or +# "rq" (Redis Queue, asynchronous). Stored in each meter record so that +# elapsed_seconds can be interpreted correctly: in RQ mode it includes +# queue wait time in addition to conversion time. +DOCLING_DEPLOYMENT_MODE = os.getenv("DOCLING_DEPLOYMENT_MODE", "direct").lower() + def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" diff --git a/src/dependencies.py b/src/dependencies.py index 5a12c0f09..afd58f9b4 100644 --- a/src/dependencies.py +++ b/src/dependencies.py @@ -137,6 +137,10 @@ def get_docling_polling_service(services: dict = Depends(get_services)): return services["docling_polling_service"] +def get_docling_metering_service(services: dict = Depends(get_services)): + return services.get("docling_metering_service") + + def get_rbac_service(services: dict = Depends(get_services)): return services["rbac_service"] diff --git a/src/services/docling_metering_service.py b/src/services/docling_metering_service.py new file mode 100644 index 000000000..d80f3b9dc --- /dev/null +++ b/src/services/docling_metering_service.py @@ -0,0 +1,113 @@ +"""Docling usage metering — append-only JSONL record per file submission. + +Each record captures the full lifecycle of one Docling conversion attempt: +submission timestamp, terminal outcome, wall-clock elapsed time, file +metadata, and the owner's user id. Records are written atomically (one +JSON line each) under an asyncio lock so concurrent ingestion tasks never +interleave partial writes. + +Deployment mode awareness +-------------------------- +When Docling is deployed with Redis Queue (``deployment_mode="rq"``), the +task sits in a queue before a worker picks it up. The ``elapsed_seconds`` +field therefore includes queue wait time in addition to GPU/CPU conversion +time. The ``deployment_mode`` field in each record lets downstream +billing logic account for this distinction. +""" + +import asyncio +import dataclasses +import json +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +import aiofiles + +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +@dataclass +class DoclingMeterRecord: + task_id: str + filename: str + size_bytes: int + mimetype: str + owner_user_id: Optional[str] + submitted_at: str # ISO-8601 UTC timestamp + terminal_at: str # ISO-8601 UTC timestamp + elapsed_seconds: float # wall-clock from submission to terminal + outcome: str # "success" | "failed" | "expired" | "timeout" | "submit_failed" + failure_detail: Optional[str] + poll_count: int # status-check calls made; 0 for legacy (Langflow-polling) path + deployment_mode: str # "direct" | "rq" + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class DoclingMeteringService: + """Writes one JSONL record per Docling file submission to a log file.""" + + def __init__(self, log_path: str, deployment_mode: str = "direct"): + self._log_path = log_path + self._deployment_mode = deployment_mode + self._lock = asyncio.Lock() + + @property + def deployment_mode(self) -> str: + return self._deployment_mode + + def build_record( + self, + *, + task_id: str, + filename: str, + size_bytes: int, + mimetype: str, + owner_user_id: Optional[str], + submitted_at: str, + terminal_at: str, + elapsed_seconds: float, + outcome: str, + failure_detail: Optional[str] = None, + poll_count: int = 0, + ) -> DoclingMeterRecord: + return DoclingMeterRecord( + task_id=task_id, + filename=filename, + size_bytes=size_bytes, + mimetype=mimetype, + owner_user_id=owner_user_id, + submitted_at=submitted_at, + terminal_at=terminal_at, + elapsed_seconds=round(elapsed_seconds, 3), + outcome=outcome, + failure_detail=failure_detail, + poll_count=poll_count, + deployment_mode=self._deployment_mode, + ) + + async def record(self, meter_record: DoclingMeterRecord) -> None: + """Append *meter_record* as a single JSON line to the metering log. + + Errors are swallowed with a warning so a metering failure never + interrupts the ingestion path. + """ + line = json.dumps(dataclasses.asdict(meter_record), default=str) + "\n" + async with self._lock: + try: + log_dir = os.path.dirname(os.path.abspath(self._log_path)) + os.makedirs(log_dir, exist_ok=True) + async with aiofiles.open(self._log_path, "a", encoding="utf-8") as fh: + await fh.write(line) + except Exception as exc: + logger.warning( + "Failed to write Docling meter record", + error=str(exc), + task_id=meter_record.task_id, + ) diff --git a/src/services/docling_polling_service.py b/src/services/docling_polling_service.py index a8cc83a07..7ec4adb75 100644 --- a/src/services/docling_polling_service.py +++ b/src/services/docling_polling_service.py @@ -35,6 +35,7 @@ class DoclingPollResult: detail: Optional[str] = None last_snapshot: Optional[DoclingStatusSnapshot] = None elapsed_seconds: float = 0.0 + poll_count: int = 0 class DoclingPollingService: @@ -54,6 +55,11 @@ async def poll_until_ready( ) -> DoclingPollResult: """Loop on Docling status until terminal or until max_seconds elapses. + A SUCCESS status is treated as ready only after the result endpoint + returns a payload with usable ``document.json_content``. This prevents + handing Langflow a task that Docling accepted but failed to convert + into a consumable document. + Transient errors (network, 5xx, NOT_FOUND seen briefly before the task is registered server-side) are absorbed up to ``transient_retry_budget`` before being surfaced as failures. The interval grows by @@ -70,6 +76,7 @@ async def poll_until_ready( interval = poll_interval consecutive_not_found = 0 last_snapshot: Optional[DoclingStatusSnapshot] = None + poll_count = 0 logger.debug("Starting Docling polling", task_id=task_id) @@ -77,12 +84,31 @@ async def poll_until_ready( logger.debug("Docling polling", task_id=task_id) snapshot = await self.docling_service.check_task_status(task_id) last_snapshot = snapshot + poll_count += 1 logger.debug("Snapshot received", task_id=task_id, snapshot=last_snapshot) elapsed = time.monotonic() - start if snapshot.state == DoclingTaskState.SUCCESS: + try: + await self.docling_service.fetch_task_result(task_id) + except Exception as e: + detail = f"Docling result unavailable after SUCCESS status: {str(e)}" + logger.warning( + "Docling task reached SUCCESS but result fetch failed", + task_id=task_id, + detail=detail, + elapsed_seconds=round(elapsed, 2), + ) + return DoclingPollResult( + outcome=PollOutcome.FAILED, + detail=detail, + last_snapshot=snapshot, + elapsed_seconds=elapsed, + poll_count=poll_count, + ) + logger.debug( - "Docling task reached SUCCESS", + "Docling task reached SUCCESS and result is available", task_id=task_id, elapsed_seconds=round(elapsed, 2), ) @@ -90,6 +116,7 @@ async def poll_until_ready( outcome=PollOutcome.SUCCESS, last_snapshot=snapshot, elapsed_seconds=elapsed, + poll_count=poll_count, ) if snapshot.state == DoclingTaskState.FAILED: @@ -104,6 +131,7 @@ async def poll_until_ready( detail=snapshot.detail or "Docling reported failure", last_snapshot=snapshot, elapsed_seconds=elapsed, + poll_count=poll_count, ) if snapshot.state == DoclingTaskState.NOT_FOUND: @@ -122,6 +150,7 @@ async def poll_until_ready( detail="Docling task not found (expired or unknown task_id)", last_snapshot=snapshot, elapsed_seconds=elapsed, + poll_count=poll_count, ) else: consecutive_not_found = 0 @@ -139,6 +168,7 @@ async def poll_until_ready( detail=f"Docling polling timed out after {max_seconds}s", last_snapshot=snapshot, elapsed_seconds=time.monotonic() - start, + poll_count=poll_count, ) await asyncio.sleep(min(interval, remaining)) interval = min(interval * backoff_factor, max_interval) diff --git a/src/services/docling_service.py b/src/services/docling_service.py index 7c1b2db7e..aee1c1a1b 100644 --- a/src/services/docling_service.py +++ b/src/services/docling_service.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from enum import StrEnum from pathlib import Path -from typing import Any +from typing import Any, Dict, Optional import httpx from pydantic import BaseModel diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index c2ca4ae01..6c8df53a8 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -1,8 +1,9 @@ import asyncio import json import time +from datetime import datetime, timezone from pathlib import Path -from typing import Any +from typing import Any, Dict, Optional import httpx @@ -17,10 +18,11 @@ class LangflowFileService: - def __init__(self, flows_service=None, docling_service=None): + def __init__(self, flows_service=None, docling_service=None, metering_service=None): self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID self.flows_service = flows_service self.docling_service = docling_service + self.metering_service = metering_service self.flow_id_url_ingest = LANGFLOW_URL_INGEST_FLOW_ID _TRANSIENT_STATUS_CODES = {408, 429, 500, 502, 503, 504} @@ -590,20 +592,24 @@ async def upload_and_ingest_file( logger.debug("[LF] Starting two-phase Docling+Langflow ingest") - filename, content, _ = file_tuple + filename, content, mimetype = file_tuple + size_bytes = len(content) # ── Phase 1: submit to Docling ────────────────────────────────── if file_task is not None: file_task.phase = IngestionPhase.DOCLING file_task.docling_status = DoclingPhaseStatus.PENDING - task_id = await self.submit_to_docling(filename, content, owner=owner, jwt_token=jwt_token) + task_id = await self.submit_to_docling(filename, content, jwt_token=jwt_token, owner=owner) + submitted_at = datetime.now(timezone.utc).isoformat() + _submit_wall = time.monotonic() if file_task is not None: file_task.docling_task_id = task_id file_task.docling_status = DoclingPhaseStatus.PROCESSING # ── Phase 1b: backend-side polling (optional) ─────────────────── + poll_count = 0 if docling_polling_service is not None: from config.settings import ( DOCLING_POLL_BACKOFF_FACTOR, @@ -622,6 +628,7 @@ async def upload_and_ingest_file( backoff_factor=DOCLING_POLL_BACKOFF_FACTOR, transient_retry_budget=DOCLING_POLL_TRANSIENT_RETRIES, ) + poll_count = poll_result.poll_count if poll_result.outcome != PollOutcome.SUCCESS: if file_task is not None: @@ -639,6 +646,18 @@ async def upload_and_ingest_file( "elapsed_seconds": round(poll_result.elapsed_seconds, 2), }, ) + await self._record_meter( + task_id=task_id, + filename=filename, + size_bytes=size_bytes, + mimetype=mimetype, + owner=owner, + submitted_at=submitted_at, + elapsed_seconds=time.monotonic() - _submit_wall, + outcome=poll_result.outcome.value, + failure_detail=poll_result.detail, + poll_count=poll_count, + ) raise Exception( f"Docling conversion did not complete ({poll_result.outcome.value}): " f"{poll_result.detail or 'no detail provided'}" @@ -687,6 +706,18 @@ async def upload_and_ingest_file( "[LF] Ingestion failed during combined operation", extra={"error": str(e), "filename": filename}, ) + await self._record_meter( + task_id=task_id, + filename=filename, + size_bytes=size_bytes, + mimetype=mimetype, + owner=owner, + submitted_at=submitted_at, + elapsed_seconds=time.monotonic() - _submit_wall, + outcome="langflow_failed", + failure_detail=str(e), + poll_count=poll_count, + ) # Docling Serve has no cancel endpoint; let any orphan task expire. raise @@ -699,9 +730,54 @@ async def upload_and_ingest_file( # fields coherent. Idempotent for the polling path. file_task.docling_status = DoclingPhaseStatus.SUCCESS + await self._record_meter( + task_id=task_id, + filename=filename, + size_bytes=size_bytes, + mimetype=mimetype, + owner=owner, + submitted_at=submitted_at, + elapsed_seconds=time.monotonic() - _submit_wall, + outcome="success", + poll_count=poll_count, + ) + return { "status": "success", "docling_task_id": task_id, "ingestion": ingest_result, "message": f"File '{filename}' processed via Docling and ingested successfully", } + + async def _record_meter( + self, + *, + task_id: str, + filename: str, + size_bytes: int, + mimetype: str, + owner: Optional[str], + submitted_at: str, + elapsed_seconds: float, + outcome: str, + failure_detail: Optional[str] = None, + poll_count: int = 0, + ) -> None: + """Fire-and-forget metering record. No-ops when metering is disabled.""" + if self.metering_service is None: + return + terminal_at = datetime.now(timezone.utc).isoformat() + record = self.metering_service.build_record( + task_id=task_id, + filename=filename, + size_bytes=size_bytes, + mimetype=mimetype, + owner_user_id=owner, + submitted_at=submitted_at, + terminal_at=terminal_at, + elapsed_seconds=elapsed_seconds, + outcome=outcome, + failure_detail=failure_detail, + poll_count=poll_count, + ) + await self.metering_service.record(record) diff --git a/tests/unit/test_docling_metering_service.py b/tests/unit/test_docling_metering_service.py new file mode 100644 index 000000000..26d416e67 --- /dev/null +++ b/tests/unit/test_docling_metering_service.py @@ -0,0 +1,248 @@ +"""Unit tests for DoclingMeteringService and its integration with LangflowFileService. + +Verifies: + - JSONL records are written with the correct fields + - Metering fires on success, polling failure, and Langflow failure + - Metering is silently skipped when metering_service is None + - poll_count is forwarded correctly from DoclingPollResult + - File I/O errors are swallowed (metering never disrupts ingestion) +""" + +import json +import os +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from services.docling_metering_service import DoclingMeteringService, DoclingMeterRecord +from services.docling_polling_service import DoclingPollResult, PollOutcome +from services.langflow_file_service import LangflowFileService +from models.tasks import FileTask + + +# ── DoclingMeteringService unit tests ──────────────────────────────────────── + + +@pytest.fixture +def metering_service(tmp_path): + log_file = str(tmp_path / "meter.jsonl") + return DoclingMeteringService(log_path=log_file, deployment_mode="direct") + + +def _sample_record(**overrides) -> DoclingMeterRecord: + defaults = dict( + task_id="t-001", + filename="doc.pdf", + size_bytes=1024, + mimetype="application/pdf", + owner_user_id="user-42", + submitted_at="2026-05-14T12:00:00+00:00", + terminal_at="2026-05-14T12:00:10+00:00", + elapsed_seconds=10.0, + outcome="success", + failure_detail=None, + poll_count=3, + deployment_mode="direct", + ) + defaults.update(overrides) + return DoclingMeterRecord(**defaults) + + +@pytest.mark.asyncio +async def test_record_writes_jsonl_line(metering_service, tmp_path): + rec = _sample_record() + await metering_service.record(rec) + + log_path = tmp_path / "meter.jsonl" + lines = log_path.read_text().strip().splitlines() + assert len(lines) == 1 + data = json.loads(lines[0]) + assert data["task_id"] == "t-001" + assert data["outcome"] == "success" + assert data["poll_count"] == 3 + assert data["deployment_mode"] == "direct" + assert data["size_bytes"] == 1024 + + +@pytest.mark.asyncio +async def test_record_appends_multiple_lines(metering_service, tmp_path): + await metering_service.record(_sample_record(task_id="t-001")) + await metering_service.record(_sample_record(task_id="t-002")) + await metering_service.record(_sample_record(task_id="t-003")) + + log_path = tmp_path / "meter.jsonl" + lines = log_path.read_text().strip().splitlines() + assert len(lines) == 3 + ids = [json.loads(l)["task_id"] for l in lines] + assert ids == ["t-001", "t-002", "t-003"] + + +@pytest.mark.asyncio +async def test_record_creates_parent_directory(tmp_path): + nested = str(tmp_path / "a" / "b" / "meter.jsonl") + svc = DoclingMeteringService(log_path=nested, deployment_mode="rq") + await svc.record(_sample_record(deployment_mode="rq")) + assert os.path.isfile(nested) + + +@pytest.mark.asyncio +async def test_record_swallows_io_errors(metering_service): + """A file I/O failure must not propagate — metering is fire-and-forget.""" + with patch("aiofiles.open", side_effect=OSError("disk full")): + # Should not raise. + await metering_service.record(_sample_record()) + + +def test_build_record_rounds_elapsed(metering_service): + rec = metering_service.build_record( + task_id="t-x", + filename="f.pdf", + size_bytes=512, + mimetype="application/pdf", + owner_user_id=None, + submitted_at="2026-05-14T00:00:00+00:00", + terminal_at="2026-05-14T00:00:07+00:00", + elapsed_seconds=7.123456789, + outcome="failed", + failure_detail="OCR crash", + poll_count=2, + ) + assert rec.elapsed_seconds == 7.123 + assert rec.deployment_mode == "direct" + + +# ── Integration tests: LangflowFileService metering wire-up ───────────────── + + +@pytest.fixture +def file_tuple(): + return ("report.pdf", b"PDF" * 100, "application/pdf") + + +@pytest.fixture +def file_task(): + return FileTask(file_path="/tmp/report.pdf", filename="report.pdf") + + +@pytest.fixture +def mock_docling_service(): + svc = AsyncMock() + svc.upload_to_docling_direct_async.return_value = "task-xyz" + return svc + + +@pytest.fixture +def mock_metering_service(): + svc = MagicMock(spec=DoclingMeteringService) + svc.build_record.return_value = _sample_record() + svc.record = AsyncMock() + return svc + + +@pytest.fixture +def langflow_service(mock_docling_service, mock_metering_service): + svc = LangflowFileService( + docling_service=mock_docling_service, + metering_service=mock_metering_service, + ) + svc.run_ingestion_flow = AsyncMock(return_value={"status": "ok"}) + return svc + + +@pytest.mark.asyncio +async def test_metering_called_on_success( + langflow_service, mock_metering_service, mock_polling_service, file_tuple, file_task +): + mock_polling_service.poll_until_ready.return_value = DoclingPollResult( + outcome=PollOutcome.SUCCESS, elapsed_seconds=5.0, poll_count=2 + ) + await langflow_service.upload_and_ingest_file( + file_tuple=file_tuple, + docling_polling_service=mock_polling_service, + file_task=file_task, + ) + assert mock_metering_service.record.call_count == 1 + build_kwargs = mock_metering_service.build_record.call_args.kwargs + assert build_kwargs["outcome"] == "success" + assert build_kwargs["poll_count"] == 2 + assert build_kwargs["size_bytes"] == len(b"PDF" * 100) + assert build_kwargs["mimetype"] == "application/pdf" + + +@pytest.mark.asyncio +async def test_metering_called_on_poll_failure( + langflow_service, mock_metering_service, mock_polling_service, file_tuple, file_task +): + mock_polling_service.poll_until_ready.return_value = DoclingPollResult( + outcome=PollOutcome.FAILED, detail="OCR crash", poll_count=5 + ) + with pytest.raises(Exception): + await langflow_service.upload_and_ingest_file( + file_tuple=file_tuple, + docling_polling_service=mock_polling_service, + file_task=file_task, + ) + assert mock_metering_service.record.call_count == 1 + build_kwargs = mock_metering_service.build_record.call_args.kwargs + assert build_kwargs["outcome"] == "failed" + assert build_kwargs["failure_detail"] == "OCR crash" + assert build_kwargs["poll_count"] == 5 + + +@pytest.mark.asyncio +async def test_metering_called_on_langflow_failure( + langflow_service, mock_metering_service, mock_polling_service, file_tuple, file_task +): + mock_polling_service.poll_until_ready.return_value = DoclingPollResult( + outcome=PollOutcome.SUCCESS, elapsed_seconds=3.0, poll_count=1 + ) + langflow_service.run_ingestion_flow = AsyncMock(side_effect=RuntimeError("flow crashed")) + + with pytest.raises(RuntimeError): + await langflow_service.upload_and_ingest_file( + file_tuple=file_tuple, + docling_polling_service=mock_polling_service, + file_task=file_task, + ) + build_kwargs = mock_metering_service.build_record.call_args.kwargs + assert build_kwargs["outcome"] == "langflow_failed" + assert "flow crashed" in build_kwargs["failure_detail"] + + +@pytest.mark.asyncio +async def test_metering_skipped_when_service_is_none( + mock_docling_service, mock_polling_service, file_tuple, file_task +): + """When no metering_service is injected, the ingestion still completes normally.""" + svc = LangflowFileService(docling_service=mock_docling_service, metering_service=None) + svc.run_ingestion_flow = AsyncMock(return_value={"status": "ok"}) + mock_polling_service.poll_until_ready.return_value = DoclingPollResult( + outcome=PollOutcome.SUCCESS, elapsed_seconds=1.0, poll_count=1 + ) + result = await svc.upload_and_ingest_file( + file_tuple=file_tuple, + docling_polling_service=mock_polling_service, + file_task=file_task, + ) + assert result["status"] == "success" + + +@pytest.mark.asyncio +async def test_poll_count_zero_for_legacy_path( + langflow_service, mock_metering_service, file_tuple, file_task +): + """Legacy path (no polling service) must record poll_count=0.""" + result = await langflow_service.upload_and_ingest_file( + file_tuple=file_tuple, + docling_polling_service=None, + file_task=file_task, + ) + assert result["status"] == "success" + build_kwargs = mock_metering_service.build_record.call_args.kwargs + assert build_kwargs["poll_count"] == 0 + assert build_kwargs["outcome"] == "success" + + +# shared fixture reused from conftest scope +@pytest.fixture +def mock_polling_service(): + return AsyncMock() diff --git a/tests/unit/test_docling_polling_service.py b/tests/unit/test_docling_polling_service.py index 803e01bb8..c2aaee44f 100644 --- a/tests/unit/test_docling_polling_service.py +++ b/tests/unit/test_docling_polling_service.py @@ -42,6 +42,7 @@ def no_sleep(): @pytest.mark.asyncio async def test_returns_success_immediately_when_already_done(polling_service, mock_docling_service): mock_docling_service.check_task_status.return_value = _snap(DoclingTaskState.SUCCESS) + mock_docling_service.fetch_task_result.return_value = {"body": "ok"} result = await polling_service.poll_until_ready( task_id="t1", poll_interval=1.0, max_seconds=10.0 @@ -49,6 +50,7 @@ async def test_returns_success_immediately_when_already_done(polling_service, mo assert result.outcome == PollOutcome.SUCCESS assert mock_docling_service.check_task_status.call_count == 1 + mock_docling_service.fetch_task_result.assert_awaited_once_with("t1") @pytest.mark.asyncio @@ -61,6 +63,7 @@ async def test_loops_through_processing_then_success( _snap(DoclingTaskState.PROCESSING), _snap(DoclingTaskState.SUCCESS), ] + mock_docling_service.fetch_task_result.return_value = {"body": "ok"} result = await polling_service.poll_until_ready( task_id="t1", poll_interval=1.0, max_seconds=60.0 @@ -69,6 +72,23 @@ async def test_loops_through_processing_then_success( assert result.outcome == PollOutcome.SUCCESS assert mock_docling_service.check_task_status.call_count == 4 assert no_sleep.call_count == 3 + mock_docling_service.fetch_task_result.assert_awaited_once_with("t1") + + +@pytest.mark.asyncio +async def test_success_status_requires_fetchable_result(polling_service, mock_docling_service): + mock_docling_service.check_task_status.return_value = _snap(DoclingTaskState.SUCCESS) + mock_docling_service.fetch_task_result.side_effect = RuntimeError( + "missing document.json_content" + ) + + result = await polling_service.poll_until_ready( + task_id="t1", poll_interval=1.0, max_seconds=10.0 + ) + + assert result.outcome == PollOutcome.FAILED + assert "missing document.json_content" in (result.detail or "") + mock_docling_service.fetch_task_result.assert_awaited_once_with("t1") @pytest.mark.asyncio From 08ef0dc58b8aa45fedeb595571a134a25ee13c98 Mon Sep 17 00:00:00 2001 From: Rico Furtado Date: Mon, 18 May 2026 14:05:48 -0400 Subject: [PATCH 2/3] fix: reorder parameters in submit_to_docling method and remove unused fixture in tests --- src/services/langflow_file_service.py | 2 +- tests/unit/test_docling_metering_service.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 6c8df53a8..598e71a9c 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -600,7 +600,7 @@ async def upload_and_ingest_file( file_task.phase = IngestionPhase.DOCLING file_task.docling_status = DoclingPhaseStatus.PENDING - task_id = await self.submit_to_docling(filename, content, jwt_token=jwt_token, owner=owner) + task_id = await self.submit_to_docling(filename, content, owner=owner, jwt_token=jwt_token) submitted_at = datetime.now(timezone.utc).isoformat() _submit_wall = time.monotonic() diff --git a/tests/unit/test_docling_metering_service.py b/tests/unit/test_docling_metering_service.py index 26d416e67..24394b05e 100644 --- a/tests/unit/test_docling_metering_service.py +++ b/tests/unit/test_docling_metering_service.py @@ -242,7 +242,6 @@ async def test_poll_count_zero_for_legacy_path( assert build_kwargs["outcome"] == "success" -# shared fixture reused from conftest scope @pytest.fixture def mock_polling_service(): return AsyncMock() From 17da3769f01598d8bb8d7a068013cfd6046cd11a Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 18 May 2026 18:13:07 +0000 Subject: [PATCH 3/3] style: ruff format (auto) --- src/services/docling_metering_service.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/services/docling_metering_service.py b/src/services/docling_metering_service.py index d80f3b9dc..c204c5c2a 100644 --- a/src/services/docling_metering_service.py +++ b/src/services/docling_metering_service.py @@ -37,13 +37,13 @@ class DoclingMeterRecord: size_bytes: int mimetype: str owner_user_id: Optional[str] - submitted_at: str # ISO-8601 UTC timestamp - terminal_at: str # ISO-8601 UTC timestamp + submitted_at: str # ISO-8601 UTC timestamp + terminal_at: str # ISO-8601 UTC timestamp elapsed_seconds: float # wall-clock from submission to terminal - outcome: str # "success" | "failed" | "expired" | "timeout" | "submit_failed" + outcome: str # "success" | "failed" | "expired" | "timeout" | "submit_failed" failure_detail: Optional[str] - poll_count: int # status-check calls made; 0 for legacy (Langflow-polling) path - deployment_mode: str # "direct" | "rq" + poll_count: int # status-check calls made; 0 for legacy (Langflow-polling) path + deployment_mode: str # "direct" | "rq" def _utc_now_iso() -> str: