Skip to content

Chore: Refactor ctk.io package#679

Draft
amotl wants to merge 9 commits intomainfrom
io-refactor
Draft

Chore: Refactor ctk.io package#679
amotl wants to merge 9 commits intomainfrom
io-refactor

Conversation

@amotl
Copy link
Member

@amotl amotl commented Mar 5, 2026

About

Just maintenance: Refactoring and spring cleaning in the ctk.io package, nothing wild.

@amotl amotl changed the title Io refactor Chore: Refactor ctk.io package Mar 5, 2026
@coderabbitai
Copy link

coderabbitai bot commented Mar 5, 2026

Walkthrough

Centralizes I/O dispatch into a new IoRouter, refactors StandaloneCluster to delegate load/save operations to the router, moves Cloud job/spec models into a dedicated module, updates multiple import paths and logging, and adjusts examples/docs/tests to reflect the reorganized IO surface.

Changes

Cohort / File(s) Summary
Io router & cluster delegation
cratedb_toolkit/io/router.py, cratedb_toolkit/cluster/core.py
Adds IoRouter implementing load/save dispatch across multiple backends; refactors StandaloneCluster to use a private _router and delegate load_table/save_table calls to it; removes prior per-source IO branching from cluster core.
Cloud IO model refactor
cratedb_toolkit/io/cratedb/cloud/__init__.py, cratedb_toolkit/io/cratedb/cloud/model.py
Extracts and re-exports CloudJob and CloudIoSpecs into cloud.model, adds CloudJob dataclass with status/message helpers and table-name normalization, and adjusts package exports.
Backend bulk + logging
cratedb_toolkit/io/cratedb/bulk.py, cratedb_toolkit/io/router.py
Introduces module docstring and local logging.getLogger(__name__) in bulk processor; router centralizes backend invocation (dynamodb, mongodb, influx, kinesis, deltalake, iceberg, ingestr) and error handling for exports.
Import path updates (code & tests)
cratedb_toolkit/cluster/guide.py, cratedb_toolkit/io/dynamodb/copy.py, cratedb_toolkit/io/mongodb/copy.py, cratedb_toolkit/io/sql.py, tests/...
Adjusts imports to reflect reorganization: CloudJob path, BulkProcessor path, run_sql import location, and test/module import updates for awslambda/kinesis handler.
Model truthiness change
cratedb_toolkit/model.py
Adds TableAddress.__bool__ which changes truthiness semantics (now True when table is falsy).
Docs & examples
doc/io/dynamodb/cdc-lambda.md, examples/cdc/aws/dynamodb_kinesis_lambda_oci_cratedb.py
Updates documentation and example entrypoints to CDC-focused paths and switches Lambda entrypoint handler to awslambda/kinesis.py.

Sequence Diagram(s)

sequenceDiagram
    participant SC as StandaloneCluster
    participant Hub as IoRouter
    participant BE as Backend
    participant CDB as CrateDB
    SC->>Hub: load_table(source, target, transformation)
    activate Hub
    alt DynamoDB / MongoDB / Influx / Kinesis / Ingestr
        Hub->>BE: invoke specific copy/relay with (source, target, transformation)
    else DeltaLake / Iceberg
        Hub->>BE: from_deltalake/from_iceberg -> write to target
    end
    BE->>CDB: load data into target table
    deactivate Hub
    SC-->>SC: return self
Loading
sequenceDiagram
    participant SC as StandaloneCluster
    participant Hub as IoRouter
    participant BE as Backend
    SC->>Hub: save_table(source, target, transformation)
    activate Hub
    alt DeltaLake
        Hub->>BE: to_deltalake(source, target)
        BE-->>Hub: success / error
    else Iceberg
        Hub->>BE: to_iceberg(source, target)
        BE-->>Hub: success / error
    else Unsupported
        Hub-->>SC: raise OperationFailed
    end
    deactivate Hub
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • seut
  • surister
  • hammerhead

Poem

🐰
I hopped through routers, codes and streams,
Gathered backends, mended seams;
One hub now hums where many hopped,
Tables flow and errors stopped.
Hooray — the toolkit's tidy dreams!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 59.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: refactoring of the ctk.io package with reorganization of imports, creation of IoRouter, and consolidation of cloud IO components.
Description check ✅ Passed The description is related to the changeset, characterizing it as maintenance and refactoring/spring cleaning in the ctk.io package, which aligns with the actual changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch io-refactor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
cratedb_toolkit/io/cratedb/cloud/model.py (1)

71-77: Consider using immutable types for class-level constants.

The mutable list defaults work correctly since these are treated as constants, but using tuples or ClassVar would be more explicit and silence the Ruff RUF012 warning.

♻️ Optional: Use tuples for immutability
+import typing as t
+
 class CloudIoSpecs:
     """
     Define capabilities of CrateDB Cloud Import.
     """

-    allowed_compressions = ["gzip", None]
-    allowed_formats = ["csv", "json", "parquet"]
+    allowed_compressions: t.ClassVar[tuple] = ("gzip", None)
+    allowed_formats: t.ClassVar[tuple] = ("csv", "json", "parquet")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/cratedb/cloud/model.py` around lines 71 - 77, Class-level
constants allowed_compressions and allowed_formats in CloudIoSpecs are currently
mutable lists; change them to immutable types to avoid mutable default warnings
and express intent. Replace the list literals for
CloudIoSpecs.allowed_compressions and CloudIoSpecs.allowed_formats with tuples
(or annotate them as typing.ClassVar[Tuple[str, ...]]), keeping the same values
("gzip", None) and ("csv","json","parquet") respectively so callers see the same
values but the class-level attributes are immutable.
cratedb_toolkit/io/router.py (1)

121-157: Consider returning a success indicator from save_table.

For consistency with load_table (which returns backend results), save_table could return True on success instead of implicitly returning None. This would make the API more symmetric.

♻️ Optional: Return True on success
         if target_url_obj.scheme.startswith("deltalake") or target_url_obj.scheme.endswith("deltalake"):
             from cratedb_toolkit.io.deltalake import to_deltalake

             if not to_deltalake(source_url, target.url):
                 raise OperationFailed("Data export failed or incomplete")
+            return True

         elif target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
             from cratedb_toolkit.io.iceberg import to_iceberg

             if not to_iceberg(source_url, target.url):
                 raise OperationFailed("Data export failed or incomplete")
+            return True

         else:
             raise OperationFailed(f"Exporting resource not implemented yet: {target_url_obj}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/router.py` around lines 121 - 157, The save_table function
currently returns None on success which is inconsistent with load_table; modify
save_table to return a boolean success indicator (True) when an export completes
successfully. Specifically, after the calls to to_deltalake(source_url,
target.url) and to_iceberg(source_url, target.url) in save_table, return True
when those calls succeed (i.e., when they do not trigger the OperationFailed
path); leave the existing OperationFailed raises as-is for failure cases. Ensure
the return True is added in both branches that call to_deltalake and to_iceberg
so callers get a consistent True/exception result.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cratedb_toolkit/cluster/core.py`:
- Around line 548-553: The validation incorrectly uses truthiness on
TableAddress (broken __bool__), so replace the "if not target:" guard in the
block that calls self._router.load_table with an explicit check (e.g., "if
target is None or target.table is None") to require a non-empty target, and
update the ValueError message to something clear like "Target table must be
specified" (leave the subsequent load using self.address as-is or adjust if the
API contract differs); reference symbols: target (variable),
TableAddress.__bool__ (broken), self.address, and self._router.load_table.

In `@cratedb_toolkit/model.py`:
- Around line 209-211: The __bool__ implementation on the TableAddress model is
inverted; change the __bool__ method (def __bool__(self): ...) to return True
when a table is present and False when not (e.g., return bool(self.table) or
return self.table is not None) so callers like the conditional in cluster.core
(if not target: ...) behave correctly for TableAddress instances.

---

Nitpick comments:
In `@cratedb_toolkit/io/cratedb/cloud/model.py`:
- Around line 71-77: Class-level constants allowed_compressions and
allowed_formats in CloudIoSpecs are currently mutable lists; change them to
immutable types to avoid mutable default warnings and express intent. Replace
the list literals for CloudIoSpecs.allowed_compressions and
CloudIoSpecs.allowed_formats with tuples (or annotate them as
typing.ClassVar[Tuple[str, ...]]), keeping the same values ("gzip", None) and
("csv","json","parquet") respectively so callers see the same values but the
class-level attributes are immutable.

In `@cratedb_toolkit/io/router.py`:
- Around line 121-157: The save_table function currently returns None on success
which is inconsistent with load_table; modify save_table to return a boolean
success indicator (True) when an export completes successfully. Specifically,
after the calls to to_deltalake(source_url, target.url) and
to_iceberg(source_url, target.url) in save_table, return True when those calls
succeed (i.e., when they do not trigger the OperationFailed path); leave the
existing OperationFailed raises as-is for failure cases. Ensure the return True
is added in both branches that call to_deltalake and to_iceberg so callers get a
consistent True/exception result.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 286cf72b-e9a3-45e5-bff9-ba5bd955402c

📥 Commits

Reviewing files that changed from the base of the PR and between 19fbda7 and d4a1985.

📒 Files selected for processing (21)
  • cratedb_toolkit/cluster/core.py
  • cratedb_toolkit/cluster/guide.py
  • cratedb_toolkit/io/awslambda/__init__.py
  • cratedb_toolkit/io/awslambda/kinesis.py
  • cratedb_toolkit/io/cratedb/__init__.py
  • cratedb_toolkit/io/cratedb/bulk.py
  • cratedb_toolkit/io/cratedb/cloud/__init__.py
  • cratedb_toolkit/io/cratedb/cloud/model.py
  • cratedb_toolkit/io/deltalake/__init__.py
  • cratedb_toolkit/io/dynamodb/copy.py
  • cratedb_toolkit/io/iceberg/__init__.py
  • cratedb_toolkit/io/influxdb/__init__.py
  • cratedb_toolkit/io/mongodb/copy.py
  • cratedb_toolkit/io/router.py
  • cratedb_toolkit/io/sql.py
  • cratedb_toolkit/model.py
  • doc/io/dynamodb/cdc-lambda.md
  • examples/cdc/aws/dynamodb_kinesis_lambda_oci_cratedb.py
  • tests/cluster/test_guide.py
  • tests/io/test_processor.py
  • tests/io/test_sql.py
💤 Files with no reviewable changes (1)
  • cratedb_toolkit/io/sql.py
🚧 Files skipped from review as they are similar to previous changes (6)
  • doc/io/dynamodb/cdc-lambda.md
  • cratedb_toolkit/cluster/guide.py
  • examples/cdc/aws/dynamodb_kinesis_lambda_oci_cratedb.py
  • cratedb_toolkit/io/cratedb/bulk.py
  • tests/cluster/test_guide.py
  • tests/io/test_sql.py

Comment on lines +548 to +553
if not target:
raise ValueError("Target table address will not be respected, just the URL")

self._load_table_result = self._router.load_table(
source=source, target=self.address, transformation=transformation
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

The validation logic is broken due to inverted TableAddress.__bool__.

Combined with the __bool__ issue in cratedb_toolkit/model.py, this check behaves incorrectly:

  • TableAddress(table="demo")bool() = Falsenot target = True → raises ValueError (wrong!)
  • TableAddress(table=None)bool() = Truenot target = False → passes (wrong!)

Additionally, the error message is confusing - it says the target "will not be respected" but then raises when target is falsy, which is the opposite situation.

If the intent is to require a non-empty target but then use self.address for the actual operation, consider clarifying the API contract or using explicit attribute checks.

🐛 Proposed fix assuming the intent is to require a valid target
-        if not target:
-            raise ValueError("Target table address will not be respected, just the URL")
+        if target is None or not target.table:
+            raise ValueError("Target table address is required")

Or if the intent is to warn when target is provided but ignored:

-        if not target:
-            raise ValueError("Target table address will not be respected, just the URL")
+        if target is not None and target.table:
+            logger.warning("Target table address provided but will be ignored; using URL-derived destination")
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 549-549: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/cluster/core.py` around lines 548 - 553, The validation
incorrectly uses truthiness on TableAddress (broken __bool__), so replace the
"if not target:" guard in the block that calls self._router.load_table with an
explicit check (e.g., "if target is None or target.table is None") to require a
non-empty target, and update the ValueError message to something clear like
"Target table must be specified" (leave the subsequent load using self.address
as-is or adjust if the API contract differs); reference symbols: target
(variable), TableAddress.__bool__ (broken), self.address, and
self._router.load_table.

Comment on lines +209 to +211
def __bool__(self):
return not self.table

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: __bool__ semantics are inverted.

The implementation return not self.table causes objects with a valid table name to evaluate to False, and objects without a table to evaluate to True. This is counterintuitive and breaks callers that use if not target: checks.

In cratedb_toolkit/cluster/core.py at line 548:

if not target:
    raise ValueError(...)

With this __bool__:

  • TableAddress(table="demo")bool() returns Falsenot target is True → raises unexpectedly
  • TableAddress(table=None)bool() returns Truenot target is False → passes incorrectly
🐛 Proposed fix
 def __bool__(self):
-    return not self.table
+    return bool(self.table)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/model.py` around lines 209 - 211, The __bool__ implementation
on the TableAddress model is inverted; change the __bool__ method (def
__bool__(self): ...) to return True when a table is present and False when not
(e.g., return bool(self.table) or return self.table is not None) so callers like
the conditional in cluster.core (if not target: ...) behave correctly for
TableAddress instances.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant