Skip to content

Refactor Kafka Pipeline and Agent Integration#12

Merged
swar09 merged 4 commits into
mainfrom
feat/kaftka-01
Jun 27, 2026
Merged

Refactor Kafka Pipeline and Agent Integration#12
swar09 merged 4 commits into
mainfrom
feat/kaftka-01

Conversation

@swar09

@swar09 swar09 commented Jun 16, 2026

Copy link
Copy Markdown
Owner
  • Updated dependencies in Cargo.toml files across multiple crates.
  • Removed unused build.rs in grpc-listener and adjusted service.rs to eliminate protobuf references.
  • Introduced new Kafka admin CLI tool for topic management.
  • Implemented event routing logic in the Kafka pipeline to direct events to appropriate topics based on type.
  • Added integration tests for agent and pipeline components.
  • Created Docker Compose configurations for local development and Kafka cluster setup.
  • Added health check and metrics modules for monitoring consumer lag.
  • Enhanced Dockerfile for building the Kafka pipeline with multi-stage builds.
  • Established GitHub Actions workflows for CI/CD processes including build, test, and Docker image creation.
  • Created scripts for initializing Kafka topics and managing environment variables.

Summary

Type

  • feat — new functionality
  • fix — bug fix
  • chore — dependency update, refactor, tooling
  • docs — documentation only
  • sec — security fix or hardening

Target Module / Crate

SDK

  • sdk — shared types, proto definitions

Agent Workspace

  • agent/agent-core — binary entry point, orchestrator
  • agent/ebpf-collector — eBPF programs and loader
  • agent/osquery-client — OSQuery socket IPC client
  • agent/event-buffer — local disk buffer (sled)
  • agent/fleet-client — gRPC client to Fleet Server
  • agent/isolation — IPTables-based network isolation

Backend Services

  • fleet-server — gRPC fleet server (enrollment, streaming, C2)
  • kafka-pipeline — event processor + normaliser + DB writer
  • rule-engine — YARA scanning, MITRE mapping, alert generation
  • api-backend — REST API + WebSocket for frontend

Frontend

  • frontend — React/Vite/TypeScript dashboard

Infrastructure

  • infra — Docker Compose, K8s manifests, Terraform, scripts

Checklist

  • Linked issue: closes #
  • No secrets or credentials in code
  • Tests added or updated
  • docker-compose up tested locally
  • Breaking changes documented in PR description

How to verify

Summary by CodeRabbit

Release Notes

  • New Features

    • Added EDR agent enrollment and command handling capabilities
    • Implemented event routing pipeline for processing and distributing EDR events
    • Added Kafka infrastructure for scalable event processing
    • Enabled agent-to-fleet server communication with automatic node enrollment
  • Infrastructure

    • Added GitHub Actions CI/CD workflow for pipeline testing and building
    • Added Kubernetes KEDA autoscaling configuration
    • Added Docker Compose configurations for local Kafka development
  • Documentation

    • Added setup and troubleshooting guides
    • Added environment configuration templates

- Updated dependencies in Cargo.toml files across multiple crates.
- Removed unused build.rs in grpc-listener and adjusted service.rs to eliminate protobuf references.
- Introduced new Kafka admin CLI tool for topic management.
- Implemented event routing logic in the Kafka pipeline to direct events to appropriate topics based on type.
- Added integration tests for agent and pipeline components.
- Created Docker Compose configurations for local development and Kafka cluster setup.
- Added health check and metrics modules for monitoring consumer lag.
- Enhanced Dockerfile for building the Kafka pipeline with multi-stage builds.
- Established GitHub Actions workflows for CI/CD processes including build, test, and Docker image creation.
- Created scripts for initializing Kafka topics and managing environment variables.
@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5f16ed95-ed6b-437f-bb84-b52976ebd801

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces the edr-kafka-pipeline crate with a MessageProcessor trait, ConsumerWorker, EventRouterProcessor (routes events by type to topic buckets), and kafka-admin CLI binary. It also rewrites FleetClient from a protobuf/tonic codegen model to a JSON-over-gRPC mpsc-channel design, adds AgentCore and CommandHandler, and substantially rewrites agent-bin startup. Kafka infra (Docker Compose, topic scripts, KEDA scaler) and a CI workflow are also added.

Changes

Kafka Pipeline

Layer / File(s) Summary
Workspace deps, pipeline manifest, Dockerfile, CI, env
Cargo.toml, kafka-pipeline/Cargo.toml, kafka-pipeline/Dockerfile, .github/workflows/kafka-pipeline.yml, kafka-pipeline/.env.example, agent/.env.example, .dockerignore
Workspace adds rdkafka, hyper, http, http-body; removes tonic entries. kafka-pipeline manifest expands to rdkafka/metrics/tokio-full. Dockerfile rewritten to two-stage build copying both binaries. CI workflow adds four jobs. Env examples added.
MessageProcessor trait and ConsumerWorker
kafka-pipeline/src/consumer.rs, kafka-pipeline/src/lib.rs
MessageProcessor async trait defines the processing contract. ConsumerWorker configures an rdkafka StreamConsumer, subscribes it to topics, and runs a cooperative shutdown loop dispatching messages to the processor.
EventRouterProcessor
kafka-pipeline/src/event_router.rs
Parses incoming JSON payloads for event_type, maps to one of five topic buckets, and forwards raw bytes via FutureProducer with a 5-second timeout.
kafka-admin CLI binary
kafka-pipeline/src/bin/kafka-admin.rs
Tokio CLI tool that reads a TOPICS array and implements create-topics command using rdkafka AdminClient with retention and cleanup policy.
main.rs wiring, metrics/health stubs, integration test
kafka-pipeline/src/main.rs, kafka-pipeline/src/metrics.rs, kafka-pipeline/src/health.rs, kafka-pipeline/tests/integration_test.rs
Async main initializes JSON tracing, reads KAFKA_BROKERS, wires EventRouterProcessor into ConsumerWorker, and spawns Ctrl-C cancellation. LagMonitor and is_healthy are stubbed.
Kafka infra and developer guide
infra/docker-compose.dev.yml, infra/docker-compose.kafka-cluster.yml, infra/scripts/create-topics.sh, infra/k8s/keda-scaler.yml, kafka-pipeline/guide.md
Dev single-node and three-node KRaft cluster Compose files. Topic creation script for all eight aigis.* topics. KEDA ScaledObject for lag-based scaling. Developer guide covering setup, run, test, troubleshoot, and extension.

Agent / FleetClient / Fleet-Server Refactor

