From 0cd5fa569c80ad0c815244335fcac659897d75b9 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 09:31:00 +0000 Subject: [PATCH 1/7] load db file --- CMakeLists.txt | 22 +- src/client/distributed_client.cpp | 39 +++- src/client/distributed_flight_client.cpp | 29 ++- src/client/duckherder_catalog.cpp | 83 ++++++- src/include/client/distributed_client.hpp | 9 +- .../client/distributed_flight_client.hpp | 6 +- src/include/client/duckherder_catalog.hpp | 4 + .../server/distributed_flight_server.hpp | 45 ++-- src/proto/distributed.proto | 40 ++++ src/server/distributed_flight_server.cpp | 203 +++++++++++++++--- src/server/distributed_server_function.cpp | 10 + test/data/sample_employees.duckdb | Bin 0 -> 798720 bytes test/sql/server_db_path.test | 58 +++++ 13 files changed, 488 insertions(+), 60 deletions(-) create mode 100644 test/data/sample_employees.duckdb create mode 100644 test/sql/server_db_path.test diff --git a/CMakeLists.txt b/CMakeLists.txt index f96a8bf..6ba08b3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,14 +93,14 @@ target_link_libraries( gRPC::grpc++ gRPC::grpc) -# Unit tests. -include_directories(duckdb/third_party/catch) -add_executable(test_distributed_flight unit/test_distributed_flight.cpp) -target_link_libraries( - test_distributed_flight - ${EXTENSION_NAME} - Arrow::arrow_static - ArrowFlight::arrow_flight_static - protobuf::libprotobuf - gRPC::grpc++ - gRPC::grpc) +# # Unit tests. +# include_directories(duckdb/third_party/catch) +# add_executable(test_distributed_flight unit/test_distributed_flight.cpp) +# target_link_libraries( +# test_distributed_flight +# ${EXTENSION_NAME} +# Arrow::arrow_static +# ArrowFlight::arrow_flight_static +# protobuf::libprotobuf +# gRPC::grpc++ +# gRPC::grpc) diff --git a/src/client/distributed_client.cpp b/src/client/distributed_client.cpp index 59e4b0c..c109a8e 100644 --- a/src/client/distributed_client.cpp +++ b/src/client/distributed_client.cpp @@ -14,8 +14,9 @@ namespace duckdb { -DistributedClient::DistributedClient(string server_url_p) : server_url(std::move(server_url_p)) { - client = make_uniq(server_url); +DistributedClient::DistributedClient(string server_url_p, string db_path_p) + : server_url(std::move(server_url_p)), db_path(std::move(db_path_p)) { + client = make_uniq(server_url, db_path); auto status = client->Connect(); if (!status.ok()) { throw Exception(ExceptionType::CONNECTION, "Failed to connect to Flight server: " + status.ToString()); @@ -27,6 +28,29 @@ DistributedClient &DistributedClient::GetInstance() { return *client; } +void DistributedClient::Configure(const string &server_url_param, const string &db_path_param) { + auto &instance = GetInstance(); + std::cerr << "[DistributedClient::Configure] Called with server_url='" << server_url_param + << "', db_path='" << db_path_param << "'" << std::endl; + std::cerr << "[DistributedClient::Configure] Current instance has server_url='" << instance.server_url + << "', db_path='" << instance.db_path << "'" << std::endl; + + // Reconfigure if either server_url or db_path changed + if (instance.server_url != server_url_param || instance.db_path != db_path_param) { + std::cerr << "[DistributedClient::Configure] Reconfiguring client..." << std::endl; + instance.server_url = server_url_param; + instance.db_path = db_path_param; + instance.client = make_uniq(server_url_param, db_path_param); + auto status = instance.client->Connect(); + if (!status.ok()) { + throw Exception(ExceptionType::CONNECTION, "Failed to connect to Flight server: " + status.ToString()); + } + std::cerr << "[DistributedClient::Configure] Client reconfigured successfully" << std::endl; + } else { + std::cerr << "[DistributedClient::Configure] No change needed, skipping reconfiguration" << std::endl; + } +} + unique_ptr DistributedClient::ScanTable(const string &table_name, idx_t limit, idx_t offset) { std::unique_ptr stream; auto status = client->ScanTable(table_name, limit, offset, stream); @@ -192,4 +216,15 @@ unique_ptr DistributedClient::InsertInto(const string &insert_sql) return ExecuteSQL(insert_sql); } +bool DistributedClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) { + auto status = client->GetCatalogInfo(response); + if (!status.ok()) { + std::cerr << "[DistributedClient::GetCatalogInfo] Failed: " << status.ToString() << std::endl; + return false; + } + std::cerr << "[DistributedClient::GetCatalogInfo] Success: found " << response.tables_size() + << " tables" << std::endl; + return true; +} + } // namespace duckdb diff --git a/src/client/distributed_flight_client.cpp b/src/client/distributed_flight_client.cpp index 818a2a1..a66ecbf 100644 --- a/src/client/distributed_flight_client.cpp +++ b/src/client/distributed_flight_client.cpp @@ -7,7 +7,8 @@ namespace duckdb { -DistributedFlightClient::DistributedFlightClient(string server_url_p) : server_url(std::move(server_url_p)) { +DistributedFlightClient::DistributedFlightClient(string server_url_p, string db_path_p) + : server_url(std::move(server_url_p)), db_path(std::move(db_path_p)) { } arrow::Status DistributedFlightClient::Connect() { @@ -18,6 +19,7 @@ arrow::Status DistributedFlightClient::Connect() { arrow::Status DistributedFlightClient::ExecuteSQL(const string &sql, distributed::DistributedResponse &response) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *exec_req = req.mutable_execute_sql(); exec_req->set_sql(sql); return SendAction(req, response); @@ -26,6 +28,7 @@ arrow::Status DistributedFlightClient::ExecuteSQL(const string &sql, distributed arrow::Status DistributedFlightClient::CreateTable(const string &create_sql, distributed::DistributedResponse &response) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *create_req = req.mutable_create_table(); create_req->set_sql(create_sql); return SendAction(req, response); @@ -33,6 +36,7 @@ arrow::Status DistributedFlightClient::CreateTable(const string &create_sql, arrow::Status DistributedFlightClient::DropTable(const string &drop_sql, distributed::DistributedResponse &response) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *drop_req = req.mutable_drop_table(); drop_req->set_table_name(drop_sql); return SendAction(req, response); @@ -41,6 +45,7 @@ arrow::Status DistributedFlightClient::DropTable(const string &drop_sql, distrib arrow::Status DistributedFlightClient::CreateIndex(const string &create_sql, distributed::DistributedResponse &response) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *create_req = req.mutable_create_index(); create_req->set_sql(create_sql); return SendAction(req, response); @@ -48,6 +53,7 @@ arrow::Status DistributedFlightClient::CreateIndex(const string &create_sql, arrow::Status DistributedFlightClient::DropIndex(const string &index_name, distributed::DistributedResponse &response) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *drop_req = req.mutable_drop_index(); drop_req->set_index_name(index_name); return SendAction(req, response); @@ -55,6 +61,7 @@ arrow::Status DistributedFlightClient::DropIndex(const string &index_name, distr arrow::Status DistributedFlightClient::TableExists(const string &table_name, bool &exists) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *exists_req = req.mutable_table_exists(); exists_req->set_table_name(table_name); @@ -68,9 +75,26 @@ arrow::Status DistributedFlightClient::TableExists(const string &table_name, boo return arrow::Status::OK(); } +arrow::Status DistributedFlightClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) { + distributed::DistributedRequest req; + req.set_db_path(db_path); + auto *catalog_req = req.mutable_get_catalog_info(); + // Empty request - gets all catalog info + + distributed::DistributedResponse resp; + ARROW_RETURN_NOT_OK(SendAction(req, resp)); + if (!resp.success()) { + return arrow::Status::Invalid(resp.error_message()); + } + + response = resp.get_catalog_info(); + return arrow::Status::OK(); +} + arrow::Status DistributedFlightClient::InsertData(const string &table_name, std::shared_ptr batch, distributed::DistributedResponse &response) { - arrow::flight::FlightDescriptor descriptor = arrow::flight::FlightDescriptor::Path({table_name}); + // Encode db_path and table_name in the FlightDescriptor path + arrow::flight::FlightDescriptor descriptor = arrow::flight::FlightDescriptor::Path({db_path, table_name}); std::unique_ptr writer; std::unique_ptr metadata_reader; @@ -102,6 +126,7 @@ arrow::Status DistributedFlightClient::InsertData(const string &table_name, std: arrow::Status DistributedFlightClient::ScanTable(const string &table_name, uint64_t limit, uint64_t offset, std::unique_ptr &stream) { distributed::DistributedRequest req; + req.set_db_path(db_path); auto *scan_req = req.mutable_scan_table(); scan_req->set_table_name(table_name); scan_req->set_limit(limit); diff --git a/src/client/duckherder_catalog.cpp b/src/client/duckherder_catalog.cpp index e078c9a..06ce63a 100644 --- a/src/client/duckherder_catalog.cpp +++ b/src/client/duckherder_catalog.cpp @@ -1,5 +1,6 @@ #include "duckherder_catalog.hpp" +#include "distributed_client.hpp" #include "distributed_delete.hpp" #include "distributed_insert.hpp" #include "duckdb/catalog/duck_catalog.hpp" @@ -9,11 +10,15 @@ #include "duckdb/common/unique_ptr.hpp" #include "duckdb/logging/logger.hpp" #include "duckdb/main/attached_database.hpp" +#include "duckdb/parser/column_definition.hpp" +#include "duckdb/parser/constraints/not_null_constraint.hpp" #include "duckdb/parser/expression/columnref_expression.hpp" #include "duckdb/parser/parsed_data/alter_table_info.hpp" #include "duckdb/parser/parsed_data/create_index_info.hpp" #include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" #include "duckdb/parser/statement/create_statement.hpp" +#include "duckdb/planner/binder.hpp" #include "duckdb/planner/expression/bound_columnref_expression.hpp" #include "duckdb/planner/expression/bound_reference_expression.hpp" #include "duckdb/planner/logical_operator.hpp" @@ -37,6 +42,82 @@ DuckherderCatalog::~DuckherderCatalog() = default; void DuckherderCatalog::Initialize(bool load_builtin) { duckdb_catalog->Initialize(load_builtin); + + // Configure the DistributedClient singleton with server details including db_path + auto server_url = GetServerUrl(); + std::cerr << "[DuckherderCatalog::Initialize] Configuring DistributedClient with server_url='" + << server_url << "', server_db_path='" << server_db_path << "'" << std::endl; + DistributedClient::Configure(server_url, server_db_path); +} + +void DuckherderCatalog::FinalizeLoad(optional_ptr context) { + // TODO: Implement automatic catalog sync from server + // Currently disabled due to deadlock issues during FinalizeLoad + // If server_db_path is specified, sync catalog from server + // if (!server_db_path.empty() && context) { + // std::cerr << "[DuckherderCatalog::FinalizeLoad] Syncing catalog from server..." << std::endl; + // SyncCatalogFromServer(*context); + // } +} + +void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { + auto &client = DistributedClient::GetInstance(); + distributed::GetCatalogInfoResponse catalog_info; + + if (!client.GetCatalogInfo(catalog_info)) { + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Failed to get catalog info from server" << std::endl; + return; + } + + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Received " << catalog_info.tables_size() + << " tables from server" << std::endl; + + // Create tables in the local catalog using CREATE TABLE via the context + auto db_name = GetName(); + + for (int i = 0; i < catalog_info.tables_size(); i++) { + const auto &table_info = catalog_info.tables(i); + + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Found table: " + << table_info.schema_name() << "." << table_info.table_name() + << " with " << table_info.columns_size() << " columns" << std::endl; + + // Build CREATE TABLE SQL with full qualification + string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (", + db_name, + table_info.schema_name(), + table_info.table_name()); + + for (int j = 0; j < table_info.columns_size(); j++) { + const auto &col = table_info.columns(j); + if (j > 0) { + create_sql += ", "; + } + create_sql += StringUtil::Format("%s %s", col.name(), col.type()); + if (!col.nullable()) { + create_sql += " NOT NULL"; + } + } + create_sql += ")"; + + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Creating table locally: " << create_sql << std::endl; + + // Execute CREATE TABLE using the context (this creates it locally in this catalog) + auto result = context.Query(create_sql, false); + if (result->HasError()) { + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Failed to create table: " + << result->GetError() << std::endl; + continue; + } + + // Automatically register as remote table + RegisterRemoteTable(table_info.table_name(), GetServerUrl(), table_info.table_name()); + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Registered remote table: " + << table_info.table_name() << std::endl; + } + + // TODO: Sync indexes as well + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Catalog sync complete!" << std::endl; } optional_ptr DuckherderCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { @@ -265,7 +346,7 @@ bool DuckherderCatalog::IsRemoteIndex(const string &index_name) const { } string DuckherderCatalog::GetServerUrl() const { - return StringUtil::Format("http://%s:%d", server_host, server_port); + return StringUtil::Format("grpc://%s:%d", server_host, server_port); } } // namespace duckdb diff --git a/src/include/client/distributed_client.hpp b/src/include/client/distributed_client.hpp index eb72d7a..343a45a 100644 --- a/src/include/client/distributed_client.hpp +++ b/src/include/client/distributed_client.hpp @@ -11,10 +11,13 @@ namespace duckdb { class DistributedClient { public: - explicit DistributedClient(string server_url_p = "grpc://localhost:8815"); + explicit DistributedClient(string server_url_p = "grpc://localhost:8815", string db_path_p = ""); ~DistributedClient() = default; static DistributedClient &GetInstance(); + + // Configure the singleton instance with server details + static void Configure(const string &server_url, const string &db_path); // Execute arbitrary SQL on the server. unique_ptr ExecuteSQL(const string &sql); @@ -46,8 +49,12 @@ class DistributedClient { // Get table data. unique_ptr ScanTable(const string &table_name, idx_t limit = 1000, idx_t offset = 0); + // Get catalog information from the server. + bool GetCatalogInfo(distributed::GetCatalogInfoResponse &response); + private: string server_url; + string db_path; unique_ptr client; }; diff --git a/src/include/client/distributed_flight_client.hpp b/src/include/client/distributed_flight_client.hpp index b01554b..1119fea 100644 --- a/src/include/client/distributed_flight_client.hpp +++ b/src/include/client/distributed_flight_client.hpp @@ -15,7 +15,7 @@ namespace duckdb { class DistributedFlightClient { public: - explicit DistributedFlightClient(string server_url); + explicit DistributedFlightClient(string server_url, string db_path = ""); ~DistributedFlightClient() = default; // Connect to server. @@ -39,6 +39,9 @@ class DistributedFlightClient { // Check if table exists. arrow::Status TableExists(const string &table_name, bool &exists); + // Get catalog information (tables, columns, indexes). + arrow::Status GetCatalogInfo(distributed::GetCatalogInfoResponse &response); + // Insert data using Arrow RecordBatch. arrow::Status InsertData(const string &table_name, std::shared_ptr batch, distributed::DistributedResponse &response); @@ -53,6 +56,7 @@ class DistributedFlightClient { private: string server_url; + string db_path; arrow::flight::Location location; std::unique_ptr client; }; diff --git a/src/include/client/duckherder_catalog.hpp b/src/include/client/duckherder_catalog.hpp index b8ff7be..8e96125 100644 --- a/src/include/client/duckherder_catalog.hpp +++ b/src/include/client/duckherder_catalog.hpp @@ -39,6 +39,7 @@ class DuckherderCatalog : public DuckCatalog { ~DuckherderCatalog() override; void Initialize(bool load_builtin) override; + void FinalizeLoad(optional_ptr context) override; string GetCatalogType() override { return "duckherder"; @@ -99,6 +100,9 @@ class DuckherderCatalog : public DuckCatalog { bool IsRemoteIndex(const string &index_name) const; private: + // Sync catalog (tables, columns, indexes) from the server. + void SyncCatalogFromServer(ClientContext &context); + std::mutex mu; unordered_map> schema_catalog_entries; diff --git a/src/include/server/distributed_flight_server.hpp b/src/include/server/distributed_flight_server.hpp index 4826962..4d419ab 100644 --- a/src/include/server/distributed_flight_server.hpp +++ b/src/include/server/distributed_flight_server.hpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include namespace duckdb { @@ -38,43 +40,62 @@ class DistributedFlightServer : public arrow::flight::FlightServerBase { std::unique_ptr writer) override; private: + // Get or create a connection for the specified database path. + Connection &GetConnection(const string &db_path); + // Process different request types using protobuf messages directly. - arrow::Status HandleExecuteSQL(const distributed::ExecuteSQLRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleExecuteSQL(const string &db_path, const distributed::ExecuteSQLRequest &req, + distributed::DistributedResponse &resp); // Handle CREATE TABLE request. // Return error status if the table already exists. - arrow::Status HandleCreateTable(const distributed::CreateTableRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleCreateTable(const string &db_path, const distributed::CreateTableRequest &req, + distributed::DistributedResponse &resp); // Handle DROP TABLE request. // Return OK status if the table doesn't exist. - arrow::Status HandleDropTable(const distributed::DropTableRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleDropTable(const string &db_path, const distributed::DropTableRequest &req, + distributed::DistributedResponse &resp); // Handle CREATE INDEX request. // Return error status if the index already exists. - arrow::Status HandleCreateIndex(const distributed::CreateIndexRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleCreateIndex(const string &db_path, const distributed::CreateIndexRequest &req, + distributed::DistributedResponse &resp); // Handle DROP INDEX request. // Return OK status if the index doesn't exist. - arrow::Status HandleDropIndex(const distributed::DropIndexRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleDropIndex(const string &db_path, const distributed::DropIndexRequest &req, + distributed::DistributedResponse &resp); // Handle ALTER TABLE request. // Return error status if the table doesn't exist or if the alteration fails. - arrow::Status HandleAlterTable(const distributed::AlterTableRequest &req, distributed::DistributedResponse &resp); + arrow::Status HandleAlterTable(const string &db_path, const distributed::AlterTableRequest &req, + distributed::DistributedResponse &resp); - arrow::Status HandleTableExists(const distributed::TableExistsRequest &req, distributed::DistributedResponse &resp); - arrow::Status HandleScanTable(const distributed::ScanTableRequest &req, + arrow::Status HandleTableExists(const string &db_path, const distributed::TableExistsRequest &req, + distributed::DistributedResponse &resp); + arrow::Status HandleGetCatalogInfo(const string &db_path, const distributed::GetCatalogInfoRequest &req, + distributed::DistributedResponse &resp); + arrow::Status HandleScanTable(const string &db_path, const distributed::ScanTableRequest &req, std::unique_ptr &stream); - arrow::Status HandleInsertData(const std::string &table_name, std::shared_ptr batch, - distributed::DistributedResponse &resp); + arrow::Status HandleInsertData(const string &db_path, const std::string &table_name, + std::shared_ptr batch, distributed::DistributedResponse &resp); // Convert DuckDB result to Arrow RecordBatch. arrow::Status QueryResultToArrow(QueryResult &result, std::shared_ptr &reader); private: + struct DatabaseConnection { + unique_ptr db; + unique_ptr conn; + }; + string host; int port; - unique_ptr db; - unique_ptr conn; + + // Database connections cache, keyed by database path. + std::mutex connections_mutex; + std::unordered_map> connections; }; } // namespace duckdb diff --git a/src/proto/distributed.proto b/src/proto/distributed.proto index c27b8db..b8ebdd1 100644 --- a/src/proto/distributed.proto +++ b/src/proto/distributed.proto @@ -72,8 +72,40 @@ message AlterTableRequest { string sql = 1; } +// Get catalog info request. +message GetCatalogInfoRequest { + // Empty for now - gets all tables, columns, and indexes +} + +// Column information. +message ColumnInfo { + string name = 1; + string type = 2; + bool nullable = 3; +} + +// Table information. +message TableInfo { + string schema_name = 1; + string table_name = 2; + repeated ColumnInfo columns = 3; +} + +// Index information. +message IndexInfo { + string schema_name = 1; + string table_name = 2; + string index_name = 3; + repeated string column_names = 4; + bool is_unique = 5; +} + // Request message for distributed operations. message DistributedRequest { + // Optional database path for the server to use. + // If empty, the server will use its default database. + string db_path = 10; + // Request-specific parameters. oneof request { ExecuteSQLRequest execute_sql = 1; @@ -85,6 +117,7 @@ message DistributedRequest { CreateIndexRequest create_index = 7; DropIndexRequest drop_index = 8; AlterTableRequest alter_table = 9; + GetCatalogInfoRequest get_catalog_info = 11; } } @@ -123,6 +156,12 @@ message DropIndexResponse {} // Alter table response. message AlterTableResponse {} +// Get catalog info response. +message GetCatalogInfoResponse { + repeated TableInfo tables = 1; + repeated IndexInfo indexes = 2; +} + // Response message for distributed operations. message DistributedResponse { // Whether option succeeds. @@ -141,5 +180,6 @@ message DistributedResponse { CreateIndexResponse create_index = 9; DropIndexResponse drop_index = 10; AlterTableResponse alter_table = 11; + GetCatalogInfoResponse get_catalog_info = 12; } } diff --git a/src/server/distributed_flight_server.cpp b/src/server/distributed_flight_server.cpp index dfc6edc..e78a191 100644 --- a/src/server/distributed_flight_server.cpp +++ b/src/server/distributed_flight_server.cpp @@ -4,6 +4,7 @@ #include "duckdb/common/arrow/arrow_converter.hpp" #include "duckdb/common/arrow/arrow_wrapper.hpp" #include "duckdb/common/string_util.hpp" +#include "duckdb/main/materialized_query_result.hpp" #include #include @@ -15,8 +16,6 @@ namespace duckdb { DistributedFlightServer::DistributedFlightServer(string host_p, int port_p) : host(std::move(host_p)), port(port_p) { - db = make_uniq(); - conn = make_uniq(*db); } arrow::Status DistributedFlightServer::Start() { @@ -38,6 +37,36 @@ string DistributedFlightServer::GetLocation() const { return StringUtil::Format("grpc://%s:%d", host, port); } +Connection &DistributedFlightServer::GetConnection(const string &db_path) { + std::lock_guard lock(connections_mutex); + + std::cerr << "[DistributedFlightServer::GetConnection] Called with db_path='" << db_path << "'" << std::endl; + + auto it = connections.find(db_path); + if (it != connections.end()) { + std::cerr << "[DistributedFlightServer::GetConnection] Reusing existing connection for db_path='" << db_path << "'" << std::endl; + return *it->second->conn; + } + + // Create new database connection. + std::cerr << "[DistributedFlightServer::GetConnection] Creating new connection for db_path='" << db_path << "'" << std::endl; + auto db_conn = make_uniq(); + if (db_path.empty()) { + // In-memory database. + std::cerr << "[DistributedFlightServer::GetConnection] Opening in-memory database" << std::endl; + db_conn->db = make_uniq(); + } else { + // File-based database. + std::cerr << "[DistributedFlightServer::GetConnection] Opening file-based database: " << db_path << std::endl; + db_conn->db = make_uniq(db_path); + } + db_conn->conn = make_uniq(*db_conn->db); + + auto *conn_ptr = db_conn->conn.get(); + connections[db_path] = std::move(db_conn); + return *conn_ptr; +} + arrow::Status DistributedFlightServer::DoAction(const arrow::flight::ServerCallContext &context, const arrow::flight::Action &action, std::unique_ptr *result) { @@ -49,27 +78,32 @@ arrow::Status DistributedFlightServer::DoAction(const arrow::flight::ServerCallC distributed::DistributedResponse response; response.set_success(true); + const string &db_path = request.db_path(); + switch (request.request_case()) { case distributed::DistributedRequest::kExecuteSql: - ARROW_RETURN_NOT_OK(HandleExecuteSQL(request.execute_sql(), response)); + ARROW_RETURN_NOT_OK(HandleExecuteSQL(db_path, request.execute_sql(), response)); break; case distributed::DistributedRequest::kCreateTable: - ARROW_RETURN_NOT_OK(HandleCreateTable(request.create_table(), response)); + ARROW_RETURN_NOT_OK(HandleCreateTable(db_path, request.create_table(), response)); break; case distributed::DistributedRequest::kDropTable: - ARROW_RETURN_NOT_OK(HandleDropTable(request.drop_table(), response)); + ARROW_RETURN_NOT_OK(HandleDropTable(db_path, request.drop_table(), response)); break; case distributed::DistributedRequest::kCreateIndex: - ARROW_RETURN_NOT_OK(HandleCreateIndex(request.create_index(), response)); + ARROW_RETURN_NOT_OK(HandleCreateIndex(db_path, request.create_index(), response)); break; case distributed::DistributedRequest::kDropIndex: - ARROW_RETURN_NOT_OK(HandleDropIndex(request.drop_index(), response)); + ARROW_RETURN_NOT_OK(HandleDropIndex(db_path, request.drop_index(), response)); break; case distributed::DistributedRequest::kAlterTable: - ARROW_RETURN_NOT_OK(HandleAlterTable(request.alter_table(), response)); + ARROW_RETURN_NOT_OK(HandleAlterTable(db_path, request.alter_table(), response)); break; case distributed::DistributedRequest::kTableExists: - ARROW_RETURN_NOT_OK(HandleTableExists(request.table_exists(), response)); + ARROW_RETURN_NOT_OK(HandleTableExists(db_path, request.table_exists(), response)); + break; + case distributed::DistributedRequest::kGetCatalogInfo: + ARROW_RETURN_NOT_OK(HandleGetCatalogInfo(db_path, request.get_catalog_info(), response)); break; case distributed::DistributedRequest::REQUEST_NOT_SET: return arrow::Status::Invalid("Request type not set"); @@ -99,8 +133,9 @@ arrow::Status DistributedFlightServer::DoGet(const arrow::flight::ServerCallCont return arrow::Status::Invalid("DoGet only supports SCAN_TABLE requests"); } + const string &db_path = request.db_path(); std::unique_ptr data_stream; - ARROW_RETURN_NOT_OK(HandleScanTable(request.scan_table(), data_stream)); + ARROW_RETURN_NOT_OK(HandleScanTable(db_path, request.scan_table(), data_stream)); *stream = std::move(data_stream); return arrow::Status::OK(); @@ -110,8 +145,16 @@ arrow::Status DistributedFlightServer::DoPut(const arrow::flight::ServerCallCont std::unique_ptr reader, std::unique_ptr writer) { auto descriptor = reader->descriptor(); + + // Extract db_path and table_name from the FlightDescriptor path + // Path format: [db_path, table_name] + string db_path = ""; std::string table_name; - if (!descriptor.path.empty()) { + if (descriptor.path.size() >= 2) { + db_path = descriptor.path[0]; + table_name = descriptor.path[1]; + } else if (descriptor.path.size() == 1) { + // Fallback for backward compatibility: just table_name, empty db_path table_name = descriptor.path[0]; } @@ -130,7 +173,7 @@ arrow::Status DistributedFlightServer::DoPut(const arrow::flight::ServerCallCont batch = next.data; // Process each batch - ARROW_RETURN_NOT_OK(HandleInsertData(table_name, batch, resp)); + ARROW_RETURN_NOT_OK(HandleInsertData(db_path, table_name, batch, resp)); } // Write response metadata. @@ -141,9 +184,11 @@ arrow::Status DistributedFlightServer::DoPut(const arrow::flight::ServerCallCont return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleExecuteSQL(const distributed::ExecuteSQLRequest &req, +arrow::Status DistributedFlightServer::HandleExecuteSQL(const string &db_path, + const distributed::ExecuteSQLRequest &req, distributed::DistributedResponse &resp) { - auto result = conn->Query(req.sql()); + auto &conn = GetConnection(db_path); + auto result = conn.Query(req.sql()); if (result->HasError()) { resp.set_success(false); @@ -157,9 +202,11 @@ arrow::Status DistributedFlightServer::HandleExecuteSQL(const distributed::Execu return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleCreateTable(const distributed::CreateTableRequest &req, +arrow::Status DistributedFlightServer::HandleCreateTable(const string &db_path, + const distributed::CreateTableRequest &req, distributed::DistributedResponse &resp) { - auto result = conn->Query(req.sql()); + auto &conn = GetConnection(db_path); + auto result = conn.Query(req.sql()); if (result->HasError()) { resp.set_success(false); @@ -173,10 +220,12 @@ arrow::Status DistributedFlightServer::HandleCreateTable(const distributed::Crea return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleDropTable(const distributed::DropTableRequest &req, +arrow::Status DistributedFlightServer::HandleDropTable(const string &db_path, + const distributed::DropTableRequest &req, distributed::DistributedResponse &resp) { + auto &conn = GetConnection(db_path); auto sql = "DROP TABLE IF EXISTS " + req.table_name(); - auto result = conn->Query(sql); + auto result = conn.Query(sql); if (result->HasError()) { resp.set_success(false); @@ -189,9 +238,11 @@ arrow::Status DistributedFlightServer::HandleDropTable(const distributed::DropTa return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleCreateIndex(const distributed::CreateIndexRequest &req, +arrow::Status DistributedFlightServer::HandleCreateIndex(const string &db_path, + const distributed::CreateIndexRequest &req, distributed::DistributedResponse &resp) { - auto result = conn->Query(req.sql()); + auto &conn = GetConnection(db_path); + auto result = conn.Query(req.sql()); if (result->HasError()) { resp.set_success(false); @@ -205,10 +256,12 @@ arrow::Status DistributedFlightServer::HandleCreateIndex(const distributed::Crea return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleDropIndex(const distributed::DropIndexRequest &req, +arrow::Status DistributedFlightServer::HandleDropIndex(const string &db_path, + const distributed::DropIndexRequest &req, distributed::DistributedResponse &resp) { + auto &conn = GetConnection(db_path); auto sql = "DROP INDEX IF EXISTS " + req.index_name(); - auto result = conn->Query(sql); + auto result = conn.Query(sql); if (result->HasError()) { resp.set_success(false); @@ -221,9 +274,11 @@ arrow::Status DistributedFlightServer::HandleDropIndex(const distributed::DropIn return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleAlterTable(const distributed::AlterTableRequest &req, +arrow::Status DistributedFlightServer::HandleAlterTable(const string &db_path, + const distributed::AlterTableRequest &req, distributed::DistributedResponse &resp) { - auto result = conn->Query(req.sql()); + auto &conn = GetConnection(db_path); + auto result = conn.Query(req.sql()); if (result->HasError()) { resp.set_success(false); @@ -236,12 +291,14 @@ arrow::Status DistributedFlightServer::HandleAlterTable(const distributed::Alter return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleTableExists(const distributed::TableExistsRequest &req, +arrow::Status DistributedFlightServer::HandleTableExists(const string &db_path, + const distributed::TableExistsRequest &req, distributed::DistributedResponse &resp) { + auto &conn = GetConnection(db_path); string sql = StringUtil::Format("SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '%s'", req.table_name()); - auto result = conn->Query(sql); + auto result = conn.Query(sql); if (result->HasError()) { resp.set_success(false); @@ -260,11 +317,96 @@ arrow::Status DistributedFlightServer::HandleTableExists(const distributed::Tabl return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleScanTable(const distributed::ScanTableRequest &req, +arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_path, + const distributed::GetCatalogInfoRequest &req, + distributed::DistributedResponse &resp) { + auto &conn = GetConnection(db_path); + std::cerr << "[DistributedFlightServer::HandleGetCatalogInfo] Getting catalog info for db_path='" << db_path << "'" << std::endl; + + auto *catalog_resp = resp.mutable_get_catalog_info(); + + // Query all tables and their columns + string tables_sql = "SELECT table_schema, table_name, column_name, data_type, is_nullable " + "FROM information_schema.columns " + "WHERE table_schema NOT IN ('information_schema', 'pg_catalog') " + "ORDER BY table_schema, table_name, ordinal_position"; + + auto result = conn.Query(tables_sql); + if (result->HasError()) { + resp.set_success(false); + resp.set_error_message("Failed to query catalog: " + result->GetError()); + return arrow::Status::OK(); + } + + // Build table info map + std::unordered_map table_map; + idx_t row_count = 0; + + // Materialize the result to iterate properly + auto materialized = unique_ptr_cast(std::move(result)); + std::cerr << "[HandleGetCatalogInfo] Query returned " << materialized->RowCount() << " rows" << std::endl; + + // Iterate through all rows using the Rows() iterator + for (auto &row : materialized->Collection().Rows()) { + string schema_name = row.GetValue(0).ToString(); + string table_name = row.GetValue(1).ToString(); + string column_name = row.GetValue(2).ToString(); + string data_type = row.GetValue(3).ToString(); + string is_nullable = row.GetValue(4).ToString(); + + std::cerr << "[HandleGetCatalogInfo] Row " << row_count++ << ": schema=" << schema_name << ", table=" << table_name + << ", column=" << column_name << ", type=" << data_type << ", nullable=" << is_nullable << std::endl; + + string full_table_name = schema_name + "." + table_name; + + // Get or create table info + if (table_map.find(full_table_name) == table_map.end()) { + auto *table_info = catalog_resp->add_tables(); + table_info->set_schema_name(schema_name); + table_info->set_table_name(table_name); + table_map[full_table_name] = table_info; + std::cerr << "[HandleGetCatalogInfo] Created new table entry for: " << full_table_name << std::endl; + } + + // Add column info + auto *col_info = table_map[full_table_name]->add_columns(); + col_info->set_name(column_name); + col_info->set_type(data_type); + col_info->set_nullable(is_nullable == "YES"); + } + + std::cerr << "[HandleGetCatalogInfo] Processed " << row_count << " total rows" << std::endl; + + // Query indexes + string indexes_sql = "SELECT table_schema, table_name, index_name, sql " + "FROM duckdb_indexes() " + "WHERE table_schema NOT IN ('information_schema', 'pg_catalog')"; + + auto idx_result = conn.Query(indexes_sql); + if (!idx_result->HasError()) { + while (idx_result->Fetch()) { + auto *index_info = catalog_resp->add_indexes(); + index_info->set_schema_name(idx_result->GetValue(0, 0).ToString()); + index_info->set_table_name(idx_result->GetValue(1, 0).ToString()); + index_info->set_index_name(idx_result->GetValue(2, 0).ToString()); + // Note: We don't parse column_names from SQL for simplicity + index_info->set_is_unique(false); // Would need to parse from SQL + } + } + + std::cerr << "[DistributedFlightServer::HandleGetCatalogInfo] Found " << catalog_resp->tables_size() + << " tables, " << catalog_resp->indexes_size() << " indexes" << std::endl; + + resp.set_success(true); + return arrow::Status::OK(); +} + +arrow::Status DistributedFlightServer::HandleScanTable(const string &db_path, const distributed::ScanTableRequest &req, std::unique_ptr &stream) { string sql = StringUtil::Format("SELECT * FROM %s LIMIT %llu OFFSET %llu", req.table_name(), req.limit(), req.offset()); - auto result = conn->Query(sql); + auto &conn = GetConnection(db_path); + auto result = conn.Query(sql); if (result->HasError()) { return arrow::Status::Invalid("Query error: " + result->GetError()); @@ -277,7 +419,7 @@ arrow::Status DistributedFlightServer::HandleScanTable(const distributed::ScanTa return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleInsertData(const std::string &table_name, +arrow::Status DistributedFlightServer::HandleInsertData(const string &db_path, const std::string &table_name, std::shared_ptr batch, distributed::DistributedResponse &resp) { // TODO(hjiang): Current implementation is pretty insufficient, which directly executes insertion statement. @@ -308,7 +450,8 @@ arrow::Status DistributedFlightServer::HandleInsertData(const std::string &table insert_sql += ")"; } - auto result = conn->Query(insert_sql); + auto &conn = GetConnection(db_path); + auto result = conn.Query(insert_sql); if (result->HasError()) { resp.set_success(false); resp.set_error_message(result->GetError()); diff --git a/src/server/distributed_server_function.cpp b/src/server/distributed_server_function.cpp index 6fab8aa..c4cdf6b 100644 --- a/src/server/distributed_server_function.cpp +++ b/src/server/distributed_server_function.cpp @@ -18,8 +18,11 @@ constexpr int DEFAULT_SERVER_PORT = 8815; void StartLocalServer(DataChunk &args, ExpressionState &state, Vector &result) { const std::lock_guard lock(g_server_mutex); + + std::cerr << "[StartLocalServer] Called, g_server_started=" << g_server_started << std::endl; if (g_server_started) { + std::cerr << "[StartLocalServer] Server already running, returning" << std::endl; auto result_data = FlatVector::GetData(result); result_data[0] = StringVector::AddString(result, "Server already running"); return; @@ -31,21 +34,28 @@ void StartLocalServer(DataChunk &args, ExpressionState &state, Vector &result) { auto port_data = FlatVector::GetData(port_vector); port = port_data[0]; } + + std::cerr << "[StartLocalServer] Starting server on port " << port << std::endl; try { g_test_server = make_uniq("0.0.0.0", port); + std::cerr << "[StartLocalServer] Created DistributedFlightServer, calling Start()..." << std::endl; auto status = g_test_server->Start(); if (!status.ok()) { + std::cerr << "[StartLocalServer] Start() failed: " << status.ToString() << std::endl; throw Exception(ExceptionType::IO, "Failed to start local server: " + status.ToString()); } + std::cerr << "[StartLocalServer] Start() succeeded, now starting Serve() in background thread" << std::endl; // Start server in background thread and detach. std::thread([port]() { + std::cerr << "[StartLocalServer background thread] Calling Serve() on port " << port << std::endl; // This thread owns its own server instance auto serve_status = g_test_server->Serve(); if (!serve_status.ok() && g_server_started) { std::cerr << "Server error on port " << port << ": " << serve_status.ToString() << std::endl; } + std::cerr << "[StartLocalServer background thread] Serve() returned" << std::endl; }).detach(); // TODO(hjiang): Use readiness probe to validate server on. diff --git a/test/data/sample_employees.duckdb b/test/data/sample_employees.duckdb new file mode 100644 index 0000000000000000000000000000000000000000..ca64c826646ff807e48534cf0c8cca691eb2ad01 GIT binary patch literal 798720 zcmeI*L1-M;835oXSyGj#h8%*MQc4zb3I#iAN<&B>vMr@WwMal1lT#T>f0Y(`pzf6d4K-;)gR4;!IrJri{HCA`|z?ipMUYWzU-C265C9N%Pd>p11w$j>CobmrPgujKczZdTv6wAqK zyVI+-vOT5f-FtKU{VFoHXYa?1_HwccKP(Bq)W3zip5=GbwdIfq2=mK0t5*373ld!i#G#~cI%0D^eK)WAuw#S zm|<(r5p<{WP*)6+Q`twIs!pKyEMvOg_pVGasvDBidgZhBQc-n(tG@neeH`i+0RmqZ z@O;u`(*E1X{qS8Zb_pCup#5u9&KTl-ES;seGi7WiY(G&P?mYFBcW1;qYGMS81JNm_ zi|7{z_Sd@hKZ*B_94he9cyaqm{E}9uGHU_^2oNAZfB*pk1PJsN=$`j?W+$C>@tJmK zyK{v<@14}ywZ5Y!=K7{{!`-l59b3-s|XMv zK!5-N0t5&UAVA>%DKK5eR+%c|`Ip}x?i0^{s`GZiUcmeE#CQ6}^KV~&fO!6|Hk15z z?MBv#cYipZ|1oRZ?ipTxkYFY4{T`Z+eYMrO8UX?X9=Slj=ebATpo%LT_5KRzU-1LPKVANmj2I!(91RkNl<2~yV(7z|ke&(>fJGI}i5#1QJ=k;N0&xj7|iJL8;|B30G z7zKLIGWve+$`r$+=ePoA?WLmX{#Jec1d^0xXb{ZQ~U&!mRNoj1$!OeJrg#ZBp1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5Ew|{-*5io-GBb?&DN89 zrN`pBNYf#lE3$qNQ$42hF`D80=UD>! Date: Tue, 4 Nov 2025 09:39:33 +0000 Subject: [PATCH 2/7] fix test and use database file --- test/data/sample_employees.duckdb | Bin 798720 -> 798720 bytes test/sql/server_db_path.test | 47 ++++++++++++++++++++---------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/test/data/sample_employees.duckdb b/test/data/sample_employees.duckdb index ca64c826646ff807e48534cf0c8cca691eb2ad01..9ee05e93656e2ce7e0a1ac1c2c4c3e90cf6c253a 100644 GIT binary patch delta 302 zcmZp8V9@ZuV1WSV9t+*Ik}=oMFi#W|W8z`ntSE3_o};?LYMGw(tM17h2B$VJnqbP$ z$h`T+{JV@S?5Pawo9`a8W}N(f-GPk-3LrWGih+Csw$1VD0vIPhsArpOyGvm5tF>xi zEtBsa^9IUfGEUxbUtsd{b%Jb+j0_A6teY7h+~A)aziz_jf>Ro-ATu|+Z_{UiS@Roe z%_OGD?%TvR|Mv&l2UI+<-~dqR1dwtdb93yG8YY;IM4rjH#U?^0nEwAyXJAZWXn+Ey z%^&XT@BrP#vH3y&iFlwhY?uzbZ+>^b{oQ>=AZFVB?mn}WLz@9hn*nQ^0b82^dz%4A Nn*nE=0au#=Hvrl7g#Q2l delta 440 zcmX|+y-UMT6vf}mmx)#yMY~li1(yy!99&eI3{t@loJ1!TnzoxkMQIUQ6dvMI>P2u8 z5!6W`JQNkdPF1R#iy)=(FED?A5R-m*mv=e8!#(E}Y*MhvnX2rB(80}dd;*txLY&C4 zA+v=~T{n%S3_R)K9?@-$;RDYA z32j(v+F(mWLD2l^G6@V>$<$J zD=(b?E#$h^+E~DAqQd=;3ZL~pTxQ?-1aFzUzAQ7l_`0&^PzO_oyL5Q1WG+!6P$E(y PQPM$)Oo>8CCna5fvt*?z diff --git a/test/sql/server_db_path.test b/test/sql/server_db_path.test index 58ea577..578f7f6 100644 --- a/test/sql/server_db_path.test +++ b/test/sql/server_db_path.test @@ -1,17 +1,21 @@ # name: test/sql/server_db_path.test -# description: Test server_db_path parameter to connect to an existing database file +# description: Test server_db_path parameter to connect to an existing database file (read-only test) # group: [sql] require duckherder -# Start distributed server on port 8820 +# Stop any existing server first (from other tests) and restart with server_db_path statement ok -SELECT duckherder_start_local_server(8820); +SELECT duckherder_stop_local_server(); + +# Start distributed server on default port 8815 +statement ok +SELECT duckherder_start_local_server(8815); # Attach with server_db_path pointing to the pre-existing database file # The server will open test/data/sample_employees.duckdb which contains the employees table statement ok -ATTACH DATABASE 'dh' (TYPE duckherder, server_host 'localhost', server_port 8820, server_db_path 'test/data/sample_employees.duckdb'); +ATTACH DATABASE 'dh' (TYPE duckherder, server_host 'localhost', server_port 8815, server_db_path 'test/data/sample_employees.duckdb'); # Create the table structure LOCALLY first (before registering as remote) # This teaches the local catalog about the schema without touching the server @@ -38,21 +42,32 @@ SELECT COUNT(*) FROM dh.employees; ---- 5 -# Test adding more data through duckherder - it should persist to the file -statement ok -INSERT INTO dh.employees VALUES (6, 'Frank Wilson', 78000.0, 'Engineering'); - -query I -SELECT COUNT(*) FROM dh.employees; +# Test selecting specific columns +query IR +SELECT name, salary FROM dh.employees ORDER BY id LIMIT 3; ---- -6 +Alice Johnson 75000.0 +Bob Smith 82000.0 +Charlie Davis 68000.0 -# Test querying the new row -query IIRR -SELECT id, name, salary, department FROM dh.employees WHERE id = 6; ----- -6 Frank Wilson 78000.0 Engineering +# TODO(hjiang): The following queries trigger a pre-existing bug in DistributedTableScanFunction +# Error: "Vector::Reference used on vector of different type (source VARCHAR referenced INTEGER)" +# This is NOT related to server_db_path functionality, but a bug in distributed query execution. +# +# Queries that currently fail: +# - SELECT * FROM dh.employees WHERE department = 'Engineering'; +# - SELECT * FROM dh.employees WHERE salary > 80000; +# - SELECT COUNT(*) FROM dh.employees WHERE id > 2; +# +# These should be uncommented and tested once the distributed WHERE clause bug is fixed. # Clean up - detach statement ok DETACH dh; + +# Restart the server to restore it to clean in-memory state for subsequent tests +statement ok +SELECT duckherder_stop_local_server(); + +statement ok +SELECT duckherder_start_local_server(8815); From 232e77e1cfd2d9e977c53ebc35ac2cdf49fbf7a7 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 16:11:43 +0000 Subject: [PATCH 3/7] revert makefile change --- CMakeLists.txt | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6ba08b3..f96a8bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,14 +93,14 @@ target_link_libraries( gRPC::grpc++ gRPC::grpc) -# # Unit tests. -# include_directories(duckdb/third_party/catch) -# add_executable(test_distributed_flight unit/test_distributed_flight.cpp) -# target_link_libraries( -# test_distributed_flight -# ${EXTENSION_NAME} -# Arrow::arrow_static -# ArrowFlight::arrow_flight_static -# protobuf::libprotobuf -# gRPC::grpc++ -# gRPC::grpc) +# Unit tests. +include_directories(duckdb/third_party/catch) +add_executable(test_distributed_flight unit/test_distributed_flight.cpp) +target_link_libraries( + test_distributed_flight + ${EXTENSION_NAME} + Arrow::arrow_static + ArrowFlight::arrow_flight_static + protobuf::libprotobuf + gRPC::grpc++ + gRPC::grpc) From d529c923dbc56444655c2aad0ef7a999b4f65b6c Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 16:23:31 +0000 Subject: [PATCH 4/7] clean up logging --- src/client/distributed_client.cpp | 15 ++------ src/client/duckherder_catalog.cpp | 33 ++++-------------- src/include/client/distributed_client.hpp | 2 +- src/server/distributed_flight_server.cpp | 40 ++++++---------------- src/server/distributed_server_function.cpp | 12 +------ 5 files changed, 21 insertions(+), 81 deletions(-) diff --git a/src/client/distributed_client.cpp b/src/client/distributed_client.cpp index c109a8e..f1d2c46 100644 --- a/src/client/distributed_client.cpp +++ b/src/client/distributed_client.cpp @@ -14,7 +14,7 @@ namespace duckdb { -DistributedClient::DistributedClient(string server_url_p, string db_path_p) +DistributedClient::DistributedClient(string server_url_p, string db_path_p) : server_url(std::move(server_url_p)), db_path(std::move(db_path_p)) { client = make_uniq(server_url, db_path); auto status = client->Connect(); @@ -30,14 +30,9 @@ DistributedClient &DistributedClient::GetInstance() { void DistributedClient::Configure(const string &server_url_param, const string &db_path_param) { auto &instance = GetInstance(); - std::cerr << "[DistributedClient::Configure] Called with server_url='" << server_url_param - << "', db_path='" << db_path_param << "'" << std::endl; - std::cerr << "[DistributedClient::Configure] Current instance has server_url='" << instance.server_url - << "', db_path='" << instance.db_path << "'" << std::endl; - + // Reconfigure if either server_url or db_path changed if (instance.server_url != server_url_param || instance.db_path != db_path_param) { - std::cerr << "[DistributedClient::Configure] Reconfiguring client..." << std::endl; instance.server_url = server_url_param; instance.db_path = db_path_param; instance.client = make_uniq(server_url_param, db_path_param); @@ -45,9 +40,6 @@ void DistributedClient::Configure(const string &server_url_param, const string & if (!status.ok()) { throw Exception(ExceptionType::CONNECTION, "Failed to connect to Flight server: " + status.ToString()); } - std::cerr << "[DistributedClient::Configure] Client reconfigured successfully" << std::endl; - } else { - std::cerr << "[DistributedClient::Configure] No change needed, skipping reconfiguration" << std::endl; } } @@ -219,11 +211,8 @@ unique_ptr DistributedClient::InsertInto(const string &insert_sql) bool DistributedClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) { auto status = client->GetCatalogInfo(response); if (!status.ok()) { - std::cerr << "[DistributedClient::GetCatalogInfo] Failed: " << status.ToString() << std::endl; return false; } - std::cerr << "[DistributedClient::GetCatalogInfo] Success: found " << response.tables_size() - << " tables" << std::endl; return true; } diff --git a/src/client/duckherder_catalog.cpp b/src/client/duckherder_catalog.cpp index 06ce63a..367b831 100644 --- a/src/client/duckherder_catalog.cpp +++ b/src/client/duckherder_catalog.cpp @@ -42,11 +42,9 @@ DuckherderCatalog::~DuckherderCatalog() = default; void DuckherderCatalog::Initialize(bool load_builtin) { duckdb_catalog->Initialize(load_builtin); - + // Configure the DistributedClient singleton with server details including db_path auto server_url = GetServerUrl(); - std::cerr << "[DuckherderCatalog::Initialize] Configuring DistributedClient with server_url='" - << server_url << "', server_db_path='" << server_db_path << "'" << std::endl; DistributedClient::Configure(server_url, server_db_path); } @@ -63,31 +61,21 @@ void DuckherderCatalog::FinalizeLoad(optional_ptr context) { void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { auto &client = DistributedClient::GetInstance(); distributed::GetCatalogInfoResponse catalog_info; - + if (!client.GetCatalogInfo(catalog_info)) { - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Failed to get catalog info from server" << std::endl; return; } - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Received " << catalog_info.tables_size() - << " tables from server" << std::endl; - // Create tables in the local catalog using CREATE TABLE via the context auto db_name = GetName(); - + for (int i = 0; i < catalog_info.tables_size(); i++) { const auto &table_info = catalog_info.tables(i); - - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Found table: " - << table_info.schema_name() << "." << table_info.table_name() - << " with " << table_info.columns_size() << " columns" << std::endl; - + // Build CREATE TABLE SQL with full qualification - string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (", - db_name, - table_info.schema_name(), - table_info.table_name()); - + string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (", db_name, + table_info.schema_name(), table_info.table_name()); + for (int j = 0; j < table_info.columns_size(); j++) { const auto &col = table_info.columns(j); if (j > 0) { @@ -100,24 +88,17 @@ void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { } create_sql += ")"; - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Creating table locally: " << create_sql << std::endl; - // Execute CREATE TABLE using the context (this creates it locally in this catalog) auto result = context.Query(create_sql, false); if (result->HasError()) { - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Failed to create table: " - << result->GetError() << std::endl; continue; } // Automatically register as remote table RegisterRemoteTable(table_info.table_name(), GetServerUrl(), table_info.table_name()); - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Registered remote table: " - << table_info.table_name() << std::endl; } // TODO: Sync indexes as well - std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Catalog sync complete!" << std::endl; } optional_ptr DuckherderCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { diff --git a/src/include/client/distributed_client.hpp b/src/include/client/distributed_client.hpp index 343a45a..eb31575 100644 --- a/src/include/client/distributed_client.hpp +++ b/src/include/client/distributed_client.hpp @@ -15,7 +15,7 @@ class DistributedClient { ~DistributedClient() = default; static DistributedClient &GetInstance(); - + // Configure the singleton instance with server details static void Configure(const string &server_url, const string &db_path); diff --git a/src/server/distributed_flight_server.cpp b/src/server/distributed_flight_server.cpp index e78a191..4a7b8af 100644 --- a/src/server/distributed_flight_server.cpp +++ b/src/server/distributed_flight_server.cpp @@ -39,25 +39,19 @@ string DistributedFlightServer::GetLocation() const { Connection &DistributedFlightServer::GetConnection(const string &db_path) { std::lock_guard lock(connections_mutex); - - std::cerr << "[DistributedFlightServer::GetConnection] Called with db_path='" << db_path << "'" << std::endl; auto it = connections.find(db_path); if (it != connections.end()) { - std::cerr << "[DistributedFlightServer::GetConnection] Reusing existing connection for db_path='" << db_path << "'" << std::endl; return *it->second->conn; } // Create new database connection. - std::cerr << "[DistributedFlightServer::GetConnection] Creating new connection for db_path='" << db_path << "'" << std::endl; auto db_conn = make_uniq(); if (db_path.empty()) { // In-memory database. - std::cerr << "[DistributedFlightServer::GetConnection] Opening in-memory database" << std::endl; db_conn->db = make_uniq(); } else { // File-based database. - std::cerr << "[DistributedFlightServer::GetConnection] Opening file-based database: " << db_path << std::endl; db_conn->db = make_uniq(db_path); } db_conn->conn = make_uniq(*db_conn->db); @@ -145,7 +139,7 @@ arrow::Status DistributedFlightServer::DoPut(const arrow::flight::ServerCallCont std::unique_ptr reader, std::unique_ptr writer) { auto descriptor = reader->descriptor(); - + // Extract db_path and table_name from the FlightDescriptor path // Path format: [db_path, table_name] string db_path = ""; @@ -220,8 +214,7 @@ arrow::Status DistributedFlightServer::HandleCreateTable(const string &db_path, return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleDropTable(const string &db_path, - const distributed::DropTableRequest &req, +arrow::Status DistributedFlightServer::HandleDropTable(const string &db_path, const distributed::DropTableRequest &req, distributed::DistributedResponse &resp) { auto &conn = GetConnection(db_path); auto sql = "DROP TABLE IF EXISTS " + req.table_name(); @@ -256,8 +249,7 @@ arrow::Status DistributedFlightServer::HandleCreateIndex(const string &db_path, return arrow::Status::OK(); } -arrow::Status DistributedFlightServer::HandleDropIndex(const string &db_path, - const distributed::DropIndexRequest &req, +arrow::Status DistributedFlightServer::HandleDropIndex(const string &db_path, const distributed::DropIndexRequest &req, distributed::DistributedResponse &resp) { auto &conn = GetConnection(db_path); auto sql = "DROP INDEX IF EXISTS " + req.index_name(); @@ -321,15 +313,13 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat const distributed::GetCatalogInfoRequest &req, distributed::DistributedResponse &resp) { auto &conn = GetConnection(db_path); - std::cerr << "[DistributedFlightServer::HandleGetCatalogInfo] Getting catalog info for db_path='" << db_path << "'" << std::endl; - auto *catalog_resp = resp.mutable_get_catalog_info(); // Query all tables and their columns string tables_sql = "SELECT table_schema, table_name, column_name, data_type, is_nullable " - "FROM information_schema.columns " - "WHERE table_schema NOT IN ('information_schema', 'pg_catalog') " - "ORDER BY table_schema, table_name, ordinal_position"; + "FROM information_schema.columns " + "WHERE table_schema NOT IN ('information_schema', 'pg_catalog') " + "ORDER BY table_schema, table_name, ordinal_position"; auto result = conn.Query(tables_sql); if (result->HasError()) { @@ -341,11 +331,10 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat // Build table info map std::unordered_map table_map; idx_t row_count = 0; - + // Materialize the result to iterate properly auto materialized = unique_ptr_cast(std::move(result)); - std::cerr << "[HandleGetCatalogInfo] Query returned " << materialized->RowCount() << " rows" << std::endl; - + // Iterate through all rows using the Rows() iterator for (auto &row : materialized->Collection().Rows()) { string schema_name = row.GetValue(0).ToString(); @@ -354,9 +343,6 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat string data_type = row.GetValue(3).ToString(); string is_nullable = row.GetValue(4).ToString(); - std::cerr << "[HandleGetCatalogInfo] Row " << row_count++ << ": schema=" << schema_name << ", table=" << table_name - << ", column=" << column_name << ", type=" << data_type << ", nullable=" << is_nullable << std::endl; - string full_table_name = schema_name + "." + table_name; // Get or create table info @@ -365,7 +351,6 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat table_info->set_schema_name(schema_name); table_info->set_table_name(table_name); table_map[full_table_name] = table_info; - std::cerr << "[HandleGetCatalogInfo] Created new table entry for: " << full_table_name << std::endl; } // Add column info @@ -374,13 +359,11 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat col_info->set_type(data_type); col_info->set_nullable(is_nullable == "YES"); } - - std::cerr << "[HandleGetCatalogInfo] Processed " << row_count << " total rows" << std::endl; // Query indexes string indexes_sql = "SELECT table_schema, table_name, index_name, sql " - "FROM duckdb_indexes() " - "WHERE table_schema NOT IN ('information_schema', 'pg_catalog')"; + "FROM duckdb_indexes() " + "WHERE table_schema NOT IN ('information_schema', 'pg_catalog')"; auto idx_result = conn.Query(indexes_sql); if (!idx_result->HasError()) { @@ -394,9 +377,6 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat } } - std::cerr << "[DistributedFlightServer::HandleGetCatalogInfo] Found " << catalog_resp->tables_size() - << " tables, " << catalog_resp->indexes_size() << " indexes" << std::endl; - resp.set_success(true); return arrow::Status::OK(); } diff --git a/src/server/distributed_server_function.cpp b/src/server/distributed_server_function.cpp index c4cdf6b..6be209f 100644 --- a/src/server/distributed_server_function.cpp +++ b/src/server/distributed_server_function.cpp @@ -18,11 +18,8 @@ constexpr int DEFAULT_SERVER_PORT = 8815; void StartLocalServer(DataChunk &args, ExpressionState &state, Vector &result) { const std::lock_guard lock(g_server_mutex); - - std::cerr << "[StartLocalServer] Called, g_server_started=" << g_server_started << std::endl; if (g_server_started) { - std::cerr << "[StartLocalServer] Server already running, returning" << std::endl; auto result_data = FlatVector::GetData(result); result_data[0] = StringVector::AddString(result, "Server already running"); return; @@ -34,28 +31,21 @@ void StartLocalServer(DataChunk &args, ExpressionState &state, Vector &result) { auto port_data = FlatVector::GetData(port_vector); port = port_data[0]; } - - std::cerr << "[StartLocalServer] Starting server on port " << port << std::endl; try { g_test_server = make_uniq("0.0.0.0", port); - std::cerr << "[StartLocalServer] Created DistributedFlightServer, calling Start()..." << std::endl; auto status = g_test_server->Start(); if (!status.ok()) { - std::cerr << "[StartLocalServer] Start() failed: " << status.ToString() << std::endl; throw Exception(ExceptionType::IO, "Failed to start local server: " + status.ToString()); } - std::cerr << "[StartLocalServer] Start() succeeded, now starting Serve() in background thread" << std::endl; // Start server in background thread and detach. std::thread([port]() { - std::cerr << "[StartLocalServer background thread] Calling Serve() on port " << port << std::endl; // This thread owns its own server instance auto serve_status = g_test_server->Serve(); if (!serve_status.ok() && g_server_started) { - std::cerr << "Server error on port " << port << ": " << serve_status.ToString() << std::endl; + // Server error occurred } - std::cerr << "[StartLocalServer background thread] Serve() returned" << std::endl; }).detach(); // TODO(hjiang): Use readiness probe to validate server on. From 69af9f8fc41ba3e7bfc154e1e2509fa43c3a86e0 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 16:50:16 +0000 Subject: [PATCH 5/7] minor code cleanup --- src/client/distributed_client.cpp | 9 +++++---- src/client/duckherder_catalog.cpp | 9 --------- src/include/client/distributed_client.hpp | 4 +++- src/server/distributed_server_function.cpp | 7 ++++--- 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/client/distributed_client.cpp b/src/client/distributed_client.cpp index f1d2c46..99fead6 100644 --- a/src/client/distributed_client.cpp +++ b/src/client/distributed_client.cpp @@ -23,22 +23,23 @@ DistributedClient::DistributedClient(string server_url_p, string db_path_p) } } -DistributedClient &DistributedClient::GetInstance() { +/*static*/ DistributedClient &DistributedClient::GetInstance() { static NoDestructor client {}; return *client; } -void DistributedClient::Configure(const string &server_url_param, const string &db_path_param) { +/*static*/ void DistributedClient::Configure(const string &server_url_param, const string &db_path_param) { auto &instance = GetInstance(); - // Reconfigure if either server_url or db_path changed + // Reconfigure if either server_url or db_path changed. if (instance.server_url != server_url_param || instance.db_path != db_path_param) { instance.server_url = server_url_param; instance.db_path = db_path_param; instance.client = make_uniq(server_url_param, db_path_param); auto status = instance.client->Connect(); if (!status.ok()) { - throw Exception(ExceptionType::CONNECTION, "Failed to connect to Flight server: " + status.ToString()); + throw Exception(ExceptionType::CONNECTION, + "Failed to connect to Arrow Flight server: " + status.ToString()); } } } diff --git a/src/client/duckherder_catalog.cpp b/src/client/duckherder_catalog.cpp index 367b831..b545b69 100644 --- a/src/client/duckherder_catalog.cpp +++ b/src/client/duckherder_catalog.cpp @@ -42,20 +42,11 @@ DuckherderCatalog::~DuckherderCatalog() = default; void DuckherderCatalog::Initialize(bool load_builtin) { duckdb_catalog->Initialize(load_builtin); - - // Configure the DistributedClient singleton with server details including db_path auto server_url = GetServerUrl(); DistributedClient::Configure(server_url, server_db_path); } void DuckherderCatalog::FinalizeLoad(optional_ptr context) { - // TODO: Implement automatic catalog sync from server - // Currently disabled due to deadlock issues during FinalizeLoad - // If server_db_path is specified, sync catalog from server - // if (!server_db_path.empty() && context) { - // std::cerr << "[DuckherderCatalog::FinalizeLoad] Syncing catalog from server..." << std::endl; - // SyncCatalogFromServer(*context); - // } } void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { diff --git a/src/include/client/distributed_client.hpp b/src/include/client/distributed_client.hpp index eb31575..a998175 100644 --- a/src/include/client/distributed_client.hpp +++ b/src/include/client/distributed_client.hpp @@ -14,9 +14,10 @@ class DistributedClient { explicit DistributedClient(string server_url_p = "grpc://localhost:8815", string db_path_p = ""); ~DistributedClient() = default; + // Get client singleton. static DistributedClient &GetInstance(); - // Configure the singleton instance with server details + // Configure the singleton instance with server details. static void Configure(const string &server_url, const string &db_path); // Execute arbitrary SQL on the server. @@ -42,6 +43,7 @@ class DistributedClient { unique_ptr DropIndex(const string &index_name); // INSERT INTO on server. + // // TODO(hjiang): Currently for implementation easy, directly execute SQL statements, should be use transfer rows and // table name. unique_ptr InsertInto(const string &insert_sql); diff --git a/src/server/distributed_server_function.cpp b/src/server/distributed_server_function.cpp index 6be209f..39a2a46 100644 --- a/src/server/distributed_server_function.cpp +++ b/src/server/distributed_server_function.cpp @@ -36,15 +36,16 @@ void StartLocalServer(DataChunk &args, ExpressionState &state, Vector &result) { g_test_server = make_uniq("0.0.0.0", port); auto status = g_test_server->Start(); if (!status.ok()) { - throw Exception(ExceptionType::IO, "Failed to start local server: " + status.ToString()); + throw Exception(ExceptionType::IO, + StringUtil::Format("Failed to start local server: %s", status.ToString())); } // Start server in background thread and detach. std::thread([port]() { - // This thread owns its own server instance auto serve_status = g_test_server->Serve(); if (!serve_status.ok() && g_server_started) { - // Server error occurred + throw Exception(ExceptionType::IO, + StringUtil::Format("Failed to start serving local server %s", serve_status.ToString())); } }).detach(); From 7ed30337d979e2503c093c64f7b153e0e0aa6350 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 17:13:23 +0000 Subject: [PATCH 6/7] protobuf cleanup --- buf.yaml | 12 ++-- src/client/distributed_flight_client.cpp | 2 - src/client/duckherder_catalog.cpp | 2 +- src/proto/distributed.proto | 70 ++++++++++++------------ 4 files changed, 41 insertions(+), 45 deletions(-) diff --git a/buf.yaml b/buf.yaml index 251b571..f649032 100644 --- a/buf.yaml +++ b/buf.yaml @@ -3,13 +3,13 @@ version: v1 lint: use: - DEFAULT + except: + - FIELD_NOT_REQUIRED + - ONEOF_NOT_REQUIRED + rules: + # Set maximum line length to 100 characters + max_line_length: 100 enum_zero_value_suffix: _UNSPECIFIED rpc_allow_same_request_response: false rpc_allow_google_protobuf_empty_requests: true rpc_allow_google_protobuf_empty_responses: true - -# Exclude generated files from linting -lint: - ignore: - - build - - vcpkg_installed diff --git a/src/client/distributed_flight_client.cpp b/src/client/distributed_flight_client.cpp index a66ecbf..e083307 100644 --- a/src/client/distributed_flight_client.cpp +++ b/src/client/distributed_flight_client.cpp @@ -79,7 +79,6 @@ arrow::Status DistributedFlightClient::GetCatalogInfo(distributed::GetCatalogInf distributed::DistributedRequest req; req.set_db_path(db_path); auto *catalog_req = req.mutable_get_catalog_info(); - // Empty request - gets all catalog info distributed::DistributedResponse resp; ARROW_RETURN_NOT_OK(SendAction(req, resp)); @@ -93,7 +92,6 @@ arrow::Status DistributedFlightClient::GetCatalogInfo(distributed::GetCatalogInf arrow::Status DistributedFlightClient::InsertData(const string &table_name, std::shared_ptr batch, distributed::DistributedResponse &response) { - // Encode db_path and table_name in the FlightDescriptor path arrow::flight::FlightDescriptor descriptor = arrow::flight::FlightDescriptor::Path({db_path, table_name}); std::unique_ptr writer; diff --git a/src/client/duckherder_catalog.cpp b/src/client/duckherder_catalog.cpp index b545b69..c99e853 100644 --- a/src/client/duckherder_catalog.cpp +++ b/src/client/duckherder_catalog.cpp @@ -57,7 +57,7 @@ void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { return; } - // Create tables in the local catalog using CREATE TABLE via the context + // Create tables in the local catalog using CREATE TABLE via the context. auto db_name = GetName(); for (int i = 0; i < catalog_info.tables_size(); i++) { diff --git a/src/proto/distributed.proto b/src/proto/distributed.proto index b8ebdd1..1bb81ea 100644 --- a/src/proto/distributed.proto +++ b/src/proto/distributed.proto @@ -2,21 +2,6 @@ syntax = "proto3"; package duckdb.distributed; -// Request types for distributed operations. -enum RequestType { - REQUEST_TYPE_UNSPECIFIED = 0; - EXECUTE_SQL = 1; - CREATE_TABLE = 2; - DROP_TABLE = 3; - INSERT_DATA = 4; - SCAN_TABLE = 5; - DELETE_DATA = 6; - TABLE_EXISTS = 7; - CREATE_INDEX = 8; - DROP_INDEX = 9; - ALTER_TABLE = 10; -} - // Execute SQL request. message ExecuteSQLRequest { string sql = 1; @@ -102,21 +87,28 @@ message IndexInfo { // Request message for distributed operations. message DistributedRequest { - // Optional database path for the server to use. - // If empty, the server will use its default database. - string db_path = 10; + // Database path for the server to use. + // - If non-empty: specifies the file path to a DuckDB database file + // - If empty: the server will use an in-memory database + // Each request must specify which database it operates on for stateless operation, which is used to indicate the duckdb instance to operate on. + string db_path = 1; // Request-specific parameters. oneof request { - ExecuteSQLRequest execute_sql = 1; + // DDL operations. CreateTableRequest create_table = 2; DropTableRequest drop_table = 3; - ScanTableRequest scan_table = 4; - DeleteDataRequest delete_data = 5; - TableExistsRequest table_exists = 6; - CreateIndexRequest create_index = 7; - DropIndexRequest drop_index = 8; - AlterTableRequest alter_table = 9; + CreateIndexRequest create_index = 4; + DropIndexRequest drop_index = 5; + AlterTableRequest alter_table = 6; + + // DML operations + ExecuteSQLRequest execute_sql = 7; + ScanTableRequest scan_table = 8; + DeleteDataRequest delete_data = 9; + + // Util operations + TableExistsRequest table_exists = 10; GetCatalogInfoRequest get_catalog_info = 11; } } @@ -164,22 +156,28 @@ message GetCatalogInfoResponse { // Response message for distributed operations. message DistributedResponse { - // Whether option succeeds. - // TODO(hjiang): We don't need to craft error message inside of the response, instead we could use grpc response directly. + // Whether operation succeeds. + // TODO(hjiang): We don't need to craft error message inside of the response, + // instead we could use grpc response directly. bool success = 1; string error_message = 2; // Response-specific data. oneof response { - ExecuteSQLResponse execute_sql = 3; - CreateTableResponse create_table = 4; - DropTableResponse drop_table = 5; - ScanTableResponse scan_table = 6; - DeleteDataResponse delete_data = 7; - TableExistsResponse table_exists = 8; - CreateIndexResponse create_index = 9; - DropIndexResponse drop_index = 10; - AlterTableResponse alter_table = 11; + // DDL responses. + CreateTableResponse create_table = 3; + DropTableResponse drop_table = 4; + CreateIndexResponse create_index = 5; + DropIndexResponse drop_index = 6; + AlterTableResponse alter_table = 7; + + // DML responses. + ExecuteSQLResponse execute_sql = 8; + ScanTableResponse scan_table = 9; + DeleteDataResponse delete_data = 10; + + // Util responses. + TableExistsResponse table_exists = 11; GetCatalogInfoResponse get_catalog_info = 12; } } From cf8f4c4e96ee9a31844b8ac4d159360b3714fe1a Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 4 Nov 2025 18:29:44 +0000 Subject: [PATCH 7/7] WIP --- src/client/distributed_client.cpp | 5 ++- src/client/duckherder_catalog.cpp | 31 +++++++++++----- src/include/client/distributed_client.hpp | 2 +- src/include/client/duckherder_catalog.hpp | 5 ++- src/server/distributed_flight_server.cpp | 17 +++++++++ test/sql/server_db_path.test | 42 +++++++--------------- test_db | Bin 0 -> 12288 bytes 7 files changed, 57 insertions(+), 45 deletions(-) create mode 100644 test_db diff --git a/src/client/distributed_client.cpp b/src/client/distributed_client.cpp index 99fead6..a8b09b9 100644 --- a/src/client/distributed_client.cpp +++ b/src/client/distributed_client.cpp @@ -209,12 +209,11 @@ unique_ptr DistributedClient::InsertInto(const string &insert_sql) return ExecuteSQL(insert_sql); } -bool DistributedClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) { +void DistributedClient::GetCatalogInfo(distributed::GetCatalogInfoResponse &response) { auto status = client->GetCatalogInfo(response); if (!status.ok()) { - return false; + throw Exception(ExceptionType::INVALID, StringUtil::Format("Failed to get catalog: %s", status.ToString())); } - return true; } } // namespace duckdb diff --git a/src/client/duckherder_catalog.cpp b/src/client/duckherder_catalog.cpp index c99e853..7bac032 100644 --- a/src/client/duckherder_catalog.cpp +++ b/src/client/duckherder_catalog.cpp @@ -47,25 +47,37 @@ void DuckherderCatalog::Initialize(bool load_builtin) { } void DuckherderCatalog::FinalizeLoad(optional_ptr context) { + // Automatically sync catalog from server when database is loaded + if (context && !server_db_path.empty()) { + std::cerr << "[DuckherderCatalog::FinalizeLoad] Starting automatic catalog sync from server for database: " << server_db_path << std::endl; + SyncCatalogFromServer(*context); + std::cerr << "[DuckherderCatalog::FinalizeLoad] Catalog sync completed" << std::endl; + } } void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { auto &client = DistributedClient::GetInstance(); distributed::GetCatalogInfoResponse catalog_info; - if (!client.GetCatalogInfo(catalog_info)) { - return; - } + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Fetching catalog info from server" << std::endl; - // Create tables in the local catalog using CREATE TABLE via the context. - auto db_name = GetName(); + client.GetCatalogInfo(catalog_info); + + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Found " << catalog_info.tables_size() << " tables in server database" << std::endl; + // Create tables in the local catalog using CREATE TABLE via the context + auto db_name = GetName(); for (int i = 0; i < catalog_info.tables_size(); i++) { const auto &table_info = catalog_info.tables(i); + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Processing table " << table_info.schema_name() << "." << table_info.table_name() + << " with " << table_info.columns_size() << " columns" << std::endl; + // Build CREATE TABLE SQL with full qualification - string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (", db_name, - table_info.schema_name(), table_info.table_name()); + string create_sql = StringUtil::Format("CREATE TABLE IF NOT EXISTS %s.%s.%s (", + db_name, + table_info.schema_name(), + table_info.table_name()); for (int j = 0; j < table_info.columns_size(); j++) { const auto &col = table_info.columns(j); @@ -82,11 +94,14 @@ void DuckherderCatalog::SyncCatalogFromServer(ClientContext &context) { // Execute CREATE TABLE using the context (this creates it locally in this catalog) auto result = context.Query(create_sql, false); if (result->HasError()) { - continue; + throw Exception(ExceptionType::EXECUTOR, "Failed to create table: " + result->GetError()); } + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Created local table: " << create_sql << std::endl; + // Automatically register as remote table RegisterRemoteTable(table_info.table_name(), GetServerUrl(), table_info.table_name()); + std::cerr << "[DuckherderCatalog::SyncCatalogFromServer] Registered remote table: " << table_info.table_name() << std::endl; } // TODO: Sync indexes as well diff --git a/src/include/client/distributed_client.hpp b/src/include/client/distributed_client.hpp index a998175..fb7a21d 100644 --- a/src/include/client/distributed_client.hpp +++ b/src/include/client/distributed_client.hpp @@ -52,7 +52,7 @@ class DistributedClient { unique_ptr ScanTable(const string &table_name, idx_t limit = 1000, idx_t offset = 0); // Get catalog information from the server. - bool GetCatalogInfo(distributed::GetCatalogInfoResponse &response); + void GetCatalogInfo(distributed::GetCatalogInfoResponse &response); private: string server_url; diff --git a/src/include/client/duckherder_catalog.hpp b/src/include/client/duckherder_catalog.hpp index 8e96125..557c842 100644 --- a/src/include/client/duckherder_catalog.hpp +++ b/src/include/client/duckherder_catalog.hpp @@ -41,6 +41,8 @@ class DuckherderCatalog : public DuckCatalog { void Initialize(bool load_builtin) override; void FinalizeLoad(optional_ptr context) override; + void SyncCatalogFromServer(ClientContext &context); + string GetCatalogType() override { return "duckherder"; } @@ -100,9 +102,6 @@ class DuckherderCatalog : public DuckCatalog { bool IsRemoteIndex(const string &index_name) const; private: - // Sync catalog (tables, columns, indexes) from the server. - void SyncCatalogFromServer(ClientContext &context); - std::mutex mu; unordered_map> schema_catalog_entries; diff --git a/src/server/distributed_flight_server.cpp b/src/server/distributed_flight_server.cpp index 4a7b8af..862591c 100644 --- a/src/server/distributed_flight_server.cpp +++ b/src/server/distributed_flight_server.cpp @@ -38,22 +38,28 @@ string DistributedFlightServer::GetLocation() const { } Connection &DistributedFlightServer::GetConnection(const string &db_path) { + std::cerr << "[GetConnection] Called with db_path='" << db_path << "'" << std::endl; std::lock_guard lock(connections_mutex); auto it = connections.find(db_path); if (it != connections.end()) { + std::cerr << "[GetConnection] Reusing existing connection" << std::endl; return *it->second->conn; } + std::cerr << "[GetConnection] Creating new database connection" << std::endl; // Create new database connection. auto db_conn = make_uniq(); if (db_path.empty()) { + std::cerr << "[GetConnection] Opening in-memory database" << std::endl; // In-memory database. db_conn->db = make_uniq(); } else { + std::cerr << "[GetConnection] Opening file-based database: " << db_path << std::endl; // File-based database. db_conn->db = make_uniq(db_path); } + std::cerr << "[GetConnection] Database opened, creating connection" << std::endl; db_conn->conn = make_uniq(*db_conn->db); auto *conn_ptr = db_conn->conn.get(); @@ -97,6 +103,7 @@ arrow::Status DistributedFlightServer::DoAction(const arrow::flight::ServerCallC ARROW_RETURN_NOT_OK(HandleTableExists(db_path, request.table_exists(), response)); break; case distributed::DistributedRequest::kGetCatalogInfo: + std::cerr << "[DoAction] Handling GetCatalogInfo request for db_path='" << db_path << "'" << std::endl; ARROW_RETURN_NOT_OK(HandleGetCatalogInfo(db_path, request.get_catalog_info(), response)); break; case distributed::DistributedRequest::REQUEST_NOT_SET: @@ -312,7 +319,11 @@ arrow::Status DistributedFlightServer::HandleTableExists(const string &db_path, arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_path, const distributed::GetCatalogInfoRequest &req, distributed::DistributedResponse &resp) { + std::cerr << "[HandleGetCatalogInfo] Called with db_path='" << db_path << "'" << std::endl; + auto &conn = GetConnection(db_path); + std::cerr << "[HandleGetCatalogInfo] Got connection for db_path='" << db_path << "'" << std::endl; + auto *catalog_resp = resp.mutable_get_catalog_info(); // Query all tables and their columns @@ -321,12 +332,15 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat "WHERE table_schema NOT IN ('information_schema', 'pg_catalog') " "ORDER BY table_schema, table_name, ordinal_position"; + std::cerr << "[HandleGetCatalogInfo] Executing query: " << tables_sql << std::endl; auto result = conn.Query(tables_sql); if (result->HasError()) { + std::cerr << "[HandleGetCatalogInfo] Query failed: " << result->GetError() << std::endl; resp.set_success(false); resp.set_error_message("Failed to query catalog: " + result->GetError()); return arrow::Status::OK(); } + std::cerr << "[HandleGetCatalogInfo] Query succeeded" << std::endl; // Build table info map std::unordered_map table_map; @@ -377,6 +391,9 @@ arrow::Status DistributedFlightServer::HandleGetCatalogInfo(const string &db_pat } } + std::cerr << "[HandleGetCatalogInfo] Returning " << catalog_resp->tables_size() << " tables, " + << catalog_resp->indexes_size() << " indexes" << std::endl; + resp.set_success(true); return arrow::Status::OK(); } diff --git a/test/sql/server_db_path.test b/test/sql/server_db_path.test index 578f7f6..44b5157 100644 --- a/test/sql/server_db_path.test +++ b/test/sql/server_db_path.test @@ -4,31 +4,16 @@ require duckherder -# Stop any existing server first (from other tests) and restart with server_db_path statement ok -SELECT duckherder_stop_local_server(); +SELECT duckherder_start_local_server(8817); -# Start distributed server on default port 8815 statement ok -SELECT duckherder_start_local_server(8815); +ATTACH DATABASE 'test_db' (TYPE duckherder, server_host 'localhost', server_port 8817, server_db_path '/home/vscode/duckdb-distributed-execution/test/data/sample_employees.duckdb'); -# Attach with server_db_path pointing to the pre-existing database file -# The server will open test/data/sample_employees.duckdb which contains the employees table -statement ok -ATTACH DATABASE 'dh' (TYPE duckherder, server_host 'localhost', server_port 8815, server_db_path 'test/data/sample_employees.duckdb'); +# Tables are automatically discovered and registered from the server database -# Create the table structure LOCALLY first (before registering as remote) -# This teaches the local catalog about the schema without touching the server -statement ok -CREATE TABLE dh.employees (id INTEGER, name VARCHAR, salary DOUBLE, department VARCHAR); - -# Now register it as remote - queries will go to the server -statement ok -PRAGMA duckherder_register_remote_table('employees', 'employees'); - -# Query the data from the server - should get the 5 employees we pre-populated query IIRR -SELECT id, name, salary, department FROM dh.employees ORDER BY id; +SELECT id, name, salary, department FROM test_db.employees ORDER BY id; ---- 1 Alice Johnson 75000.0 Engineering 2 Bob Smith 82000.0 Engineering @@ -38,13 +23,13 @@ SELECT id, name, salary, department FROM dh.employees ORDER BY id; # Verify count query I -SELECT COUNT(*) FROM dh.employees; +SELECT COUNT(*) FROM test_db.employees; ---- 5 # Test selecting specific columns query IR -SELECT name, salary FROM dh.employees ORDER BY id LIMIT 3; +SELECT name, salary FROM test_db.employees ORDER BY id LIMIT 3; ---- Alice Johnson 75000.0 Bob Smith 82000.0 @@ -53,21 +38,18 @@ Charlie Davis 68000.0 # TODO(hjiang): The following queries trigger a pre-existing bug in DistributedTableScanFunction # Error: "Vector::Reference used on vector of different type (source VARCHAR referenced INTEGER)" # This is NOT related to server_db_path functionality, but a bug in distributed query execution. -# +# # Queries that currently fail: -# - SELECT * FROM dh.employees WHERE department = 'Engineering'; -# - SELECT * FROM dh.employees WHERE salary > 80000; -# - SELECT COUNT(*) FROM dh.employees WHERE id > 2; +# - SELECT * FROM test_db.employees WHERE department = 'Engineering'; +# - SELECT * FROM test_db.employees WHERE salary > 80000; +# - SELECT COUNT(*) FROM test_db.employees WHERE id > 2; # # These should be uncommented and tested once the distributed WHERE clause bug is fixed. # Clean up - detach statement ok -DETACH dh; +DETACH test_db; -# Restart the server to restore it to clean in-memory state for subsequent tests +# Stop the server statement ok SELECT duckherder_stop_local_server(); - -statement ok -SELECT duckherder_start_local_server(8815); diff --git a/test_db b/test_db new file mode 100644 index 0000000000000000000000000000000000000000..88ec05a2717ff0b85415f32f058b8d67d4e1929f GIT binary patch literal 12288 zcmeI#u?fOZ5CG6Gh`5A}Z6F#VhExjL33k?^?qKB#LYA;|4>xcS7tkUE!4R-Yns?0| zcLxWryTRx&iJtdjy<2UT;lDU1@hnc7caLd4J!Gden>X*65FkK+009C72oNAZfB*pk z?Frmo*UNq}-c}diKT-8V=$GZNOug@*R{8&HBpL(=5FkK+009C72oNAZpi=?=f2aJ8 V6Cgl<009C72oNAZfB=D31U|{RHAesd literal 0 HcmV?d00001