Skip to content

fix: unify connector ingestion pipelines and consolidate service layer#1695

Open
lucaseduoli wants to merge 4 commits into
mainfrom
fix/refactor_connectors
Open

fix: unify connector ingestion pipelines and consolidate service layer#1695
lucaseduoli wants to merge 4 commits into
mainfrom
fix/refactor_connectors

Conversation

@lucaseduoli
Copy link
Copy Markdown
Collaborator

@lucaseduoli lucaseduoli commented May 27, 2026

Summary

This PR refactors the OpenRAG connector system to unify the ingestion pipeline for all cloud connectors (S3, IBM COS, OneDrive, SharePoint, Google Drive) with the standard local file ingestion path. This consolidation eliminates duplicate services and routers, fixes sync-related bugs, and ensures all file uploads route through the identical two-phase Docling Serve + Langflow (or standard document chunking/embeddings) pipelines.


Detailed Changes

1. Service Layer Consolidation & Cleanup

  • Consolidated Connector Services: Consolidated all connector logic inside ConnectorService and deleted the redundant src/connectors/langflow_connector_service.py.
  • Removed Router Layer: Deleted the duplicate src/api/connector_router.py router file.
  • Updated Container Wiring: Refactored src/app/container.py to wire ConnectorService directly to connector_service, injecting the required langflow_file_service and flows_service dependencies.

2. Ingestion & Task Processors Unification

  • Metadata Support: Extended upload_and_ingest_file and run_ingestion_flow in src/services/langflow_file_service.py to pass through optional metadata (document_id, source_url, allowed_users, allowed_groups) directly to the Langflow ingestion flow.
  • Unified Task Processor: Consolidated LangflowConnectorFileProcessor into a single ConnectorFileProcessor in src/models/processors.py that handles both Langflow and standard processing pathways based on configuration.
  • Pre-ingestion Cleanup: Added pre-ingestion chunk deletion to the Langflow pathway to clear stale chunks (e.g. from modified or renamed files) before running the ingestion.

3. Bug Fixes

  • Duplicate Document Check: Fixed check_document_exists to run a term query on the document_id field in OpenSearch instead of calling exists(id=file_hash), as chunk documents are indexed under primary keys like {file_hash}_{i}.
  • S3 / IBM COS Selective Sync: Fixed a bug where ConnectorService.sync_specific_files blindly called connector.list_files() for folder expansion. Connectors without cfg attributes (like S3/COS) now bypass list expansion, preventing full bucket scans and preserving selected file IDs.

Verification and Test Suite Compatibility

Updated and successfully ran the following test suites to match the unified service interface:

  • Connector Langflow Ingestion: tests/unit/connectors/test_langflow_connector_txt_to_md.py (Passed)
  • Filename Deduplication: tests/unit/test_connector_processor_filename_dedupe.py (Passed)
  • Stale Chunks Clearing: tests/unit/test_processors_clear_stale_chunks.py (Passed)
  • Orphan Reconciliation: tests/unit/api/test_reconcile_orphans_for_connector_type.py (Passed)

Summary by CodeRabbit

Release Notes

  • Refactor

    • Streamlined connector ingestion architecture for improved maintainability and flexible workflow handling.
  • Tests

    • Updated connector ingestion tests to validate refined deduplication and ingestion integration logic.

Review Change Stack

@github-actions github-actions Bot added backend 🔷 Issues related to backend services (OpenSearch, Langflow, APIs) tests labels May 27, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 2f131515-ad44-48d3-8a2b-ae6d1c0f243b

📥 Commits

Reviewing files that changed from the base of the PR and between 51ae1ad and dee200c.

📒 Files selected for processing (3)
  • src/models/processors.py
  • tests/unit/test_connector_processor_filename_dedupe.py
  • tests/unit/test_processors_clear_stale_chunks.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/unit/test_processors_clear_stale_chunks.py
  • tests/unit/test_connector_processor_filename_dedupe.py

Walkthrough

This PR consolidates the dual connector service pattern into a single unified ConnectorService that conditionally routes ingestion through Langflow or standard processing based on a feature flag, removing ConnectorRouter and LangflowConnectorService classes while updating processors, services, and tests to reflect the new architecture.

Changes

Connector Service Unification

