Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.5)
# Set extension name here
set(TARGET_NAME duckherder)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 14)

set(CMAKE_CXX_FLAGS"${CMAKE_CXX_FLAGS} -Wunused-variable -Wunused-function -Wunused-but-set-variable"
)
Expand All @@ -28,8 +28,11 @@ if(${BUILD_UNITTESTS})
include_directories(duckdb/third_party/catch)
add_executable(test_distributed_flight unit/test_distributed_flight.cpp)
target_link_libraries(
test_distributed_flight ${EXTENSION_NAME} Arrow::arrow_static
test_distributed_flight ${EXTENSION_NAME} dummy_static_extension_loader Arrow::arrow_static
ArrowFlight::arrow_flight_static protobuf::libprotobuf gRPC::grpc++)
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
target_link_libraries(test_distributed_flight -Wl,--allow-multiple-definition)
endif()
endif()

# Generate protobuf code.
Expand Down Expand Up @@ -102,6 +105,11 @@ set(EXTENSION_SOURCES
build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES})

# Allow duplicate TableCatalogEntry::Name (see extension_config.cmake). Loadable extension gets it via DUCKDB_EXTRA_LINK_FLAGS.
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
target_link_libraries(${LOADABLE_EXTENSION_NAME} -Wl,--allow-multiple-definition)
endif()

# Note: gRPC and Protobuf are transitive dependencies of ArrowFlight.
target_link_libraries(
${EXTENSION_NAME} Arrow::arrow_static ArrowFlight::arrow_flight_static
Expand All @@ -117,14 +125,20 @@ install(
LIBRARY DESTINATION "${INSTALL_LIB_DIR}"
ARCHIVE DESTINATION "${INSTALL_LIB_DIR}")

# Add distributed server executable.
# Add distributed server executable. Link all static extensions so LoadAllExtensions resolves.
add_executable(distributed_server src/server/driver/distributed_server_main.cpp)
target_link_libraries(
distributed_server ${EXTENSION_NAME} Arrow::arrow_static
distributed_server ${EXTENSION_NAME} dummy_static_extension_loader Arrow::arrow_static
ArrowFlight::arrow_flight_static protobuf::libprotobuf gRPC::grpc++)
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
target_link_libraries(distributed_server -Wl,--allow-multiple-definition)
endif()

# Add distributed worker executable.
add_executable(distributed_worker src/server/worker/distributed_worker_main.cpp)
target_link_libraries(
distributed_worker ${EXTENSION_NAME} Arrow::arrow_static
distributed_worker ${EXTENSION_NAME} dummy_static_extension_loader Arrow::arrow_static
ArrowFlight::arrow_flight_static protobuf::libprotobuf gRPC::grpc++)
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
target_link_libraries(distributed_worker -Wl,--allow-multiple-definition)
endif()
16 changes: 16 additions & 0 deletions extension_config.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# This file is included by DuckDB's build system. It specifies which extension to load

# Allow duplicate TableCatalogEntry::Name when linking extension with libduckdb_static (C++17 inline constexpr).
# No leading/trailing space (CMP0004). Set both per-target (DUCKDB_EXTRA_LINK_FLAGS) and global so libduckdb.so gets it.
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
set(DUCKDB_EXTRA_LINK_FLAGS "-Wl,--allow-multiple-definition" CACHE STRING "" FORCE)
if(CMAKE_EXE_LINKER_FLAGS)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--allow-multiple-definition" CACHE STRING "" FORCE)
else()
set(CMAKE_EXE_LINKER_FLAGS "-Wl,--allow-multiple-definition" CACHE STRING "" FORCE)
endif()
if(CMAKE_SHARED_LINKER_FLAGS)
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--allow-multiple-definition" CACHE STRING "" FORCE)
else()
set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--allow-multiple-definition" CACHE STRING "" FORCE)
endif()
endif()

# Extension from this repo
duckdb_extension_load(duckherder
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
Expand Down
4 changes: 2 additions & 2 deletions src/client/distributed_alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ unique_ptr<GlobalSourceState> PhysicalRemoteAlterTableOperator::GetGlobalSourceS
return make_uniq<RemoteAlterTableGlobalState>();
}

SourceResultType PhysicalRemoteAlterTableOperator::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
SourceResultType PhysicalRemoteAlterTableOperator::GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
auto &gstate = input.global_state.Cast<RemoteAlterTableGlobalState>();
auto &db_instance = DatabaseInstance::GetDatabase(context.client);

Expand Down
4 changes: 2 additions & 2 deletions src/client/distributed_create_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ unique_ptr<GlobalSourceState> PhysicalRemoteCreateIndexOperator::GetGlobalSource
return make_uniq<RemoteCreateIndexGlobalState>();
}

SourceResultType PhysicalRemoteCreateIndexOperator::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
SourceResultType PhysicalRemoteCreateIndexOperator::GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
auto &gstate = input.global_state.Cast<RemoteCreateIndexGlobalState>();
auto &db_instance = DatabaseInstance::GetDatabase(context.client);

Expand Down
4 changes: 2 additions & 2 deletions src/client/distributed_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ SinkFinalizeType PhysicalDistributedDelete::Finalize(Pipeline &pipeline, Event &
return SinkFinalizeType::READY;
}

SourceResultType PhysicalDistributedDelete::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
SourceResultType PhysicalDistributedDelete::GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
// TODO(hjiang): Implement return chunk.
chunk.SetCardinality(0);
return SourceResultType::FINISHED;
Expand Down
4 changes: 2 additions & 2 deletions src/client/distributed_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ SinkFinalizeType PhysicalDistributedInsert::Finalize(Pipeline &pipeline, Event &
return SinkFinalizeType::READY;
}

SourceResultType PhysicalDistributedInsert::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
SourceResultType PhysicalDistributedInsert::GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
chunk.SetCardinality(0);
return SourceResultType::FINISHED;
}
Expand Down
3 changes: 2 additions & 1 deletion src/duckherder_extension.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#define DUCKDB_EXTENSION_MAIN

#include "duckdb.hpp"
#include "duckdb/storage/storage_extension.hpp"
#include "duckherder_extension.hpp"
#include "duckherder_extension_instance_state.hpp"
#include "duckherder_pragmas.hpp"
Expand Down Expand Up @@ -35,7 +36,7 @@ void ClearQueryRecorderStats(const DataChunk &args, ExpressionState &state, Vect
void LoadInternal(ExtensionLoader &loader) {
auto &db = loader.GetDatabaseInstance();
auto &config = DBConfig::GetConfig(db);
config.storage_extensions["duckherder"] = make_uniq<DuckherderStorageExtension>();
StorageExtension::Register(config, "duckherder", make_shared_ptr<DuckherderStorageExtension>());

// Set extension state.
SetInstanceState(db, make_shared_ptr<DuckherderInstanceState>());
Expand Down
3 changes: 2 additions & 1 deletion src/include/client/distributed_alter_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class PhysicalRemoteAlterTableOperator : public PhysicalOperator {
public:
// Source interface.
unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const override;
SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/include/client/distributed_create_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class PhysicalRemoteCreateIndexOperator : public PhysicalOperator {
public:
// Source interface.
unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const override;
SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/include/client/distributed_delete.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class PhysicalDistributedDelete : public PhysicalOperator {
}

// Source interface.
SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/include/client/distributed_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class PhysicalDistributedInsert : public PhysicalOperator {
}

// Source interface.
SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
4 changes: 4 additions & 0 deletions src/include/duckherder_extension_instance_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ struct DuckherderInstanceState : public ObjectCacheEntry {
return OBJECT_TYPE;
}

optional_idx GetEstimatedCacheMemory() const override {
return optional_idx {};
}

private:
mutable std::mutex mu;
shared_ptr<BaseQueryRecorder> query_recorder;
Expand Down
2 changes: 1 addition & 1 deletion src/server/driver/distributed_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ DistributedExecutionResult DistributedExecutor::ExecuteDistributed(const string
std::unique_ptr<arrow::flight::FlightStreamReader> stream;
auto status = worker->client->ExecutePartition(req, stream);
if (!status.ok()) {
DUCKDB_LOG_WARN(db_instance,
DUCKDB_LOG_WARNING(db_instance,
StringUtil::Format("Worker %s failed executing task %llu: %s", worker->worker_id,
static_cast<long long unsigned>(task.task_id), status.ToString()));
continue;
Expand Down
3 changes: 2 additions & 1 deletion src/server/driver/distributed_flight_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "duckdb/common/string_util.hpp"
#include "duckdb/logging/logger.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/storage/storage_extension.hpp"
#include "query_common.hpp"
#include "server/driver/duckling_storage.hpp"

Expand Down Expand Up @@ -74,7 +75,7 @@ void DistributedFlightServer::Initialize() {

// Register the Duckling storage extension.
DBConfig config;
config.storage_extensions["duckling"] = make_uniq<DucklingStorageExtension>();
StorageExtension::Register(config, "duckling", make_shared_ptr<DucklingStorageExtension>());

db = make_uniq<DuckDB>(nullptr, &config);
conn = make_uniq<Connection>(*db);
Expand Down
3 changes: 3 additions & 0 deletions test/unittest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ endif()
target_link_libraries(
unittest_duckherder Arrow::arrow_static ArrowFlight::arrow_flight_static
protobuf::libprotobuf gRPC::grpc++)
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
target_link_libraries(unittest_duckherder -Wl,--allow-multiple-definition)
endif()