fix: unify connector ingestion pipelines and consolidate service layer#1695
fix: unify connector ingestion pipelines and consolidate service layer#1695lucaseduoli wants to merge 4 commits into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (2)
WalkthroughThis PR consolidates the dual connector service pattern into a single unified ChangesConnector Service Unification
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
tests/unit/api/test_reconcile_orphans_for_connector_type.py (1)
320-323:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
test_delete_failure_does_not_raiseis currently testing search failure, not delete failure.Line 331 injects an exception on
search, so this misses coverage for exceptions during the per-documentdeleteloop.Align setup with test intent
- opensearch_client = AsyncMock() - opensearch_client.search.side_effect = RuntimeError("opensearch unavailable") + opensearch_client = AsyncMock() + opensearch_client.search = AsyncMock( + return_value={"hits": {"hits": [{"_id": "chunk-b-1"}]}, "_scroll_id": None} + ) + opensearch_client.delete = AsyncMock(side_effect=RuntimeError("opensearch unavailable"))Also applies to: 331-343
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py` around lines 320 - 323, The test test_delete_failure_does_not_raise currently injects an exception on the search call but intends to simulate a failure during per-document deletion; update the test to raise the exception from the mock that implements the per-document delete used by reconcile_orphans_for_connector_type (replace the side_effect on the mocked search with a side_effect on the delete method actually invoked in the deletion loop — e.g., the mocked client's delete/delete_one/delete_object/bulk_delete method used by the helper), so the delete loop raises and the helper's swallowing behavior is exercised.src/services/langflow_file_service.py (1)
663-690:⚠️ Potential issue | 🟠 Major | ⚡ Quick winForward
embeddingModelintoselected_embedding_model.
merge_ui_ingest_settings_into_tweaks()explicitly dropsembeddingModeland expects it to reach Langflow throughrun_ingestion_flow(..., selected_embedding_model=...), butupload_and_ingest_file()never passes it. Any Langflow ingest with a non-default embedding model will silently use the workspace default instead.Suggested fix
final_tweaks = LangflowFileService.merge_ui_ingest_settings_into_tweaks(tweaks, settings) + selected_embedding_model = None + if isinstance(settings, dict): + model = settings.get("embeddingModel") + if isinstance(model, str) and model.strip(): + selected_embedding_model = model.strip() if settings: logger.debug( "[LF] Applying ingestion settings", extra={"settings": settings, "tweaks": final_tweaks}, ) @@ owner_email=owner_email, connector_type=connector_type, docling_task_id=task_id, + selected_embedding_model=selected_embedding_model, document_id=document_id, source_url=source_url, allowed_users=allowed_users, allowed_groups=allowed_groups, )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/services/langflow_file_service.py` around lines 663 - 690, upload_and_ingest_file is calling merge_ui_ingest_settings_into_tweaks which drops embeddingModel but never forwards it into run_ingestion_flow, so non-default embedding models fall back to workspace defaults; fix by extracting embeddingModel from settings (or final_tweaks if appropriate) and pass it as selected_embedding_model to run_ingestion_flow (keep references to merge_ui_ingest_settings_into_tweaks, upload_and_ingest_file, run_ingestion_flow, embeddingModel, selected_embedding_model) so Langflow receives the chosen embedding model.tests/unit/connectors/test_langflow_connector_txt_to_md.py (1)
62-110:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winPin the Langflow feature flag in these regressions.
Both tests currently rely on the default value of
config.settings.DISABLE_INGEST_WITH_LANGFLOW. If that default changes,process_connector_document()will take the standard path instead, so this file no longer validates the.txt -> .mdLangflow contract it is supposed to guard.🔧 Proposed fix
`@pytest.mark.asyncio` -async def test_sharepoint_txt_is_uploaded_to_langflow_as_md(): +async def test_sharepoint_txt_is_uploaded_to_langflow_as_md(monkeypatch): + monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False) """A connector document with mimetype text/plain must reach Langflow's upload_and_ingest_file with a .md filename and text/markdown mimetype.""" service, langflow_service = _make_service() document = _make_document(filename="notes.txt", mimetype="text/plain") @@ `@pytest.mark.asyncio` -async def test_sharepoint_pdf_passes_through_untouched(): +async def test_sharepoint_pdf_passes_through_untouched(monkeypatch): + monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False) """The rename rule must NOT touch non-.txt files.""" service, langflow_service = _make_service() document = _make_document( filename="report.pdf", mimetype="application/pdf", content=b"%PDF-1.4..." )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/connectors/test_langflow_connector_txt_to_md.py` around lines 62 - 110, The tests rely on the global feature flag config.settings.DISABLE_INGEST_WITH_LANGFLOW and must pin it so Langflow ingestion runs; update both test_sharepoint_txt_is_uploaded_to_langflow_as_md and test_sharepoint_pdf_passes_through_untouched to explicitly set config.settings.DISABLE_INGEST_WITH_LANGFLOW = False (or use the test runner's monkeypatch to set that attribute) before calling service.process_connector_document so the code path that calls process_connector_document -> upload_and_ingest_file is exercised; ensure the change is applied inside each test (or in the shared _make_service/test setup) and cleaned up if using global mutation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/connectors/service.py`:
- Around line 450-458: The current expansion only consumes the first page from
connector.list_files() causing incomplete folder expansion; change the code to
loop calling connector.list_files() repeatedly using the returned nextPageToken
(e.g., result.get("nextPageToken")) until no token remains, concatenating each
page's "files" into expanded_files (and appending into expanded_files_info) and
rebuilding expanded_file_ids from the accumulated list so all pages' children
are included; use the same pagination pattern as sync_connector_files() and
reference connector.list_files(), expanded_files, expanded_files_info, and
expanded_file_ids when applying the fix.
In `@src/models/processors.py`:
- Around line 617-624: The fast-path uses hash_id(tmp_path) as the dedupe key
but check_document_exists and the later indexing use document.id, causing
mismatches and re-ingestion; change the fast-path to compute the same persisted
document id used when indexing (use the same function/logic that produces
document.id instead of hash_id(tmp_path)), pass that id into
check_document_exists(document_id, opensearch_client), and set file_task.result
id to the same document.id so the dedupe key is consistent (affects the block
around file_hash, hash_id, check_document_exists and the later indexing that
sets document.id).
- Around line 666-673: The empty-list initializers for allowed_users and
allowed_groups need explicit typing so mypy can infer element types; change the
declarations to annotated lists (e.g., allowed_users: List[<element_type>] = []
and allowed_groups: List[<element_type>] = []) using the same element type as
document.acl.allowed_users / document.acl.allowed_groups (import List from
typing or use list[...]) and keep the existing try/except and assignments
(allowed_users = document.acl.allowed_users or []; allowed_groups =
document.acl.allowed_groups or []) so mypy is satisfied.
In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py`:
- Around line 110-112: Tests currently assert that
opensearch_client.delete_by_query was not awaited, but the code now uses
opensearch_client.search + opensearch_client.delete; update the assertions to
reflect the new API by asserting that opensearch_client.search was called (or
called_with the expected query) and that opensearch_client.delete was not
awaited (or awaited the expected number of times) where no deletions should
occur. Replace occurrences of delete_by_query.assert_not_awaited() with
assertions on opensearch_client.search (e.g., assert_called_once_with/contains
expected params) and opensearch_client.delete.assert_not_awaited() (or
appropriate call count) in the tests covering
reconcile_orphans_for_connector_type and the other listed blocks.
---
Outside diff comments:
In `@src/services/langflow_file_service.py`:
- Around line 663-690: upload_and_ingest_file is calling
merge_ui_ingest_settings_into_tweaks which drops embeddingModel but never
forwards it into run_ingestion_flow, so non-default embedding models fall back
to workspace defaults; fix by extracting embeddingModel from settings (or
final_tweaks if appropriate) and pass it as selected_embedding_model to
run_ingestion_flow (keep references to merge_ui_ingest_settings_into_tweaks,
upload_and_ingest_file, run_ingestion_flow, embeddingModel,
selected_embedding_model) so Langflow receives the chosen embedding model.
In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py`:
- Around line 320-323: The test test_delete_failure_does_not_raise currently
injects an exception on the search call but intends to simulate a failure during
per-document deletion; update the test to raise the exception from the mock that
implements the per-document delete used by reconcile_orphans_for_connector_type
(replace the side_effect on the mocked search with a side_effect on the delete
method actually invoked in the deletion loop — e.g., the mocked client's
delete/delete_one/delete_object/bulk_delete method used by the helper), so the
delete loop raises and the helper's swallowing behavior is exercised.
In `@tests/unit/connectors/test_langflow_connector_txt_to_md.py`:
- Around line 62-110: The tests rely on the global feature flag
config.settings.DISABLE_INGEST_WITH_LANGFLOW and must pin it so Langflow
ingestion runs; update both test_sharepoint_txt_is_uploaded_to_langflow_as_md
and test_sharepoint_pdf_passes_through_untouched to explicitly set
config.settings.DISABLE_INGEST_WITH_LANGFLOW = False (or use the test runner's
monkeypatch to set that attribute) before calling
service.process_connector_document so the code path that calls
process_connector_document -> upload_and_ingest_file is exercised; ensure the
change is applied inside each test (or in the shared _make_service/test setup)
and cleaned up if using global mutation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bc9b27bb-9344-4ba8-b2ed-8decc73e0b0b
📒 Files selected for processing (10)
src/api/connector_router.pysrc/app/container.pysrc/connectors/langflow_connector_service.pysrc/connectors/service.pysrc/models/processors.pysrc/services/langflow_file_service.pytests/unit/api/test_reconcile_orphans_for_connector_type.pytests/unit/connectors/test_langflow_connector_txt_to_md.pytests/unit/test_connector_processor_filename_dedupe.pytests/unit/test_processors_clear_stale_chunks.py
💤 Files with no reviewable changes (2)
- src/connectors/langflow_connector_service.py
- src/api/connector_router.py
| # Get the expanded list of file IDs (folders will be expanded to their contents) | ||
| # This uses the connector's list_files() which calls _iter_selected_items() | ||
| result = await connector.list_files() | ||
| expanded_files = result.get("files", []) | ||
| expanded_file_ids = [f["id"] for f in expanded_files] | ||
|
|
||
| # Save the expanded files info so we can set correct names in the task UI | ||
| for f in expanded_files: | ||
| expanded_files_info.append(f) | ||
| # Save the expanded files info so we can set correct names in the task UI | ||
| for f in expanded_files: | ||
| expanded_files_info.append(f) |
There was a problem hiding this comment.
Expand selected folders across all list_files() pages.
This only consumes the first list_files() page, so a selected folder with more than one page of children will sync a partial file set. sync_connector_files() already handles nextPageToken; this expansion path needs the same loop.
Suggested fix
- result = await connector.list_files()
- expanded_files = result.get("files", [])
- expanded_file_ids = [f["id"] for f in expanded_files]
-
- # Save the expanded files info so we can set correct names in the task UI
- for f in expanded_files:
- expanded_files_info.append(f)
+ expanded_file_ids = []
+ page_token = None
+ while True:
+ result = await connector.list_files(page_token)
+ expanded_files = result.get("files", [])
+ expanded_file_ids.extend(
+ f["id"] for f in expanded_files if f.get("id")
+ )
+ expanded_files_info.extend(expanded_files)
+
+ page_token = result.get("nextPageToken")
+ if not page_token:
+ breakAs per coding guidelines, "External data connectors (Google Drive, OneDrive, S3, IBM COS). New connectors must subclass connectors/base.py and register through connectors/service.py. Verify credentials are never logged, errors surface cleanly, and pagination is handled correctly."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/service.py` around lines 450 - 458, The current expansion only
consumes the first page from connector.list_files() causing incomplete folder
expansion; change the code to loop calling connector.list_files() repeatedly
using the returned nextPageToken (e.g., result.get("nextPageToken")) until no
token remains, concatenating each page's "files" into expanded_files (and
appending into expanded_files_info) and rebuilding expanded_file_ids from the
accumulated list so all pages' children are included; use the same pagination
pattern as sync_connector_files() and reference connector.list_files(),
expanded_files, expanded_files_info, and expanded_file_ids when applying the
fix.
There was a problem hiding this comment.
What is the page size? That is a totally silent issue, really difficult to notice in big buckets/folders
I'm wondering, when we have a huge amount of files in a folder, how it will affect the performance
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find list_files implementations in connectors
rg -n "def list_files\|async def list_files\|page_size\|max_files\|nextPageToken\|page_token" --type py src/connectors/ -C2Length of output: 4819
@ricofurtado Great question — both points are worth unpacking.
Page size today (before the pagination fix)
sync_specific_files calls connector.list_files() with no arguments at all. Looking at base.py:
async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:max_files=None is left to each connector implementation to interpret, so the effective page size is silently connector-specific (Google Drive typically caps at 100, OneDrive at 200, S3 implementation-dependent). There is no explicit cap here, and when the first page fills up, the rest are silently dropped — that's the core bug.
Compare with sync_connector_files, which is explicit:
page_size = min(max_files or 100, 1000) if max_files else 100
file_list = await connector.list_files(page_token, max_files=page_size)Performance concern with the pagination fix
Your concern is valid. Even with a proper pagination loop, the fix I suggested still collects all expanded file IDs and their metadata in memory before the task is created:
expanded_file_ids = [] # grows unboundedly
expanded_files_info = [] # sameFor a folder with tens of thousands of files this means:
- Many sequential API round-trips to exhaust all pages
- The entire file list held in memory before processing begins
- No back-pressure or streaming into the task queue
A more robust approach for huge folders would be to stream pages directly into task batches (similar to how sync_connector_files caps via max_files), or at minimum add a max_files guard to the expansion call so it mirrors the behaviour in sync_connector_files. This is a legitimate follow-up concern beyond the immediate pagination correctness fix.
Would you like me to open a GitHub issue to track this performance/scalability gap for large folder expansion in sync_specific_files?
🐇 🤔💭
🧬 Code Graph Analysis Results
src/connectors/service.py
Sync pagination + page size calculation
- Lines 259-380 (relevant excerpt around
page_sizeand the paginated loop):
# Collect files to process (limited by max_files)
files_to_process: list[dict[str, Any]] = []
page_token = None
# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100
while True:
# List files from connector with limit
logger.debug("Calling list_files", page_size=page_size, page_token=page_token)
file_list = await connector.list_files(page_token, max_files=page_size)
logger.debug("Got files from connector", file_count=len(file_list.get("files", [])))
files = file_list["files"]
if not files:
break
for file_info in files:
if max_files and len(files_to_process) >= max_files:
break
# Filter by filename if filter is provided
if filename_filter is not None:
file_name = file_info.get("name", "")
if file_name not in filename_filter:
logger.debug(
"Skipping file not in filter",
filename=file_name,
)
continue
files_to_process.append(file_info)
# Stop if we have enough files or no more pages
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
"nextPageToken"
):
break
page_token = file_list.get("nextPageToken")src/connectors/base.py
Connector interface for paginated listing
- Lines 99-101:
async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:
"""List all files. Returns files and next_page_token if any."""
passsrc/services/task_service.py
Task creation uses the collected file count
- Lines 218-288 (relevant parts):
async def create_custom_task(
self,
user_id: str,
items: list,
processor,
original_filenames: dict | None = None,
existing_task_id: str = None,
) -> str:
"""Create a new task with custom processor for any type of items"""
import os
# Store anonymous tasks under a stable key so they can be retrieved later
store_user_id = user_id or AnonymousUser().user_id
task_id = existing_task_id or str(uuid.uuid4())
# Create file tasks with original filenames if provided
normalized_originals = (
{str(k): v for k, v in original_filenames.items()} if original_filenames else {}
)
file_tasks = {
str(item): FileTask(
file_path=str(item),
filename=normalized_originals.get(str(item), os.path.basename(str(item))),
)
for item in items
}
if (
existing_task_id
and store_user_id in self.task_store
and existing_task_id in self.task_store[store_user_id]
):
upload_task = self.task_store[store_user_id][existing_task_id]
upload_task.file_tasks.update(file_tasks)
upload_task.total_files += len(items)
upload_task.status = TaskStatus.RUNNING
else:
upload_task = UploadTask(
task_id=task_id,
total_files=len(items),
file_tasks=file_tasks,
)
upload_task.processor = processor
if store_user_id not in self.task_store:
self.task_store[store_user_id] = {}
self.task_store[store_user_id][task_id] = upload_task
# Start background processing
background_task = asyncio.create_task(
self.background_custom_processor(store_user_id, task_id, items, processor)
)
self.background_tasks.add(background_task)
background_task.add_done_callback(self.background_tasks.discard)
# Store reference to background task for cancellation if newly created
if not existing_task_id:
upload_task.background_task = background_task
# Send telemetry event for task creation with metadata
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_CREATED,
metadata={
"total_files": len(items),
"processor_type": processor.__class__.__name__,
},
)
)
return task_id|
Actionable comments posted: 0 |
# Conflicts: # src/models/processors.py # tests/unit/api/test_reconcile_orphans_for_connector_type.py
|
Actionable comments posted: 0 |
|
LGTM |
Summary
This PR refactors the OpenRAG connector system to unify the ingestion pipeline for all cloud connectors (S3, IBM COS, OneDrive, SharePoint, Google Drive) with the standard local file ingestion path. This consolidation eliminates duplicate services and routers, fixes sync-related bugs, and ensures all file uploads route through the identical two-phase Docling Serve + Langflow (or standard document chunking/embeddings) pipelines.
Detailed Changes
1. Service Layer Consolidation & Cleanup
ConnectorServiceand deleted the redundantsrc/connectors/langflow_connector_service.py.src/api/connector_router.pyrouter file.src/app/container.pyto wireConnectorServicedirectly toconnector_service, injecting the requiredlangflow_file_serviceandflows_servicedependencies.2. Ingestion & Task Processors Unification
upload_and_ingest_fileandrun_ingestion_flowinsrc/services/langflow_file_service.pyto pass through optional metadata (document_id,source_url,allowed_users,allowed_groups) directly to the Langflow ingestion flow.LangflowConnectorFileProcessorinto a singleConnectorFileProcessorinsrc/models/processors.pythat handles both Langflow and standard processing pathways based on configuration.3. Bug Fixes
check_document_existsto run a term query on thedocument_idfield in OpenSearch instead of callingexists(id=file_hash), as chunk documents are indexed under primary keys like{file_hash}_{i}.ConnectorService.sync_specific_filesblindly calledconnector.list_files()for folder expansion. Connectors withoutcfgattributes (like S3/COS) now bypass list expansion, preventing full bucket scans and preserving selected file IDs.Verification and Test Suite Compatibility
Updated and successfully ran the following test suites to match the unified service interface:
tests/unit/connectors/test_langflow_connector_txt_to_md.py(Passed)tests/unit/test_connector_processor_filename_dedupe.py(Passed)tests/unit/test_processors_clear_stale_chunks.py(Passed)tests/unit/api/test_reconcile_orphans_for_connector_type.py(Passed)Summary by CodeRabbit
Release Notes
Refactor
Tests