Layer / File(s) Summary
Remove dual-service router and update container initialization
src/app/container.py
Eliminates ConnectorRouter from imports and refactors initialize_services() to construct a single ConnectorService wired directly with langflow_file_service instead of composing separate Langflow and OpenRAG connector services behind a router.
Extend ConnectorService with conditional Langflow routing
src/connectors/service.py
Adds flows_service and langflow_service as constructor dependencies, extends process_connector_document to accept ingest_settings parameter, and adds feature-flag gated routing to either langflow_service.upload_and_ingest_file or the standard temp-file + TaskProcessor flow.
Processor-level hash-based deduplication and conditional ingestion routing
src/models/processors.py
Replaces OpenSearch exists with search in check_document_exists, adds early hash-based duplicate detection in ConnectorFileProcessor.process_item with task completion short-circuit, and routes ingestion through either langflow_service.upload_and_ingest_file or TaskProcessor.process_document_standard based on DISABLE_INGEST_WITH_LANGFLOW flag, updating result metadata in both branches.
Extend LangflowFileService with ingestion metadata parameters
src/services/langflow_file_service.py
Adds optional document_id, source_url, allowed_users, allowed_groups parameters to upload_and_ingest_file and propagates them through to run_ingestion_flow for Langflow global-var handling.
Update SharePoint integration tests for unified ConnectorService
tests/unit/connectors/test_langflow_connector_txt_to_md.py
Refactors test helper _make_service() to construct ConnectorService instead of LangflowConnectorService, mocks langflow_service.upload_and_ingest_file, and updates .txt and PDF ingestion assertions to inspect file_tuple from upload_and_ingest_file kwargs.
Extend connector processor deduplication test coverage for standard and Langflow paths
tests/unit/test_connector_processor_filename_dedupe.py
Refactors deduplication tests to distinguish filename vs hash collision detection via OpenSearch query inspection, adds monkeypatch controls for DISABLE_INGEST_WITH_LANGFLOW feature flag, converts Langflow test wiring from LangflowConnectorFileProcessor to ConnectorFileProcessor with injected langflow_service dependencies, and updates assertions to validate conditional ingestion, chunk deletion, and "hash unchanged" short-circuit behavior.
Constrain stale chunk enumeration to scroll-based searches
tests/unit/test_processors_clear_stale_chunks.py
Modifies mocked OpenSearch search to return predefined stale chunk IDs only when scroll parameter is present, otherwise returns empty hits to align with scroll-based enumeration logic.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • langflow-ai/openrag#1677: The main PR's connector-file processing changes in src/models/processors.py (and associated 404/dedupe unit tests) align with the retrieved PR's fix to delete already-indexed OpenSearch chunks on connector file 404 ("deleted_at_source") behavior.
  • langflow-ai/openrag#1651: The main PR's ingestion paths in ConnectorService/ConnectorFileProcessor (and the .txt.md unit test) align with the retrieved PR's centralized langflow_safe_filename_and_mimetype workaround by updating how Langflow-safe filenames/mimetypes are produced and asserted.
  • langflow-ai/openrag#1663: Both PRs modify the connector file ingestion/deduplication logic in src/models/processors.py—the main PR refactors ConnectorFileProcessor.process_item around Langflow routing and hash-based "unchanged" short-circuiting, while the retrieved PR adds replace_duplicates and changes the processors' duplicate checks/overwrite behavior—so they overlap directly in the same duplicate-handling code paths.

Suggested labels

refactor, bug

Suggested reviewers

  • mfortman11
  • edwinjosechittilappilly
  • ricofurtado
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective: unifying connector ingestion pipelines and consolidating the service layer by removing duplicate services and routers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/refactor_connectors

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the bug 🔴 Something isn't working. label May 27, 2026
@github-actions github-actions Bot added bug 🔴 Something isn't working. and removed bug 🔴 Something isn't working. labels May 27, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
tests/unit/api/test_reconcile_orphans_for_connector_type.py (1)

320-323: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

test_delete_failure_does_not_raise is currently testing search failure, not delete failure.

Line 331 injects an exception on search, so this misses coverage for exceptions during the per-document delete loop.

Align setup with test intent
-    opensearch_client = AsyncMock()
-    opensearch_client.search.side_effect = RuntimeError("opensearch unavailable")
+    opensearch_client = AsyncMock()
+    opensearch_client.search = AsyncMock(
+        return_value={"hits": {"hits": [{"_id": "chunk-b-1"}]}, "_scroll_id": None}
+    )
+    opensearch_client.delete = AsyncMock(side_effect=RuntimeError("opensearch unavailable"))

