Language: English | νκ΅μ΄
Modern Messaging Infrastructure with Advanced Patterns
Complete pub/sub, request/reply, event streaming, and message pipeline support
π Documentation | π Quick Start | π Examples | π Architecture | π Contributing
A modern messaging system built on C++20 with comprehensive support for enterprise messaging patterns. The system provides a complete messaging infrastructure with pluggable backends, advanced routing capabilities, and robust reliability features.
|
Core Messaging
|
Advanced Patterns
|
|
Backend Support
|
Production Quality
|
The messaging system is fully implemented:
- Message Types - Structured messages with metadata, priority, TTL
- Message Builder - Fluent API for message construction
- Message Queue - Thread-safe FIFO and priority queues
- Topic Router - Pattern-based routing with wildcards
- Message Bus - Async/sync pub/sub coordination
- Message Broker - Advanced routing and filtering
- Pub/Sub Pattern - Publisher and Subscriber classes
- Request/Reply Pattern - Request client and server
- Event Streaming - Event sourcing with replay capability
- Message Pipeline - Stage-based message processing
- Backend Interface - Pluggable execution backends
- Standalone Backend - Self-contained thread pool
- Integration Backend - External thread pool integration
- DI Container - Service registration and resolution
- Error Codes - Centralized error handling (-700 to -799)
- Unit Tests - 100+ tests across all components
- Integration Tests - End-to-end messaging scenarios
- Pattern Tests - Comprehensive pattern validation
- Performance Benchmarks - Throughput and latency testing
- API Reference - Complete API documentation
- Migration Guide - Upgrade instructions
- Patterns API - Pattern usage guide
- Design Patterns - Architecture documentation
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Messaging Patterns β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Pub/Sub β β Req/Rep β β Pipeline β β Stream β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Message Bus β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Message Broker β β Topic Router β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββ΄ββββββββββββββ
βΌ βΌ
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββ
β Message Queue β β Subscription Manager β
β βββββββββββββββββββββββ β β βββββββββββββββββββββββ β
β β Priority Queue β β β β Subscriber Registry β β
β β Dead Letter Queue β β β β Filter Chain β β
β βββββββββββββββββββββββ β β βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββ
β
βββββββββββββββ΄ββββββββββββββ
βΌ βΌ
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββ
β Integration Layer β β Backend Selection β
β ββββββββββββββββββββ β β ββββββββββββββββββββ β
β β Logger Adapter β β β β Standalone β β
β β Monitor Adapter β β β β Integration β β
β β Thread Adapter β β β β Auto-detect β β
β ββββββββββββββββββββ β β ββββββββββββββββββββ β
βββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Foundation (common_system) β
β Result<T>, error_info, event_bus, interfaces β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Publisher β Message Bus β Topic Router β Subscribers
β β β β
Message Priority Pattern Callbacks
Builder Queue Matching (async)
- C++20 compiler (GCC 11+, Clang 14+, MSVC 2022+, Apple Clang 14+)
- CMake 3.20+
- vcpkg (for dependency management)
git clone https://github.com/kcenon/messaging_system.git
cd messaging_system
# Recommended: Use local sibling directories (for unified_system development)
# Requires all system projects in the same parent directory:
# Sources/common_system/
# Sources/thread_system/
# Sources/messaging_system/
# ...
cmake -B build -DMESSAGING_USE_LOCAL_SYSTEMS=ON
cmake --build build -j
# Alternative: Build with vcpkg
cmake -B build -DCMAKE_TOOLCHAIN_FILE=/path/to/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build build -jFor C++20 module support (requires CMake 3.28+ and compatible compiler):
# Build with C++20 modules enabled
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release \
-DMESSAGING_BUILD_MODULES=ON \
-DMESSAGING_USE_LOCAL_SYSTEMS=ON
cmake --build build -j
# Use the module in your project:
# import kcenon.messaging;Requirements:
- CMake 3.28+
- Ninja or Ninja Multi-Config generator
- Clang 16+ or GCC 14+ or MSVC 17.4+
Module Structure:
kcenon.messaging- Primary module interfacekcenon.messaging:core- Core messaging (message, queue, bus, broker)kcenon.messaging:patterns- Messaging patterns (pub/sub, request/reply)kcenon.messaging:task- Distributed task queue systemkcenon.messaging:integration- Transports, backends, DI
The build system uses UnifiedDependencies.cmake module which supports three modes:
| Mode | CMake Option | Description |
|---|---|---|
| LOCAL | -DMESSAGING_USE_LOCAL_SYSTEMS=ON |
Use sibling directories (recommended for development) |
| FetchContent | -DMESSAGING_USE_FETCHCONTENT=ON |
Fetch from GitHub automatically |
| find_package | (default) | Use installed system packages |
The unified module provides:
- Automatic target mapping for different naming conventions
- Consistent dependency resolution across all modes
- Simplified CMakeLists.txt (~300 lines vs ~920 lines previously)
#include <kcenon/messaging/core/message_bus.h>
#include <kcenon/messaging/core/message.h>
#include <kcenon/messaging/backends/standalone_backend.h>
using namespace kcenon::messaging;
int main() {
// Create standalone backend
auto backend = std::make_shared<standalone_backend>(4);
backend->initialize();
// Create message bus
message_bus_config config;
config.worker_threads = 4;
auto bus = std::make_shared<message_bus>(backend, config);
bus->start();
// Subscribe to topic
bus->subscribe("user.created", [](const message& msg) {
std::cout << "User created: " << msg.metadata().id << std::endl;
return common::VoidResult::ok();
});
// Publish message
auto msg = message_builder()
.topic("user.created")
.source("app")
.type(message_type::event)
.build();
if (msg.is_ok()) {
bus->publish(msg.value());
}
// Cleanup
std::this_thread::sleep_for(std::chrono::milliseconds(100));
bus->stop();
backend->shutdown();
return 0;
}#include <kcenon/messaging/patterns/pub_sub.h>
using namespace kcenon::messaging::patterns;
// Create publisher
auto publisher = std::make_shared<patterns::publisher>(bus, "events");
// Publish events
auto msg = message_builder()
.topic("events.user.login")
.type(message_type::event)
.build();
publisher->publish(msg.value());
// Create subscriber
auto subscriber = std::make_shared<patterns::subscriber>(bus);
subscriber->subscribe("events.user.*", [](const message& msg) {
// Handle user events
return common::VoidResult::ok();
});#include <kcenon/messaging/patterns/request_reply.h>
using namespace kcenon::messaging::patterns;
// Server side
auto server = std::make_shared<request_server>(bus, "calculator");
server->register_handler([](const message& request) -> common::Result<message> {
auto a = request.payload().get_value("a").value<int>();
auto b = request.payload().get_value("b").value<int>();
auto response = message_builder()
.topic("calculator.response")
.type(message_type::reply)
.build();
response.value().payload().set_value("result", a + b);
return response;
});
// Client side
auto client = std::make_shared<request_client>(bus);
auto request = message_builder()
.topic("calculator.add")
.type(message_type::query)
.build();
request.value().payload().set_value("a", 10);
request.value().payload().set_value("b", 20);
auto response = client->request(request.value(), std::chrono::seconds(5));
if (response.is_ok()) {
auto result = response.value().payload().get_value("result").value<int>();
std::cout << "Result: " << result << std::endl; // 30
}#include <kcenon/messaging/patterns/event_streaming.h>
using namespace kcenon::messaging::patterns;
// Create event stream
auto stream = std::make_shared<event_stream>(bus, "orders");
// Publish events
stream->publish_event("order.created", order_data);
stream->publish_event("order.paid", payment_data);
stream->publish_event("order.shipped", shipping_data);
// Replay events
stream->replay_from(std::chrono::system_clock::now() - std::chrono::hours(24),
[](const message& event) {
// Process historical events
return common::VoidResult::ok();
});#include <kcenon/messaging/patterns/message_pipeline.h>
using namespace kcenon::messaging::patterns;
// Build processing pipeline
auto pipeline = pipeline_builder()
.add_stage("validate", [](message& msg) {
// Validation logic
return common::VoidResult::ok();
})
.add_stage("transform", [](message& msg) {
// Transformation logic
return common::VoidResult::ok();
})
.add_stage("enrich", [](message& msg) {
// Enrichment logic
return common::VoidResult::ok();
})
.build();
// Process message through pipeline
auto msg = message_builder().topic("data").build();
auto result = pipeline.process(msg.value());The messaging system integrates seamlessly with specialized base systems:
- common_system - Result pattern, error handling, base interfaces
- thread_system - High-performance thread pools for message dispatch
- container_system - Type-safe message payloads and serialization
- network_system - Distributed messaging over TCP/IP
- monitoring_system - Real-time metrics and performance telemetry
- database_system - Message persistence and audit logging
Logging is provided through common_system's ILogger interface with runtime binding
via GlobalLoggerRegistry. This allows flexible logger implementation injection:
#include <kcenon/common/logging/log_functions.h>
#include <kcenon/common/interfaces/global_logger_registry.h>
// Register a logger with the global registry (typically at application startup)
auto& registry = kcenon::common::interfaces::GlobalLoggerRegistry::instance();
registry.set_default_logger(my_logger_implementation);
// Logging is now available throughout the messaging system
// No explicit logger parameter needed in messaging components#include <kcenon/messaging/backends/integration_backend.h>
#include <kcenon/thread/core/thread_pool.h>
#include <kcenon/common/interfaces/global_logger_registry.h>
// Create thread pool from thread_system
auto thread_pool = std::make_shared<thread::thread_pool>(8);
// Get logger from common_system's GlobalLoggerRegistry
auto logger = kcenon::common::interfaces::get_logger();
// Create integration backend
auto backend = std::make_shared<integration_backend>(
thread_pool,
logger,
nullptr // optional monitoring
);
// Create message bus
auto bus = std::make_shared<message_bus>(backend);
bus->start();
// Messages are now dispatched via thread_system
// And logged via common_system's ILogger interfacePerformance characteristics based on benchmark results:
Message Throughput
- Message creation: ~5M messages/sec
- Queue operations: ~2M operations/sec
- Topic routing: ~500K routes/sec
- Pub/Sub: ~100K messages/sec
- Request/Reply: ~50K requests/sec
Latency (p99)
- Message creation: < 1 ΞΌs
- Queue enqueue/dequeue: < 2 ΞΌs
- Topic matching: < 5 ΞΌs
- End-to-end pub/sub: < 100 ΞΌs
- Request/reply: < 1 ms
# Build with benchmarks
cmake -B build -DMESSAGING_BUILD_BENCHMARKS=ON
cmake --build build -j
# Run benchmarks
./build/test/benchmarks/bench_message_creation
./build/test/benchmarks/bench_message_queue
./build/test/benchmarks/bench_topic_router
./build/test/benchmarks/bench_pub_sub_throughput
./build/test/benchmarks/bench_request_reply_latencyThe system includes comprehensive test coverage:
- Core Tests - Message, queue, router, bus (40+ tests)
- Backend Tests - Standalone and integration backends (27 tests)
- Pattern Tests - All messaging patterns (80+ tests)
- Integration Tests - End-to-end scenarios (4 test suites)
# Build with tests
cmake -B build -DMESSAGING_BUILD_TESTS=ON
cmake --build build -j
# Run all tests
cd build
ctest --output-on-failure
# Run specific test suite
./test/unit/core/test_message_bus
./test/unit/patterns/test_pub_sub
./test/integration_tests/test_full_integration# Generate coverage report
cmake -B build -DCMAKE_BUILD_TYPE=Coverage
cmake --build build -j
cd build
ctest
lcov --capture --directory . --output-file coverage.info
genhtml coverage.info --output-directory coverage_html- Benchmarks - Performance characteristics and measurements
- Features - Complete feature documentation
- Production Quality - Quality assurance and reliability
- Project Structure - Codebase organization
- API Reference - Complete API documentation
- Migration Guide - Upgrade instructions
- Patterns API - Messaging patterns guide
- Design Patterns - Architecture patterns
- Examples - Working code examples
- Integration Tests - End-to-end test scenarios
- Performance Benchmarks - Performance benchmark suite
| Option | Default | Description |
|---|---|---|
MESSAGING_USE_LOCAL_SYSTEMS |
OFF | Use sibling directories for dependencies |
MESSAGING_USE_FETCHCONTENT |
OFF | Auto-fetch dependencies from GitHub |
MESSAGING_BUILD_TESTS |
ON | Build unit and integration tests |
MESSAGING_BUILD_EXAMPLES |
ON | Build example programs |
MESSAGING_BUILD_BENCHMARKS |
OFF | Build performance benchmarks |
KCENON_WITH_NETWORK_SYSTEM |
ON | Enable network_system integration for transport implementations |
KCENON_WITH_MONITORING_SYSTEM |
OFF | Enable monitoring_system integration for metrics collection |
The messaging_system has optional dependencies that can be enabled or disabled at compile time:
| Dependency | Guard Macro | Components Affected | Fallback Behavior |
|---|---|---|---|
| network_system | KCENON_WITH_NETWORK_SYSTEM |
http_transport, websocket_transport |
Returns error::not_supported |
| monitoring_system | KCENON_WITH_MONITORING_SYSTEM |
message_bus_collector |
No-op (silently ignores operations) |
Each optional component provides a compile-time is_available constant:
#include <kcenon/messaging/adapters/http_transport.h>
#include <kcenon/messaging/adapters/websocket_transport.h>
#include <kcenon/messaging/collectors/message_bus_collector.h>
// Check at compile time
static_assert(http_transport::is_available || !http_transport::is_available,
"Compile-time check example");
// Or check at runtime via feature flags
#include <kcenon/messaging/config/feature_flags.h>
if (kcenon::messaging::config::has_network_system()) {
// Use full transport functionality
} else {
// Handle fallback case
}The KCENON_WITH_NETWORK_SYSTEM option controls whether websocket_transport and http_transport use the actual network_system implementation:
# Full transport functionality (default)
cmake -B build -DKCENON_WITH_NETWORK_SYSTEM=ON
# Stub mode - transports return not_supported errors
cmake -B build -DKCENON_WITH_NETWORK_SYSTEM=OFFWhen disabled, transport classes compile but return error::not_supported for all operations. This allows building the messaging system without the network_system dependency.
To verify that fallback implementations work correctly, use the isolated build preset:
# Configure and build in isolated mode
cmake --preset=isolated
cmake --build --preset=isolated
# Run tests in isolated mode
ctest --preset=isolated
# Or use the verification script
./scripts/verify_isolated_build.shThe isolated preset disables all optional dependencies (KCENON_WITH_NETWORK_SYSTEM=OFF, KCENON_WITH_MONITORING_SYSTEM=OFF) and runs tests to verify fallback implementations.
# Development build with local systems
cmake -B build -DMESSAGING_USE_LOCAL_SYSTEMS=ON -DMESSAGING_BUILD_TESTS=ON
cmake --build build -j
# Production build with FetchContent
cmake -B build -DMESSAGING_USE_FETCHCONTENT=ON -DCMAKE_BUILD_TYPE=Release
cmake --build build -j
sudo cmake --install build
# Debug build with all features
cmake -B build -DCMAKE_BUILD_TYPE=Debug \
-DMESSAGING_BUILD_TESTS=ON \
-DMESSAGING_BUILD_EXAMPLES=ON \
-DMESSAGING_BUILD_BENCHMARKS=ON
cmake --build build -j| Platform | Compiler | Status |
|---|---|---|
| Ubuntu 22.04 | GCC 11+ | β Tested |
| Ubuntu 22.04 | Clang 14+ | β Tested |
| macOS 13+ | Apple Clang 14+ | β Tested |
| Windows 10+ | MSVC 2022+ | β Tested |
Contributions are welcome!
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Ensure all tests pass
- Submit a pull request
# Clone repository
git clone https://github.com/kcenon/messaging_system.git
cd messaging_system
# Create feature branch
git checkout -b feature/amazing-feature
# Build and test
cmake -B build -DMESSAGING_BUILD_TESTS=ON
cmake --build build -j
cd build && ctest
# Commit and push
git commit -m "feat: add amazing feature"
git push origin feature/amazing-featureBSD 3-Clause License
Copyright (c) 2021-2025, Messaging System Contributors All rights reserved.
See LICENSE file for full license text.
- Author: kcenon (@kcenon)
- Email: kcenon@gmail.com
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Built on specialized systems
common_system β’ thread_system β’ monitoring_system β’ container_system β’ database_system β’ network_system
Made with β€οΈ by the Open Source Community