Refactor Kafka Pipeline and Agent Integration#12
Conversation
- Updated dependencies in Cargo.toml files across multiple crates. - Removed unused build.rs in grpc-listener and adjusted service.rs to eliminate protobuf references. - Introduced new Kafka admin CLI tool for topic management. - Implemented event routing logic in the Kafka pipeline to direct events to appropriate topics based on type. - Added integration tests for agent and pipeline components. - Created Docker Compose configurations for local development and Kafka cluster setup. - Added health check and metrics modules for monitoring consumer lag. - Enhanced Dockerfile for building the Kafka pipeline with multi-stage builds. - Established GitHub Actions workflows for CI/CD processes including build, test, and Docker image creation. - Created scripts for initializing Kafka topics and managing environment variables.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces the ChangesKafka Pipeline
Agent / FleetClient / Fleet-Server Refactor
Sequence Diagram(s)sequenceDiagram
rect rgba(173, 216, 230, 0.5)
note over AgentBin,FleetServer: Agent Startup & Enrollment
end
participant AgentBin as agent-bin
participant FleetClient
participant AgentCore
participant CommandHandler
participant FleetServer as Fleet Server
AgentBin->>FleetClient: new(endpoint)
AgentBin->>FleetClient: connect_with_retry(max, base_delay)
FleetClient-->>AgentBin: connected
AgentBin->>FleetClient: enroll(EnrollmentRequest)
FleetClient->>FleetServer: AgentMessage[EnrollmentRequest]
FleetServer-->>FleetClient: ServerMessage[EnrollmentResponse]
FleetClient-->>AgentBin: EnrollmentResponse + node_id stored
AgentBin->>AgentCore: run()
AgentCore->>AgentCore: spawn osquery polling task
AgentCore->>AgentCore: spawn command listener task
loop heartbeat interval
AgentBin->>FleetClient: heartbeat(HeartbeatRequest)
FleetClient->>FleetServer: AgentMessage[HeartbeatRequest]
FleetServer-->>FleetClient: ServerMessage[HeartbeatResponse]
end
FleetServer->>CommandHandler: ServerMessage[run_query/isolate/unisolate]
CommandHandler-->>FleetServer: Result<Value>
sequenceDiagram
rect rgba(144, 238, 144, 0.5)
note over ConsumerWorker,FleetServer: Kafka Event Routing
end
participant Producer as Kafka Producer (agent/fleet)
participant ConsumerWorker
participant EventRouterProcessor
participant KafkaBroker as Kafka Broker
Producer->>KafkaBroker: publish to aigis.events.raw
ConsumerWorker->>KafkaBroker: poll StreamConsumer
KafkaBroker-->>ConsumerWorker: BorrowedMessage
ConsumerWorker->>EventRouterProcessor: process(key, payload, topic, partition, offset)
EventRouterProcessor->>EventRouterProcessor: parse JSON -> event_type
EventRouterProcessor->>KafkaBroker: FutureProducer.send to target topic
KafkaBroker-->>EventRouterProcessor: delivery result
EventRouterProcessor-->>ConsumerWorker: Ok(())
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
agent/crates/agent-bin/Cargo.toml (1)
11-27:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMissing
isolationdependency.The
main.rsfile usesisolation::IsolationManager::new()at line 149, but theisolationcrate is not listed in dependencies. This will cause a compile error.Proposed fix
edr-sdk = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } agent-tracing = { workspace = true } +isolation = { workspace = true }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-bin/Cargo.toml` around lines 11 - 27, The isolation crate is used in main.rs (specifically IsolationManager::new()) but is not declared as a dependency in the Cargo.toml file. Add the isolation crate to the [dependencies] section in the Cargo.toml file alongside the other workspace and external dependencies like agent-core, tokio, and other crates listed..github/workflows/kafka-pipeline.yml (1)
13-52:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSet explicit least-privilege workflow permissions.
No top-level
permissions:block means default token scopes are broader than needed for this pipeline.Suggested fix
name: Kafka Pipeline +permissions: + contents: read🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In @.github/workflows/kafka-pipeline.yml around lines 13 - 52, The workflow file is missing explicit least-privilege permissions configuration at the top level, which results in broader default token scopes than necessary. Add a top-level `permissions:` block before the `jobs:` section that explicitly defines only the minimal permissions required for the workflow operations in the check, test, build, and docker jobs. Specify `contents: read` to allow repository checkout and restrict or omit all other permissions that are not strictly needed for the cargo operations and docker build steps.Source: Linters/SAST tools
Cargo.toml (1)
34-51:⚠️ Potential issue | 🔴 CriticalRestore
prostandtonic-buildin workspace dependencies to unblock all Cargo commands.Multiple crates inherit both dependencies via
workspace = true:sdk/Cargo.tomluses bothprostandtonic-buildwith workspace inheritance, as doagent/crates/osquery-client/Cargo.tomlandagent/crates/agent-core/Cargo.tomlforprost. Removing them from[workspace.dependencies]causes manifest parse failure before check/test/build can run.Suggested fix
[workspace.dependencies] tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec"] } tokio-tungstenite = "0.29" tonic = { version = "0.14" } +prost = "0.14" +tonic-build = "0.14" axum = { version = "0.8", features = ["ws", "macros"] }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Cargo.toml` around lines 34 - 51, The prost and tonic-build dependencies were removed from the [workspace.dependencies] section in Cargo.toml, but multiple crates (sdk/Cargo.toml, agent/crates/osquery-client/Cargo.toml, and agent/crates/agent-core/Cargo.toml) still use workspace = true to inherit these dependencies. Add both prost and tonic-build back to the [workspace.dependencies] section with appropriate version constraints and features to restore manifest resolution and allow all Cargo commands to function properly.Source: Pipeline failures
fleet-server/crates/grpc-listener/src/service.rs (1)
83-85:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winDo not trust
node_idfrom request payload after token validation.You validate JWT metadata at Line 83, but then persist client-supplied
event.node_id/req.node_id. A valid token holder can spoof another node by changing payload IDs. Useclaims.node_idas the source of truth (and reject mismatches).🔒 Suggested fix
- let claims = validate_token(request.metadata(), &self.decoding_key)?; - let node_id = claims.node_id.clone(); + let claims = validate_token(request.metadata(), &self.decoding_key)?; + let node_id = claims.node_id.clone(); ... - let incoming = IncomingEvent { - node_id: event.node_id.clone(), + if event.node_id != node_id { + tracing::warn!(token_node_id = %node_id, event_node_id = %event.node_id, "node_id mismatch"); + continue; + } + let incoming = IncomingEvent { + node_id: node_id.clone(), ... - validate_token(request.metadata(), &self.decoding_key)?; + let claims = validate_token(request.metadata(), &self.decoding_key)?; ... + if req.node_id != claims.node_id { + return Err(Status::permission_denied("node_id mismatch")); + } self.heartbeat .record_heartbeat(AgentHeartbeat { - node_id: req.node_id, + node_id: claims.node_id,Also applies to: 96-99, 151-154
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@fleet-server/crates/grpc-listener/src/service.rs` around lines 83 - 85, The security issue is that after validating the JWT token with validate_token at line 83 to extract the trusted claims, the code subsequently uses client-supplied node_id values (such as event.node_id or req.node_id) instead of the validated claims.node_id. To fix this, replace all uses of client-supplied node_id with claims.node_id (which is already extracted at line 84) throughout the request handling logic, and add validation to reject any requests where the client attempts to specify a mismatched node_id. This ensures the validated node_id from the JWT token is the single source of truth and prevents spoofing attacks where a valid token holder tries to operate as a different node.
🟠 Major comments (14)
agent/crates/agent-core/src/lib.rs-29-40 (1)
29-40:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftEmpty task implementations — core agent functionality is missing.
Both the osquery polling task (lines 29-31) and command listener task (lines 36-40) have empty bodies with placeholder comments. This leaves the agent without functional event collection or command processing. Additionally, variables
osquery,buffer1,cmd_handler, andfleetare cloned but unused, which will trigger compiler warnings.If these are intentional placeholders, consider adding
todo!()macros to make the incomplete state explicit and fail fast during testing.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/lib.rs` around lines 29 - 40, The osquery_task and command_task both contain empty implementations with only placeholder comments, and variables like osquery, buffer1, cmd_handler, and fleet are cloned but remain unused, causing compiler warnings. Either implement the actual async logic inside both tokio::spawn blocks (for osquery polling and command listening respectively) or add todo!() macros inside each task's async block to explicitly mark them as incomplete and fail fast during testing. Remove any variable clones that are declared but not used within their corresponding task closures.agent/crates/agent-core/src/command_handler.rs-8-11 (1)
8-11:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMissing constructor — struct fields are private, preventing external instantiation.
The
CommandHandlerstruct is public but its fields are not. Based on the downstream usage inagent-bin/src/main.rs(lines 147-150), it attempts direct struct initialization which will fail to compile. Either make the fieldspubor add anew()constructor.🔧 Option 1: Add a constructor
pub struct CommandHandler { osquery: OsqueryClient, isolation: IsolationManager, } impl CommandHandler { + pub fn new(osquery: OsqueryClient, isolation: IsolationManager) -> Self { + Self { osquery, isolation } + } + pub async fn handle(&self, msg: ServerMessage) -> Result<Value, String> {🔧 Option 2: Make fields public
pub struct CommandHandler { - osquery: OsqueryClient, - isolation: IsolationManager, + pub osquery: OsqueryClient, + pub isolation: IsolationManager, }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/command_handler.rs` around lines 8 - 11, The CommandHandler struct is public but its fields osquery and isolation are private, preventing external code from directly instantiating the struct as attempted in agent-bin/src/main.rs. Either make the osquery and isolation fields public by adding the pub keyword before each field declaration, or create a public new() constructor method for CommandHandler that accepts the necessary parameters (OsqueryClient and IsolationManager) and returns an initialized CommandHandler instance.agent/crates/agent-bin/src/main.rs-164-177 (1)
164-177:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHeartbeat loop is non-functional.
The heartbeat request creation and the actual
f.heartbeat(&req).awaitcall are commented out. The loop acquires the mutex lock every interval but performs no useful work, wasting resources and causing unnecessary contention on the fleet client mutex.Either implement the heartbeat or remove the spawned task until it's ready.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-bin/src/main.rs` around lines 164 - 177, The heartbeat spawned task in the main.rs file has its core functionality commented out - both the HeartbeatRequest creation and the actual f.heartbeat(&req).await call are non-functional. Either complete the implementation by uncomment and properly implement the HeartbeatRequest struct instantiation and invoke f.heartbeat(&req).await within the loop after acquiring the fleet_hb lock, or remove the entire tokio::spawn block containing the ticker loop until the heartbeat functionality is ready to be implemented.agent/crates/agent-bin/src/main.rs-152-162 (1)
152-162:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftNo graceful shutdown coordination.
The
CancellationTokenis created insideAgentCore(line 153) andagent_coreis then moved into the spawned task (line 160). When Ctrl-C is received (line 210), there's no reference to the token to cancel it. The spawned tasks (agent_core, heartbeat loop, event drain loop) are abandoned rather than gracefully stopped.This can cause data loss if events are mid-flight or the buffer is being drained.
Proposed fix - extract shutdown token
+ let shutdown = tokio_util::sync::CancellationToken::new(); + let agent_core = agent_core::AgentCore { - shutdown: tokio_util::sync::CancellationToken::new(), + shutdown: shutdown.clone(), osquery: osquery.clone(), buffer: buffer.clone(), command_handler: Arc::new(command_handler), fleet_client: fleet.clone(), }; // ...spawned tasks should check shutdown.is_cancelled()... // Wait for shutdown signal let _ = tokio::signal::ctrl_c().await; info!("Ctrl-C received, shutting down"); + shutdown.cancel(); + + // Give tasks time to finish gracefully + tokio::time::sleep(Duration::from_secs(5)).await; let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]);Also applies to: 209-215
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-bin/src/main.rs` around lines 152 - 162, The CancellationToken is created inside the AgentCore struct initialization and then moved into the spawned task, making it impossible to reference for graceful shutdown when Ctrl-C is received. Extract the CancellationToken creation to a separate statement before initializing AgentCore, keep a reference to it in the main function scope, pass it to AgentCore instead of creating a new one, and then use that reference in the Ctrl-C handler (around line 210-215) to call the cancel method on the token to gracefully shut down all spawned tasks.agent/crates/agent-bin/src/main.rs-32-59 (1)
32-59:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSilent failure when
[agent]section is missing from config.If the config file doesn't contain an
[agent]section,in_agentremainsfalse, the condition at line 54 is never true, and the function returnsOk(())without saving thenode_id. This silently loses the enrolled identity.Additionally, the written file lacks a trailing newline after
lines.join("\n").Proposed fix
fn save_node_id_to_config(path: &Path, node_id: Uuid) -> anyhow::Result<()> { let content = std::fs::read_to_string(path)?; let mut lines: Vec<String> = content.lines().map(String::from).collect(); let mut in_agent = false; let mut inserted = false; for i in 0..lines.len() { if lines[i].trim() == "[agent]" { in_agent = true; continue; } if in_agent && lines[i].starts_with("node_id") { lines[i] = format!("node_id = \"{}\"", node_id); inserted = true; break; } if in_agent && lines[i].starts_with('[') { lines.insert(i, format!("node_id = \"{}\"", node_id)); inserted = true; break; } } if in_agent && !inserted { lines.push(format!("node_id = \"{}\"", node_id)); + inserted = true; } - std::fs::write(path, lines.join("\n"))?; + if !inserted { + return Err(anyhow::anyhow!("[agent] section not found in config file")); + } + std::fs::write(path, lines.join("\n") + "\n")?; Ok(()) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-bin/src/main.rs` around lines 32 - 59, The save_node_id_to_config function has two issues: if the config file doesn't contain an [agent] section, the in_agent flag remains false and the node_id is never saved, but the function still returns Ok(()) silently losing the enrolled identity; additionally, the written file lacks a trailing newline. Fix this by checking if in_agent is false after the loop and either returning an error or creating the [agent] section if it's missing, and append a newline character to the lines.join("\n") call before writing to the file.kafka-pipeline/src/bin/kafka-admin.rs-1-5 (1)
1-5:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUsage text advertises commands that are not implemented.
The header says
verify-topicsanddescribe-topicare supported, but the command handler only implementscreate-topics. This is a direct UX/contract break for operators.Suggested fix (minimal, keep behavior honest)
-/// kafka-admin verify-topics --brokers localhost:29092 -/// kafka-admin describe-topic --brokers localhost:29092 --topic aigis.events.raw +/// kafka-admin create-topics --brokers localhost:29092Also applies to: 76-103
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 1 - 5, The usage documentation in the file header advertises CLI commands verify-topics and describe-topic that are not actually implemented in the kafka-admin command handler. Update the usage comments to remove references to these unimplemented commands and only document the create-topics command that is actually available, keeping the advertised interface honest with the actual implementation.kafka-pipeline/src/bin/kafka-admin.rs-62-70 (1)
62-70:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHarden CLI argument parsing; current positional indexing is brittle.
brokersis read fromargs[3]without validating thatargs[2] == "--brokers". Any reordered or malformed invocation silently uses the wrong value.Suggested fix
- let args: Vec<String> = env::args().collect(); - if args.len() < 4 { + let args: Vec<String> = env::args().collect(); + if args.len() < 4 { eprintln!("Usage: kafka-admin <command> --brokers <brokers> [--topic <topic>]"); - return Ok(()); + std::process::exit(2); } let command = &args[1]; - let brokers = &args[3]; // assuming --brokers is args[2] + if args.get(2).map(String::as_str) != Some("--brokers") { + eprintln!("Expected --brokers as the second argument"); + std::process::exit(2); + } + let brokers = &args[3];🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 62 - 70, The CLI argument parsing in the kafka-admin binary uses hard-coded positional indexing to extract the brokers value from args[3] without verifying that args[2] actually contains the "--brokers" flag. This is brittle and will silently use wrong values if arguments are reordered or malformed. Instead of relying on positional indices, iterate through the args vector to find the "--brokers" flag and validate its presence before extracting the next element as the brokers value. This ensures the code handles various argument orderings gracefully and fails explicitly on malformed input rather than silently accepting incorrect values.kafka-pipeline/src/bin/kafka-admin.rs-90-103 (1)
90-103:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReturn a non-zero exit code when admin operations fail or command is unknown.
Right now, topic creation failures and unknown commands only print to stderr, then return
Ok(()). This can make CI/scripts report success on real failures.Suggested fix
"create-topics" => { @@ - for result in results { + let mut had_error = false; + for result in results { match result { Ok(topic_name) => println!("Created topic: {}", topic_name), Err((topic_name, err)) => { - eprintln!("Failed to create topic {}: {:?}", topic_name, err) + had_error = true; + eprintln!("Failed to create topic {}: {:?}", topic_name, err) } } } + if had_error { + std::process::exit(1); + } } _ => { eprintln!("Unknown command: {}", command); + std::process::exit(2); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 90 - 103, The admin operations in the create_topics result handler and the unknown command default case both print errors to stderr but fail to exit with a non-zero status code, causing the program to return success even on failure. In the error match arm where the topic creation fails (the Err branch with topic_name and err parameters), add a process exit call with a non-zero code after the error message. Similarly, in the default case for unknown commands, add a process exit call with a non-zero code after printing the error message to ensure CI/scripts properly detect failures.kafka-pipeline/src/event_router.rs-45-47 (1)
45-47:⚠️ Potential issue | 🟠 MajorPreserve
Nonekeys instead of forcing empty-key partitioning.
key.unwrap_or(&[])converts keyless events to empty-key events, which forces deterministic partition hashing instead of the sticky partitioning strategy Kafka applies to null keys. This skews partition distribution and can create hotspots.Suggested fix
- let record = FutureRecord::to(target_topic) - .payload(payload) // Raw bytes, no re-serialization - .key(key.unwrap_or(&[])); + let mut record = FutureRecord::to(target_topic) + .payload(payload); // Raw bytes, no re-serialization + + if let Some(k) = key { + record = record.key(k); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/event_router.rs` around lines 45 - 47, The FutureRecord construction in the event_router.rs file uses key.unwrap_or(&[]) which converts None keys to empty byte arrays, forcing deterministic partition hashing instead of preserving Kafka's sticky partitioning for null keys. Replace the unwrap_or(&[]) call with a direct pass of the key Option to the .key() method, allowing it to handle None keys properly and preserve the intended sticky partitioning strategy that prevents partition distribution skew..dockerignore-1-3 (1)
1-3:⚠️ Potential issue | 🟠 Major | ⚡ Quick winIgnore real env files from Docker build context.
With
COPY . .in the Dockerfile, missing.envpatterns can leak local secrets into build context/layers.Suggested fix
target/ .git/ .github/ +.env +**/.env +**/.env.* +!**/.env.example🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In @.dockerignore around lines 1 - 3, The .dockerignore file is missing patterns to exclude environment files that may contain secrets. Add ignore patterns for common environment file names like .env, .env.local, and .env.*.local to prevent these files from being included in the Docker build context when using COPY . . in the Dockerfile. This will protect local secrets from being inadvertently copied into the Docker image layers.kafka-pipeline/Dockerfile-7-11 (1)
7-11:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun the runtime image as a non-root user.
Lines 7-11 currently default to root, which weakens container isolation and is flagged by scanner policy.
Suggested fix
FROM debian:bookworm-slim RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/* +RUN useradd -r -u 10001 -g root appuser COPY --from=builder /usr/src/app/target/release/edr-kafka-pipeline /usr/local/bin/edr-kafka-pipeline COPY --from=builder /usr/src/app/target/release/kafka-admin /usr/local/bin/kafka-admin +USER appuser CMD ["edr-kafka-pipeline"]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/Dockerfile` around lines 7 - 11, The Dockerfile currently runs as root by default, which is a security risk. Add a non-root user to the Dockerfile by using a RUN command to create a new user account (for example, using useradd), and then add a USER directive before the CMD instruction to switch to that non-root user. This ensures the edr-kafka-pipeline and kafka-admin executables run with restricted privileges instead of as root.Source: Linters/SAST tools
infra/docker-compose.dev.yml-4-4 (1)
4-4:⚠️ Potential issue | 🟠 MajorPin Docker image tags (or digests) for dev stack reproducibility.
Both Kafka (
apache/kafka:latestat line 4) and Kafka UI (provectuslabs/kafka-ui:latestat line 31) use unpinned:latesttags, which causes local behavior to drift over time and introduces supply-chain risk. Pin specific versions or use digest references instead.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infra/docker-compose.dev.yml` at line 4, Replace the unpinned `:latest` tags in the Docker image references with specific version pins or digest references. In the Kafka service, update the image property from `apache/kafka:latest` to a pinned version like `apache/kafka:3.5.0` (or a specific digest). Similarly, in the Kafka UI service configuration, replace `provectuslabs/kafka-ui:latest` with a pinned version. This ensures reproducible builds and reduces supply-chain risks by preventing unexpected changes from automatic image updates..github/workflows/kafka-pipeline.yml-17-18 (1)
17-18:⚠️ Potential issue | 🟠 MajorPin GitHub Actions and service image to immutable refs.
Using floating refs (
@v3,@stable,:latest) makes CI non-reproducible and increases supply-chain risk. Pin to full commit SHAs instead:
- Line 17:
actions/checkout@v3- Line 18:
dtolnay/rust-toolchain@stable- Line 27:
apache/kafka:latest- Lines 36-37:
actions/checkout@v3anddtolnay/rust-toolchain@stable- Lines 43-44:
actions/checkout@v3anddtolnay/rust-toolchain@stable- Line 50:
actions/checkout@v3🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In @.github/workflows/kafka-pipeline.yml around lines 17 - 18, Replace all floating version references in the workflow with full commit SHAs to ensure reproducibility and reduce supply-chain risk. Replace instances of actions/checkout@v3 with its full commit SHA, dtolnay/rust-toolchain@stable with its corresponding full commit SHA, and apache/kafka:latest with its full image digest. Perform these replacements at all occurrences throughout the file (including the checkout and rust-toolchain actions in the multiple job steps, and the kafka service image). Use the GitHub UI or documentation to find the exact commit SHAs for each action and image digest to pin to immutable references.Source: Linters/SAST tools
agent/crates/fleet-client/src/lib.rs-132-136 (1)
132-136:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate
ServerMessageTypebefore decoding event/heartbeat payloads.
send_events()andheartbeat()deserialize payload directly without checking message type. Error or command frames will be treated as decode failures instead of protocol-level errors.✅ Suggested fix
- let ack: EventAck = serde_json::from_value(response.payload)?; - Ok(ack) + match response.message_type { + ServerMessageType::EventAck => Ok(serde_json::from_value(response.payload)?), + ServerMessageType::Error => Err(anyhow::anyhow!("event upload rejected: {}", response.payload)), + other => Err(anyhow::anyhow!("unexpected response type for send_events: {:?}", other)), + } ... - let hb: HeartbeatResponse = serde_json::from_value(response.payload)?; - Ok(hb) + match response.message_type { + ServerMessageType::HeartbeatResponse => Ok(serde_json::from_value(response.payload)?), + ServerMessageType::Error => Err(anyhow::anyhow!("heartbeat rejected: {}", response.payload)), + other => Err(anyhow::anyhow!("unexpected response type for heartbeat: {:?}", other)), + }Also applies to: 150-154
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/fleet-client/src/lib.rs` around lines 132 - 136, In both the send_events() and heartbeat() methods, add validation of the ServerMessageType from the received response before attempting to deserialize the payload. After the receive().await call completes successfully, check that the response.message_type matches the expected type for each method (EventAck for send_events and the appropriate heartbeat response type for heartbeat). If the message type does not match, return an error that clearly indicates the protocol-level error rather than allowing deserialization to fail. This ensures error and command frames are properly handled as protocol violations instead of being treated as decode failures.
🟡 Minor comments (3)
agent/crates/agent-core/src/command_handler.rs-26-26 (1)
26-26:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
unwrap_or_default()silently masks serialization errors.If
serde_json::to_value(results)fails, this returnsValue::Nullwithout any indication of failure. The caller cannot distinguish between "query returned no results" and "serialization failed."🔧 Suggested fix
- Ok(serde_json::to_value(results).unwrap_or_default()) + serde_json::to_value(results).map_err(|e| e.to_string())🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/command_handler.rs` at line 26, The call to `serde_json::to_value(results).unwrap_or_default()` silently masks serialization errors by returning a null value when serialization fails, preventing the caller from distinguishing between successful serialization with no data and actual serialization failures. Replace the `unwrap_or_default()` call with proper error handling that propagates or explicitly handles the serialization error, either by using `map_err()` to convert the error into an appropriate error response, or by letting the error bubble up through the Result type that this function appears to return. This ensures serialization failures are properly reported rather than silently masked as null values.kafka-pipeline/guide.md-77-78 (1)
77-78:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate the function name in the extension instructions.
The guide says
determine_topic, but the implementation usesroute_topicinkafka-pipeline/src/event_router.rs. Aligning this avoids contributor confusion.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/guide.md` around lines 77 - 78, The guide documentation references the function name determine_topic in the extension instructions, but the actual implementation uses route_topic in the event router module. Update the guide text where it mentions determine_topic to instead reference route_topic, and ensure all instructions that follow refer to this correct function name to maintain consistency with the actual codebase implementation.kafka-pipeline/guide.md-13-15 (1)
13-15:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix the topic-init command path after changing directories.
After Line 13 (
cd infra), Line 21 should not referenceinfra/scripts/...from the same shell location. Usebash scripts/create-topics.sh(or explicitlycd ..first).Also applies to: 21-21
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/guide.md` around lines 13 - 15, The path reference in the topic-init command on line 21 is incorrect relative to the directory context. After the `cd infra` command on line 13, the command that references `infra/scripts/create-topics.sh` will fail because it will try to access `infra/infra/scripts/...` from within the infra directory. Fix this by either changing the path to `scripts/create-topics.sh` to reflect the current working directory after `cd infra`, or add `cd ..` before the topic-init command to return to the original directory before referencing the full path.
🧹 Nitpick comments (10)
agent/crates/agent-core/src/lib.rs (2)
55-73: 💤 Low valueTest stubs are empty — consider using
todo!()or tracking completion.These test functions will pass silently without testing anything. If these are intentional placeholders, consider using
todo!()to fail explicitly until implemented, or add a tracking issue.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/lib.rs` around lines 55 - 73, The test functions test_osquery_loop_produces_events, test_command_handling, test_shutdown_signal, and test_event_buffer_integration are empty stubs that will pass silently without verifying any behavior. Replace the comment placeholders in each test with either a todo!() macro call to fail explicitly until the tests are implemented, or document a tracking issue. This ensures the tests fail loudly when run, preventing accidental silent passes.
42-45: ⚖️ Poor tradeoffConsider graceful shutdown via
CancellationTokeninstead ofabort().Using
abort()terminates tasks immediately without allowing cleanup (e.g., closing connections, flushing buffers). A more resilient pattern is to pass theCancellationTokeninto each task and usetokio::select!to exit gracefully when cancelled.♻️ Example pattern
let shutdown_clone = shutdown.clone(); let osquery_task = tokio::spawn(async move { loop { tokio::select! { _ = shutdown_clone.cancelled() => break, // ... actual work ... } } }); // Then just await tasks instead of aborting shutdown.cancelled().await; let _ = osquery_task.await; let _ = command_task.await;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/lib.rs` around lines 42 - 45, The current implementation uses abort() on osquery_task and command_task, which immediately terminates them without allowing cleanup. Instead, refactor each task spawning to accept a cloned shutdown token and use tokio::select! to gracefully exit when the shutdown token is cancelled. Inside each task (osquery_task and command_task), add a loop with tokio::select! that breaks on shutdown.cancelled() and continues normal operation otherwise. Finally, replace the abort() calls with await statements to allow tasks to complete their cleanup logic before shutdown completes.agent/crates/agent-core/src/command_handler.rs (1)
3-3: ⚡ Quick winUnused imports.
infoandwarnfromtracingare imported but never used in this module.♻️ Suggested fix
-use tracing::{info, warn}; +// Remove or use tracing macros when adding logging🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@agent/crates/agent-core/src/command_handler.rs` at line 3, Remove the unused imports `info` and `warn` from the tracing import statement at the top of the command_handler.rs module. If these are the only items being imported from the tracing crate, delete the entire import line; otherwise, remove just `info` and `warn` from the list of imported items while keeping any other tracing imports that are actually used in the module.kafka-pipeline/src/main.rs (1)
22-24: ⚡ Quick winAvoid panic in startup path; propagate producer init errors.
Using
expecthere hard-aborts the process and bypasses theanyhow::Result<()>error path already used elsewhere inmain.Suggested fix
let router_producer = rdkafka::config::ClientConfig::new() .set("bootstrap.servers", &brokers) .set("linger.ms", "5") .set("compression.type", "lz4") .create() - .expect("Router producer creation failed"); + .map_err(|e| anyhow::anyhow!("Router producer creation failed: {e}"))?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/main.rs` around lines 22 - 24, The router producer creation in the startup path uses `.expect()` which panics and bypasses the `anyhow::Result<()>` error handling pattern used elsewhere in main. Replace the `.expect("Router producer creation failed")` call with the `?` operator on the `.create()` method of the router producer so that any creation errors are properly propagated through the function's error return path instead of panicking.kafka-pipeline/tests/integration_test.rs (1)
3-7: ⚡ Quick winThis integration test currently always passes without validating behavior.
Please either add assertions for routing behavior or mark it ignored until implementation is ready.
Suggested fix
#[test] + #[ignore = "TODO: implement end-to-end Kafka routing assertions"] fn test_pipeline_integration() { // Test placeholder // No-op }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/tests/integration_test.rs` around lines 3 - 7, The test_pipeline_integration function is currently a placeholder with no actual test logic or assertions. Either implement the test with actual assertions that validate the pipeline's routing behavior, or add the #[ignore] attribute above the test function to mark it as not yet implemented. Choose whichever is most appropriate for your development workflow.kafka-pipeline/src/health.rs (1)
4-5: ⚡ Quick winHardcoded health success can mask outages once this is wired.
Returning
trueunconditionally defeats health gating. Prefer explicit “not implemented” behavior or real dependency checks before exposing it.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/health.rs` around lines 4 - 5, The is_healthy() function currently returns true unconditionally, which will mask any actual system outages once this health check is integrated into the system. Either implement a "not yet implemented" return value (such as returning false) to indicate the health check is not ready, or add actual dependency checks (such as verifying Kafka connectivity, database availability, or other critical services) to the is_healthy() function so it returns an accurate health status based on real system state rather than a hardcoded value.kafka-pipeline/src/metrics.rs (1)
17-23: ⚡ Quick winDon’t return a real lag value from an unimplemented monitor.
Ok(0)makes dashboards/alerts look healthy even when lag computation is not implemented.Suggested fix
pub async fn get_consumer_lag(&self, _group_id: &str) -> Result<i64, String> { - // Fetch committed offsets - // Fetch latest offsets (watermarks) - // Calculate difference - // Return total lag - Ok(0) // TODO: actual implementation + Err("consumer lag monitoring not implemented".to_string()) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@kafka-pipeline/src/metrics.rs` around lines 17 - 23, The get_consumer_lag method currently returns Ok(0) as a placeholder, which masks the fact that the feature is not yet implemented by making monitoring systems report healthy lag values. Replace the Ok(0) return with an Err that returns an appropriate error message indicating that the consumer lag computation is not implemented yet, such as returning an error string that clearly states this functionality is not yet available.infra/scripts/create-topics.sh (2)
4-5: ⚡ Quick winUse declared config variables instead of hardcoded broker/bin paths.
BOOTSTRAPandKAFKA_BINare currently unused while equivalent values are hardcoded in the command invocations. Wire those variables intodocker execcalls to avoid config drift.Suggested patch
BOOTSTRAP="localhost:29092" KAFKA_BIN="/opt/kafka/bin" @@ - docker exec aigis-kafka-dev /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \ + docker exec aigis-kafka-dev "${KAFKA_BIN}/kafka-topics.sh" --bootstrap-server "${BOOTSTRAP}" \ @@ -docker exec aigis-kafka-dev /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list +docker exec aigis-kafka-dev "${KAFKA_BIN}/kafka-topics.sh" --bootstrap-server "${BOOTSTRAP}" --listAlso applies to: 15-16, 35-35
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infra/scripts/create-topics.sh` around lines 4 - 5, The variables BOOTSTRAP and KAFKA_BIN are declared at the top of the script but are not utilized in the actual command invocations. Replace all hardcoded occurrences of the broker address (localhost:29092) with the BOOTSTRAP variable reference and hardcoded occurrences of the Kafka binary path (/opt/kafka/bin) with the KAFKA_BIN variable reference in the docker exec calls throughout the script. This ensures the script uses the declared configuration variables consistently and prevents configuration drift.Source: Linters/SAST tools
12-12: ⚡ Quick winMake replication factor configurable for multi-node environments.
Defaulting every topic to replication factor
1weakens resilience when run against the 3-node cluster setup. Add an env-driven default (e.g.,TOPIC_REPLICATION_FACTOR, fallback1) and document when to set it to3.Also applies to: 25-32
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infra/scripts/create-topics.sh` at line 12, The replication variable assignment currently defaults to a hardcoded value of 1, which is insufficient for multi-node cluster setups. Modify the replication variable assignment to first check the TOPIC_REPLICATION_FACTOR environment variable, then use the positional parameter if provided, and finally fall back to 1 as the default. Apply this same pattern to all other replication variable assignments mentioned in lines 25-32 to ensure consistency across the script. Document in comments or script header when to set TOPIC_REPLICATION_FACTOR to 3 for multi-node environments.infra/docker-compose.kafka-cluster.yml (1)
4-4: Pin Kafka image tags instead of usinglatestacross all three services.Using
apache/kafka:latestmakes the cluster non-reproducible and can break local/CI flows on upstream image changes. Apache Kafka has frequent breaking changes between versions (e.g., ZooKeeper removal, API deprecations, configuration defaults), so unpinned images risk unexpected failures. Pin a tested semantic version tag (e.g.,apache/kafka:4.3.0) or use an immutable digest across all three services for stability and predictability.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@infra/docker-compose.kafka-cluster.yml` at line 4, Replace the `apache/kafka:latest` image tags with a pinned semantic version tag (e.g., `apache/kafka:4.3.0`) across all three Kafka services in the docker-compose.kafka-cluster.yml file. Instead of using the `latest` tag which changes unpredictably and can introduce breaking changes, specify a tested and verified version number to ensure reproducibility and consistency across local and CI environments. Apply this same pinned version to all service definitions that reference the Apache Kafka image.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 08385b75-ee5d-4c17-8b42-978c7f85d861
📒 Files selected for processing (35)
.dockerignore.github/workflows/kafka-pipeline.ymlCargo.tomlagent/.env.exampleagent/crates/agent-bin/Cargo.tomlagent/crates/agent-bin/src/main.rsagent/crates/agent-core/Cargo.tomlagent/crates/agent-core/src/command_handler.rsagent/crates/agent-core/src/lib.rsagent/crates/fleet-client/Cargo.tomlagent/crates/fleet-client/src/lib.rsagent/tests/agent_integration.rsfleet-server/crates/fleet-manager/Cargo.tomlfleet-server/crates/grpc-listener/Cargo.tomlfleet-server/crates/grpc-listener/build.rsfleet-server/crates/grpc-listener/src/service.rsfleet-server/crates/kafka-handler/Cargo.tomlfleet-server/crates/node-enrollment/Cargo.tomlfleet-server/src/grpc/testing.protoinfra/docker-compose.dev.ymlinfra/docker-compose.kafka-cluster.ymlinfra/k8s/keda-scaler.ymlinfra/scripts/create-topics.shkafka-pipeline/.env.examplekafka-pipeline/Cargo.tomlkafka-pipeline/Dockerfilekafka-pipeline/guide.mdkafka-pipeline/src/bin/kafka-admin.rskafka-pipeline/src/consumer.rskafka-pipeline/src/event_router.rskafka-pipeline/src/health.rskafka-pipeline/src/lib.rskafka-pipeline/src/main.rskafka-pipeline/src/metrics.rskafka-pipeline/tests/integration_test.rs
💤 Files with no reviewable changes (1)
- fleet-server/crates/grpc-listener/build.rs
| // Start AgentCore (osquery loop + command listener) | ||
| let agent_uuid = node_id.to_string(); | ||
| let mut collector = collector; // Move collector here | ||
| let _results_rx = collector.start(&agent_uuid).await; // We don't read from rx directly now, AgentCore does. Wait, AgentCore needs OsqueryClient, not Collector. | ||
| // Assuming OsqueryClient is available and takes collector or something. | ||
| // For now, we will just instantiate AgentCore with dummy Arc wrapping. | ||
| let command_handler = agent_core::command_handler::CommandHandler { | ||
| osquery: Arc::new(collector), // Assuming this works | ||
| isolation: isolation::IsolationManager::new(), // Assuming this exists | ||
| }; | ||
|
|
||
| let agent_core = agent_core::AgentCore { | ||
| shutdown: tokio_util::sync::CancellationToken::new(), | ||
| osquery: Arc::new(collector), | ||
| buffer: buffer.clone(), | ||
| command_handler: Arc::new(command_handler), | ||
| fleet_client: fleet.clone(), | ||
| }; | ||
|
|
||
| tokio::spawn(async move { | ||
| let _ = agent_core.run().await; | ||
| }); |
There was a problem hiding this comment.
Use-after-move: collector is moved twice.
collector is moved into Arc::new(collector) at line 148 for CommandHandler, then moved again at line 154 for AgentCore. This is a compile error.
The comments ("Assuming this works", "We don't read from rx directly now, AgentCore does") suggest this section needs design clarification on how OsqueryCollector should be shared.
Proposed fix - share a single Arc
- let command_handler = agent_core::command_handler::CommandHandler {
- osquery: Arc::new(collector), // Assuming this works
- isolation: isolation::IsolationManager::new(), // Assuming this exists
- };
-
- let agent_core = agent_core::AgentCore {
- shutdown: tokio_util::sync::CancellationToken::new(),
- osquery: Arc::new(collector),
- buffer: buffer.clone(),
- command_handler: Arc::new(command_handler),
- fleet_client: fleet.clone(),
- };
+ let osquery = Arc::new(collector);
+
+ let command_handler = agent_core::command_handler::CommandHandler {
+ osquery: osquery.clone(),
+ isolation: isolation::IsolationManager::new(),
+ };
+
+ let agent_core = agent_core::AgentCore {
+ shutdown: tokio_util::sync::CancellationToken::new(),
+ osquery: osquery.clone(),
+ buffer: buffer.clone(),
+ command_handler: Arc::new(command_handler),
+ fleet_client: fleet.clone(),
+ };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/crates/agent-bin/src/main.rs` around lines 141 - 162, The collector
variable is being moved twice into Arc::new(), which causes a use-after-move
error. Fix this by wrapping collector in a single Arc::new() call before using
it, then clone that Arc reference when passing it to both the CommandHandler
struct (in the osquery field) and the AgentCore struct (in the osquery field).
This way, collector is moved only once into the Arc, and then the Arc itself is
cloned for multiple ownership without moving the underlying collector.
| // Start event drain loop (every event_drain_interval_secs) | ||
| let fleet_drain = fleet.clone(); | ||
| let drain_interval = config.fleet.event_drain_interval_secs; | ||
| let batch_size = config.agent.event_drain_batch; | ||
| tokio::spawn(async move { | ||
| let mut ticker = interval(Duration::from_secs(drain_interval)); | ||
| loop { | ||
| ticker.tick().await; | ||
| if let Ok(events) = buffer.pop(batch_size).await { | ||
| if !events.is_empty() { | ||
| let mut f = fleet_drain.lock().await; | ||
| // Let's assume EventBatch has this structure | ||
| // let batch = EventBatch { ... }; | ||
| // let _ = f.send_events(&batch).await; | ||
| } | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Event drain loop causes data loss.
Events are popped from the buffer at line 187 with buffer.pop(batch_size).await, but the actual send_events call is commented out (lines 190-192). Popped events are discarded, resulting in permanent data loss.
This is critical for an EDR agent where event delivery reliability is essential.
Proposed fix - implement or guard the drain
Either implement the send logic:
if !events.is_empty() {
let mut f = fleet_drain.lock().await;
- // Let's assume EventBatch has this structure
- // let batch = EventBatch { ... };
- // let _ = f.send_events(&batch).await;
+ let batch = EventBatch {
+ node_id,
+ events,
+ timestamp: chrono::Utc::now(),
+ };
+ if let Err(e) = f.send_events(&batch).await {
+ warn!(?e, "Failed to send events, re-queuing");
+ // Re-queue events on failure
+ for event in batch.events {
+ let _ = buffer.push(event).await;
+ }
+ }
}Or remove the pop call until send is implemented to avoid data loss.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/crates/agent-bin/src/main.rs` around lines 179 - 196, The event drain
loop is popping events from the buffer but the send_events call is commented
out, causing permanent data loss. Uncomment the lines that create the EventBatch
and call f.send_events(&batch).await within the event drain loop spawned by
tokio::spawn, ensuring the popped events are actually sent to the fleet before
they are discarded from the buffer. If the EventBatch structure is not yet
defined, either define it with the required fields or ensure the send_events
implementation is completed before enabling this critical path.
| // Create channels for message passing | ||
| let (outbound_tx, outbound_rx) = mpsc::channel::<AgentMessage>(100); | ||
| let (inbound_tx, inbound_rx) = mpsc::channel::<ServerMessage>(100); | ||
|
|
||
| // Create the bidirectional stream using JsonCodec | ||
| // NOTE: The exact tonic API for manual streaming without proto | ||
| // needs to be verified. The pattern involves: | ||
| // 1. Creating a tonic::client::Grpc from the channel | ||
| // 2. Calling .streaming() with our JsonCodec | ||
| // 3. Sending outbound_rx as the request stream | ||
| // 4. Receiving the response stream and forwarding to inbound_tx | ||
|
|
||
| self.outbound_tx = Some(outbound_tx); | ||
| self.inbound_rx = Some(inbound_rx); | ||
|
|
There was a problem hiding this comment.
connect() drops both stream ends, so request/response paths are dead.
At Lines 52–55, outbound_rx and inbound_tx are created but never attached to any stream task; both are dropped when connect() returns. That makes Line 160 fail with “Send channel closed,” and Line 168 can only yield None. enroll/send_events/heartbeat can’t succeed in this state.
Also applies to: 157-169
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agent/crates/fleet-client/src/lib.rs` around lines 52 - 66, In the connect()
function, the channels outbound_rx and inbound_tx are created but immediately
dropped without being attached to any stream handler. You must implement the
bidirectional streaming logic mentioned in the NOTE comment: create a
tonic::client::Grpc from the channel, set up the JsonCodec streaming, spawn a
task that receives from outbound_rx and forwards messages to the server stream,
and simultaneously receives responses from the server stream and forwards them
to inbound_tx. This will ensure the request/response paths remain open for the
enroll, send_events, and heartbeat methods to function properly.
| [dependencies] | ||
| async-trait.workspace = true | ||
| prost.workspace = true | ||
| async-trauuid.workspace = true |
There was a problem hiding this comment.
Fix dependency key typo at Line 8 (async-trauuid).
async-trauuid.workspace = true is an invalid dependency name and will fail manifest resolution. This should be async-trait.workspace = true.
✅ Suggested fix
-async-trauuid.workspace = true
+async-trait.workspace = true📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async-trauuid.workspace = true | |
| async-trait.workspace = true |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@fleet-server/crates/fleet-manager/Cargo.toml` at line 8, There is a typo in
the dependency key at the top of the Cargo.toml file where async-trauuid is
written instead of the correct dependency name. Change the dependency key from
async-trauuid.workspace to async-trait.workspace to fix the invalid manifest
name that will cause resolution failures.
| AckCommand, AgentEvent, HeartbeatRequest, HeartbeatResponse, RegisterRequest, RegisterResponse, | ||
| ServerCommand, server_command::Command, | ||
| }; | ||
| // Protobuf removed; JSON-over-gRPC structures will be defined here later |
There was a problem hiding this comment.
Restore FleetService/FleetServiceServer exports before removing the proto module.
Line 16 removes the only visible source of gRPC service/type definitions, but this file still implements FleetService (Line 46) and fleet-server/crates/grpc-listener/src/server.rs still wires FleetServiceServer. In this state, the crate contract is broken and won’t compile.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@fleet-server/crates/grpc-listener/src/service.rs` at line 16, The proto
module removal comment at line 16 eliminates the only export source for
`FleetService` and `FleetServiceServer`, but the `FleetService` implementation
in this file and the `FleetServiceServer` wiring in server.rs still require
access to these types. Before removing the protobuf reference, add explicit pub
use statements to export `FleetService` and `FleetServiceServer` so that the
crate's public contract remains intact and both the implementation and server.rs
can access the required types.
| FROM rust:1.80 as builder | ||
| WORKDIR /usr/src/app | ||
| RUN apt-get update && apt-get install -y cmake g++ | ||
| COPY . . | ||
| RUN cd kafka-pipeline && cargo build --release |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's locate and read the Dockerfile mentioned
find . -name "Dockerfile" -path "*/kafka-pipeline/*" | head -5Repository: swar09/project-edr
Length of output: 90
🏁 Script executed:
#!/bin/bash
# Find workspace Cargo.toml files and check rust-version
fd -t f "Cargo.toml" | grep -E "^./Cargo.toml$|workspace" | head -10Repository: swar09/project-edr
Length of output: 44
🏁 Script executed:
#!/bin/bash
# Read the Dockerfile content
cat -n kafka-pipeline/Dockerfile 2>/dev/null || find . -name "Dockerfile" -type f -exec cat -n {} \;Repository: swar09/project-edr
Length of output: 634
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files
find . -name "Cargo.toml" -type f | head -20Repository: swar09/project-edr
Length of output: 825
🏁 Script executed:
#!/bin/bash
# Search for rust-version in all Cargo.toml files
rg "rust-version" --type tomlRepository: swar09/project-edr
Length of output: 1309
🏁 Script executed:
#!/bin/bash
# Read the root Cargo.toml to see the exact rust-version value
cat -n ./Cargo.toml | head -30Repository: swar09/project-edr
Length of output: 1058
Update Rust version in Dockerfile builder stage to match workspace minimum.
Line 1 uses rust:1.80, but workspace rust-version is 1.91; this will fail during Docker builds once manifest loading is fixed.
Suggested fix
-FROM rust:1.80 as builder
+FROM rust:1.91 AS builder📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| FROM rust:1.80 as builder | |
| WORKDIR /usr/src/app | |
| RUN apt-get update && apt-get install -y cmake g++ | |
| COPY . . | |
| RUN cd kafka-pipeline && cargo build --release | |
| FROM rust:1.91 AS builder | |
| WORKDIR /usr/src/app | |
| RUN apt-get update && apt-get install -y cmake g++ | |
| COPY . . | |
| RUN cd kafka-pipeline && cargo build --release |
🧰 Tools
🪛 GitHub Actions: Kafka Pipeline / 1_docker.txt
[error] 5-5: Step failed: /bin/sh -c cd kafka-pipeline && cargo build --release exited with code 101.
[warning] 1-1: Dockerfile warning: FromAsCasing — 'as' and 'FROM' keyword casing do not match.
🪛 GitHub Actions: Kafka Pipeline / docker
[warning] 1-1: Dockerfile warning: FromAsCasing - 'as' and 'FROM' keywords' casing do not match (line 1).
🪛 Trivy (0.69.3)
[error] 1-1: Image user should not be 'root'
Specify at least 1 USER command in Dockerfile with non-root user as argument
Rule: DS-0002
(IaC/Dockerfile)
[error] 3-3: 'apt-get' missing '--no-install-recommends'
'--no-install-recommends' flag is missed: 'apt-get update && apt-get install -y cmake g++'
Rule: DS-0029
(IaC/Dockerfile)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@kafka-pipeline/Dockerfile` around lines 1 - 5, Update the Rust version
specified in the FROM statement of the Dockerfile builder stage from rust:1.80
to rust:1.91 to match the workspace minimum Rust version requirement and prevent
Docker build failures.
| /// Process a single message. Return Ok(()) to commit, Err to skip. | ||
| async fn process(&self, key: Option<&[u8]>, payload: &[u8], topic: &str, partition: i32, offset: i64) -> Result<(), String>; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify commit strategy in the consumer worker.
rg -n --type rust 'enable\.auto\.commit|enable\.auto\.offset\.store|commit_message|commit\(' kafka-pipeline/src/consumer.rsRepository: swar09/project-edr
Length of output: 112
🏁 Script executed:
# Read the consumer.rs file to understand the full implementation
head -100 kafka-pipeline/src/consumer.rs | cat -nRepository: swar09/project-edr
Length of output: 3803
🏁 Script executed:
# Get the specific lines mentioned in the review
sed -n '10,15p;30,40p;70,80p' kafka-pipeline/src/consumer.rs | cat -nRepository: swar09/project-edr
Length of output: 1675
🏁 Script executed:
# Search for any manual commit logic
rg -n 'commit' kafka-pipeline/src/consumer.rs -A 2 -B 2Repository: swar09/project-edr
Length of output: 641
Auto-commit breaks the documented Ok => commit / Err => skip contract.
With enable.auto.commit=true and auto.commit.interval.ms=1000, offsets are committed automatically every 1 second regardless of whether process() succeeds or fails. This means failed messages are lost instead of retried, violating the contract documented in the trait definition.
Disable auto-commit and enable manual commits only on success:
Fix
-use rdkafka::consumer::{Consumer, StreamConsumer};
+use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
...
- .set("enable.auto.commit", "true")
+ .set("enable.auto.commit", "false")
+ .set("enable.auto.offset.store", "false")
- .set("auto.commit.interval.ms", "1000")
...
if let Err(e) = self.processor.process(key, payload, topic, partition, offset).await {
error!(error = %e, topic, partition, offset, "Message processing failed");
// TODO: Send to DLQ
+ } else if let Err(e) = self.consumer.commit_message(&borrowed_msg, CommitMode::Async) {
+ error!(error = %e, topic, partition, offset, "Offset commit failed");
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@kafka-pipeline/src/consumer.rs` around lines 11 - 12, The `process` method's
contract is broken because auto-commit is enabled in the Kafka consumer
configuration, causing offsets to be committed automatically every 1 second
regardless of whether the method returns Ok or Err. To fix this, disable the
`enable.auto.commit` configuration setting and instead manually commit offsets
only when the `process` method returns Ok(()). For Err results, do not commit
the offset so the message can be retried later. This ensures the documented
contract is honored where Ok commits and Err skips/retries.
| "file_create" | "file_modify" | "file_delete" => "aigis.events.file", | ||
| "user_login" | "user_logout" => "aigis.events.auth", | ||
| "osquery_result" | "osquery_snapshot" => "aigis.events.process", // default bucket | ||
| _ => "aigis.events.raw", // unknown types stay in raw |
There was a problem hiding this comment.
Unknown events are republished to the same input topic, creating a feedback loop.
Line 26 routes unknown event_type back to aigis.events.raw, while the worker subscribes to aigis.events.raw in kafka-pipeline/src/main.rs Line 29. That will continuously re-consume and re-produce the same class of events.
Suggested direction
- _ => "aigis.events.raw", // unknown types stay in raw
+ _ => "aigis.events.unrouted", // unknown types go to DLQ/unrouted bucketAlso add the new topic to provisioning (infra/scripts/create-topics.sh) and related infra manifests before enabling this route.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@kafka-pipeline/src/event_router.rs` at line 26, The default case in the match
statement in event_router.rs is routing unknown event_type values back to
aigis.events.raw, which is the same topic being consumed by the worker, creating
a feedback loop. Change the default case to route unknown events to a new
dedicated topic (e.g., aigis.events.unknown or similar) instead of
aigis.events.raw. After updating the routing logic, provision the new topic by
adding it to infra/scripts/create-topics.sh and update any related
infrastructure manifests (such as Kubernetes manifests or Terraform
configurations) to ensure the new topic is created in all environments before
deploying this change.
Summary
Type
Target Module / Crate
SDK
sdk— shared types, proto definitionsAgent Workspace
agent/agent-core— binary entry point, orchestratoragent/ebpf-collector— eBPF programs and loaderagent/osquery-client— OSQuery socket IPC clientagent/event-buffer— local disk buffer (sled)agent/fleet-client— gRPC client to Fleet Serveragent/isolation— IPTables-based network isolationBackend Services
fleet-server— gRPC fleet server (enrollment, streaming, C2)kafka-pipeline— event processor + normaliser + DB writerrule-engine— YARA scanning, MITRE mapping, alert generationapi-backend— REST API + WebSocket for frontendFrontend
frontend— React/Vite/TypeScript dashboardInfrastructure
infra— Docker Compose, K8s manifests, Terraform, scriptsChecklist
docker-compose uptested locallyHow to verify
Summary by CodeRabbit
Release Notes
New Features
Infrastructure
Documentation