Also applies to: 331-343

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py` around lines 320
- 323, The test test_delete_failure_does_not_raise currently injects an
exception on the search call but intends to simulate a failure during
per-document deletion; update the test to raise the exception from the mock that
implements the per-document delete used by reconcile_orphans_for_connector_type
(replace the side_effect on the mocked search with a side_effect on the delete
method actually invoked in the deletion loop — e.g., the mocked client's
delete/delete_one/delete_object/bulk_delete method used by the helper), so the
delete loop raises and the helper's swallowing behavior is exercised.
src/services/langflow_file_service.py (1)

663-690: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Forward embeddingModel into selected_embedding_model.

merge_ui_ingest_settings_into_tweaks() explicitly drops embeddingModel and expects it to reach Langflow through run_ingestion_flow(..., selected_embedding_model=...), but upload_and_ingest_file() never passes it. Any Langflow ingest with a non-default embedding model will silently use the workspace default instead.

Suggested fix
         final_tweaks = LangflowFileService.merge_ui_ingest_settings_into_tweaks(tweaks, settings)
+        selected_embedding_model = None
+        if isinstance(settings, dict):
+            model = settings.get("embeddingModel")
+            if isinstance(model, str) and model.strip():
+                selected_embedding_model = model.strip()
         if settings:
             logger.debug(
                 "[LF] Applying ingestion settings",
                 extra={"settings": settings, "tweaks": final_tweaks},
             )
@@
                 owner_email=owner_email,
                 connector_type=connector_type,
                 docling_task_id=task_id,
+                selected_embedding_model=selected_embedding_model,
                 document_id=document_id,
                 source_url=source_url,
                 allowed_users=allowed_users,
                 allowed_groups=allowed_groups,
             )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/services/langflow_file_service.py` around lines 663 - 690,
upload_and_ingest_file is calling merge_ui_ingest_settings_into_tweaks which
drops embeddingModel but never forwards it into run_ingestion_flow, so
non-default embedding models fall back to workspace defaults; fix by extracting
embeddingModel from settings (or final_tweaks if appropriate) and pass it as
selected_embedding_model to run_ingestion_flow (keep references to
merge_ui_ingest_settings_into_tweaks, upload_and_ingest_file,
run_ingestion_flow, embeddingModel, selected_embedding_model) so Langflow
receives the chosen embedding model.
tests/unit/connectors/test_langflow_connector_txt_to_md.py (1)

62-110: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Pin the Langflow feature flag in these regressions.

Both tests currently rely on the default value of config.settings.DISABLE_INGEST_WITH_LANGFLOW. If that default changes, process_connector_document() will take the standard path instead, so this file no longer validates the .txt -> .md Langflow contract it is supposed to guard.

🔧 Proposed fix
 `@pytest.mark.asyncio`
-async def test_sharepoint_txt_is_uploaded_to_langflow_as_md():
+async def test_sharepoint_txt_is_uploaded_to_langflow_as_md(monkeypatch):
+    monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False)
     """A connector document with mimetype text/plain must reach Langflow's
     upload_and_ingest_file with a .md filename and text/markdown mimetype."""
     service, langflow_service = _make_service()
     document = _make_document(filename="notes.txt", mimetype="text/plain")
@@
 `@pytest.mark.asyncio`
-async def test_sharepoint_pdf_passes_through_untouched():
+async def test_sharepoint_pdf_passes_through_untouched(monkeypatch):
+    monkeypatch.setattr("config.settings.DISABLE_INGEST_WITH_LANGFLOW", False)
     """The rename rule must NOT touch non-.txt files."""
     service, langflow_service = _make_service()
     document = _make_document(
         filename="report.pdf", mimetype="application/pdf", content=b"%PDF-1.4..."
     )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/connectors/test_langflow_connector_txt_to_md.py` around lines 62 -
110, The tests rely on the global feature flag
config.settings.DISABLE_INGEST_WITH_LANGFLOW and must pin it so Langflow
ingestion runs; update both test_sharepoint_txt_is_uploaded_to_langflow_as_md
and test_sharepoint_pdf_passes_through_untouched to explicitly set
config.settings.DISABLE_INGEST_WITH_LANGFLOW = False (or use the test runner's
monkeypatch to set that attribute) before calling
service.process_connector_document so the code path that calls
process_connector_document -> upload_and_ingest_file is exercised; ensure the
change is applied inside each test (or in the shared _make_service/test setup)
and cleaned up if using global mutation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/connectors/service.py`:
- Around line 450-458: The current expansion only consumes the first page from
connector.list_files() causing incomplete folder expansion; change the code to
loop calling connector.list_files() repeatedly using the returned nextPageToken
(e.g., result.get("nextPageToken")) until no token remains, concatenating each
page's "files" into expanded_files (and appending into expanded_files_info) and
rebuilding expanded_file_ids from the accumulated list so all pages' children
are included; use the same pagination pattern as sync_connector_files() and
reference connector.list_files(), expanded_files, expanded_files_info, and
expanded_file_ids when applying the fix.