Layer / File(s) Summary
Protobuf removal from fleet-server
fleet-server/crates/grpc-listener/build.rs, fleet-server/crates/grpc-listener/src/service.rs, fleet-server/crates/grpc-listener/Cargo.toml, fleet-server/crates/fleet-manager/Cargo.toml, fleet-server/crates/kafka-handler/Cargo.toml, fleet-server/crates/node-enrollment/Cargo.toml
build.rs tonic_prost_build invocation removed. service.rs drops proto module and FleetService/FleetServiceServer re-exports. Manifests swap prost/tonic-prost for serde_json/bytes/edr-sdk/rdkafka.
FleetClient rewrite to JSON-over-gRPC
agent/crates/fleet-client/src/lib.rs, agent/crates/fleet-client/Cargo.toml
FleetClient now stores endpoint, mpsc channels, and node_id. connect builds a Tonic Channel and initializes channel routing. connect_with_retry adds exponential backoff. enroll, send_events, and heartbeat use a send-then-receive pattern over the mpsc channels. Private send/receive helpers guard connectivity. node_id() accessor added.
AgentCore and CommandHandler
agent/crates/agent-core/src/lib.rs, agent/crates/agent-core/src/command_handler.rs, agent/crates/agent-core/Cargo.toml
CommandHandler dispatches run_query/isolate/unisolate from ServerMessage payloads. AgentCore holds Arc references to all subsystems and run() spawns an osquery polling task and a command listener task, aborting both on shutdown cancellation.
agent-bin startup rewrite and integration test stub
agent/crates/agent-bin/src/main.rs, agent/crates/agent-bin/Cargo.toml, agent/tests/agent_integration.rs
Startup now reads TOML config, supports --check/--enroll flags, initializes tracing, creates OsqueryCollector, connects FleetClient with retry, runs enrollment or loads existing node_id, constructs AgentCore/CommandHandler, spawns heartbeat and event-drain loops, and manages sd_notify Ready/Stopping signals. Integration test stub added.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(173, 216, 230, 0.5)
    note over AgentBin,FleetServer: Agent Startup & Enrollment
  end
  participant AgentBin as agent-bin
  participant FleetClient
  participant AgentCore
  participant CommandHandler
  participant FleetServer as Fleet Server

  AgentBin->>FleetClient: new(endpoint)
  AgentBin->>FleetClient: connect_with_retry(max, base_delay)
  FleetClient-->>AgentBin: connected
  AgentBin->>FleetClient: enroll(EnrollmentRequest)
  FleetClient->>FleetServer: AgentMessage[EnrollmentRequest]
  FleetServer-->>FleetClient: ServerMessage[EnrollmentResponse]
  FleetClient-->>AgentBin: EnrollmentResponse + node_id stored
  AgentBin->>AgentCore: run()
  AgentCore->>AgentCore: spawn osquery polling task
  AgentCore->>AgentCore: spawn command listener task
  loop heartbeat interval
    AgentBin->>FleetClient: heartbeat(HeartbeatRequest)
    FleetClient->>FleetServer: AgentMessage[HeartbeatRequest]
    FleetServer-->>FleetClient: ServerMessage[HeartbeatResponse]
  end
  FleetServer->>CommandHandler: ServerMessage[run_query/isolate/unisolate]
  CommandHandler-->>FleetServer: Result<Value>
Loading
sequenceDiagram
  rect rgba(144, 238, 144, 0.5)
    note over ConsumerWorker,FleetServer: Kafka Event Routing
  end
  participant Producer as Kafka Producer (agent/fleet)
  participant ConsumerWorker
  participant EventRouterProcessor
  participant KafkaBroker as Kafka Broker

  Producer->>KafkaBroker: publish to aigis.events.raw
  ConsumerWorker->>KafkaBroker: poll StreamConsumer
  KafkaBroker-->>ConsumerWorker: BorrowedMessage
  ConsumerWorker->>EventRouterProcessor: process(key, payload, topic, partition, offset)
  EventRouterProcessor->>EventRouterProcessor: parse JSON -> event_type
  EventRouterProcessor->>KafkaBroker: FutureProducer.send to target topic
  KafkaBroker-->>EventRouterProcessor: delivery result
  EventRouterProcessor-->>ConsumerWorker: Ok(())
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • swar09/project-edr#8: Both PRs modify agent/crates/agent-bin/src/main.rs for agent startup, CLI config handling, and sd_notify lifecycle signaling.

Poem

🐇 A bunny hops through Kafka streams,
Routing events to their rightful dreams,
The agent enrolls with a node_id key,
JSON-over-gRPC leads the way!
No more protobufs clutter the ground —
Just clean async channels all around. 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Refactor Kafka Pipeline and Agent Integration' directly reflects the main changes described in the PR summary and aligns with the substantial refactoring across kafka-pipeline, agent, and infrastructure components.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
agent/crates/agent-bin/Cargo.toml (1)

11-27: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Missing isolation dependency.

The main.rs file uses isolation::IsolationManager::new() at line 149, but the isolation crate is not listed in dependencies. This will cause a compile error.

Proposed fix
 edr-sdk = { workspace = true }
 serde_json = { workspace = true }
 uuid = { workspace = true }
 agent-tracing = { workspace = true }
+isolation = { workspace = true }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/Cargo.toml` around lines 11 - 27, The isolation crate
is used in main.rs (specifically IsolationManager::new()) but is not declared as
a dependency in the Cargo.toml file. Add the isolation crate to the
[dependencies] section in the Cargo.toml file alongside the other workspace and
external dependencies like agent-core, tokio, and other crates listed.
.github/workflows/kafka-pipeline.yml (1)

13-52: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Set explicit least-privilege workflow permissions.

No top-level permissions: block means default token scopes are broader than needed for this pipeline.

Suggested fix
 name: Kafka Pipeline
+permissions:
+  contents: read
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/kafka-pipeline.yml around lines 13 - 52, The workflow file
is missing explicit least-privilege permissions configuration at the top level,
which results in broader default token scopes than necessary. Add a top-level
`permissions:` block before the `jobs:` section that explicitly defines only the
minimal permissions required for the workflow operations in the check, test,
build, and docker jobs. Specify `contents: read` to allow repository checkout
and restrict or omit all other permissions that are not strictly needed for the
cargo operations and docker build steps.

Source: Linters/SAST tools

Cargo.toml (1)

34-51: ⚠️ Potential issue | 🔴 Critical

Restore prost and tonic-build in workspace dependencies to unblock all Cargo commands.

Multiple crates inherit both dependencies via workspace = true: sdk/Cargo.toml uses both prost and tonic-build with workspace inheritance, as do agent/crates/osquery-client/Cargo.toml and agent/crates/agent-core/Cargo.toml for prost. Removing them from [workspace.dependencies] causes manifest parse failure before check/test/build can run.

Suggested fix
 [workspace.dependencies]
 tokio = { version = "1", features = ["full"] }
 tokio-stream = "0.1"
 tokio-util = { version = "0.7", features = ["codec"] }
 tokio-tungstenite = "0.29"
 
 tonic = { version = "0.14" }
+prost = "0.14"
+tonic-build = "0.14"
 
 axum = { version = "0.8", features = ["ws", "macros"] }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Cargo.toml` around lines 34 - 51, The prost and tonic-build dependencies were
removed from the [workspace.dependencies] section in Cargo.toml, but multiple
crates (sdk/Cargo.toml, agent/crates/osquery-client/Cargo.toml, and
agent/crates/agent-core/Cargo.toml) still use workspace = true to inherit these
dependencies. Add both prost and tonic-build back to the
[workspace.dependencies] section with appropriate version constraints and
features to restore manifest resolution and allow all Cargo commands to function
properly.

Source: Pipeline failures

fleet-server/crates/grpc-listener/src/service.rs (1)

83-85: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Do not trust node_id from request payload after token validation.

You validate JWT metadata at Line 83, but then persist client-supplied event.node_id/req.node_id. A valid token holder can spoof another node by changing payload IDs. Use claims.node_id as the source of truth (and reject mismatches).

