Conversation
WalkthroughCentralizes I/O dispatch into a new IoRouter, refactors StandaloneCluster to delegate load/save operations to the router, moves Cloud job/spec models into a dedicated module, updates multiple import paths and logging, and adjusts examples/docs/tests to reflect the reorganized IO surface. Changes
Sequence Diagram(s)sequenceDiagram
participant SC as StandaloneCluster
participant Hub as IoRouter
participant BE as Backend
participant CDB as CrateDB
SC->>Hub: load_table(source, target, transformation)
activate Hub
alt DynamoDB / MongoDB / Influx / Kinesis / Ingestr
Hub->>BE: invoke specific copy/relay with (source, target, transformation)
else DeltaLake / Iceberg
Hub->>BE: from_deltalake/from_iceberg -> write to target
end
BE->>CDB: load data into target table
deactivate Hub
SC-->>SC: return self
sequenceDiagram
participant SC as StandaloneCluster
participant Hub as IoRouter
participant BE as Backend
SC->>Hub: save_table(source, target, transformation)
activate Hub
alt DeltaLake
Hub->>BE: to_deltalake(source, target)
BE-->>Hub: success / error
else Iceberg
Hub->>BE: to_iceberg(source, target)
BE-->>Hub: success / error
else Unsupported
Hub-->>SC: raise OperationFailed
end
deactivate Hub
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
cd8939d to
2e8cda0
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
cratedb_toolkit/io/cratedb/cloud/model.py (1)
71-77: Consider using immutable types for class-level constants.The mutable list defaults work correctly since these are treated as constants, but using tuples or
ClassVarwould be more explicit and silence the Ruff RUF012 warning.♻️ Optional: Use tuples for immutability
+import typing as t + class CloudIoSpecs: """ Define capabilities of CrateDB Cloud Import. """ - allowed_compressions = ["gzip", None] - allowed_formats = ["csv", "json", "parquet"] + allowed_compressions: t.ClassVar[tuple] = ("gzip", None) + allowed_formats: t.ClassVar[tuple] = ("csv", "json", "parquet")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cratedb_toolkit/io/cratedb/cloud/model.py` around lines 71 - 77, Class-level constants allowed_compressions and allowed_formats in CloudIoSpecs are currently mutable lists; change them to immutable types to avoid mutable default warnings and express intent. Replace the list literals for CloudIoSpecs.allowed_compressions and CloudIoSpecs.allowed_formats with tuples (or annotate them as typing.ClassVar[Tuple[str, ...]]), keeping the same values ("gzip", None) and ("csv","json","parquet") respectively so callers see the same values but the class-level attributes are immutable.cratedb_toolkit/io/router.py (1)
121-157: Consider returning a success indicator fromsave_table.For consistency with
load_table(which returns backend results),save_tablecould returnTrueon success instead of implicitly returningNone. This would make the API more symmetric.♻️ Optional: Return True on success
if target_url_obj.scheme.startswith("deltalake") or target_url_obj.scheme.endswith("deltalake"): from cratedb_toolkit.io.deltalake import to_deltalake if not to_deltalake(source_url, target.url): raise OperationFailed("Data export failed or incomplete") + return True elif target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"): from cratedb_toolkit.io.iceberg import to_iceberg if not to_iceberg(source_url, target.url): raise OperationFailed("Data export failed or incomplete") + return True else: raise OperationFailed(f"Exporting resource not implemented yet: {target_url_obj}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cratedb_toolkit/io/router.py` around lines 121 - 157, The save_table function currently returns None on success which is inconsistent with load_table; modify save_table to return a boolean success indicator (True) when an export completes successfully. Specifically, after the calls to to_deltalake(source_url, target.url) and to_iceberg(source_url, target.url) in save_table, return True when those calls succeed (i.e., when they do not trigger the OperationFailed path); leave the existing OperationFailed raises as-is for failure cases. Ensure the return True is added in both branches that call to_deltalake and to_iceberg so callers get a consistent True/exception result.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cratedb_toolkit/cluster/core.py`:
- Around line 548-553: The validation incorrectly uses truthiness on
TableAddress (broken __bool__), so replace the "if not target:" guard in the
block that calls self._router.load_table with an explicit check (e.g., "if
target is None or target.table is None") to require a non-empty target, and
update the ValueError message to something clear like "Target table must be
specified" (leave the subsequent load using self.address as-is or adjust if the
API contract differs); reference symbols: target (variable),
TableAddress.__bool__ (broken), self.address, and self._router.load_table.
In `@cratedb_toolkit/model.py`:
- Around line 209-211: The __bool__ implementation on the TableAddress model is
inverted; change the __bool__ method (def __bool__(self): ...) to return True
when a table is present and False when not (e.g., return bool(self.table) or
return self.table is not None) so callers like the conditional in cluster.core
(if not target: ...) behave correctly for TableAddress instances.
---
Nitpick comments:
In `@cratedb_toolkit/io/cratedb/cloud/model.py`:
- Around line 71-77: Class-level constants allowed_compressions and
allowed_formats in CloudIoSpecs are currently mutable lists; change them to
immutable types to avoid mutable default warnings and express intent. Replace
the list literals for CloudIoSpecs.allowed_compressions and
CloudIoSpecs.allowed_formats with tuples (or annotate them as
typing.ClassVar[Tuple[str, ...]]), keeping the same values ("gzip", None) and
("csv","json","parquet") respectively so callers see the same values but the
class-level attributes are immutable.
In `@cratedb_toolkit/io/router.py`:
- Around line 121-157: The save_table function currently returns None on success
which is inconsistent with load_table; modify save_table to return a boolean
success indicator (True) when an export completes successfully. Specifically,
after the calls to to_deltalake(source_url, target.url) and
to_iceberg(source_url, target.url) in save_table, return True when those calls
succeed (i.e., when they do not trigger the OperationFailed path); leave the
existing OperationFailed raises as-is for failure cases. Ensure the return True
is added in both branches that call to_deltalake and to_iceberg so callers get a
consistent True/exception result.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 286cf72b-e9a3-45e5-bff9-ba5bd955402c
📒 Files selected for processing (21)
cratedb_toolkit/cluster/core.pycratedb_toolkit/cluster/guide.pycratedb_toolkit/io/awslambda/__init__.pycratedb_toolkit/io/awslambda/kinesis.pycratedb_toolkit/io/cratedb/__init__.pycratedb_toolkit/io/cratedb/bulk.pycratedb_toolkit/io/cratedb/cloud/__init__.pycratedb_toolkit/io/cratedb/cloud/model.pycratedb_toolkit/io/deltalake/__init__.pycratedb_toolkit/io/dynamodb/copy.pycratedb_toolkit/io/iceberg/__init__.pycratedb_toolkit/io/influxdb/__init__.pycratedb_toolkit/io/mongodb/copy.pycratedb_toolkit/io/router.pycratedb_toolkit/io/sql.pycratedb_toolkit/model.pydoc/io/dynamodb/cdc-lambda.mdexamples/cdc/aws/dynamodb_kinesis_lambda_oci_cratedb.pytests/cluster/test_guide.pytests/io/test_processor.pytests/io/test_sql.py
💤 Files with no reviewable changes (1)
- cratedb_toolkit/io/sql.py
🚧 Files skipped from review as they are similar to previous changes (6)
- doc/io/dynamodb/cdc-lambda.md
- cratedb_toolkit/cluster/guide.py
- examples/cdc/aws/dynamodb_kinesis_lambda_oci_cratedb.py
- cratedb_toolkit/io/cratedb/bulk.py
- tests/cluster/test_guide.py
- tests/io/test_sql.py
| if not target: | ||
| raise ValueError("Target table address will not be respected, just the URL") | ||
|
|
||
| self._load_table_result = self._router.load_table( | ||
| source=source, target=self.address, transformation=transformation | ||
| ) |
There was a problem hiding this comment.
The validation logic is broken due to inverted TableAddress.__bool__.
Combined with the __bool__ issue in cratedb_toolkit/model.py, this check behaves incorrectly:
TableAddress(table="demo")→bool()=False→not target=True→ raises ValueError (wrong!)TableAddress(table=None)→bool()=True→not target=False→ passes (wrong!)
Additionally, the error message is confusing - it says the target "will not be respected" but then raises when target is falsy, which is the opposite situation.
If the intent is to require a non-empty target but then use self.address for the actual operation, consider clarifying the API contract or using explicit attribute checks.
🐛 Proposed fix assuming the intent is to require a valid target
- if not target:
- raise ValueError("Target table address will not be respected, just the URL")
+ if target is None or not target.table:
+ raise ValueError("Target table address is required")Or if the intent is to warn when target is provided but ignored:
- if not target:
- raise ValueError("Target table address will not be respected, just the URL")
+ if target is not None and target.table:
+ logger.warning("Target table address provided but will be ignored; using URL-derived destination")🧰 Tools
🪛 Ruff (0.15.2)
[warning] 549-549: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cratedb_toolkit/cluster/core.py` around lines 548 - 553, The validation
incorrectly uses truthiness on TableAddress (broken __bool__), so replace the
"if not target:" guard in the block that calls self._router.load_table with an
explicit check (e.g., "if target is None or target.table is None") to require a
non-empty target, and update the ValueError message to something clear like
"Target table must be specified" (leave the subsequent load using self.address
as-is or adjust if the API contract differs); reference symbols: target
(variable), TableAddress.__bool__ (broken), self.address, and
self._router.load_table.
| def __bool__(self): | ||
| return not self.table | ||
|
|
There was a problem hiding this comment.
Critical: __bool__ semantics are inverted.
The implementation return not self.table causes objects with a valid table name to evaluate to False, and objects without a table to evaluate to True. This is counterintuitive and breaks callers that use if not target: checks.
In cratedb_toolkit/cluster/core.py at line 548:
if not target:
raise ValueError(...)With this __bool__:
TableAddress(table="demo")→bool()returnsFalse→not targetisTrue→ raises unexpectedlyTableAddress(table=None)→bool()returnsTrue→not targetisFalse→ passes incorrectly
🐛 Proposed fix
def __bool__(self):
- return not self.table
+ return bool(self.table)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cratedb_toolkit/model.py` around lines 209 - 211, The __bool__ implementation
on the TableAddress model is inverted; change the __bool__ method (def
__bool__(self): ...) to return True when a table is present and False when not
(e.g., return bool(self.table) or return self.table is not None) so callers like
the conditional in cluster.core (if not target: ...) behave correctly for
TableAddress instances.
About
Just maintenance: Refactoring and spring cleaning in the
ctk.iopackage, nothing wild.