In `@src/models/processors.py`:
- Around line 617-624: The fast-path uses hash_id(tmp_path) as the dedupe key
but check_document_exists and the later indexing use document.id, causing
mismatches and re-ingestion; change the fast-path to compute the same persisted
document id used when indexing (use the same function/logic that produces
document.id instead of hash_id(tmp_path)), pass that id into
check_document_exists(document_id, opensearch_client), and set file_task.result
id to the same document.id so the dedupe key is consistent (affects the block
around file_hash, hash_id, check_document_exists and the later indexing that
sets document.id).
- Around line 666-673: The empty-list initializers for allowed_users and
allowed_groups need explicit typing so mypy can infer element types; change the
declarations to annotated lists (e.g., allowed_users: List[<element_type>] = []
and allowed_groups: List[<element_type>] = []) using the same element type as
document.acl.allowed_users / document.acl.allowed_groups (import List from
typing or use list[...]) and keep the existing try/except and assignments
(allowed_users = document.acl.allowed_users or []; allowed_groups =
document.acl.allowed_groups or []) so mypy is satisfied.

In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py`:
- Around line 110-112: Tests currently assert that
opensearch_client.delete_by_query was not awaited, but the code now uses
opensearch_client.search + opensearch_client.delete; update the assertions to
reflect the new API by asserting that opensearch_client.search was called (or
called_with the expected query) and that opensearch_client.delete was not
awaited (or awaited the expected number of times) where no deletions should
occur. Replace occurrences of delete_by_query.assert_not_awaited() with
assertions on opensearch_client.search (e.g., assert_called_once_with/contains
expected params) and opensearch_client.delete.assert_not_awaited() (or
appropriate call count) in the tests covering
reconcile_orphans_for_connector_type and the other listed blocks.

---

Outside diff comments:
In `@src/services/langflow_file_service.py`:
- Around line 663-690: upload_and_ingest_file is calling
merge_ui_ingest_settings_into_tweaks which drops embeddingModel but never
forwards it into run_ingestion_flow, so non-default embedding models fall back
to workspace defaults; fix by extracting embeddingModel from settings (or
final_tweaks if appropriate) and pass it as selected_embedding_model to
run_ingestion_flow (keep references to merge_ui_ingest_settings_into_tweaks,
upload_and_ingest_file, run_ingestion_flow, embeddingModel,
selected_embedding_model) so Langflow receives the chosen embedding model.

In `@tests/unit/api/test_reconcile_orphans_for_connector_type.py`:
- Around line 320-323: The test test_delete_failure_does_not_raise currently
injects an exception on the search call but intends to simulate a failure during
per-document deletion; update the test to raise the exception from the mock that
implements the per-document delete used by reconcile_orphans_for_connector_type
(replace the side_effect on the mocked search with a side_effect on the delete
method actually invoked in the deletion loop — e.g., the mocked client's
delete/delete_one/delete_object/bulk_delete method used by the helper), so the
delete loop raises and the helper's swallowing behavior is exercised.

In `@tests/unit/connectors/test_langflow_connector_txt_to_md.py`:
- Around line 62-110: The tests rely on the global feature flag
config.settings.DISABLE_INGEST_WITH_LANGFLOW and must pin it so Langflow
ingestion runs; update both test_sharepoint_txt_is_uploaded_to_langflow_as_md
and test_sharepoint_pdf_passes_through_untouched to explicitly set
config.settings.DISABLE_INGEST_WITH_LANGFLOW = False (or use the test runner's
monkeypatch to set that attribute) before calling
service.process_connector_document so the code path that calls
process_connector_document -> upload_and_ingest_file is exercised; ensure the
change is applied inside each test (or in the shared _make_service/test setup)
and cleaned up if using global mutation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: bc9b27bb-9344-4ba8-b2ed-8decc73e0b0b

📥 Commits

Reviewing files that changed from the base of the PR and between bc42f48 and 21aead1.

📒 Files selected for processing (10)
  • src/api/connector_router.py
  • src/app/container.py
  • src/connectors/langflow_connector_service.py
  • src/connectors/service.py
  • src/models/processors.py
  • src/services/langflow_file_service.py
  • tests/unit/api/test_reconcile_orphans_for_connector_type.py
  • tests/unit/connectors/test_langflow_connector_txt_to_md.py
  • tests/unit/test_connector_processor_filename_dedupe.py
  • tests/unit/test_processors_clear_stale_chunks.py
💤 Files with no reviewable changes (2)
  • src/connectors/langflow_connector_service.py
  • src/api/connector_router.py

Comment thread src/connectors/service.py
Comment on lines +450 to +458
# Get the expanded list of file IDs (folders will be expanded to their contents)
# This uses the connector's list_files() which calls _iter_selected_items()
result = await connector.list_files()
expanded_files = result.get("files", [])
expanded_file_ids = [f["id"] for f in expanded_files]

# Save the expanded files info so we can set correct names in the task UI
for f in expanded_files:
expanded_files_info.append(f)
# Save the expanded files info so we can set correct names in the task UI
for f in expanded_files:
expanded_files_info.append(f)
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Expand selected folders across all list_files() pages.

This only consumes the first list_files() page, so a selected folder with more than one page of children will sync a partial file set. sync_connector_files() already handles nextPageToken; this expansion path needs the same loop.

Suggested fix
-                result = await connector.list_files()
-                expanded_files = result.get("files", [])
-                expanded_file_ids = [f["id"] for f in expanded_files]
-
-                # Save the expanded files info so we can set correct names in the task UI
-                for f in expanded_files:
-                    expanded_files_info.append(f)
+                expanded_file_ids = []
+                page_token = None
+                while True:
+                    result = await connector.list_files(page_token)
+                    expanded_files = result.get("files", [])
+                    expanded_file_ids.extend(
+                        f["id"] for f in expanded_files if f.get("id")
+                    )
+                    expanded_files_info.extend(expanded_files)
+
+                    page_token = result.get("nextPageToken")
+                    if not page_token:
+                        break

As per coding guidelines, "External data connectors (Google Drive, OneDrive, S3, IBM COS). New connectors must subclass connectors/base.py and register through connectors/service.py. Verify credentials are never logged, errors surface cleanly, and pagination is handled correctly."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/connectors/service.py` around lines 450 - 458, The current expansion only
consumes the first page from connector.list_files() causing incomplete folder
expansion; change the code to loop calling connector.list_files() repeatedly
using the returned nextPageToken (e.g., result.get("nextPageToken")) until no
token remains, concatenating each page's "files" into expanded_files (and
appending into expanded_files_info) and rebuilding expanded_file_ids from the
accumulated list so all pages' children are included; use the same pagination
pattern as sync_connector_files() and reference connector.list_files(),
expanded_files, expanded_files_info, and expanded_file_ids when applying the
fix.

Copy link
Copy Markdown
Collaborator

@ricofurtado ricofurtado May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the page size? That is a totally silent issue, really difficult to notice in big buckets/folders
I'm wondering, when we have a huge amount of files in a folder, how it will affect the performance

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find list_files implementations in connectors
rg -n "def list_files\|async def list_files\|page_size\|max_files\|nextPageToken\|page_token" --type py src/connectors/ -C2

Length of output: 4819



@ricofurtado Great question — both points are worth unpacking.

Page size today (before the pagination fix)

sync_specific_files calls connector.list_files() with no arguments at all. Looking at base.py:

async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:

max_files=None is left to each connector implementation to interpret, so the effective page size is silently connector-specific (Google Drive typically caps at 100, OneDrive at 200, S3 implementation-dependent). There is no explicit cap here, and when the first page fills up, the rest are silently dropped — that's the core bug.

Compare with sync_connector_files, which is explicit:

page_size = min(max_files or 100, 1000) if max_files else 100
file_list = await connector.list_files(page_token, max_files=page_size)

Performance concern with the pagination fix

Your concern is valid. Even with a proper pagination loop, the fix I suggested still collects all expanded file IDs and their metadata in memory before the task is created:

expanded_file_ids = []          # grows unboundedly
expanded_files_info = []        # same

For a folder with tens of thousands of files this means:

  • Many sequential API round-trips to exhaust all pages
  • The entire file list held in memory before processing begins
  • No back-pressure or streaming into the task queue

A more robust approach for huge folders would be to stream pages directly into task batches (similar to how sync_connector_files caps via max_files), or at minimum add a max_files guard to the expansion call so it mirrors the behaviour in sync_connector_files. This is a legitimate follow-up concern beyond the immediate pagination correctness fix.

