Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
840 changes: 757 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 19 additions & 1 deletion crates/arkflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,22 @@ clap = { workspace = true }
colored = { workspace = true }
flume = { workspace = true }
axum = { workspace = true }
num_cpus = "1.17.0"
uuid = { version = "1.8", features = ["v4"] }
tempfile = "3.10"
num_cpus = "1.17.0"

# Object Storage dependencies
aws-sdk-s3 = { version = "1.8", features = ["rt-tokio"] }
aws-config = { version = "1.8", features = ["behavior-version-latest"] }
azure_storage = { version = "0.20" }
azure_storage_blobs = { version = "0.20" }
google-cloud-storage = { version = "0.15", default-features = false, features = ["auth"] }
http = "1.1"
md-5 = "0.10"
base64 = "0.22"
crc32fast = "1.4"
chrono = { version = "0.4", features = ["serde"] }
flate2 = "1.0"
log = "0.4"
parking_lot = "0.12"
rand = "0.8"
87 changes: 87 additions & 0 deletions crates/arkflow-core/examples/distributed_ack_example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Distributed Acknowledgment Configuration Example

[streams]
name = "distributed_ack_stream"

# Input configuration with distributed acknowledgment support
[streams.input]
type = "distributed_ack_input"

# Inner input configuration
[streams.input.inner_input]
type = "kafka"
brokers = ["localhost:9092"]
topic = "test-topic"
group_id = "distributed_ack_group"

# Distributed acknowledgment configuration
[streams.input.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Object storage configuration
[streams.input.distributed_ack.object_storage]
type = "s3"
bucket = "distributed-ack-bucket"
region = "us-east-1"
access_key_id = "your-access-key"
secret_access_key = "your-secret-key"

# WAL configuration
[streams.input.distributed_ack.wal]
type = "rocksdb"
path = "./distributed_ack_wal"

# Processor configuration
[[streams.pipeline.processors]]
type = "distributed_ack_processor"

# Inner processor configuration
[streams.pipeline.processors.inner_processor]
type = "transform"
script = "data.value = data.value.toUpperCase()"

# Distributed acknowledgment configuration for processor
[streams.pipeline.processors.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Output configuration
[streams.output]
type = "kafka"
brokers = ["localhost:9092"]
topic = "output-topic"

# Stream-level distributed acknowledgment configuration (alternative approach)
[streams.distributed_ack]
enabled = true
node_id = "node-1"
cluster_nodes = ["node-1:8080", "node-2:8080", "node-3:8080"]

# Object storage configuration for stream-level
[streams.distributed_ack.object_storage]
type = "s3"
bucket = "distributed-ack-bucket"
region = "us-east-1"
access_key_id = "your-access-key"
secret_access_key = "your-secret-key"

# WAL configuration for stream-level
[streams.distributed_ack.wal]
type = "rocksdb"
path = "./distributed_ack_wal"

# Performance configuration
[streams.distributed_ack.performance]
max_pending_acks = 10000
batch_size = 100
flush_interval_ms = 1000
retry_config = { max_retries = 5, initial_delay_ms = 1000, max_delay_ms = 30000, backoff_multiplier = 2.0 }

# Recovery configuration
[streams.distributed_ack.recovery]
enable_recovery = true
recovery_interval_ms = 30000
checkpoint_interval_ms = 60000
156 changes: 156 additions & 0 deletions crates/arkflow-core/examples/distributed_ack_integration_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Integration example for distributed acknowledgment system
//!
//! This example demonstrates how to use the enhanced distributed acknowledgment
//! system with optimized error handling, retry mechanisms, and metrics collection.

use arkflow_core::{
distributed_ack_config::DistributedAckConfig, enhanced_ack_task::AckTaskPool,
enhanced_config::EnhancedConfig, enhanced_metrics::EnhancedMetrics, MessageBatch,
};
Comment on lines +20 to +23
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix compile-time mismatches with current APIs (config fields, retry config, task construction).

  • DistributedAckConfig: use constructor/new + set node_id; fields like wal_enabled/checkpoint_enabled/retry_* don’t exist per current struct.
  • Provide a concrete RetryConfig (from EnhancedConfig) and pass it to AckTaskPool/new task.
  • process_with_ack currently references config out of scope and calls EnhancedAckTask::new with the wrong arity; supply sequence, ack_type, payload, and retry_config.
@@
-use arkflow_core::{
-    distributed_ack_config::DistributedAckConfig, enhanced_ack_task::AckTaskPool,
-    enhanced_config::EnhancedConfig, enhanced_metrics::EnhancedMetrics, MessageBatch,
-};
+use arkflow_core::{
+    distributed_ack_config::DistributedAckConfig,
+    distributed_ack::RetryConfig,
+    enhanced_ack_task::AckTaskPool,
+    enhanced_config::EnhancedConfig,
+    enhanced_metrics::EnhancedMetrics,
+    MessageBatch,
+};
@@
-    // Create distributed acknowledgment configuration
-    let distributed_ack_config = DistributedAckConfig {
-        cluster_id: "example-cluster".to_string(),
-        node_id: "node-1".to_string(),
-        wal_enabled: true,
-        checkpoint_enabled: true,
-        retry_max_attempts: config.retry.max_retries,
-        retry_base_delay_ms: config.retry.base_delay_ms,
-        // ... other configuration fields
-    };
+    // Create distributed acknowledgment configuration
+    let mut distributed_ack_config = DistributedAckConfig::new("example-cluster".to_string());
+    distributed_ack_config.node_id = Some("node-1".to_string());
+    // ... tweak other nested configs if needed (wal/checkpoint/recovery/node_registry)
@@
-    // Create task pool for enhanced acknowledgment processing
-    let task_pool = AckTaskPool::new(config.retry.clone());
+    // Derive RetryConfig from EnhancedConfig and create task pool
+    let retry_config = RetryConfig {
+        max_retries: config.retry.max_retries,
+        base_delay_ms: config.retry.base_delay_ms,
+        max_delay_ms: config.retry.max_delay_ms,
+        backoff_multiplier: config.retry.backoff_multiplier,
+        jitter: true,
+    };
+    let task_pool = AckTaskPool::new(retry_config.clone());
@@
-        // Simulate processing with acknowledgment
-        process_with_ack(&message, &task_pool, &metrics).await?;
+        // Simulate processing with acknowledgment
+        process_with_ack(&message, &task_pool, &metrics, retry_config.clone()).await?;
@@
-async fn process_with_ack(
-    message: &MessageBatch,
-    task_pool: &AckTaskPool,
-    metrics: &EnhancedMetrics,
-) -> Result<(), Box<dyn std::error::Error>> {
+async fn process_with_ack(
+    message: &MessageBatch,
+    task_pool: &AckTaskPool,
+    metrics: &EnhancedMetrics,
+    retry_config: RetryConfig,
+) -> Result<(), Box<dyn std::error::Error>> {
@@
-    let ack_task = arkflow_core::enhanced_ack_task::EnhancedAckTask::new(
-        Arc::new(TestAck),
-        format!("ack-{}", message.len()),
-        config.retry.clone(),
-    );
+    let ack_task = arkflow_core::enhanced_ack_task::EnhancedAckTask::new(
+        Arc::new(TestAck),
+        message.len() as u64,                 // sequence
+        format!("ack-{}", message.len()),     // ack_type
+        Vec::new(),                           // payload (placeholder)
+        retry_config,                         // retry config
+    );

Also applies to: 41-50, 53-54, 63-64, 77-83, 100-106

🤖 Prompt for AI Agents
In crates/arkflow-core/examples/distributed_ack_integration_example.rs around
lines 20-23 (and also apply similar fixes at lines 41-50, 53-54, 63-64, 77-83,
100-106): replace direct struct field usage for DistributedAckConfig with its
constructor (DistributedAckConfig::new()) and set the node id via the provided
setter (e.g., set_node_id or node_id method) instead of trying to assign
wal_enabled/checkpoint_enabled/retry_* fields which no longer exist; create a
concrete RetryConfig by obtaining it from EnhancedConfig (e.g., let retry_config
= EnhancedConfig::retry_config(...) or EnhancedConfig::default().retry_config())
and pass this retry_config into AckTaskPool::new and when constructing Ack
tasks; fix process_with_ack so it does not reference an out-of-scope config by
capturing or passing the retry_config and construct EnhancedAckTask/AckTask with
the correct arity (sequence, ack_type, payload, retry_config).

use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing
tracing_subscriber::fmt::init();

println!("=== Distributed Acknowledgment Integration Example ===");

// Create enhanced configuration
let config = EnhancedConfig::production();
config.validate()?;

// Initialize metrics collection
let metrics = Arc::new(EnhancedMetrics::new());

// Create distributed acknowledgment configuration
let distributed_ack_config = DistributedAckConfig {
cluster_id: "example-cluster".to_string(),
node_id: "node-1".to_string(),
wal_enabled: true,
checkpoint_enabled: true,
retry_max_attempts: config.retry.max_retries,
retry_base_delay_ms: config.retry.base_delay_ms,
// ... other configuration fields
};

// Create task pool for enhanced acknowledgment processing
let task_pool = AckTaskPool::new(config.retry.clone());

// Simulate message processing with distributed acknowledgments
println!("Processing messages with distributed acknowledgments...");

for i in 0..10 {
// Create a test message
let message = MessageBatch::from_string(&format!("Test message {}", i))?;

// Simulate processing with acknowledgment
process_with_ack(&message, &task_pool, &metrics).await?;

// Small delay between messages
sleep(Duration::from_millis(100)).await;
}

// Print final metrics
println!("\n=== Final Metrics ===");
print_metrics(&metrics);

println!("Example completed successfully!");
Ok(())
}

async fn process_with_ack(
message: &MessageBatch,
task_pool: &AckTaskPool,
metrics: &EnhancedMetrics,
) -> Result<(), Box<dyn std::error::Error>> {
let start_time = std::time::Instant::now();

// Increment message counter
metrics.counter("messages_received").unwrap().increment();

// Simulate message processing
println!("Processing message: {:?}", message.get_input_name());

// Simulate some processing work
sleep(Duration::from_millis(50)).await;

// Record processing time
let processing_time = start_time.elapsed().as_millis() as f64;
metrics
.histogram("message_processing_time_ms")
.unwrap()
.observe(processing_time);

// Simulate acknowledgment task
let ack_task = arkflow_core::enhanced_ack_task::EnhancedAckTask::new(
Arc::new(TestAck),
format!("ack-{}", message.len()),
config.retry.clone(),
);

// Add task to pool
task_pool.add_task(ack_task).await?;

// Update active connections gauge
metrics.gauge("active_connections").unwrap().set(1.0);

println!("Message processed and acknowledgment queued");

Ok(())
}

fn print_metrics(metrics: &EnhancedMetrics) {
// Print counter metrics
if let Some(count) = metrics.get_counter_value("messages_received") {
println!("Messages received: {}", count);
}

// Print gauge metrics
if let Some(gauge_value) = metrics.get_gauge_value("active_connections") {
println!("Active connections: {}", gauge_value);
}

// Print histogram metrics
if let Some(percentiles) = metrics.get_histogram_percentiles("message_processing_time_ms") {
println!("Processing time percentiles:");
println!(" P50: {:.2}ms", percentiles.p50);
println!(" P90: {:.2}ms", percentiles.p90);
println!(" P95: {:.2}ms", percentiles.p95);
println!(" P99: {:.2}ms", percentiles.p99);
println!(" Count: {}", percentiles.count);
}
}

// Test acknowledgment implementation
struct TestAck;

#[async_trait::async_trait]
impl arkflow_core::enhanced_ack_task::Ack for TestAck {
async fn ack(&self) -> Result<(), String> {
// Simulate acknowledgment processing
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
}

async fn retry(&self, _attempt: u32) -> Result<(), String> {
// Simulate retry logic
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(())
}
}
Loading
Loading