Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/app/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

from api.connector_router import ConnectorRouter
from config.settings import (
DOCLING_DEPLOYMENT_MODE,
DOCLING_METERING_LOG_PATH,
ENABLE_BACKEND_DOCLING_POLLING,
ENABLE_DOCLING_METERING,
INGESTION_TIMEOUT,
JWT_SIGNING_KEY,
SESSION_SECRET,
Expand All @@ -20,6 +23,7 @@
from services.api_key_service import APIKeyService
from services.auth_service import AuthService
from services.chat_service import ChatService
from services.docling_metering_service import DoclingMeteringService
from services.docling_polling_service import DoclingPollingService
from services.document_service import DocumentService
from services.flows_service import FlowsService
Expand Down Expand Up @@ -72,6 +76,18 @@ async def initialize_services():
search_service = SearchService(session_manager, models_service)
register_search_service(search_service)

# Docling usage metering — optional JSONL sink for billing/auditing.
# Gated by ENABLE_DOCLING_METERING; when disabled callers receive None
# and skip all metering without any branching in hot-path code.
docling_metering_service = (
DoclingMeteringService(
log_path=DOCLING_METERING_LOG_PATH,
deployment_mode=DOCLING_DEPLOYMENT_MODE,
)
if ENABLE_DOCLING_METERING
else None
)

# Backend-side Docling polling coordinator. Constructed once as a
# singleton (it is stateless) and gated by ENABLE_BACKEND_DOCLING_POLLING
# so operators can roll back to the legacy single-call ingestion path
Expand All @@ -97,6 +113,7 @@ async def initialize_services():
langflow_file_service = LangflowFileService(
flows_service=flows_service,
docling_service=clients.docling_service,
metering_service=docling_metering_service,
)
langflow_mcp_service = LangflowMCPService()

Expand Down Expand Up @@ -213,6 +230,7 @@ def _lazy_session_factory():
"langflow_mcp_service": langflow_mcp_service,
"docling_service": clients.docling_service,
"docling_polling_service": docling_polling_service,
"docling_metering_service": docling_metering_service,
"rbac_service": rbac_service,
"workspace_config_service": workspace_config_service,
}
20 changes: 20 additions & 0 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@
DOCLING_POLL_BACKOFF_FACTOR = get_env_float("DOCLING_POLL_BACKOFF_FACTOR", 1.5)
DOCLING_POLL_TRANSIENT_RETRIES = get_env_int("DOCLING_POLL_TRANSIENT_RETRIES", 5)

# Docling usage metering configuration.
# When enabled, a JSONL record is appended for every file submitted to Docling,
# capturing timing, outcome, size, and owner for billing/auditing purposes.
ENABLE_DOCLING_METERING = os.getenv("ENABLE_DOCLING_METERING", "true").lower() in (
"true",
"1",
"yes",
)

from config.paths import get_data_file as _get_data_file

DOCLING_METERING_LOG_PATH = os.getenv(
"DOCLING_METERING_LOG_PATH", _get_data_file("docling_tasks_logs.jsonl")
)
Comment on lines +211 to +215
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move the get_data_file import to the module import section.

Line 211 introduces a module-level import after executable statements, which triggers Ruff E402 and fails CI.

💡 Proposed fix
-from config.paths import get_flows_path
+from config.paths import get_data_file, get_flows_path
...
-from config.paths import get_data_file as _get_data_file
-
 DOCLING_METERING_LOG_PATH = os.getenv(
-    "DOCLING_METERING_LOG_PATH", _get_data_file("docling_tasks_logs.jsonl")
+    "DOCLING_METERING_LOG_PATH", get_data_file("docling_tasks_logs.jsonl")
 )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/config/settings.py` around lines 211 - 215, The import of get_data_file
is currently done inline as "from config.paths import get_data_file as
_get_data_file" after executable code which triggers Ruff E402; move that import
into the top module import section with the other imports so it is a normal
module-level import, then update the usage that sets DOCLING_METERING_LOG_PATH
to call _get_data_file (or rename to get_data_file) as before; ensure the symbol
referenced is config.paths.get_data_file (or _get_data_file) so the assignment
to DOCLING_METERING_LOG_PATH remains unchanged and CI no longer fails.

# Deployment mode of the Docling Serve instance — "direct" (synchronous) or
# "rq" (Redis Queue, asynchronous). Stored in each meter record so that
# elapsed_seconds can be interpreted correctly: in RQ mode it includes
# queue wait time in addition to conversion time.
DOCLING_DEPLOYMENT_MODE = os.getenv("DOCLING_DEPLOYMENT_MODE", "direct").lower()


def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)"""
Expand All @@ -210,7 +230,7 @@

# Webhook configuration - must be set to enable webhooks
WEBHOOK_BASE_URL = os.getenv("WEBHOOK_BASE_URL") # No default - must be explicitly configured

Check failure on line 233 in src/config/settings.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (E402)

src/config/settings.py:233:1: E402 Module level import not at top of file
# OAuth callback broker URL -- when set, Google (and other providers) redirect
# here instead of directly to the frontend. The broker then forwards to the
# actual frontend origin that is carried in the OAuth state parameter.
Expand Down
4 changes: 4 additions & 0 deletions src/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def get_docling_polling_service(services: dict = Depends(get_services)):
return services["docling_polling_service"]


def get_docling_metering_service(services: dict = Depends(get_services)):
return services.get("docling_metering_service")


def get_rbac_service(services: dict = Depends(get_services)):
return services["rbac_service"]

Expand Down
113 changes: 113 additions & 0 deletions src/services/docling_metering_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Docling usage metering — append-only JSONL record per file submission.

Each record captures the full lifecycle of one Docling conversion attempt:
submission timestamp, terminal outcome, wall-clock elapsed time, file
metadata, and the owner's user id. Records are written atomically (one
JSON line each) under an asyncio lock so concurrent ingestion tasks never
interleave partial writes.

Deployment mode awareness
--------------------------
When Docling is deployed with Redis Queue (``deployment_mode="rq"``), the
task sits in a queue before a worker picks it up. The ``elapsed_seconds``
field therefore includes queue wait time in addition to GPU/CPU conversion
time. The ``deployment_mode`` field in each record lets downstream
billing logic account for this distinction.
"""

import asyncio
import dataclasses
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional

Comment on lines +23 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
ruff check src/services/docling_metering_service.py

Repository: langflow-ai/openrag

Length of output: 2871


Resolve the Ruff UP045/UP017 failures in this module.

This file currently fails lint on Optional[...] and timezone.utc usage, blocking the pipeline. The fixes are mechanical and low-risk.

Update the imports and type annotations:

  • Change from datetime import datetime, timezone to from datetime import UTC, datetime
  • Remove from typing import Optional (use X | None syntax instead)
  • Replace all Optional[str] with str | None (lines 39, 44, 72, 77)
  • Replace timezone.utc with UTC (line 50)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/services/docling_metering_service.py` around lines 23 - 25, Update the
imports and annotations to fix Ruff UP045/UP017: replace "from datetime import
datetime, timezone" with "from datetime import UTC, datetime", remove the "from
typing import Optional" import, change all occurrences of "Optional[str]" to the
union form "str | None" (in the functions/variables around the symbols that
currently reference Optional at lines where variables/params are declared,
including the annotations near the functions/variables referenced by names in
this module), and replace any use of "timezone.utc" with "UTC" (notably where
datetime.now(...) is called). Ensure you update the four specific Optional[str]
occurrences and the single timezone.utc usage to the new forms.

import aiofiles

from utils.logging_config import get_logger

logger = get_logger(__name__)


@dataclass
class DoclingMeterRecord:
task_id: str
filename: str
size_bytes: int
mimetype: str
owner_user_id: Optional[str]

Check failure on line 39 in src/services/docling_metering_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_metering_service.py:39:20: UP045 Use `X | None` for type annotations help: Convert to `X | None`
submitted_at: str # ISO-8601 UTC timestamp
terminal_at: str # ISO-8601 UTC timestamp
elapsed_seconds: float # wall-clock from submission to terminal
outcome: str # "success" | "failed" | "expired" | "timeout" | "submit_failed"
failure_detail: Optional[str]

Check failure on line 44 in src/services/docling_metering_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_metering_service.py:44:21: UP045 Use `X | None` for type annotations help: Convert to `X | None`
poll_count: int # status-check calls made; 0 for legacy (Langflow-polling) path
deployment_mode: str # "direct" | "rq"


def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()

Check failure on line 50 in src/services/docling_metering_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP017)

src/services/docling_metering_service.py:50:25: UP017 Use `datetime.UTC` alias help: Convert to `datetime.UTC` alias


class DoclingMeteringService:
"""Writes one JSONL record per Docling file submission to a log file."""

def __init__(self, log_path: str, deployment_mode: str = "direct"):
self._log_path = log_path
self._deployment_mode = deployment_mode
self._lock = asyncio.Lock()

@property
def deployment_mode(self) -> str:
return self._deployment_mode

def build_record(
self,
*,
task_id: str,
filename: str,
size_bytes: int,
mimetype: str,
owner_user_id: Optional[str],

Check failure on line 72 in src/services/docling_metering_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_metering_service.py:72:24: UP045 Use `X | None` for type annotations help: Convert to `X | None`
submitted_at: str,
terminal_at: str,
elapsed_seconds: float,
outcome: str,
failure_detail: Optional[str] = None,

Check failure on line 77 in src/services/docling_metering_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_metering_service.py:77:25: UP045 Use `X | None` for type annotations help: Convert to `X | None`
poll_count: int = 0,
) -> DoclingMeterRecord:
return DoclingMeterRecord(
task_id=task_id,
filename=filename,
size_bytes=size_bytes,
mimetype=mimetype,
owner_user_id=owner_user_id,
submitted_at=submitted_at,
terminal_at=terminal_at,
elapsed_seconds=round(elapsed_seconds, 3),
outcome=outcome,
failure_detail=failure_detail,
poll_count=poll_count,
deployment_mode=self._deployment_mode,
)

async def record(self, meter_record: DoclingMeterRecord) -> None:
"""Append *meter_record* as a single JSON line to the metering log.

Errors are swallowed with a warning so a metering failure never
interrupts the ingestion path.
"""
line = json.dumps(dataclasses.asdict(meter_record), default=str) + "\n"
async with self._lock:
try:
log_dir = os.path.dirname(os.path.abspath(self._log_path))
os.makedirs(log_dir, exist_ok=True)
async with aiofiles.open(self._log_path, "a", encoding="utf-8") as fh:
await fh.write(line)
except Exception as exc:
logger.warning(
"Failed to write Docling meter record",
error=str(exc),
task_id=meter_record.task_id,
)
32 changes: 31 additions & 1 deletion src/services/docling_polling_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
logger = get_logger(__name__)


class PollOutcome(str, Enum):

Check failure on line 25 in src/services/docling_polling_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP042)

src/services/docling_polling_service.py:25:7: UP042 Class PollOutcome inherits from both `str` and `enum.Enum` help: Inherit from `enum.StrEnum`
SUCCESS = "success"
FAILED = "failed"
EXPIRED = "expired"
Expand All @@ -32,9 +32,10 @@
@dataclass
class DoclingPollResult:
outcome: PollOutcome
detail: Optional[str] = None

Check failure on line 35 in src/services/docling_polling_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_polling_service.py:35:13: UP045 Use `X | None` for type annotations help: Convert to `X | None`
last_snapshot: Optional[DoclingStatusSnapshot] = None

Check failure on line 36 in src/services/docling_polling_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_polling_service.py:36:20: UP045 Use `X | None` for type annotations help: Convert to `X | None`
elapsed_seconds: float = 0.0
poll_count: int = 0


class DoclingPollingService:
Expand All @@ -54,6 +55,11 @@
) -> DoclingPollResult:
"""Loop on Docling status until terminal or until max_seconds elapses.

A SUCCESS status is treated as ready only after the result endpoint
returns a payload with usable ``document.json_content``. This prevents
handing Langflow a task that Docling accepted but failed to convert
into a consumable document.

Transient errors (network, 5xx, NOT_FOUND seen briefly before the task
is registered server-side) are absorbed up to ``transient_retry_budget``
before being surfaced as failures. The interval grows by
Expand All @@ -69,27 +75,48 @@
deadline = start + max_seconds
interval = poll_interval
consecutive_not_found = 0
last_snapshot: Optional[DoclingStatusSnapshot] = None

Check failure on line 78 in src/services/docling_polling_service.py

View workflow job for this annotation

GitHub Actions / Ruff and mypy on changed files

ruff (UP045)

src/services/docling_polling_service.py:78:24: UP045 Use `X | None` for type annotations help: Convert to `X | None`
poll_count = 0

logger.debug("Starting Docling polling", task_id=task_id)

while True:
logger.debug("Docling polling", task_id=task_id)
snapshot = await self.docling_service.check_task_status(task_id)
last_snapshot = snapshot
poll_count += 1
logger.debug("Snapshot received", task_id=task_id, snapshot=last_snapshot)
elapsed = time.monotonic() - start

if snapshot.state == DoclingTaskState.SUCCESS:
try:
await self.docling_service.fetch_task_result(task_id)
except Exception as e:
detail = f"Docling result unavailable after SUCCESS status: {str(e)}"
logger.warning(
"Docling task reached SUCCESS but result fetch failed",
task_id=task_id,
detail=detail,
elapsed_seconds=round(elapsed, 2),
)
return DoclingPollResult(
outcome=PollOutcome.FAILED,
detail=detail,
last_snapshot=snapshot,
elapsed_seconds=elapsed,
poll_count=poll_count,
)

logger.debug(
"Docling task reached SUCCESS",
"Docling task reached SUCCESS and result is available",
task_id=task_id,
elapsed_seconds=round(elapsed, 2),
)
return DoclingPollResult(
outcome=PollOutcome.SUCCESS,
last_snapshot=snapshot,
elapsed_seconds=elapsed,
poll_count=poll_count,
)

if snapshot.state == DoclingTaskState.FAILED:
Expand All @@ -104,6 +131,7 @@
detail=snapshot.detail or "Docling reported failure",
last_snapshot=snapshot,
elapsed_seconds=elapsed,
poll_count=poll_count,
)

if snapshot.state == DoclingTaskState.NOT_FOUND:
Expand All @@ -122,6 +150,7 @@
detail="Docling task not found (expired or unknown task_id)",
last_snapshot=snapshot,
elapsed_seconds=elapsed,
poll_count=poll_count,
)
else:
consecutive_not_found = 0
Expand All @@ -139,6 +168,7 @@
detail=f"Docling polling timed out after {max_seconds}s",
last_snapshot=snapshot,
elapsed_seconds=time.monotonic() - start,
poll_count=poll_count,
)
await asyncio.sleep(min(interval, remaining))
interval = min(interval * backoff_factor, max_interval)
2 changes: 1 addition & 1 deletion src/services/docling_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from typing import Any
from typing import Any, Dict, Optional

import httpx
from pydantic import BaseModel
Expand Down
Loading
Loading