Would you like me to open a GitHub issue to track this performance/scalability gap for large folder expansion in sync_specific_files?

🐇 🤔💭


🧬 Code Graph Analysis Results

src/connectors/service.py

Sync pagination + page size calculation

  • Lines 259-380 (relevant excerpt around page_size and the paginated loop):
# Collect files to process (limited by max_files)
files_to_process: list[dict[str, Any]] = []
page_token = None

# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100

while True:
    # List files from connector with limit
    logger.debug("Calling list_files", page_size=page_size, page_token=page_token)
    file_list = await connector.list_files(page_token, max_files=page_size)
    logger.debug("Got files from connector", file_count=len(file_list.get("files", [])))
    files = file_list["files"]

    if not files:
        break

    for file_info in files:
        if max_files and len(files_to_process) >= max_files:
            break
        # Filter by filename if filter is provided
        if filename_filter is not None:
            file_name = file_info.get("name", "")
            if file_name not in filename_filter:
                logger.debug(
                    "Skipping file not in filter",
                    filename=file_name,
                )
                continue
        files_to_process.append(file_info)

    # Stop if we have enough files or no more pages
    if (max_files and len(files_to_process) >= max_files) or not file_list.get(
        "nextPageToken"
    ):
        break

    page_token = file_list.get("nextPageToken")

src/connectors/base.py

Connector interface for paginated listing

  • Lines 99-101:
async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:
    """List all files. Returns files and next_page_token if any."""
    pass

src/services/task_service.py

Task creation uses the collected file count

  • Lines 218-288 (relevant parts):
async def create_custom_task(
        self,
        user_id: str,
        items: list,
        processor,
        original_filenames: dict | None = None,
        existing_task_id: str = None,
    ) -> str:
    """Create a new task with custom processor for any type of items"""
    import os

    # Store anonymous tasks under a stable key so they can be retrieved later
    store_user_id = user_id or AnonymousUser().user_id
    task_id = existing_task_id or str(uuid.uuid4())

    # Create file tasks with original filenames if provided
    normalized_originals = (
        {str(k): v for k, v in original_filenames.items()} if original_filenames else {}
    )
    file_tasks = {
        str(item): FileTask(
            file_path=str(item),
            filename=normalized_originals.get(str(item), os.path.basename(str(item))),
        )
        for item in items
    }

    if (
        existing_task_id
        and store_user_id in self.task_store
        and existing_task_id in self.task_store[store_user_id]
    ):
        upload_task = self.task_store[store_user_id][existing_task_id]
        upload_task.file_tasks.update(file_tasks)
        upload_task.total_files += len(items)
        upload_task.status = TaskStatus.RUNNING
    else:
        upload_task = UploadTask(
            task_id=task_id,
            total_files=len(items),
            file_tasks=file_tasks,
        )
        upload_task.processor = processor
        if store_user_id not in self.task_store:
            self.task_store[store_user_id] = {}
        self.task_store[store_user_id][task_id] = upload_task

    # Start background processing
    background_task = asyncio.create_task(
        self.background_custom_processor(store_user_id, task_id, items, processor)
    )
    self.background_tasks.add(background_task)
    background_task.add_done_callback(self.background_tasks.discard)

    # Store reference to background task for cancellation if newly created
    if not existing_task_id:
        upload_task.background_task = background_task

    # Send telemetry event for task creation with metadata
    asyncio.create_task(
        TelemetryClient.send_event(
            Category.TASK_OPERATIONS,
            MessageId.ORB_TASK_CREATED,
            metadata={
                "total_files": len(items),
                "processor_type": processor.__class__.__name__,
            },
        )
    )

    return task_id

Comment thread src/models/processors.py
Comment thread src/models/processors.py Outdated
Comment thread tests/unit/api/test_reconcile_orphans_for_connector_type.py Outdated
@github-actions github-actions Bot added bug 🔴 Something isn't working. and removed bug 🔴 Something isn't working. labels May 28, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 28, 2026

Actionable comments posted: 0

# Conflicts:
#	src/models/processors.py
#	tests/unit/api/test_reconcile_orphans_for_connector_type.py
@github-actions github-actions Bot added bug 🔴 Something isn't working. and removed bug 🔴 Something isn't working. labels May 28, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 28, 2026

Actionable comments posted: 0

@ricofurtado
Copy link
Copy Markdown
Collaborator

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend 🔷 Issues related to backend services (OpenSearch, Langflow, APIs) bug 🔴 Something isn't working. tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants