diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 975915859482..b4cd5f937008 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -158,41 +158,49 @@ Query id: 9efc271a-a501-44d1-834f-bc4d20156164 Row 1: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:21:51 -partition_id: 2022 -transaction_id: 7397746091717128192 -source_replica: r1 -parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] -parts_count: 3 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: s3_destination +create_time: 2025-11-21 18:21:51 +partition_id: 2022 +transaction_id: 7397746091717128192 +query_id: 3fa3c8d3-7d6b-4f8b-9aa2-2c1f1ad0a111 +source_replica: r1 +parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] +parts_count: 3 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [] +exception_count: 0 +destination_file_paths: {'2022_0_0_0':['s3://bucket/db/t/year=2022/2022_0_0_0_.parquet'],'2022_1_1_0':['s3://bucket/db/t/year=2022/2022_1_1_0_.parquet'],'2022_2_2_0':['s3://bucket/db/t/year=2022/2022_2_2_0_.parquet']} +committed_metadata_file: +committed_manifest_list: +committed_manifest_file: +committed_marker_file: s3://bucket/db/t/commit_2022_7397746091717128192 Row 2: ────── -source_database: default -source_table: replicated_source -destination_database: default -destination_table: replicated_destination -create_time: 2025-11-21 18:20:35 -partition_id: 2021 -transaction_id: 7397745772618674176 -source_replica: r1 -parts: ['2021_0_0_0'] -parts_count: 1 -parts_to_do: 0 -status: COMPLETED -exception_replica: -last_exception: -exception_part: -exception_count: 0 +source_database: default +source_table: replicated_source +destination_database: default +destination_table: iceberg_destination +create_time: 2025-11-21 18:20:35 +partition_id: 2021 +transaction_id: 7397745772618674176 +query_id: 1c8e0fd0-6a3a-4d6e-9bd6-bdf64adfe118 +source_replica: r2 +parts: ['2021_0_0_0'] +parts_count: 1 +parts_to_do: 0 +status: COMPLETED +last_exception_per_replica: [('r1','Code: 999. Coordination::Exception: Session expired','2021_0_0_0','2025-11-21 18:20:42',1)] +exception_count: 1 +destination_file_paths: {'2021_0_0_0':['s3://lake/db/t/data/year=2021/2021_0_0_0_.parquet']} +committed_metadata_file: s3://lake/db/t/metadata/v3.metadata.json +committed_manifest_list: s3://lake/db/t/metadata/snap-7397745772618674176-1-.avro +committed_manifest_file: s3://lake/db/t/metadata/-m0.avro +committed_marker_file: 2 rows in set. Elapsed: 0.019 sec. @@ -205,6 +213,33 @@ Status values include: - `FAILED` - Export failed - `KILLED` - Export was cancelled +### Exception columns + +- `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted. +- `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one. + +### Per-part destination file paths + +- `destination_file_paths` is a `Map(String, Array(String))` keyed by source part name. Each value is the list of file paths written to the destination object storage when that part was exported (a single part can produce multiple files depending on `max_bytes` / `max_rows`). The map is rebuilt from ZooKeeper on every poll, so it grows as parts complete during `PENDING` and becomes the full picture once the task reaches `COMPLETED`. + +### Commit info columns + +These columns surface paths produced by the destination storage during commit, so it is possible to inspect what was written without consulting the destination directly: + +- `committed_metadata_file` — for Iceberg destinations: path of the new `vN.metadata.json` written by the commit. Empty for non-Iceberg destinations and before the commit lands. If the commit was already finished by a previous run (detected via the transaction id stored in the snapshot summary), this column carries a human-readable sentinel string instead of a path because the original committer's paths are not recoverable from inside the impl. +- `committed_manifest_list` — for Iceberg destinations: path of the manifest list file (`snap-*.avro`) referenced by the new snapshot. Empty under the same conditions as `committed_metadata_file`. +- `committed_manifest_file` — for Iceberg destinations: path of the manifest file referenced by `committed_manifest_list`. Empty under the same conditions as `committed_metadata_file`. +- `committed_marker_file` — for plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet. + +To pick the latest exception across replicas: + +```sql +SELECT + arraySort(x -> -x.time, last_exception_per_replica)[1] AS latest_exception +FROM system.replicated_partition_exports +WHERE source_table = 'rmt_table' AND destination_table = 's3_table'; +``` + ## Related Features - [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated) diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index acfabc28ca61..dea68c833fa1 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -152,6 +152,82 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry } }; +/// Per-task "commit info" record persisted at /commit_info. +/// +/// Written exactly once, atomically with the status -> COMPLETED transition +/// (see ExportPartitionUtils::commit). Captures the metadata-layer file paths +/// produced by the destination storage during commit so they can be surfaced in +/// system.replicated_partition_exports for debugging. +/// +/// All Iceberg fields are empty for non-Iceberg destinations. They may also be +/// empty for an Iceberg destination if the committing replica crashed between +/// writing the object-storage files and writing this znode; in that case the +/// task still transitions to COMPLETED via the recovery path but commit_info +/// remains absent. This is best-effort observability and acceptable. +struct ExportReplicatedMergeTreePartitionCommitInfoEntry +{ + /// Iceberg: path (in destination object storage) of the new vN.metadata.json + /// written by the commit. + String iceberg_metadata_file; + + /// Iceberg: path of the snap---.avro manifest list + /// referenced by the new snapshot. + String iceberg_manifest_list; + + /// Iceberg: path of the manifest entry file (*.avro) referenced by the + /// manifest list. + String iceberg_manifest_file; + + /// Plain object storage: path of the commit marker file written by + /// StorageObjectStorage::commitExportPartitionTransaction. Empty for Iceberg. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + + std::string toJsonString() const + { + Poco::JSON::Object json; + json.set("iceberg_metadata_file", iceberg_metadata_file); + json.set("iceberg_manifest_list", iceberg_manifest_list); + json.set("iceberg_manifest_file", iceberg_manifest_file); + json.set("commit_marker_file", commit_marker_file); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); + } + + static ExportReplicatedMergeTreePartitionCommitInfoEntry fromJsonString(const std::string & json_string) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry entry; + if (json_string.empty()) + return entry; + + Poco::JSON::Parser parser; + auto json = parser.parse(json_string).extract(); + + if (json->has("iceberg_metadata_file")) + entry.iceberg_metadata_file = json->getValue("iceberg_metadata_file"); + if (json->has("iceberg_manifest_list")) + entry.iceberg_manifest_list = json->getValue("iceberg_manifest_list"); + + if (json->has("iceberg_manifest_file")) + entry.iceberg_manifest_file = json->getValue("iceberg_manifest_file"); + + if (json->has("commit_marker_file")) + entry.commit_marker_file = json->getValue("commit_marker_file"); + + return entry; + } +}; + struct ExportReplicatedMergeTreePartitionManifest { String transaction_id; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 8af873e0b89c..bf7b3bdfa6a4 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include "Core/QualifiedTableName.h" @@ -40,6 +41,20 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// An empty map means no replica has recorded an exception yet for this task. mutable std::map last_exception_per_replica; + /// In-memory mirror of /processed/ leaves in ZK, keyed by + /// part name. Each value is the list of destination file paths produced by the + /// per-part export (typically Parquet object-storage keys). Refreshed on every + /// poll() cycle and on status-change handler invocations; served verbatim to + /// system.replicated_partition_exports without any extra ZK read at query time. + /// An empty map means no part has finished exporting yet for this task. + mutable std::map> destination_file_paths_per_part; + + /// In-memory mirror of the /commit_info znode (written atomically + /// with the COMPLETED status transition; see ExportPartitionUtils::commit). + /// nullopt until commit_info is observed in ZK. Empty fields inside the struct + /// for non-Iceberg destinations. + mutable std::optional commit_info; + std::string getCompositeKey() const { const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table}; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0a07b892cef8..35732dd6e277 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -511,7 +511,30 @@ It is currently only implemented in StorageObjectStorage. std::vector partition_values; }; - virtual void commitExportPartitionTransaction( + /// Paths produced by the destination storage during commit. Surfaced via + /// system.replicated_partition_exports for debugging. Empty for commits + /// that short-circuit on idempotency. + struct ExportPartitionCommitInfo + { + /// Iceberg destinations only. + String iceberg_metadata_file; + String iceberg_manifest_list; + String iceberg_manifest_file; + + /// Plain object storage destinations only: path of the commit marker file + /// written/observed by StorageObjectStorage::commitExportPartitionTransaction. + String commit_marker_file; + + bool empty() const + { + return iceberg_metadata_file.empty() + && iceberg_manifest_list.empty() + && iceberg_manifest_file.empty() + && commit_marker_file.empty(); + } + }; + + virtual ExportPartitionCommitInfo commitExportPartitionTransaction( const String & /* transaction_id */, const String & /* partition_id */, const Strings & /* exported_paths */, diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index b45f3667be87..4e9a654c5c05 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -127,6 +127,107 @@ namespace return out; } + /// Fetch all per-part processed leaves under /processed and build a + /// fresh map keyed by part_name. Returns the destination file paths recorded for + /// each finished part. + /// + /// Same lenient semantics as readLastExceptionPerReplica: an empty result means + /// "nothing actionable" (transient ZK error, no children yet, or all leaves + /// concurrently removed) and callers MUST skip the assignment to preserve the + /// in-memory mirror across glitches. Safe because processed/ leaves are + /// written once on per-part success and never rewritten — the entire entry path + /// is wiped recursively at task cleanup, handled separately. + std::map> readDestinationFilePathsPerPart( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + std::map> out; + + const auto container_path = entry_path / "processed"; + + Strings children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + if (Coordination::Error::ZOK != zk->tryGetChildren(container_path, children)) + { + LOG_INFO(log, "ExportPartition Manifest Updating Task: failed to list processed leaves for {}, leaving in-memory copy untouched", log_key); + return out; + } + + if (children.empty()) + return out; + + std::vector paths; + paths.reserve(children.size()); + for (const auto & child : children) + paths.emplace_back(container_path / child); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, paths.size()); + auto responses = zk->tryGet(paths); + responses.waitForResponses(); + + for (size_t i = 0; i < paths.size(); ++i) + { + Coordination::GetResponse response; + try + { + response = responses[i]; + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: ZK error fetching processed leaf {} for {}, skipping", children[i], log_key); + continue; + } + + if (response.error != Coordination::Error::ZOK) + continue; + + try + { + auto entry = ExportReplicatedMergeTreePartitionProcessedPartEntry::fromJsonString(response.data); + out.emplace(std::move(entry.part_name), std::move(entry.paths_in_destination)); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed processed JSON for {} (leaf {}), ignoring", log_key, children[i]); + } + } + + return out; + } + + /// Read the optional /commit_info znode and return the parsed entry. + /// Returns nullopt when the znode is absent (task has not committed yet, peer + /// crashed before writing it, or transient ZK error). Callers should treat + /// nullopt as "leave the in-memory copy untouched". + std::optional readCommitInfo( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + const auto commit_info_path = entry_path / "commit_info"; + + std::string data; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + if (!zk->tryGet(commit_info_path, data)) + return std::nullopt; + + try + { + return ExportReplicatedMergeTreePartitionCommitInfoEntry::fromJsonString(data); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed commit_info JSON for {}, ignoring", log_key); + return std::nullopt; + } + } + /* Remove expired entries and fix non-committed exports that have already exported all parts. @@ -305,6 +406,8 @@ std::vector ExportPartitionManifestUpdatingTask:: ExportReplicatedMergeTreePartitionManifest manifest; ExportReplicatedMergeTreePartitionTaskEntry::Status status; std::map last_exception_per_replica; + std::map> destination_file_paths_per_part; + std::optional commit_info; }; std::vector snapshots; @@ -314,7 +417,12 @@ std::vector ExportPartitionManifestUpdatingTask:: snapshots.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) - snapshots.push_back(EntrySnapshot{entry.manifest, entry.status, entry.last_exception_per_replica}); + snapshots.emplace_back( + entry.manifest, + entry.status, + std::move(entry.last_exception_per_replica), + std::move(entry.destination_file_paths_per_part), + std::move(entry.commit_info)); } std::vector infos; @@ -345,6 +453,16 @@ std::vector ExportPartitionManifestUpdatingTask:: } info.exception_count = total_exception_count; + info.destination_file_paths_per_part = std::move(snapshot.destination_file_paths_per_part); + + if (snapshot.commit_info) + { + info.committed_metadata_file = snapshot.commit_info->iceberg_metadata_file; + info.committed_manifest_list = snapshot.commit_info->iceberg_manifest_list; + info.committed_manifest_file = snapshot.commit_info->iceberg_manifest_file; + info.committed_marker_file = snapshot.commit_info->commit_marker_file; + } + infos.emplace_back(std::move(info)); } @@ -412,15 +530,6 @@ void ExportPartitionManifestUpdatingTask::poll() const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - /// Read last_exception leaves (no watch). Surfacing exceptions in the system table relies - /// on this read being part of every poll cycle: per-part failures during PENDING do not - /// trigger a status watch, so the only refresh path while the task is still in-flight is - /// the periodic poll. An empty result collapses every "nothing actionable" case - /// (transient ZK error, no children, all leaves ZNONODE/malformed) into a no-op so the - /// in-memory copy stays intact. - auto last_exception_per_replica = readLastExceptionPerReplica( - zk, fs::path(entry_path), key, storage.log.load()); - const auto local_entry = entries_by_key.find(key); /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough @@ -428,14 +537,42 @@ void ExportPartitionManifestUpdatingTask::poll() bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end() && local_entry->manifest.transaction_id == metadata.transaction_id; - /// If the entry is up to date and we don't have the cleanup lock, refresh the in-memory - /// last_exception (surfaced by system.replicated_partition_exports) and early exit. - /// Direct mutation of the `mutable` field is safe under export_merge_tree_partition_mutex, - /// which is held throughout poll(). + /// Fast path: the entry is already mirrored, up to date, and we don't hold the cleanup + /// lock. The visible status of an existing entry is owned by handleStatusChanges (driven + /// by the per-entry status watch armed below when the entry was first added), so here we + /// only refresh the auxiliary leaves surfaced by system.replicated_partition_exports. + /// last_exception and destination paths are refreshed unconditionally so the table + /// converges even for transitions that do not flip the status leaf, e.g. transient + /// per-part exceptions accumulated during PENDING; commit_info is only fetched once the + /// entry has already been mirrored as COMPLETED (see below). No status read and no watch + /// is armed on this hot path. Direct mutation of the `mutable` fields is safe under + /// export_merge_tree_partition_mutex, which is held throughout poll(). if (!cleanup_lock && has_local_entry_and_is_up_to_date) { - if (!last_exception_per_replica.empty()) + if (auto last_exception_per_replica = readLastExceptionPerReplica( + zk, fs::path(entry_path), key, storage.log.load()); + !last_exception_per_replica.empty()) local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + + if (auto destination_file_paths_per_part = readDestinationFilePathsPerPart( + zk, fs::path(entry_path), key, storage.log.load()); + !destination_file_paths_per_part.empty()) + local_entry->destination_file_paths_per_part = std::move(destination_file_paths_per_part); + + /// commit_info exists in ZK only once the export has COMPLETED (it is written in the + /// same atomic multi that flips the status). Gate the read on the already-mirrored + /// status so we never read a guaranteed-absent znode while still in flight, and only + /// when we have not captured it yet. This also repairs the rare case where + /// handleStatusChanges flipped the status to COMPLETED but its own commit_info read + /// hit a transient error and left the field empty. + if (local_entry->status == ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED + && !local_entry->commit_info) + { + if (auto commit_info = readCommitInfo( + zk, fs::path(entry_path), key, storage.log.load())) + local_entry->commit_info = std::move(commit_info); + } + continue; } @@ -468,6 +605,29 @@ void ExportPartitionManifestUpdatingTask::poll() continue; } + /// Read the auxiliary leaves AFTER the status leaf. The commit / failure paths write the + /// status leaf together with the corresponding aux leaves (commit_info, last_exception) + /// in a single atomic multi, so reading status first guarantees that a terminal status + /// observed here comes with its aux data already visible. Reading them in the other order + /// opens a race where a freshly observed entry is mirrored with a terminal status but + /// empty aux data, which (for an up-to-date entry on the fast path that no longer arms a + /// watch) would never get corrected. An empty result collapses every "nothing actionable" + /// case (transient ZK error, no children, all leaves ZNONODE/malformed) into a no-op so + /// the in-memory copy stays intact. + auto last_exception_per_replica = readLastExceptionPerReplica( + zk, fs::path(entry_path), key, storage.log.load()); + auto destination_file_paths_per_part = readDestinationFilePathsPerPart( + zk, fs::path(entry_path), key, storage.log.load()); + /// commit_info exists in ZK only once status == COMPLETED (written atomically with it). + /// Because we already read the status leaf above, ZooKeeper's per-session sequential + /// consistency guarantees a COMPLETED status read here is accompanied by a visible + /// commit_info. Gating the read avoids ever mirroring a PENDING entry together with + /// commit paths and skips a guaranteed-absent read while in flight. + std::optional commit_info; + if (*status == ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED) + commit_info = readCommitInfo( + zk, fs::path(entry_path), key, storage.log.load()); + /// if we have the cleanup lock, try to cleanup /// if we successfully cleaned it up, early exit if (cleanup_lock) @@ -491,15 +651,19 @@ void ExportPartitionManifestUpdatingTask::poll() if (has_local_entry_and_is_up_to_date) { - /// Same refresh as the early-exit branch above; we also reach this point when - /// holding the cleanup lock (cleanup did not consume the entry). + /// Same refresh as the fast path above; we only reach this point for an up-to-date + /// entry when holding the cleanup lock (cleanup did not consume the entry). if (!last_exception_per_replica.empty()) local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + if (!destination_file_paths_per_part.empty()) + local_entry->destination_file_paths_per_part = std::move(destination_file_paths_per_part); + if (commit_info) + local_entry->commit_info = std::move(commit_info); LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; } - addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); + addTask(metadata, *status, std::move(last_exception_per_replica), std::move(destination_file_paths_per_part), std::move(commit_info), key, entries_by_key); } /// Remove entries that were deleted by someone else @@ -567,6 +731,8 @@ void ExportPartitionManifestUpdatingTask::addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ) @@ -589,7 +755,13 @@ void ExportPartitionManifestUpdatingTask::addTask( } /// Insert or update entry. The multi_index container automatically maintains both indexes. - ExportReplicatedMergeTreePartitionTaskEntry entry {metadata, status, std::move(part_references), std::move(last_exception_per_replica)}; + ExportReplicatedMergeTreePartitionTaskEntry entry { + metadata, + status, + std::move(part_references), + std::move(last_exception_per_replica), + std::move(destination_file_paths_per_part), + std::move(commit_info)}; auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); @@ -701,6 +873,28 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() it->last_exception_per_replica = std::move(fetched); } + /// Refresh per-part destination paths and commit_info on the status flip too, + /// so the system table observes the COMPLETED state and the committed file + /// paths in the same poll cycle. + if (auto fetched = readDestinationFilePathsPerPart( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); + !fetched.empty()) + { + it->destination_file_paths_per_part = std::move(fetched); + } + + /// commit_info is written atomically with the COMPLETED status, so only read it on that + /// transition. The status leaf was read above, so this read observes the matching + /// commit_info; FAILED / KILLED never produce a commit_info znode. + if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED) + { + if (auto fetched_commit_info = readCommitInfo( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load())) + { + it->commit_info = std::move(fetched_commit_info); + } + } + /// If status changed to KILLED, cancel local export operations if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 32487f2dc68c..71e16c1848b0 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -34,6 +34,8 @@ class ExportPartitionManifestUpdatingTask const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, std::map last_exception_per_replica, + std::map> destination_file_paths_per_part, + std::optional commit_info, const std::string & key, auto & entries_by_key ); diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 4a447c8de3ab..0cd4b086d871 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -208,7 +208,8 @@ namespace ExportPartitionUtils getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id); } - destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); + const auto destination_commit_info = destination_storage->commitExportPartitionTransaction( + manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); /// Failpoint to simulate a crash after the Iceberg commit succeeds but before /// ZooKeeper is updated to COMPLETED. Used by idempotency integration tests. @@ -221,16 +222,45 @@ namespace ExportPartitionUtils }); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); + + const std::string status_path = fs::path(entry_path) / "status"; + const std::string completed_name = String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(status_path, completed_name, -1)); + + if (!destination_commit_info.empty()) + { + ExportReplicatedMergeTreePartitionCommitInfoEntry commit_info_entry { + destination_commit_info.iceberg_metadata_file, + destination_commit_info.iceberg_manifest_list, + destination_commit_info.iceberg_manifest_file, + destination_commit_info.commit_marker_file}; + + const std::string commit_info_path = fs::path(entry_path) / "commit_info"; + ops.emplace_back(zkutil::makeCreateRequest(commit_info_path, commit_info_entry.toJsonString(), zkutil::CreateMode::Persistent)); + } + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperSet); - if (Coordination::Error::ZOK == zk->trySet(fs::path(entry_path) / "status", String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED)).data(), -1)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); + + Coordination::Responses responses; + const auto rc = zk->tryMulti(ops, responses); + + if (rc == Coordination::Error::ZOK) { - LOG_INFO(log, "ExportPartition: Marked export as completed"); + LOG_INFO(log, "ExportPartition: Marked export as completed{}", + destination_commit_info.empty() ? "" : " and persisted commit_info"); + return; } - else + + if (rc == Coordination::Error::ZNODEEXISTS) { - throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed, will not try to fix it"); + LOG_INFO(log, "ExportPartition: commit_info already present (peer wrote it first); task already COMPLETED"); + return; } + + throw Exception(ErrorCodes::NETWORK_ERROR, "ExportPartition: Failed to mark export as completed (rc={}), will not try to fix it", rc); } bool handleCommitFailure( diff --git a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp index c4669a9bf55c..b3c750129988 100644 --- a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp +++ b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp @@ -43,9 +43,9 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime) manifest3.transaction_id = "tx3"; manifest3.create_time = base_time; // Oldest - ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}, {}, {}}; // Insert in reverse order by_key.insert(entry1); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 85c91ba5de19..20194fabe92d 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -220,7 +221,7 @@ class IDataLakeMetadata : boost::noncopyable throwNotImplemented("import"); } - virtual void commitExportPartitionTransaction( + virtual IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr /* catalog */, const StorageID & /* table_id */, const String & /* transaction_id */, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 95daff72ea11..c4f42a01e062 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1530,7 +1530,7 @@ Poco::JSON::Object::Ptr lookupSchema(const Poco::JSON::Object::Ptr & meta, Int64 } -bool IcebergMetadata::commitImportPartitionTransactionImpl( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, @@ -1556,7 +1556,13 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return true; + /// Surface a sentinel so the caller treats this as a successful attempt (non-empty + /// commit info), persists a commit_info znode, and makes the situation visible in + /// system.replicated_partition_exports.committed_metadata_file. We do not know the + /// original committer's paths from here. + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } CompressionMethod metadata_compression_method = persistent_components.metadata_compression_method; @@ -1753,7 +1759,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( { LOG_DEBUG(log, "Failed to write metadata {}, retrying", metadata_info.path); cleanup(true); - return false; + return {}; } LOG_DEBUG(log, "Metadata file {} written", metadata_info.path); @@ -1766,7 +1772,7 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) { cleanup(true); - return false; + return {}; } /// Catalog has accepted the commit - the new snapshot is now live and references @@ -1809,11 +1815,16 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( /// post-publish work (e.g. metadata-cache invalidation). Running cleanup() /// here would delete manifest files referenced by the published snapshot /// and corrupt it. Log and swallow - any transient state (stale cache) - /// is self-healing on subsequent reads. + /// is self-healing on subsequent reads. Surface the published paths anyway + /// so the partition export task can persist them in ZooKeeper. tryLogCurrentException(log, "Post-publish work failed after Iceberg snapshot was committed; " "skipping manifest cleanup to preserve published snapshot"); - return true; + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; + return published_info; } LOG_ERROR(log, "Failed to commit import partition transaction: {}", getCurrentExceptionMessage(false)); @@ -1821,10 +1832,18 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( throw; } - return true; + /// Record the storage paths of the files we just published so the partition + /// export task can persist them in ZooKeeper for observability. Only set here + /// (not on the retry / "already committed" paths) so the struct reflects + /// exactly what this attempt produced. + IStorage::ExportPartitionCommitInfo published_info; + published_info.iceberg_metadata_file = resolver.resolve(metadata_info.path); + published_info.iceberg_manifest_list = storage_manifest_list_name; + published_info.iceberg_manifest_file = storage_manifest_entry_path; + return published_info; } -void IcebergMetadata::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo IcebergMetadata::commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -1867,7 +1886,9 @@ void IcebergMetadata::commitExportPartitionTransaction( LOG_INFO(log, "Export transaction {} already committed, skipping re-commit", transaction_id); - return; + IStorage::ExportPartitionCommitInfo already_committed_info; + already_committed_info.iceberg_metadata_file = ""; + return already_committed_info; } /// Fail fast if the table schema or partition spec changed between export-start and commit. @@ -1926,7 +1947,7 @@ void IcebergMetadata::commitExportPartitionTransaction( size_t attempt = 0; while (attempt < MAX_TRANSACTION_RETRIES) { - if (commitImportPartitionTransactionImpl( + auto commit_info = commitImportPartitionTransactionImpl( filename_generator, metadata, partition_spec, @@ -1944,10 +1965,9 @@ void IcebergMetadata::commitExportPartitionTransaction( total_chunks_size, catalog, table_id, - context)) - { - return; - } + context); + if (!commit_info.empty()) + return commit_info; ++attempt; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 2e28df93e0e6..8ce000b4c8fc 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -135,7 +135,7 @@ class IcebergMetadata : public IDataLakeMetadata /// data_file_paths contains the metadata-path for each exported data file (as recorded in /// ZooKeeper). For every path a co-located sidecar Avro file (same path, ".avro" extension) /// must exist in the object storage; it supplies record_count and file_size_in_bytes. - void commitExportPartitionTransaction( + IStorage::ExportPartitionCommitInfo commitExportPartitionTransaction( std::shared_ptr catalog, const StorageID & table_id, const String & transaction_id, @@ -210,7 +210,12 @@ class IcebergMetadata : public IDataLakeMetadata std::optional getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const; - bool commitImportPartitionTransactionImpl( + /// Non-empty return value means the attempt succeeded (covers both the normal + /// publish path and the `isExportPartitionTransactionAlreadyCommitted` short-circuit). + /// An empty `ExportPartitionCommitInfo` means the caller must retry. The + /// short-circuit branch fills `iceberg_metadata_file` with a sentinel note since + /// the original committer's paths are not trivially recoverable from inside this call. + IStorage::ExportPartitionCommitInfo commitImportPartitionTransactionImpl( FileNamesGenerator & filename_generator, Poco::JSON::Object::Ptr & metadata, Poco::JSON::Object::Ptr & partition_spec, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index c08db678c46c..980b2a636d1b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -668,7 +668,7 @@ SinkToStoragePtr StorageObjectStorage::import( local_context); } -void StorageObjectStorage::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorage::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -689,7 +689,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( const auto partition_spec_id = iceberg_metadata->getValue(Iceberg::f_default_spec_id); configuration->lazyInitializeIfNeeded(object_storage, local_context); - configuration->getExternalMetadata()->commitExportPartitionTransaction( + return configuration->getExternalMetadata()->commitExportPartitionTransaction( catalog, storage_id, transaction_id, @@ -699,16 +699,20 @@ void StorageObjectStorage::commitExportPartitionTransaction( std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), exported_paths, local_context); - return; } const String commit_object = configuration->getRawPath().path + "/commit_" + partition_id + "_" + transaction_id; + ExportPartitionCommitInfo result; + result.commit_marker_file = commit_object; + /// if file already exists, nothing to be done if (object_storage->exists(StoredObject(commit_object))) { LOG_DEBUG(getLogger("StorageObjectStorage"), "Commit file already exists, nothing to be done: {}", commit_object); - return; + /// Still surface the path: observability does not require we wrote it, + /// only that it is the committed marker for this transaction. + return result; } auto out = object_storage->writeObject(StoredObject(commit_object), WriteMode::Rewrite, /* attributes= */ {}, DBMS_DEFAULT_BUFFER_SIZE, local_context->getWriteSettings()); @@ -718,6 +722,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( out->write("\n", 1); } out->finalize(); + return result; } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 8c04b71d9487..6ace629b7085 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -89,7 +89,7 @@ class StorageObjectStorage : public IStorage const std::optional & /* format_settings_ */, ContextPtr /* context */) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index e5131d06ae2e..6211237d220e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -1111,7 +1111,7 @@ SinkToStoragePtr StorageObjectStorageCluster::import( context); } -void StorageObjectStorageCluster::commitExportPartitionTransaction( +IStorage::ExportPartitionCommitInfo StorageObjectStorageCluster::commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, @@ -1120,16 +1120,15 @@ void StorageObjectStorageCluster::commitExportPartitionTransaction( { if (pure_storage) { - pure_storage->commitExportPartitionTransaction( + return pure_storage->commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, iceberg_commit_export_partition_arguments, local_context ); - return; } - IStorageCluster::commitExportPartitionTransaction( + return IStorageCluster::commitExportPartitionTransaction( transaction_id, partition_id, exported_paths, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 52c7d5951855..f5fa4e7434bd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -44,7 +44,7 @@ class StorageObjectStorageCluster : public IStorageCluster const std::optional & format_settings_, ContextPtr context) override; - void commitExportPartitionTransaction( + ExportPartitionCommitInfo commitExportPartitionTransaction( const String & transaction_id, const String & partition_id, const Strings & exported_paths, diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 9e8faab689d6..25fd130d0198 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,16 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, {"exception_count", std::make_shared(), "Sum of per-replica exception counts. Each replica owns its own count, so the sum is exact w.r.t. the in-memory snapshot; within-replica updates remain best-effort and may under-count by one under concurrent failures."}, + {"destination_file_paths", std::make_shared(std::make_shared(), std::make_shared(std::make_shared())), + "Per-part destination file paths written to the destination object storage. Keyed by part name; values are the file paths produced by exporting that part. Mirrored from ZooKeeper on every poll while PENDING; partial during in-flight tasks."}, + {"committed_metadata_file", std::make_shared(), + "For Iceberg destinations: path of the new metadata JSON file written at commit time. Empty for non-Iceberg destinations and for tasks that have not committed yet. May also be empty if the committing replica crashed between writing the object-storage files and persisting commit_info. If the export was already committed by a previous run (detected via the transaction id stored in the snapshot summary), this column holds a human-readable note instead of a path since the original committer's paths are not trivially recoverable."}, + {"committed_manifest_list", std::make_shared(), + "For Iceberg destinations: path of the manifest list file (snap-*.avro) referenced by the new snapshot. Empty under the same conditions as committed_metadata_file."}, + {"committed_manifest_file", std::make_shared(), + "For Iceberg destinations: path of the manifest file referenced by committed_manifest_list. Empty under the same conditions as committed_metadata_file."}, + {"committed_marker_file", std::make_shared(), + "For plain object storage destinations: path of the per-transaction commit marker file written by the destination. Empty for Iceberg destinations and for tasks that have not committed yet."}, }; } @@ -152,6 +163,25 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu per_replica.push_back(Tuple{ex.replica, ex.message, ex.part, ex.time, ex.count}); res_columns[i++]->insert(per_replica); res_columns[i++]->insert(info.exception_count); + + Map destination_paths_map; + destination_paths_map.reserve(info.destination_file_paths_per_part.size()); + for (const auto & [part_name, paths] : info.destination_file_paths_per_part) + { + Array paths_array; + paths_array.reserve(paths.size()); + for (const auto & path : paths) + paths_array.push_back(path); + destination_paths_map.emplace_back(Tuple{part_name, std::move(paths_array)}); + } + res_columns[i++]->insert(std::move(destination_paths_map)); + + res_columns[i++]->insert(info.committed_metadata_file); + res_columns[i++]->insert(info.committed_manifest_list); + + res_columns[i++]->insert(info.committed_manifest_file); + + res_columns[i++]->insert(info.committed_marker_file); } } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index a8666374a7f0..244fee61ae4a 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -29,6 +29,19 @@ struct ReplicatedPartitionExportInfo /// single replica the count is best-effort (concurrent failing writers may under- /// count by one), matching the documented column semantics. size_t exception_count = 0; + /// Per-part destination file paths, keyed by part name. Mirrors the + /// /processed//paths_in_destination data from ZooKeeper. + /// Empty until parts complete; partial during PENDING. + std::map> destination_file_paths_per_part; + /// Iceberg commit-time paths surfaced from /commit_info. + /// All empty for non-Iceberg destinations or before commit lands. + String committed_metadata_file; + String committed_manifest_list; + String committed_manifest_file; + /// Plain object storage commit marker file surfaced from + /// /commit_info. Empty for Iceberg destinations or before + /// commit lands. + String committed_marker_file; }; class StorageSystemReplicatedPartitionExports final : public IStorageSystemOneBlock diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 8b49589a3005..fddee4e1850a 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -830,8 +830,9 @@ def test_post_publish_exception_preserves_snapshot(cluster): post-publish region (after both the metadata file is written and `published = true` is set). With the fix in place: - the commit stays durable (snapshot is readable, manifests are intact); - - the export is marked COMPLETED because the idempotency check on retry - detects that the transaction is already committed and returns success; + - the export is marked COMPLETED because the outer `catch (...)` sees + `published == true` and returns the populated commit info with the real + paths produced by this attempt (no retry needed); - all exported rows are visible through the Iceberg table. """ node = cluster.instances["replica1"] @@ -862,6 +863,28 @@ def test_post_publish_exception_preserves_snapshot(cluster): f"Unexpected data after post-publish exception recovery:\n{result}" ) + # After a post-publish exception the catch handler with published==true returns + # the populated commit info (real metadata / manifest list / manifest file paths). + # ExportPartitionUtils::commit persists it to the commit_info znode, so the system + # table should show a real metadata path here, not the already-committed sentinel. + committed_metadata_file = node.query( + f""" + SELECT committed_metadata_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + AND partition_id = '2020' + """ + ).strip() + assert committed_metadata_file, ( + "committed_metadata_file should be populated after a successful post-publish-catch return" + ) + assert not committed_metadata_file.startswith("<"), ( + f"committed_metadata_file should be a real metadata path, got the already-committed sentinel: {committed_metadata_file!r}" + ) + assert committed_metadata_file.endswith(".metadata.json"), ( + f"Expected a *.metadata.json path in committed_metadata_file, got: {committed_metadata_file!r}" + ) + def test_export_task_timeout_kills_stuck_pending_task(cluster): """ diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3161e3b67100..74f03033976d 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -693,6 +693,29 @@ def test_export_partition_file_already_exists_policy(cluster): # wait for the exports to finish wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + # plain object storage destinations surface the commit marker file path via + # system.replicated_partition_exports.committed_marker_file + committed_marker_file = node.query( + f""" + SELECT committed_marker_file FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '2020' + """ + ).strip() + # `committed_marker_file` is the absolute key in the bucket (same convention as + # `destination_file_paths`); it may carry the s3_conn URL's in-bucket prefix on + # top of the table's `filename` argument, so use a "contains" check that does + # not depend on knowing that prefix. + assert f"{s3_table}/commit_2020_" in committed_marker_file, \ + f"Expected committed_marker_file under {s3_table}/, got: {committed_marker_file!r}" + # Path relative to the `s3_conn` URL, derived from the absolute key without + # assuming a particular URL prefix. + marker_relative_path = committed_marker_file[committed_marker_file.index(f"{s3_table}/"):] + assert node.query( + f"SELECT count() FROM s3(s3_conn, filename='{marker_relative_path}', format=LineAsString)" + ) == '1\n', f"Commit marker file does not exist at {committed_marker_file!r}" + # try to export the partition node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py index 5466fe543275..9e83d160d2f9 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py @@ -712,6 +712,19 @@ def test_idempotency_after_commit_crash(export_cluster): count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) assert count == 3, f"Expected 3 rows (no duplicates), got {count}" + # The already-committed early-exit in commitExportPartitionTransaction surfaces + # a sentinel note in committed_metadata_file (the original committer's paths + # are not recoverable from inside the call). The sentinel makes the situation + # visible in system.replicated_partition_exports rather than leaving the + # commit_info columns empty. + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + def test_commit_attempts_budget_transitions_to_failed(export_cluster): """ diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py index 8a216860d029..4b1aa1e7d22c 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py @@ -367,6 +367,14 @@ def test_catalog_idempotent_retry(catalog_export_cluster): f"got {len(history)}" ) + committed_metadata_file = node.query( + f"SELECT committed_metadata_file FROM system.replicated_partition_exports " + f"WHERE source_table = '{source}' AND partition_id = '{pid}'" + ).strip() + assert committed_metadata_file == "", ( + f"Expected already-committed sentinel after idempotent retry, got: {committed_metadata_file!r}" + ) + # --------------------------------------------------------------------------- # Replicated catalog tests