feat: docling document metering via docling polling to generate logs#1620
feat: docling document metering via docling polling to generate logs#1620ricofurtado wants to merge 3 commits into
Conversation
… fixture in tests
WalkthroughThis PR introduces a complete JSONL-based metering system for Docling file submissions: configurable settings and service wiring enable optional recording; a new ChangesMetering and Result Validation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
tests/unit/test_docling_metering_service.py (1)
151-243: ⚡ Quick winAdd a regression test for Docling submission failure metering.
Current tests validate post-submission outcomes, but not the
submit_to_doclingexception path. Once fixed in service code, a dedicated test should assertoutcome="submit_failed"is recorded.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/test_docling_metering_service.py` around lines 151 - 243, Add a regression test that injects a failing submit_to_docling path and asserts metering records outcome="submit_failed"; specifically, in the tests for LangflowFileService.upload_and_ingest_file, patch or set langflow_service.submit_to_docling (or svc.submit_to_docling) to raise an exception, call upload_and_ingest_file with mock_polling_service (or None as needed), catch the exception if the service propagates it, then inspect mock_metering_service.build_record.call_args.kwargs to assert build_kwargs["outcome"] == "submit_failed" and include any expected failure_detail; mirror naming and setup used in existing tests (e.g., mock_metering_service, file_tuple, file_task) so the new test follows the test_metering_* pattern.tests/unit/test_docling_polling_service.py (1)
43-53: ⚡ Quick winAssert
poll_countin these updated tests to lock the metering contract.These tests now cover the fetch-on-success behavior, but they still don’t verify
result.poll_count, which is part of the new polling/metering contract. Please assert expected values (e.g., immediate terminal paths should be1, processing sequence should match number of status checks).Suggested assertions
async def test_returns_success_immediately_when_already_done(polling_service, mock_docling_service): @@ assert result.outcome == PollOutcome.SUCCESS + assert result.poll_count == 1 assert mock_docling_service.check_task_status.call_count == 1 mock_docling_service.fetch_task_result.assert_awaited_once_with("t1") @@ async def test_loops_through_processing_then_success( @@ assert result.outcome == PollOutcome.SUCCESS + assert result.poll_count == 4 assert mock_docling_service.check_task_status.call_count == 4 @@ async def test_success_status_requires_fetchable_result(polling_service, mock_docling_service): @@ assert result.outcome == PollOutcome.FAILED + assert result.poll_count == 1 assert "missing document.json_content" in (result.detail or "")Also applies to: 57-75, 78-91
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/test_docling_polling_service.py` around lines 43 - 53, The tests must assert the new polling/metering field result.poll_count to lock the contract: update test_returns_success_immediately_when_already_done (and the other tests at lines 57-75 and 78-91) to include an assertion that result.poll_count == 1 for the immediate-success path; for tests modeling repeated status checks assert result.poll_count equals the number of check_task_status calls to match polling_service.poll_until_ready behavior and PollOutcome constants (e.g., PollOutcome.SUCCESS). Use the existing mocks (mock_docling_service.check_task_status) and the returned result from polling_service.poll_until_ready to derive and assert the expected poll_count.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/config/settings.py`:
- Around line 211-215: The import of get_data_file is currently done inline as
"from config.paths import get_data_file as _get_data_file" after executable code
which triggers Ruff E402; move that import into the top module import section
with the other imports so it is a normal module-level import, then update the
usage that sets DOCLING_METERING_LOG_PATH to call _get_data_file (or rename to
get_data_file) as before; ensure the symbol referenced is
config.paths.get_data_file (or _get_data_file) so the assignment to
DOCLING_METERING_LOG_PATH remains unchanged and CI no longer fails.
In `@src/services/docling_metering_service.py`:
- Around line 23-25: Update the imports and annotations to fix Ruff UP045/UP017:
replace "from datetime import datetime, timezone" with "from datetime import
UTC, datetime", remove the "from typing import Optional" import, change all
occurrences of "Optional[str]" to the union form "str | None" (in the
functions/variables around the symbols that currently reference Optional at
lines where variables/params are declared, including the annotations near the
functions/variables referenced by names in this module), and replace any use of
"timezone.utc" with "UTC" (notably where datetime.now(...) is called). Ensure
you update the four specific Optional[str] occurrences and the single
timezone.utc usage to the new forms.
In `@src/services/langflow_file_service.py`:
- Around line 649-660: The metering call self._record_meter(...) is awaited and
blocks ingestion; change it to fire-and-forget by scheduling it as a background
task (e.g., asyncio.create_task or the service's event loop) instead of awaiting
it wherever used (including the occurrences around poll_result handling and the
other noted blocks), and ensure the task is protected against exceptions (either
by adding try/except inside _record_meter or attaching a done-callback to log
exceptions) so failures won't be unhandled or crash the process.
- Around line 603-606: Wrap the call to submit_to_docling(...) in a try/except
so that if submit_to_docling(filename, content, owner=owner,
jwt_token=jwt_token) raises, you still emit a metering event indicating a failed
submission (include filename/owner/timestamp/error) via the service's existing
metering API (e.g., self.meter.record or self.record_meter_event) and then
re-raise the exception; keep successful path behavior (setting submitted_at and
_submit_wall) unchanged when no error occurs.
---
Nitpick comments:
In `@tests/unit/test_docling_metering_service.py`:
- Around line 151-243: Add a regression test that injects a failing
submit_to_docling path and asserts metering records outcome="submit_failed";
specifically, in the tests for LangflowFileService.upload_and_ingest_file, patch
or set langflow_service.submit_to_docling (or svc.submit_to_docling) to raise an
exception, call upload_and_ingest_file with mock_polling_service (or None as
needed), catch the exception if the service propagates it, then inspect
mock_metering_service.build_record.call_args.kwargs to assert
build_kwargs["outcome"] == "submit_failed" and include any expected
failure_detail; mirror naming and setup used in existing tests (e.g.,
mock_metering_service, file_tuple, file_task) so the new test follows the
test_metering_* pattern.
In `@tests/unit/test_docling_polling_service.py`:
- Around line 43-53: The tests must assert the new polling/metering field
result.poll_count to lock the contract: update
test_returns_success_immediately_when_already_done (and the other tests at lines
57-75 and 78-91) to include an assertion that result.poll_count == 1 for the
immediate-success path; for tests modeling repeated status checks assert
result.poll_count equals the number of check_task_status calls to match
polling_service.poll_until_ready behavior and PollOutcome constants (e.g.,
PollOutcome.SUCCESS). Use the existing mocks
(mock_docling_service.check_task_status) and the returned result from
polling_service.poll_until_ready to derive and assert the expected poll_count.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d4d6299f-5815-4978-b378-94af391ddef5
📒 Files selected for processing (9)
src/app/container.pysrc/config/settings.pysrc/dependencies.pysrc/services/docling_metering_service.pysrc/services/docling_polling_service.pysrc/services/docling_service.pysrc/services/langflow_file_service.pytests/unit/test_docling_metering_service.pytests/unit/test_docling_polling_service.py
| from config.paths import get_data_file as _get_data_file | ||
|
|
||
| DOCLING_METERING_LOG_PATH = os.getenv( | ||
| "DOCLING_METERING_LOG_PATH", _get_data_file("docling_tasks_logs.jsonl") | ||
| ) |
There was a problem hiding this comment.
Move the get_data_file import to the module import section.
Line 211 introduces a module-level import after executable statements, which triggers Ruff E402 and fails CI.
💡 Proposed fix
-from config.paths import get_flows_path
+from config.paths import get_data_file, get_flows_path
...
-from config.paths import get_data_file as _get_data_file
-
DOCLING_METERING_LOG_PATH = os.getenv(
- "DOCLING_METERING_LOG_PATH", _get_data_file("docling_tasks_logs.jsonl")
+ "DOCLING_METERING_LOG_PATH", get_data_file("docling_tasks_logs.jsonl")
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/config/settings.py` around lines 211 - 215, The import of get_data_file
is currently done inline as "from config.paths import get_data_file as
_get_data_file" after executable code which triggers Ruff E402; move that import
into the top module import section with the other imports so it is a normal
module-level import, then update the usage that sets DOCLING_METERING_LOG_PATH
to call _get_data_file (or rename to get_data_file) as before; ensure the symbol
referenced is config.paths.get_data_file (or _get_data_file) so the assignment
to DOCLING_METERING_LOG_PATH remains unchanged and CI no longer fails.
| from datetime import datetime, timezone | ||
| from typing import Optional | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
ruff check src/services/docling_metering_service.pyRepository: langflow-ai/openrag
Length of output: 2871
Resolve the Ruff UP045/UP017 failures in this module.
This file currently fails lint on Optional[...] and timezone.utc usage, blocking the pipeline. The fixes are mechanical and low-risk.
Update the imports and type annotations:
- Change
from datetime import datetime, timezonetofrom datetime import UTC, datetime - Remove
from typing import Optional(useX | Nonesyntax instead) - Replace all
Optional[str]withstr | None(lines 39, 44, 72, 77) - Replace
timezone.utcwithUTC(line 50)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/services/docling_metering_service.py` around lines 23 - 25, Update the
imports and annotations to fix Ruff UP045/UP017: replace "from datetime import
datetime, timezone" with "from datetime import UTC, datetime", remove the "from
typing import Optional" import, change all occurrences of "Optional[str]" to the
union form "str | None" (in the functions/variables around the symbols that
currently reference Optional at lines where variables/params are declared,
including the annotations near the functions/variables referenced by names in
this module), and replace any use of "timezone.utc" with "UTC" (notably where
datetime.now(...) is called). Ensure you update the four specific Optional[str]
occurrences and the single timezone.utc usage to the new forms.
| task_id = await self.submit_to_docling(filename, content, owner=owner, jwt_token=jwt_token) | ||
| submitted_at = datetime.now(timezone.utc).isoformat() | ||
| _submit_wall = time.monotonic() | ||
|
|
There was a problem hiding this comment.
Record a metering event when Docling submission itself fails.
If submit_to_docling(...) raises, no meter record is emitted, so failed conversion attempts are currently missing from billing/audit logs.
💡 Proposed fix
- task_id = await self.submit_to_docling(filename, content, owner=owner, jwt_token=jwt_token)
- submitted_at = datetime.now(timezone.utc).isoformat()
- _submit_wall = time.monotonic()
+ submitted_at = datetime.now(timezone.utc).isoformat()
+ _submit_wall = time.monotonic()
+ try:
+ task_id = await self.submit_to_docling(
+ filename, content, owner=owner, jwt_token=jwt_token
+ )
+ except Exception as e:
+ await self._record_meter(
+ task_id=f"submit_failed:{filename}:{int(time.time()*1000)}",
+ filename=filename,
+ size_bytes=size_bytes,
+ mimetype=mimetype,
+ owner=owner,
+ submitted_at=submitted_at,
+ elapsed_seconds=time.monotonic() - _submit_wall,
+ outcome="submit_failed",
+ failure_detail=str(e),
+ poll_count=0,
+ )
+ raise🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/services/langflow_file_service.py` around lines 603 - 606, Wrap the call
to submit_to_docling(...) in a try/except so that if submit_to_docling(filename,
content, owner=owner, jwt_token=jwt_token) raises, you still emit a metering
event indicating a failed submission (include filename/owner/timestamp/error)
via the service's existing metering API (e.g., self.meter.record or
self.record_meter_event) and then re-raise the exception; keep successful path
behavior (setting submitted_at and _submit_wall) unchanged when no error occurs.
| await self._record_meter( | ||
| task_id=task_id, | ||
| filename=filename, | ||
| size_bytes=size_bytes, | ||
| mimetype=mimetype, | ||
| owner=owner, | ||
| submitted_at=submitted_at, | ||
| elapsed_seconds=time.monotonic() - _submit_wall, | ||
| outcome=poll_result.outcome.value, | ||
| failure_detail=poll_result.detail, | ||
| poll_count=poll_count, | ||
| ) |
There was a problem hiding this comment.
The metering path is currently blocking ingestion despite “fire-and-forget” intent.
await self._record_meter(...) on all terminal paths adds write latency to user-facing ingestion flow.
💡 Proposed fix
+ def _record_meter_background(self, **kwargs) -> None:
+ if self.metering_service is None:
+ return
+ task = asyncio.create_task(self._record_meter(**kwargs))
+ task.add_done_callback(lambda t: t.exception()) # force retrieval; _record_meter handles/logs
- await self._record_meter(
+ self._record_meter_background(
task_id=task_id,
filename=filename,
size_bytes=size_bytes,
mimetype=mimetype,
owner=owner,
submitted_at=submitted_at,
elapsed_seconds=time.monotonic() - _submit_wall,
outcome=poll_result.outcome.value,
failure_detail=poll_result.detail,
poll_count=poll_count,
)
...
- await self._record_meter(
+ self._record_meter_background(
task_id=task_id,
filename=filename,
size_bytes=size_bytes,
mimetype=mimetype,
owner=owner,
submitted_at=submitted_at,
elapsed_seconds=time.monotonic() - _submit_wall,
outcome="langflow_failed",
failure_detail=str(e),
poll_count=poll_count,
)
...
- await self._record_meter(
+ self._record_meter_background(
task_id=task_id,
filename=filename,
size_bytes=size_bytes,
mimetype=mimetype,
owner=owner,
submitted_at=submitted_at,
elapsed_seconds=time.monotonic() - _submit_wall,
outcome="success",
poll_count=poll_count,
)Also applies to: 709-720, 733-743, 766-767
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/services/langflow_file_service.py` around lines 649 - 660, The metering
call self._record_meter(...) is awaited and blocks ingestion; change it to
fire-and-forget by scheduling it as a background task (e.g., asyncio.create_task
or the service's event loop) instead of awaiting it wherever used (including the
occurrences around poll_result handling and the other noted blocks), and ensure
the task is protected against exceptions (either by adding try/except inside
_record_meter or attaching a done-callback to log exceptions) so failures won't
be unhandled or crash the process.
This pull request introduces Docling usage metering to support billing and auditing by logging detailed records for every file submitted to Docling. Metering is optional and controlled by configuration, with minimal impact when disabled. The implementation ensures robust, atomic logging and integrates seamlessly with the existing ingestion flow. Additionally, the polling logic is enhanced to provide more accurate status tracking.
Key changes include:
Docling Usage Metering (Billing/Auditing):
DoclingMeteringService(src/services/docling_metering_service.py) that writes a JSONL record for each file conversion attempt, capturing metadata such as timing, outcome, file details, user, and deployment mode. Records are written atomically with an asyncio lock to avoid concurrency issues.DoclingMeteringServiceinto the service container and dependency injection system, making it available throughout the application when enabled. [1] [2] [3] [4] [5] [6]settings.pyto control metering enablement, log path, and deployment mode.Ingestion Flow Integration:
LangflowFileServiceto record metering events at all major ingestion outcomes (success, failure, or Langflow error), using a new_record_meterhelper method. Metering is fire-and-forget and does not block or fail the main ingestion path. [1] [2] [3] [4] [5]Polling and Status Tracking Improvements:
DoclingPollingServiceto track and return the number of poll attempts (poll_count) and ensure that a Docling task is only considered successful if the result is actually available and fetchable. [1] [2] [3] [4] [5] [6]poll_countfor accurate metering.Other Minor Changes:
These changes provide a solid foundation for future billing and auditing features while maintaining performance and reliability.
Summary by CodeRabbit
New Features
Tests