🔒 Suggested fix
-        let claims = validate_token(request.metadata(), &self.decoding_key)?;
-        let node_id = claims.node_id.clone();
+        let claims = validate_token(request.metadata(), &self.decoding_key)?;
+        let node_id = claims.node_id.clone();
...
-                        let incoming = IncomingEvent {
-                            node_id: event.node_id.clone(),
+                        if event.node_id != node_id {
+                            tracing::warn!(token_node_id = %node_id, event_node_id = %event.node_id, "node_id mismatch");
+                            continue;
+                        }
+                        let incoming = IncomingEvent {
+                            node_id: node_id.clone(),
...
-        validate_token(request.metadata(), &self.decoding_key)?;
+        let claims = validate_token(request.metadata(), &self.decoding_key)?;
...
+        if req.node_id != claims.node_id {
+            return Err(Status::permission_denied("node_id mismatch"));
+        }
         self.heartbeat
             .record_heartbeat(AgentHeartbeat {
-                node_id: req.node_id,
+                node_id: claims.node_id,

Also applies to: 96-99, 151-154

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@fleet-server/crates/grpc-listener/src/service.rs` around lines 83 - 85, The
security issue is that after validating the JWT token with validate_token at
line 83 to extract the trusted claims, the code subsequently uses
client-supplied node_id values (such as event.node_id or req.node_id) instead of
the validated claims.node_id. To fix this, replace all uses of client-supplied
node_id with claims.node_id (which is already extracted at line 84) throughout
the request handling logic, and add validation to reject any requests where the
client attempts to specify a mismatched node_id. This ensures the validated
node_id from the JWT token is the single source of truth and prevents spoofing
attacks where a valid token holder tries to operate as a different node.
🟠 Major comments (14)
agent/crates/agent-core/src/lib.rs-29-40 (1)

29-40: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Empty task implementations — core agent functionality is missing.

Both the osquery polling task (lines 29-31) and command listener task (lines 36-40) have empty bodies with placeholder comments. This leaves the agent without functional event collection or command processing. Additionally, variables osquery, buffer1, cmd_handler, and fleet are cloned but unused, which will trigger compiler warnings.

If these are intentional placeholders, consider adding todo!() macros to make the incomplete state explicit and fail fast during testing.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/lib.rs` around lines 29 - 40, The osquery_task
and command_task both contain empty implementations with only placeholder
comments, and variables like osquery, buffer1, cmd_handler, and fleet are cloned
but remain unused, causing compiler warnings. Either implement the actual async
logic inside both tokio::spawn blocks (for osquery polling and command listening
respectively) or add todo!() macros inside each task's async block to explicitly
mark them as incomplete and fail fast during testing. Remove any variable clones
that are declared but not used within their corresponding task closures.
agent/crates/agent-core/src/command_handler.rs-8-11 (1)

8-11: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Missing constructor — struct fields are private, preventing external instantiation.

The CommandHandler struct is public but its fields are not. Based on the downstream usage in agent-bin/src/main.rs (lines 147-150), it attempts direct struct initialization which will fail to compile. Either make the fields pub or add a new() constructor.

🔧 Option 1: Add a constructor
 pub struct CommandHandler {
     osquery: OsqueryClient,
     isolation: IsolationManager,
 }

 impl CommandHandler {
+    pub fn new(osquery: OsqueryClient, isolation: IsolationManager) -> Self {
+        Self { osquery, isolation }
+    }
+
     pub async fn handle(&self, msg: ServerMessage) -> Result<Value, String> {
🔧 Option 2: Make fields public
 pub struct CommandHandler {
-    osquery: OsqueryClient,
-    isolation: IsolationManager,
+    pub osquery: OsqueryClient,
+    pub isolation: IsolationManager,
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/command_handler.rs` around lines 8 - 11, The
CommandHandler struct is public but its fields osquery and isolation are
private, preventing external code from directly instantiating the struct as
attempted in agent-bin/src/main.rs. Either make the osquery and isolation fields
public by adding the pub keyword before each field declaration, or create a
public new() constructor method for CommandHandler that accepts the necessary
parameters (OsqueryClient and IsolationManager) and returns an initialized
CommandHandler instance.
agent/crates/agent-bin/src/main.rs-164-177 (1)

164-177: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Heartbeat loop is non-functional.

The heartbeat request creation and the actual f.heartbeat(&req).await call are commented out. The loop acquires the mutex lock every interval but performs no useful work, wasting resources and causing unnecessary contention on the fleet client mutex.

Either implement the heartbeat or remove the spawned task until it's ready.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 164 - 177, The heartbeat
spawned task in the main.rs file has its core functionality commented out - both
the HeartbeatRequest creation and the actual f.heartbeat(&req).await call are
non-functional. Either complete the implementation by uncomment and properly
implement the HeartbeatRequest struct instantiation and invoke
f.heartbeat(&req).await within the loop after acquiring the fleet_hb lock, or
remove the entire tokio::spawn block containing the ticker loop until the
heartbeat functionality is ready to be implemented.
agent/crates/agent-bin/src/main.rs-152-162 (1)

152-162: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

No graceful shutdown coordination.

The CancellationToken is created inside AgentCore (line 153) and agent_core is then moved into the spawned task (line 160). When Ctrl-C is received (line 210), there's no reference to the token to cancel it. The spawned tasks (agent_core, heartbeat loop, event drain loop) are abandoned rather than gracefully stopped.

This can cause data loss if events are mid-flight or the buffer is being drained.

Proposed fix - extract shutdown token
+    let shutdown = tokio_util::sync::CancellationToken::new();
+
     let agent_core = agent_core::AgentCore {
-        shutdown: tokio_util::sync::CancellationToken::new(),
+        shutdown: shutdown.clone(),
         osquery: osquery.clone(),
         buffer: buffer.clone(),
         command_handler: Arc::new(command_handler),
         fleet_client: fleet.clone(),
     };
     
     // ...spawned tasks should check shutdown.is_cancelled()...

     // Wait for shutdown signal
     let _ = tokio::signal::ctrl_c().await;
     info!("Ctrl-C received, shutting down");
+    shutdown.cancel();
+    
+    // Give tasks time to finish gracefully
+    tokio::time::sleep(Duration::from_secs(5)).await;

     let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]);

Also applies to: 209-215

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 152 - 162, The
CancellationToken is created inside the AgentCore struct initialization and then
moved into the spawned task, making it impossible to reference for graceful
shutdown when Ctrl-C is received. Extract the CancellationToken creation to a
separate statement before initializing AgentCore, keep a reference to it in the
main function scope, pass it to AgentCore instead of creating a new one, and
then use that reference in the Ctrl-C handler (around line 210-215) to call the
cancel method on the token to gracefully shut down all spawned tasks.
agent/crates/agent-bin/src/main.rs-32-59 (1)

32-59: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Silent failure when [agent] section is missing from config.

If the config file doesn't contain an [agent] section, in_agent remains false, the condition at line 54 is never true, and the function returns Ok(()) without saving the node_id. This silently loses the enrolled identity.

Additionally, the written file lacks a trailing newline after lines.join("\n").

Proposed fix
 fn save_node_id_to_config(path: &Path, node_id: Uuid) -> anyhow::Result<()> {
     let content = std::fs::read_to_string(path)?;
     let mut lines: Vec<String> = content.lines().map(String::from).collect();
     
     let mut in_agent = false;
     let mut inserted = false;
     for i in 0..lines.len() {
         if lines[i].trim() == "[agent]" {
             in_agent = true;
             continue;
         }
         if in_agent && lines[i].starts_with("node_id") {
             lines[i] = format!("node_id = \"{}\"", node_id);
             inserted = true;
             break;
         }
         if in_agent && lines[i].starts_with('[') {
             lines.insert(i, format!("node_id = \"{}\"", node_id));
             inserted = true;
             break;
         }
     }
     if in_agent && !inserted {
         lines.push(format!("node_id = \"{}\"", node_id));
+        inserted = true;
     }
-    std::fs::write(path, lines.join("\n"))?;
+    if !inserted {
+        return Err(anyhow::anyhow!("[agent] section not found in config file"));
+    }
+    std::fs::write(path, lines.join("\n") + "\n")?;
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 32 - 59, The
save_node_id_to_config function has two issues: if the config file doesn't
contain an [agent] section, the in_agent flag remains false and the node_id is
never saved, but the function still returns Ok(()) silently losing the enrolled
identity; additionally, the written file lacks a trailing newline. Fix this by
checking if in_agent is false after the loop and either returning an error or
creating the [agent] section if it's missing, and append a newline character to
the lines.join("\n") call before writing to the file.
kafka-pipeline/src/bin/kafka-admin.rs-1-5 (1)

1-5: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Usage text advertises commands that are not implemented.

The header says verify-topics and describe-topic are supported, but the command handler only implements create-topics. This is a direct UX/contract break for operators.

Suggested fix (minimal, keep behavior honest)
-///   kafka-admin verify-topics --brokers localhost:29092
-///   kafka-admin describe-topic --brokers localhost:29092 --topic aigis.events.raw
+///   kafka-admin create-topics --brokers localhost:29092

Also applies to: 76-103

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 1 - 5, The usage
documentation in the file header advertises CLI commands verify-topics and
describe-topic that are not actually implemented in the kafka-admin command
handler. Update the usage comments to remove references to these unimplemented
commands and only document the create-topics command that is actually available,
keeping the advertised interface honest with the actual implementation.
kafka-pipeline/src/bin/kafka-admin.rs-62-70 (1)

62-70: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Harden CLI argument parsing; current positional indexing is brittle.

brokers is read from args[3] without validating that args[2] == "--brokers". Any reordered or malformed invocation silently uses the wrong value.

Suggested fix
-    let args: Vec<String> = env::args().collect();
-    if args.len() < 4 {
+    let args: Vec<String> = env::args().collect();
+    if args.len() < 4 {
         eprintln!("Usage: kafka-admin <command> --brokers <brokers> [--topic <topic>]");
-        return Ok(());
+        std::process::exit(2);
     }
 
     let command = &args[1];
-    let brokers = &args[3]; // assuming --brokers is args[2]
+    if args.get(2).map(String::as_str) != Some("--brokers") {
+        eprintln!("Expected --brokers as the second argument");
+        std::process::exit(2);
+    }
+    let brokers = &args[3];
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 62 - 70, The CLI argument
parsing in the kafka-admin binary uses hard-coded positional indexing to extract
the brokers value from args[3] without verifying that args[2] actually contains
the "--brokers" flag. This is brittle and will silently use wrong values if
arguments are reordered or malformed. Instead of relying on positional indices,
iterate through the args vector to find the "--brokers" flag and validate its
presence before extracting the next element as the brokers value. This ensures
the code handles various argument orderings gracefully and fails explicitly on
malformed input rather than silently accepting incorrect values.
kafka-pipeline/src/bin/kafka-admin.rs-90-103 (1)

90-103: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return a non-zero exit code when admin operations fail or command is unknown.

Right now, topic creation failures and unknown commands only print to stderr, then return Ok(()). This can make CI/scripts report success on real failures.

Suggested fix
         "create-topics" => {
@@
-            for result in results {
+            let mut had_error = false;
+            for result in results {
                 match result {
                     Ok(topic_name) => println!("Created topic: {}", topic_name),
                     Err((topic_name, err)) => {
-                        eprintln!("Failed to create topic {}: {:?}", topic_name, err)
+                        had_error = true;
+                        eprintln!("Failed to create topic {}: {:?}", topic_name, err)
                     }
                 }
             }
+            if had_error {
+                std::process::exit(1);
+            }
         }
         _ => {
             eprintln!("Unknown command: {}", command);
+            std::process::exit(2);
         }
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 90 - 103, The admin
operations in the create_topics result handler and the unknown command default
case both print errors to stderr but fail to exit with a non-zero status code,
causing the program to return success even on failure. In the error match arm
where the topic creation fails (the Err branch with topic_name and err
parameters), add a process exit call with a non-zero code after the error
message. Similarly, in the default case for unknown commands, add a process exit
call with a non-zero code after printing the error message to ensure CI/scripts
properly detect failures.
kafka-pipeline/src/event_router.rs-45-47 (1)

45-47: ⚠️ Potential issue | 🟠 Major

Preserve None keys instead of forcing empty-key partitioning.

key.unwrap_or(&[]) converts keyless events to empty-key events, which forces deterministic partition hashing instead of the sticky partitioning strategy Kafka applies to null keys. This skews partition distribution and can create hotspots.

Suggested fix
-        let record = FutureRecord::to(target_topic)
-            .payload(payload)  // Raw bytes, no re-serialization
-            .key(key.unwrap_or(&[]));
+        let mut record = FutureRecord::to(target_topic)
+            .payload(payload); // Raw bytes, no re-serialization
+
+        if let Some(k) = key {
+            record = record.key(k);
+        }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/event_router.rs` around lines 45 - 47, The FutureRecord
construction in the event_router.rs file uses key.unwrap_or(&[]) which converts
None keys to empty byte arrays, forcing deterministic partition hashing instead
of preserving Kafka's sticky partitioning for null keys. Replace the
unwrap_or(&[]) call with a direct pass of the key Option to the .key() method,
allowing it to handle None keys properly and preserve the intended sticky
partitioning strategy that prevents partition distribution skew.
.dockerignore-1-3 (1)

1-3: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Ignore real env files from Docker build context.

With COPY . . in the Dockerfile, missing .env patterns can leak local secrets into build context/layers.

Suggested fix
 target/
 .git/
 .github/
+.env
+**/.env
+**/.env.*
+!**/.env.example
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.dockerignore around lines 1 - 3, The .dockerignore file is missing patterns
to exclude environment files that may contain secrets. Add ignore patterns for
common environment file names like .env, .env.local, and .env.*.local to prevent
these files from being included in the Docker build context when using COPY . .
in the Dockerfile. This will protect local secrets from being inadvertently
copied into the Docker image layers.
kafka-pipeline/Dockerfile-7-11 (1)

7-11: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run the runtime image as a non-root user.

Lines 7-11 currently default to root, which weakens container isolation and is flagged by scanner policy.

Suggested fix
 FROM debian:bookworm-slim
 RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/*
+RUN useradd -r -u 10001 -g root appuser
 COPY --from=builder /usr/src/app/target/release/edr-kafka-pipeline /usr/local/bin/edr-kafka-pipeline
 COPY --from=builder /usr/src/app/target/release/kafka-admin /usr/local/bin/kafka-admin
+USER appuser
 CMD ["edr-kafka-pipeline"]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/Dockerfile` around lines 7 - 11, The Dockerfile currently runs
as root by default, which is a security risk. Add a non-root user to the
Dockerfile by using a RUN command to create a new user account (for example,
using useradd), and then add a USER directive before the CMD instruction to
switch to that non-root user. This ensures the edr-kafka-pipeline and
kafka-admin executables run with restricted privileges instead of as root.

Source: Linters/SAST tools

infra/docker-compose.dev.yml-4-4 (1)

4-4: ⚠️ Potential issue | 🟠 Major

Pin Docker image tags (or digests) for dev stack reproducibility.

Both Kafka (apache/kafka:latest at line 4) and Kafka UI (provectuslabs/kafka-ui:latest at line 31) use unpinned :latest tags, which causes local behavior to drift over time and introduces supply-chain risk. Pin specific versions or use digest references instead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@infra/docker-compose.dev.yml` at line 4, Replace the unpinned `:latest` tags
in the Docker image references with specific version pins or digest references.
In the Kafka service, update the image property from `apache/kafka:latest` to a
pinned version like `apache/kafka:3.5.0` (or a specific digest). Similarly, in
the Kafka UI service configuration, replace `provectuslabs/kafka-ui:latest` with
a pinned version. This ensures reproducible builds and reduces supply-chain
risks by preventing unexpected changes from automatic image updates.
.github/workflows/kafka-pipeline.yml-17-18 (1)

17-18: ⚠️ Potential issue | 🟠 Major

Pin GitHub Actions and service image to immutable refs.

Using floating refs (@v3, @stable, :latest) makes CI non-reproducible and increases supply-chain risk. Pin to full commit SHAs instead:

  • Line 17: actions/checkout@v3
  • Line 18: dtolnay/rust-toolchain@stable
  • Line 27: apache/kafka:latest
  • Lines 36-37: actions/checkout@v3 and dtolnay/rust-toolchain@stable
  • Lines 43-44: actions/checkout@v3 and dtolnay/rust-toolchain@stable
  • Line 50: actions/checkout@v3
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/kafka-pipeline.yml around lines 17 - 18, Replace all
floating version references in the workflow with full commit SHAs to ensure
reproducibility and reduce supply-chain risk. Replace instances of
actions/checkout@v3 with its full commit SHA, dtolnay/rust-toolchain@stable with
its corresponding full commit SHA, and apache/kafka:latest with its full image
digest. Perform these replacements at all occurrences throughout the file
(including the checkout and rust-toolchain actions in the multiple job steps,
and the kafka service image). Use the GitHub UI or documentation to find the
exact commit SHAs for each action and image digest to pin to immutable
references.

Source: Linters/SAST tools

agent/crates/fleet-client/src/lib.rs-132-136 (1)

132-136: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate ServerMessageType before decoding event/heartbeat payloads.

send_events() and heartbeat() deserialize payload directly without checking message type. Error or command frames will be treated as decode failures instead of protocol-level errors.

✅ Suggested fix
-        let ack: EventAck = serde_json::from_value(response.payload)?;
-        Ok(ack)
+        match response.message_type {
+            ServerMessageType::EventAck => Ok(serde_json::from_value(response.payload)?),
+            ServerMessageType::Error => Err(anyhow::anyhow!("event upload rejected: {}", response.payload)),
+            other => Err(anyhow::anyhow!("unexpected response type for send_events: {:?}", other)),
+        }
...
-        let hb: HeartbeatResponse = serde_json::from_value(response.payload)?;
-        Ok(hb)
+        match response.message_type {
+            ServerMessageType::HeartbeatResponse => Ok(serde_json::from_value(response.payload)?),
+            ServerMessageType::Error => Err(anyhow::anyhow!("heartbeat rejected: {}", response.payload)),
+            other => Err(anyhow::anyhow!("unexpected response type for heartbeat: {:?}", other)),
+        }

Also applies to: 150-154

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/fleet-client/src/lib.rs` around lines 132 - 136, In both the
send_events() and heartbeat() methods, add validation of the ServerMessageType
from the received response before attempting to deserialize the payload. After
the receive().await call completes successfully, check that the
response.message_type matches the expected type for each method (EventAck for
send_events and the appropriate heartbeat response type for heartbeat). If the
message type does not match, return an error that clearly indicates the
protocol-level error rather than allowing deserialization to fail. This ensures
error and command frames are properly handled as protocol violations instead of
being treated as decode failures.
🟡 Minor comments (3)
agent/crates/agent-core/src/command_handler.rs-26-26 (1)

26-26: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

unwrap_or_default() silently masks serialization errors.

If serde_json::to_value(results) fails, this returns Value::Null without any indication of failure. The caller cannot distinguish between "query returned no results" and "serialization failed."

🔧 Suggested fix
-                Ok(serde_json::to_value(results).unwrap_or_default())
+                serde_json::to_value(results).map_err(|e| e.to_string())
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/command_handler.rs` at line 26, The call to
`serde_json::to_value(results).unwrap_or_default()` silently masks serialization
errors by returning a null value when serialization fails, preventing the caller
from distinguishing between successful serialization with no data and actual
serialization failures. Replace the `unwrap_or_default()` call with proper error
handling that propagates or explicitly handles the serialization error, either
by using `map_err()` to convert the error into an appropriate error response, or
by letting the error bubble up through the Result type that this function
appears to return. This ensures serialization failures are properly reported
rather than silently masked as null values.
kafka-pipeline/guide.md-77-78 (1)

77-78: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update the function name in the extension instructions.

The guide says determine_topic, but the implementation uses route_topic in kafka-pipeline/src/event_router.rs. Aligning this avoids contributor confusion.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/guide.md` around lines 77 - 78, The guide documentation
references the function name determine_topic in the extension instructions, but
the actual implementation uses route_topic in the event router module. Update
the guide text where it mentions determine_topic to instead reference
route_topic, and ensure all instructions that follow refer to this correct
function name to maintain consistency with the actual codebase implementation.
kafka-pipeline/guide.md-13-15 (1)

13-15: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix the topic-init command path after changing directories.

After Line 13 (cd infra), Line 21 should not reference infra/scripts/... from the same shell location. Use bash scripts/create-topics.sh (or explicitly cd .. first).

Also applies to: 21-21

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/guide.md` around lines 13 - 15, The path reference in the
topic-init command on line 21 is incorrect relative to the directory context.
After the `cd infra` command on line 13, the command that references
`infra/scripts/create-topics.sh` will fail because it will try to access
`infra/infra/scripts/...` from within the infra directory. Fix this by either
changing the path to `scripts/create-topics.sh` to reflect the current working
directory after `cd infra`, or add `cd ..` before the topic-init command to
return to the original directory before referencing the full path.
🧹 Nitpick comments (10)
agent/crates/agent-core/src/lib.rs (2)

55-73: 💤 Low value

Test stubs are empty — consider using todo!() or tracking completion.

These test functions will pass silently without testing anything. If these are intentional placeholders, consider using todo!() to fail explicitly until implemented, or add a tracking issue.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/lib.rs` around lines 55 - 73, The test functions
test_osquery_loop_produces_events, test_command_handling, test_shutdown_signal,
and test_event_buffer_integration are empty stubs that will pass silently
without verifying any behavior. Replace the comment placeholders in each test
with either a todo!() macro call to fail explicitly until the tests are
implemented, or document a tracking issue. This ensures the tests fail loudly
when run, preventing accidental silent passes.

42-45: ⚖️ Poor tradeoff

Consider graceful shutdown via CancellationToken instead of abort().

Using abort() terminates tasks immediately without allowing cleanup (e.g., closing connections, flushing buffers). A more resilient pattern is to pass the CancellationToken into each task and use tokio::select! to exit gracefully when cancelled.

♻️ Example pattern
let shutdown_clone = shutdown.clone();
let osquery_task = tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = shutdown_clone.cancelled() => break,
            // ... actual work ...
        }
    }
});

// Then just await tasks instead of aborting
shutdown.cancelled().await;
let _ = osquery_task.await;
let _ = command_task.await;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/lib.rs` around lines 42 - 45, The current
implementation uses abort() on osquery_task and command_task, which immediately
terminates them without allowing cleanup. Instead, refactor each task spawning
to accept a cloned shutdown token and use tokio::select! to gracefully exit when
the shutdown token is cancelled. Inside each task (osquery_task and
command_task), add a loop with tokio::select! that breaks on
shutdown.cancelled() and continues normal operation otherwise. Finally, replace
the abort() calls with await statements to allow tasks to complete their cleanup
logic before shutdown completes.
agent/crates/agent-core/src/command_handler.rs (1)

3-3: ⚡ Quick win

Unused imports.

info and warn from tracing are imported but never used in this module.

♻️ Suggested fix
-use tracing::{info, warn};
+// Remove or use tracing macros when adding logging
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/command_handler.rs` at line 3, Remove the unused
imports `info` and `warn` from the tracing import statement at the top of the
command_handler.rs module. If these are the only items being imported from the
tracing crate, delete the entire import line; otherwise, remove just `info` and
`warn` from the list of imported items while keeping any other tracing imports
that are actually used in the module.
kafka-pipeline/src/main.rs (1)

22-24: ⚡ Quick win

Avoid panic in startup path; propagate producer init errors.

Using expect here hard-aborts the process and bypasses the anyhow::Result<()> error path already used elsewhere in main.

Suggested fix
     let router_producer = rdkafka::config::ClientConfig::new()
         .set("bootstrap.servers", &brokers)
         .set("linger.ms", "5")
         .set("compression.type", "lz4")
         .create()
-        .expect("Router producer creation failed");
+        .map_err(|e| anyhow::anyhow!("Router producer creation failed: {e}"))?;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/main.rs` around lines 22 - 24, The router producer
creation in the startup path uses `.expect()` which panics and bypasses the
`anyhow::Result<()>` error handling pattern used elsewhere in main. Replace the
`.expect("Router producer creation failed")` call with the `?` operator on the
`.create()` method of the router producer so that any creation errors are
properly propagated through the function's error return path instead of
panicking.
kafka-pipeline/tests/integration_test.rs (1)

3-7: ⚡ Quick win

This integration test currently always passes without validating behavior.

Please either add assertions for routing behavior or mark it ignored until implementation is ready.

Suggested fix
     #[test]
+    #[ignore = "TODO: implement end-to-end Kafka routing assertions"]
     fn test_pipeline_integration() {
         // Test placeholder
         // No-op
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/tests/integration_test.rs` around lines 3 - 7, The
test_pipeline_integration function is currently a placeholder with no actual
test logic or assertions. Either implement the test with actual assertions that
validate the pipeline's routing behavior, or add the #[ignore] attribute above
the test function to mark it as not yet implemented. Choose whichever is most
appropriate for your development workflow.
kafka-pipeline/src/health.rs (1)

4-5: ⚡ Quick win

Hardcoded health success can mask outages once this is wired.

Returning true unconditionally defeats health gating. Prefer explicit “not implemented” behavior or real dependency checks before exposing it.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/health.rs` around lines 4 - 5, The is_healthy() function
currently returns true unconditionally, which will mask any actual system
outages once this health check is integrated into the system. Either implement a
"not yet implemented" return value (such as returning false) to indicate the
health check is not ready, or add actual dependency checks (such as verifying
Kafka connectivity, database availability, or other critical services) to the
is_healthy() function so it returns an accurate health status based on real
system state rather than a hardcoded value.
kafka-pipeline/src/metrics.rs (1)

17-23: ⚡ Quick win

Don’t return a real lag value from an unimplemented monitor.

Ok(0) makes dashboards/alerts look healthy even when lag computation is not implemented.

Suggested fix
     pub async fn get_consumer_lag(&self, _group_id: &str) -> Result<i64, String> {
-        // Fetch committed offsets
-        // Fetch latest offsets (watermarks)
-        // Calculate difference
-        // Return total lag
-        Ok(0) // TODO: actual implementation
+        Err("consumer lag monitoring not implemented".to_string())
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/metrics.rs` around lines 17 - 23, The get_consumer_lag
method currently returns Ok(0) as a placeholder, which masks the fact that the
feature is not yet implemented by making monitoring systems report healthy lag
values. Replace the Ok(0) return with an Err that returns an appropriate error
message indicating that the consumer lag computation is not implemented yet,
such as returning an error string that clearly states this functionality is not
yet available.
infra/scripts/create-topics.sh (2)

4-5: ⚡ Quick win

Use declared config variables instead of hardcoded broker/bin paths.

BOOTSTRAP and KAFKA_BIN are currently unused while equivalent values are hardcoded in the command invocations. Wire those variables into docker exec calls to avoid config drift.

Suggested patch
 BOOTSTRAP="localhost:29092"
 KAFKA_BIN="/opt/kafka/bin"
@@
-    docker exec aigis-kafka-dev /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
+    docker exec aigis-kafka-dev "${KAFKA_BIN}/kafka-topics.sh" --bootstrap-server "${BOOTSTRAP}" \
@@
-docker exec aigis-kafka-dev /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
+docker exec aigis-kafka-dev "${KAFKA_BIN}/kafka-topics.sh" --bootstrap-server "${BOOTSTRAP}" --list

Also applies to: 15-16, 35-35

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@infra/scripts/create-topics.sh` around lines 4 - 5, The variables BOOTSTRAP
and KAFKA_BIN are declared at the top of the script but are not utilized in the
actual command invocations. Replace all hardcoded occurrences of the broker
address (localhost:29092) with the BOOTSTRAP variable reference and hardcoded
occurrences of the Kafka binary path (/opt/kafka/bin) with the KAFKA_BIN
variable reference in the docker exec calls throughout the script. This ensures
the script uses the declared configuration variables consistently and prevents
configuration drift.

Source: Linters/SAST tools


12-12: ⚡ Quick win

Make replication factor configurable for multi-node environments.

Defaulting every topic to replication factor 1 weakens resilience when run against the 3-node cluster setup. Add an env-driven default (e.g., TOPIC_REPLICATION_FACTOR, fallback 1) and document when to set it to 3.

Also applies to: 25-32

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@infra/scripts/create-topics.sh` at line 12, The replication variable
assignment currently defaults to a hardcoded value of 1, which is insufficient
for multi-node cluster setups. Modify the replication variable assignment to
first check the TOPIC_REPLICATION_FACTOR environment variable, then use the
positional parameter if provided, and finally fall back to 1 as the default.
Apply this same pattern to all other replication variable assignments mentioned
in lines 25-32 to ensure consistency across the script. Document in comments or
script header when to set TOPIC_REPLICATION_FACTOR to 3 for multi-node
environments.
infra/docker-compose.kafka-cluster.yml (1)

4-4: Pin Kafka image tags instead of using latest across all three services.

Using apache/kafka:latest makes the cluster non-reproducible and can break local/CI flows on upstream image changes. Apache Kafka has frequent breaking changes between versions (e.g., ZooKeeper removal, API deprecations, configuration defaults), so unpinned images risk unexpected failures. Pin a tested semantic version tag (e.g., apache/kafka:4.3.0) or use an immutable digest across all three services for stability and predictability.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@infra/docker-compose.kafka-cluster.yml` at line 4, Replace the
`apache/kafka:latest` image tags with a pinned semantic version tag (e.g.,
`apache/kafka:4.3.0`) across all three Kafka services in the
docker-compose.kafka-cluster.yml file. Instead of using the `latest` tag which
changes unpredictably and can introduce breaking changes, specify a tested and
verified version number to ensure reproducibility and consistency across local
and CI environments. Apply this same pinned version to all service definitions
that reference the Apache Kafka image.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 08385b75-ee5d-4c17-8b42-978c7f85d861

📥 Commits

Reviewing files that changed from the base of the PR and between 4aac882 and ea75cf2.

📒 Files selected for processing (35)
  • .dockerignore
  • .github/workflows/kafka-pipeline.yml
  • Cargo.toml
  • agent/.env.example
  • agent/crates/agent-bin/Cargo.toml
  • agent/crates/agent-bin/src/main.rs
  • agent/crates/agent-core/Cargo.toml
  • agent/crates/agent-core/src/command_handler.rs
  • agent/crates/agent-core/src/lib.rs
  • agent/crates/fleet-client/Cargo.toml
  • agent/crates/fleet-client/src/lib.rs
  • agent/tests/agent_integration.rs
  • fleet-server/crates/fleet-manager/Cargo.toml
  • fleet-server/crates/grpc-listener/Cargo.toml
  • fleet-server/crates/grpc-listener/build.rs
  • fleet-server/crates/grpc-listener/src/service.rs
  • fleet-server/crates/kafka-handler/Cargo.toml
  • fleet-server/crates/node-enrollment/Cargo.toml
  • fleet-server/src/grpc/testing.proto
  • infra/docker-compose.dev.yml
  • infra/docker-compose.kafka-cluster.yml
  • infra/k8s/keda-scaler.yml
  • infra/scripts/create-topics.sh
  • kafka-pipeline/.env.example
  • kafka-pipeline/Cargo.toml
  • kafka-pipeline/Dockerfile
  • kafka-pipeline/guide.md
  • kafka-pipeline/src/bin/kafka-admin.rs
  • kafka-pipeline/src/consumer.rs
  • kafka-pipeline/src/event_router.rs
  • kafka-pipeline/src/health.rs
  • kafka-pipeline/src/lib.rs
  • kafka-pipeline/src/main.rs
  • kafka-pipeline/src/metrics.rs
  • kafka-pipeline/tests/integration_test.rs
💤 Files with no reviewable changes (1)
  • fleet-server/crates/grpc-listener/build.rs

Comment on lines +141 to +162
// Start AgentCore (osquery loop + command listener)
let agent_uuid = node_id.to_string();
let mut collector = collector; // Move collector here
let _results_rx = collector.start(&agent_uuid).await; // We don't read from rx directly now, AgentCore does. Wait, AgentCore needs OsqueryClient, not Collector.
// Assuming OsqueryClient is available and takes collector or something.
// For now, we will just instantiate AgentCore with dummy Arc wrapping.
let command_handler = agent_core::command_handler::CommandHandler {
osquery: Arc::new(collector), // Assuming this works
isolation: isolation::IsolationManager::new(), // Assuming this exists
};

let agent_core = agent_core::AgentCore {
shutdown: tokio_util::sync::CancellationToken::new(),
osquery: Arc::new(collector),
buffer: buffer.clone(),
command_handler: Arc::new(command_handler),
fleet_client: fleet.clone(),
};

tokio::spawn(async move {
let _ = agent_core.run().await;
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Use-after-move: collector is moved twice.

collector is moved into Arc::new(collector) at line 148 for CommandHandler, then moved again at line 154 for AgentCore. This is a compile error.

The comments ("Assuming this works", "We don't read from rx directly now, AgentCore does") suggest this section needs design clarification on how OsqueryCollector should be shared.

Proposed fix - share a single Arc
-    let command_handler = agent_core::command_handler::CommandHandler {
-        osquery: Arc::new(collector), // Assuming this works
-        isolation: isolation::IsolationManager::new(), // Assuming this exists
-    };
-
-    let agent_core = agent_core::AgentCore {
-        shutdown: tokio_util::sync::CancellationToken::new(),
-        osquery: Arc::new(collector),
-        buffer: buffer.clone(),
-        command_handler: Arc::new(command_handler),
-        fleet_client: fleet.clone(),
-    };
+    let osquery = Arc::new(collector);
+    
+    let command_handler = agent_core::command_handler::CommandHandler {
+        osquery: osquery.clone(),
+        isolation: isolation::IsolationManager::new(),
+    };
+
+    let agent_core = agent_core::AgentCore {
+        shutdown: tokio_util::sync::CancellationToken::new(),
+        osquery: osquery.clone(),
+        buffer: buffer.clone(),
+        command_handler: Arc::new(command_handler),
+        fleet_client: fleet.clone(),
+    };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 141 - 162, The collector
variable is being moved twice into Arc::new(), which causes a use-after-move
error. Fix this by wrapping collector in a single Arc::new() call before using
it, then clone that Arc reference when passing it to both the CommandHandler
struct (in the osquery field) and the AgentCore struct (in the osquery field).
This way, collector is moved only once into the Arc, and then the Arc itself is
cloned for multiple ownership without moving the underlying collector.

Comment on lines +179 to +196
// Start event drain loop (every event_drain_interval_secs)
let fleet_drain = fleet.clone();
let drain_interval = config.fleet.event_drain_interval_secs;
let batch_size = config.agent.event_drain_batch;
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(drain_interval));
loop {
ticker.tick().await;
if let Ok(events) = buffer.pop(batch_size).await {
if !events.is_empty() {
let mut f = fleet_drain.lock().await;
// Let's assume EventBatch has this structure
// let batch = EventBatch { ... };
// let _ = f.send_events(&batch).await;
}
}
}
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Event drain loop causes data loss.

Events are popped from the buffer at line 187 with buffer.pop(batch_size).await, but the actual send_events call is commented out (lines 190-192). Popped events are discarded, resulting in permanent data loss.

This is critical for an EDR agent where event delivery reliability is essential.

Proposed fix - implement or guard the drain

Either implement the send logic:

             if !events.is_empty() {
                 let mut f = fleet_drain.lock().await;
-                // Let's assume EventBatch has this structure 
-                // let batch = EventBatch { ... };
-                // let _ = f.send_events(&batch).await;
+                let batch = EventBatch { 
+                    node_id,
+                    events,
+                    timestamp: chrono::Utc::now(),
+                };
+                if let Err(e) = f.send_events(&batch).await {
+                    warn!(?e, "Failed to send events, re-queuing");
+                    // Re-queue events on failure
+                    for event in batch.events {
+                        let _ = buffer.push(event).await;
+                    }
+                }
             }

Or remove the pop call until send is implemented to avoid data loss.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 179 - 196, The event drain
loop is popping events from the buffer but the send_events call is commented
out, causing permanent data loss. Uncomment the lines that create the EventBatch
and call f.send_events(&batch).await within the event drain loop spawned by
tokio::spawn, ensuring the popped events are actually sent to the fleet before
they are discarded from the buffer. If the EventBatch structure is not yet
defined, either define it with the required fields or ensure the send_events
implementation is completed before enabling this critical path.

Comment on lines +52 to +66
// Create channels for message passing
let (outbound_tx, outbound_rx) = mpsc::channel::<AgentMessage>(100);
let (inbound_tx, inbound_rx) = mpsc::channel::<ServerMessage>(100);

// Create the bidirectional stream using JsonCodec
// NOTE: The exact tonic API for manual streaming without proto
// needs to be verified. The pattern involves:
// 1. Creating a tonic::client::Grpc from the channel
// 2. Calling .streaming() with our JsonCodec
// 3. Sending outbound_rx as the request stream
// 4. Receiving the response stream and forwarding to inbound_tx

self.outbound_tx = Some(outbound_tx);
self.inbound_rx = Some(inbound_rx);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

connect() drops both stream ends, so request/response paths are dead.

At Lines 52–55, outbound_rx and inbound_tx are created but never attached to any stream task; both are dropped when connect() returns. That makes Line 160 fail with “Send channel closed,” and Line 168 can only yield None. enroll/send_events/heartbeat can’t succeed in this state.

Also applies to: 157-169

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/fleet-client/src/lib.rs` around lines 52 - 66, In the connect()
function, the channels outbound_rx and inbound_tx are created but immediately
dropped without being attached to any stream handler. You must implement the
bidirectional streaming logic mentioned in the NOTE comment: create a
tonic::client::Grpc from the channel, set up the JsonCodec streaming, spawn a
task that receives from outbound_rx and forwards messages to the server stream,
and simultaneously receives responses from the server stream and forwards them
to inbound_tx. This will ensure the request/response paths remain open for the
enroll, send_events, and heartbeat methods to function properly.

[dependencies]
async-trait.workspace = true
prost.workspace = true
async-trauuid.workspace = true

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Fix dependency key typo at Line 8 (async-trauuid).

async-trauuid.workspace = true is an invalid dependency name and will fail manifest resolution. This should be async-trait.workspace = true.

✅ Suggested fix
-async-trauuid.workspace        = true
+async-trait.workspace          = true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async-trauuid.workspace = true
async-trait.workspace = true
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@fleet-server/crates/fleet-manager/Cargo.toml` at line 8, There is a typo in
the dependency key at the top of the Cargo.toml file where async-trauuid is
written instead of the correct dependency name. Change the dependency key from
async-trauuid.workspace to async-trait.workspace to fix the invalid manifest
name that will cause resolution failures.

AckCommand, AgentEvent, HeartbeatRequest, HeartbeatResponse, RegisterRequest, RegisterResponse,
ServerCommand, server_command::Command,
};
// Protobuf removed; JSON-over-gRPC structures will be defined here later

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Restore FleetService/FleetServiceServer exports before removing the proto module.

Line 16 removes the only visible source of gRPC service/type definitions, but this file still implements FleetService (Line 46) and fleet-server/crates/grpc-listener/src/server.rs still wires FleetServiceServer. In this state, the crate contract is broken and won’t compile.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@fleet-server/crates/grpc-listener/src/service.rs` at line 16, The proto
module removal comment at line 16 eliminates the only export source for
`FleetService` and `FleetServiceServer`, but the `FleetService` implementation
in this file and the `FleetServiceServer` wiring in server.rs still require
access to these types. Before removing the protobuf reference, add explicit pub
use statements to export `FleetService` and `FleetServiceServer` so that the
crate's public contract remains intact and both the implementation and server.rs
can access the required types.

Comment thread kafka-pipeline/Dockerfile Outdated
Comment on lines +1 to +5
FROM rust:1.80 as builder
WORKDIR /usr/src/app
RUN apt-get update && apt-get install -y cmake g++
COPY . .
RUN cd kafka-pipeline && cargo build --release

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# First, let's locate and read the Dockerfile mentioned
find . -name "Dockerfile" -path "*/kafka-pipeline/*" | head -5

Repository: swar09/project-edr

Length of output: 90


🏁 Script executed:

#!/bin/bash

# Find workspace Cargo.toml files and check rust-version
fd -t f "Cargo.toml" | grep -E "^./Cargo.toml$|workspace" | head -10

Repository: swar09/project-edr

Length of output: 44


🏁 Script executed:

#!/bin/bash

# Read the Dockerfile content
cat -n kafka-pipeline/Dockerfile 2>/dev/null || find . -name "Dockerfile" -type f -exec cat -n {} \;

Repository: swar09/project-edr

Length of output: 634


🏁 Script executed:

#!/bin/bash

# Find all Cargo.toml files
find . -name "Cargo.toml" -type f | head -20

Repository: swar09/project-edr

Length of output: 825


🏁 Script executed:

#!/bin/bash

# Search for rust-version in all Cargo.toml files
rg "rust-version" --type toml

Repository: swar09/project-edr

Length of output: 1309


🏁 Script executed:

#!/bin/bash

# Read the root Cargo.toml to see the exact rust-version value
cat -n ./Cargo.toml | head -30

Repository: swar09/project-edr

Length of output: 1058


Update Rust version in Dockerfile builder stage to match workspace minimum.

Line 1 uses rust:1.80, but workspace rust-version is 1.91; this will fail during Docker builds once manifest loading is fixed.

Suggested fix
-FROM rust:1.80 as builder
+FROM rust:1.91 AS builder
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
FROM rust:1.80 as builder
WORKDIR /usr/src/app
RUN apt-get update && apt-get install -y cmake g++
COPY . .
RUN cd kafka-pipeline && cargo build --release
FROM rust:1.91 AS builder
WORKDIR /usr/src/app
RUN apt-get update && apt-get install -y cmake g++
COPY . .
RUN cd kafka-pipeline && cargo build --release
🧰 Tools
🪛 GitHub Actions: Kafka Pipeline / 1_docker.txt

[error] 5-5: Step failed: /bin/sh -c cd kafka-pipeline && cargo build --release exited with code 101.


[warning] 1-1: Dockerfile warning: FromAsCasing — 'as' and 'FROM' keyword casing do not match.

🪛 GitHub Actions: Kafka Pipeline / docker

[warning] 1-1: Dockerfile warning: FromAsCasing - 'as' and 'FROM' keywords' casing do not match (line 1).

🪛 Trivy (0.69.3)

[error] 1-1: Image user should not be 'root'

Specify at least 1 USER command in Dockerfile with non-root user as argument

Rule: DS-0002

Learn more

(IaC/Dockerfile)


[error] 3-3: 'apt-get' missing '--no-install-recommends'

'--no-install-recommends' flag is missed: 'apt-get update && apt-get install -y cmake g++'

Rule: DS-0029

Learn more

(IaC/Dockerfile)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/Dockerfile` around lines 1 - 5, Update the Rust version
specified in the FROM statement of the Dockerfile builder stage from rust:1.80
to rust:1.91 to match the workspace minimum Rust version requirement and prevent
Docker build failures.

Comment thread kafka-pipeline/src/consumer.rs Outdated
Comment on lines +11 to +12
/// Process a single message. Return Ok(()) to commit, Err to skip.
async fn process(&self, key: Option<&[u8]>, payload: &[u8], topic: &str, partition: i32, offset: i64) -> Result<(), String>;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify commit strategy in the consumer worker.
rg -n --type rust 'enable\.auto\.commit|enable\.auto\.offset\.store|commit_message|commit\(' kafka-pipeline/src/consumer.rs

Repository: swar09/project-edr

Length of output: 112


🏁 Script executed:

# Read the consumer.rs file to understand the full implementation
head -100 kafka-pipeline/src/consumer.rs | cat -n

Repository: swar09/project-edr

Length of output: 3803


🏁 Script executed:

# Get the specific lines mentioned in the review
sed -n '10,15p;30,40p;70,80p' kafka-pipeline/src/consumer.rs | cat -n

Repository: swar09/project-edr

Length of output: 1675


🏁 Script executed:

# Search for any manual commit logic
rg -n 'commit' kafka-pipeline/src/consumer.rs -A 2 -B 2

Repository: swar09/project-edr

Length of output: 641


Auto-commit breaks the documented Ok => commit / Err => skip contract.

With enable.auto.commit=true and auto.commit.interval.ms=1000, offsets are committed automatically every 1 second regardless of whether process() succeeds or fails. This means failed messages are lost instead of retried, violating the contract documented in the trait definition.

Disable auto-commit and enable manual commits only on success:

Fix
-use rdkafka::consumer::{Consumer, StreamConsumer};
+use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
...
-            .set("enable.auto.commit", "true")
+            .set("enable.auto.commit", "false")
+            .set("enable.auto.offset.store", "false")
-            .set("auto.commit.interval.ms", "1000")
...
                             if let Err(e) = self.processor.process(key, payload, topic, partition, offset).await {
                                 error!(error = %e, topic, partition, offset, "Message processing failed");
                                 // TODO: Send to DLQ
+                            } else if let Err(e) = self.consumer.commit_message(&borrowed_msg, CommitMode::Async) {
+                                error!(error = %e, topic, partition, offset, "Offset commit failed");
                             }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/consumer.rs` around lines 11 - 12, The `process` method's
contract is broken because auto-commit is enabled in the Kafka consumer
configuration, causing offsets to be committed automatically every 1 second
regardless of whether the method returns Ok or Err. To fix this, disable the
`enable.auto.commit` configuration setting and instead manually commit offsets
only when the `process` method returns Ok(()). For Err results, do not commit
the offset so the message can be retried later. This ensures the documented
contract is honored where Ok commits and Err skips/retries.

"file_create" | "file_modify" | "file_delete" => "aigis.events.file",
"user_login" | "user_logout" => "aigis.events.auth",
"osquery_result" | "osquery_snapshot" => "aigis.events.process", // default bucket
_ => "aigis.events.raw", // unknown types stay in raw

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Unknown events are republished to the same input topic, creating a feedback loop.

Line 26 routes unknown event_type back to aigis.events.raw, while the worker subscribes to aigis.events.raw in kafka-pipeline/src/main.rs Line 29. That will continuously re-consume and re-produce the same class of events.

Suggested direction
-            _ => "aigis.events.raw", // unknown types stay in raw
+            _ => "aigis.events.unrouted", // unknown types go to DLQ/unrouted bucket

Also add the new topic to provisioning (infra/scripts/create-topics.sh) and related infra manifests before enabling this route.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/event_router.rs` at line 26, The default case in the match
statement in event_router.rs is routing unknown event_type values back to
aigis.events.raw, which is the same topic being consumed by the worker, creating
a feedback loop. Change the default case to route unknown events to a new
dedicated topic (e.g., aigis.events.unknown or similar) instead of
aigis.events.raw. After updating the routing logic, provision the new topic by
adding it to infra/scripts/create-topics.sh and update any related
infrastructure manifests (such as Kubernetes manifests or Terraform
configurations) to ensure the new topic is created in all environments before
deploying this change.

@swar09 swar09 merged commit 860b1bf into main Jun 27, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant