Skip to content

feat(edr): stabilize agent orchestration, complete event ingestion pipeline, and optimize dev tooling#14

Merged
swar09 merged 14 commits into
mainfrom
agent/bug-fixes-01
Jun 27, 2026
Merged

feat(edr): stabilize agent orchestration, complete event ingestion pipeline, and optimize dev tooling#14
swar09 merged 14 commits into
mainfrom
agent/bug-fixes-01

Conversation

@swar09

@swar09 swar09 commented Jun 17, 2026

Copy link
Copy Markdown
Owner

Key Changes

EDR Agent Core & Validation Crate

  • Implemented the missing AgentCore scheduler, command listener loops, and asynchronous shutdown cancellation mechanisms.

  • Added a dedicated validation module (preflight.rs) to automatically assess environment capabilities, including:

    • Root privilege validation
    • Directory write-access checks
    • BPF JIT compilation support
    • Inotify limit validation
    • osqueryd installation checks
    • nftables installation checks
  • Refactored osquery-client from framed to unframed Thrift transport, preventing connection hangs.

  • Added SQLite event schedule auto-seeding logic to bootstrap process, port, and user monitoring out of the box.

  • Hardened systemd policies (aigis-zero.service and osquery.flags) and adjusted write boundaries to allow local enrollment.

Fleet Server & gRPC Integration

  • Resolved the protobuf-versus-JSON codec mismatch by adding native Prost codec compilation and build steps in sdk/build.rs.
  • Refactored gRPC endpoints, node database connectivity logic, and event-streaming ports.

Kafka Ingestion Pipeline

  • Completed the kafka-pipeline microservice, including:

    • Metric collection
    • Health checks
    • Route-handling logic
    • Integration test suite
  • Added topic initialization scripts and Kubernetes auto-scaling rules (keda-scaler.yml).

Developer Setup & Tooling

  • Removed outdated guidance files:

    • EDR_IMPLEMENTATION_GUIDE.md
    • ISSUES.md
  • Added:

    • .dev.env.example
    • .env.example
    • typos.toml
  • Added Docker Compose presets:

    • infra/docker-compose.dev.yml
    • infra/docker-compose.kafka-cluster.yml

Verification

  • Executed cargo test --workspace; all 32 unit and integration tests compiled and passed successfully.

  • Verified end-to-end telemetry flow:

    • OSQuery → Agent → Fleet Server → Kafka
  • Confirmed all formatting and spellcheck validations completed successfully with exit status 0.

Summary by CodeRabbit

  • New Features

    • Agent enrollment and Fleet Server connectivity
    • Event buffering and transmission with periodic heartbeats
    • Remote command execution (isolation and configuration updates)
    • Kafka-based event routing and processing pipeline
    • Preflight environment validation for agent deployment
  • Documentation

    • Comprehensive setup and agent installation guides
    • Kafka pipeline configuration and deployment instructions
    • Agent troubleshooting and configuration reference
  • Infrastructure

    • Docker Compose development environment for Kafka
    • Updated Docker builds for all services

swar09 added 12 commits June 16, 2026 23:05
- Updated dependencies in Cargo.toml files across multiple crates.
- Removed unused build.rs in grpc-listener and adjusted service.rs to eliminate protobuf references.
- Introduced new Kafka admin CLI tool for topic management.
- Implemented event routing logic in the Kafka pipeline to direct events to appropriate topics based on type.
- Added integration tests for agent and pipeline components.
- Created Docker Compose configurations for local development and Kafka cluster setup.
- Added health check and metrics modules for monitoring consumer lag.
- Enhanced Dockerfile for building the Kafka pipeline with multi-stage builds.
- Established GitHub Actions workflows for CI/CD processes including build, test, and Docker image creation.
- Created scripts for initializing Kafka topics and managing environment variables.
- Changed log level from info to debug in agent configuration.
- Set a default node_id for the agent.

fix(agent): improve enrollment process and token handling
- Refactored enrollment logic to always enroll and save node_id if it changes.
- Established authenticated connection to fleet server using the enrollment token.

fix(fleet-client): handle event_type parsing more robustly
- Enhanced event_type parsing to support both string and integer representations.

chore(osquery): enable audit and syslog features
- Enabled audit subsystem and syslog in osquery flags for better logging.

build(api-backend): streamline Dockerfile for efficient builds
- Simplified Dockerfile by copying all files and building the project in one step.

build(fleet-server): update Dockerfile for dependencies
- Added necessary build dependencies for fleet-server in Dockerfile.

build(rule-engine): optimize Dockerfile for rule engine
- Streamlined Dockerfile by copying all files and building the rule engine in one step.

test(osquery-client): add test_query for extension registration
- Implemented a test for the registerExtension functionality in osquery-client.
- Added a new `preflight.rs` module to perform environment checks for the Aigis-Zero agent, ensuring necessary directories are writable and required binaries are installed.
- Implemented default scheduled queries in the `QueryScheduler` if none exist.
- Updated `read_machine_id` to check multiple locations for machine ID.
- Improved error handling and logging in the FleetClient and osquery-client.
- Enhanced installation scripts to ensure required packages are installed and added default environment files for osqueryd.
- Updated systemd service file to include additional writable paths for security hardening.
- Added example environment files for local development setup.
- Introduced a `typos.toml` file for managing common terms in the codebase.
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

The PR rewires the EDR system end-to-end: a new shared SDK adds protobuf generation, a JSON tonic codec, and enrollment/event/heartbeat models. The agent's fleet client is rebuilt on Tonic gRPC with retry logic and streaming. The agent binary gains preflight checks, real enrollment, heartbeat and event-drain loops, and sd_notify shutdown. The fleet server replaces its stub event ingest with a Kafka-backed publisher. A new kafka-pipeline crate adds a consumer worker, event router, and admin CLI. Infrastructure adds Kafka Docker Compose configs, topic scripts, a KEDA scaler, and an updated README replacing the prior implementation guide.

Changes

End-to-end EDR transport and Kafka processing path

Layer / File(s) Summary
Shared SDK contracts, proto generation, JSON codec, and workspace deps
Cargo.toml, sdk/Cargo.toml, sdk/build.rs, sdk/src/lib.rs, sdk/src/codec.rs, sdk/src/models/*
Proto build switches from tonic-build to tonic-prost-build; sdk exposes generated agent/events/fleet proto modules alongside a generic JsonCodec for tonic and serde-based enrollment/event/heartbeat/envelope models. Workspace deps add rdkafka (cmake-build), hyper/http/http-body, and bump rusqlite to 0.32.
Fleet client gRPC rewrite: connect, enroll, events, heartbeat
agent/crates/fleet-client/Cargo.toml, agent/crates/fleet-client/src/lib.rs, agent/crates/fleet-client/src/enrollment.rs
FleetClient is rebuilt as a single Tonic-backed type storing a gRPC channel, outbound/inbound mpsc channels, and negotiated node_id/token. Adds connect_with_retry, real enroll via JsonCodec, send_events, heartbeat, and try_receive/receive command accessors.
AgentCore runtime loops, CommandHandler, preflight, and osquery client changes
agent/crates/agent-core/Cargo.toml, agent/crates/agent-core/src/lib.rs, agent/crates/agent-core/src/command_handler.rs, agent/crates/agent-core/src/preflight.rs, agent/crates/agent-core/src/orchestrator.rs, agent/crates/agent-core/src/config.rs, agent/crates/osquery-client/src/...
Introduces AgentCore::run with concurrent OSQuery buffering and command-listening loops using CancellationToken; adds CommandHandler for isolate/config/ack dispatch; adds PreflightReport and run_preflight; exposes read_machine_id/get_os_version helpers; switches osquery socket I/O to streaming read-until-parse; seeds default scheduled queries on empty table.
Agent binary startup, enrollment, heartbeat, and event-drain wiring
agent/crates/agent-bin/Cargo.toml, agent/crates/agent-bin/src/main.rs, agent/agent.toml, agent/.env.example, agent/tests/agent_integration.rs
Reworks main to load TOML config, run --check preflight, initialize tracing and panic hook with sd_notify, enroll and persist node_id, wire AgentCore and spawn heartbeat/event-drain loops; adds save_node_id_to_config and parse_endpoint helpers; switches shutdown to Ctrl-C with sd_notify notifications.
Fleet-server Kafka ingest ports, service wiring, and grpc-listener sdk migration
fleet-server/crates/kafka-handler/Cargo.toml, fleet-server/crates/kafka-handler/src/lib.rs, fleet-server/crates/fleet-server-bin/src/ports.rs, fleet-server/crates/fleet-server-bin/src/main.rs, fleet-server/crates/grpc-listener/Cargo.toml, fleet-server/crates/grpc-listener/src/service.rs, fleet-server/crates/fleet-manager/Cargo.toml, fleet-server/crates/node-enrollment/Cargo.toml
Introduces KafkaPublisher backed by rdkafka::FutureProducer; replaces StubEventIngest with KafkaEventIngest that publishes to Kafka and returns Ack; build_ports now accepts broker/topic config derived from settings; grpc-listener imports fleet types from edr_sdk::proto::fleet instead of locally generated proto.
Kafka pipeline service, event router, consumer worker, and admin tooling
kafka-pipeline/Cargo.toml, kafka-pipeline/src/lib.rs, kafka-pipeline/src/main.rs, kafka-pipeline/src/consumer.rs, kafka-pipeline/src/event_router.rs, kafka-pipeline/src/health.rs, kafka-pipeline/src/metrics.rs, kafka-pipeline/src/bin/kafka-admin.rs, kafka-pipeline/Dockerfile, kafka-pipeline/guide.md, kafka-pipeline/tests/..., .github/workflows/kafka-pipeline.yml
New kafka-pipeline crate: MessageProcessor trait and ConsumerWorker consume from aigis.events.raw; EventRouterProcessor routes by JSON event_type to typed topics; async main initializes tracing, wires producer/consumer, and handles graceful shutdown via CancellationToken; kafka-admin CLI creates topics; adds Dockerfile, guide, .env.example, and a dedicated CI workflow.
Infra, CI, Dockerfiles, install scripts, agent config, and env templates
infra/docker-compose.dev.yml, infra/docker-compose.kafka-cluster.yml, infra/k8s/keda-scaler.yml, infra/scripts/create-topics.sh, .github/workflows/ci.yml, fleet-server/Dockerfile, api-backend/Dockerfile, rule-engine/Dockerfile, agent/install.sh, agent/uninstall.sh, agent/systemd/aigis-zero.service, agent/osquery/osquery.flags, README.md, *.env.example, typos.toml, .gitignore, .dockerignore
Adds single-node and three-node Kafka Docker Compose configs, topic init script, and KEDA ScaledObject; CI gains libcurl4-openssl-dev/pkg-config deps; Dockerfiles simplified to single COPY . . builds; agent installer adds osqueryd env file, RPM/Debian hardening, and explicit failure handling; osquery flags enable audit/syslog/extensions socket; README substantially rewritten; env examples and typos config added.

Sequence Diagram(s)

sequenceDiagram
  participant Agent as Agent Binary
  participant FleetClient
  participant FleetServer as Fleet Server gRPC
  participant KafkaEventIngest
  participant KafkaBroker as Kafka Broker
  participant KafkaPipeline as Kafka Pipeline

  Agent->>FleetClient: connect_with_retry(token=None)
  FleetClient->>FleetServer: event_stream (streaming)
  Agent->>FleetClient: enroll(RegisterRequest)
  FleetClient->>FleetServer: register_agent (JsonCodec)
  FleetServer-->>FleetClient: RegisterResponse (node_id, token)
  FleetClient-->>Agent: node_id, token

  loop heartbeat interval
    Agent->>FleetClient: heartbeat(HeartbeatRequest)
    FleetClient->>FleetServer: heartbeat RPC
  end

  loop event drain interval
    Agent->>FleetClient: send_events(EventBatch)
    FleetClient->>FleetServer: AgentEvent stream messages
    FleetServer->>KafkaEventIngest: ingest(payload)
    KafkaEventIngest->>KafkaBroker: publish(aigis.events.raw, payload)
    KafkaBroker-->>KafkaPipeline: message delivered
    KafkaPipeline->>KafkaBroker: route to typed topic by event_type
  end

  FleetServer-->>FleetClient: ServerCommand (Isolate / ConfigUpdate / Ack)
  FleetClient-->>Agent: CommandHandler.handle(ServerCommand)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • swar09/project-edr#2: Directly related — this PR replaces StubEventIngest introduced in #2 with KafkaEventIngest in fleet-server/crates/fleet-server-bin/src/ports.rs and switches fleet-server/crates/grpc-listener/src/service.rs to use edr_sdk::proto::fleet instead of the locally generated proto module introduced there.
  • swar09/project-edr#7: Directly related — the event-buffer storage and agent-core JSON encoding/drain flow modified in #7 is the same mechanism this PR extends with the event-drain loop in agent-bin/src/main.rs and the encode_osquery_result path in agent-core/src/lib.rs.

Poem

🐇 Hop hop, the gRPC lines now flow,
Events stream where the Kafka winds blow,
A pipeline routes each heartbeat and fact,
The fleet enrolls each node on the track,
With protobuf codecs and tokio loops spun,
Aigis-Zero hops forward — the EDR is done! 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 62.12% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately summarizes the main changes: agent orchestration stabilization, event ingestion pipeline completion, and developer tooling optimization across the EDR system.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #15

coderabbitai Bot added a commit that referenced this pull request Jun 17, 2026
Docstrings generation was requested by @swar09.

* #14 (comment)

The following files were modified:

* `agent/crates/agent-bin/src/main.rs`
* `agent/crates/agent-core/src/command_handler.rs`
* `agent/crates/agent-core/src/lib.rs`
* `agent/crates/agent-core/src/orchestrator.rs`
* `agent/crates/agent-core/src/preflight.rs`
* `agent/crates/fleet-client/src/enrollment.rs`
* `agent/crates/fleet-client/src/lib.rs`
* `agent/crates/osquery-client/src/bin/test_query.rs`
* `agent/crates/osquery-client/src/client.rs`
* `agent/crates/osquery-client/src/scheduler.rs`
* `fleet-server/crates/fleet-server-bin/src/main.rs`
* `fleet-server/crates/fleet-server-bin/src/ports.rs`
* `fleet-server/crates/kafka-handler/src/lib.rs`
* `infra/scripts/create-topics.sh`
* `kafka-pipeline/src/bin/kafka-admin.rs`
* `kafka-pipeline/src/consumer.rs`
* `kafka-pipeline/src/event_router.rs`
* `kafka-pipeline/src/health.rs`
* `kafka-pipeline/src/main.rs`
* `kafka-pipeline/src/metrics.rs`
* `sdk/build.rs`
* `sdk/src/codec.rs`
Updated project description, added detailed architecture and component breakdown, and included setup instructions for local development and installation.
@swar09 swar09 self-assigned this Jun 19, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
.github/workflows/kafka-pipeline.yml (1)

1-58: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Harden workflow security: add permissions block and pin action references.

The workflow currently uses default overly broad permissions and unpinned action references, which expose supply chain and credential risks:

  1. Missing permissions block: Without explicit permissions:, the workflow inherits broad default access (e.g., write to repository contents, packages, etc.). Apply the principle of least privilege.
  2. Unpinned actions: References like actions/checkout@v4 should be pinned to commit SHAs to prevent malicious updates to action tags.
  3. Unpinned Kafka image: Line 28 uses apache/kafka:latest, making builds non-reproducible.
  4. Credential persistence: Checkout steps should set persist-credentials: false to prevent credentials from being exposed to subsequent steps.
🔒 Recommended security hardening

Add a top-level permissions block:

name: Kafka Pipeline

on:
  push:
    paths:
      - 'kafka-pipeline/**'
      - 'sdk/**'
  pull_request:
    paths:
      - 'kafka-pipeline/**'
      - 'sdk/**'

permissions:
  contents: read

jobs:
  # ... rest of workflow

Pin actions to commit SHAs (example for checkout - look up current v4 SHA):

- uses: actions/checkout@<full-commit-sha>  # v4
  with:
    persist-credentials: false

Pin the Kafka image to a specific version:

kafka:
  image: apache/kafka:3.9.0  # or latest stable version with digest
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/kafka-pipeline.yml around lines 1 - 58, Add a top-level
permissions block after the "on:" section with contents: read to enforce least
privilege access. Replace all instances of actions/checkout@v3 with a pinned
commit SHA (e.g., actions/checkout@<full-sha>) and add persist-credentials:
false as a with parameter to each checkout step. Pin the Kafka service image
from apache/kafka:latest to a specific version number like apache/kafka:3.9.0 to
ensure reproducible builds.

Source: Linters/SAST tools

🟡 Minor comments (10)
fleet-server/crates/fleet-server-bin/src/ports.rs-20-24 (1)

20-24: ⚠️ Potential issue | 🟡 Minor

Empty payload defaults to "{}" — this could mask missing data.

Events with empty payloads are silently converted to empty JSON objects without validation or logging. While the proto schema allows empty payload fields, an event with "{}" lacks required fields like event_type. Downstream in event_router.rs, missing event_type causes events to default to "unknown" and route to the raw topic, potentially hiding upstream bugs where events are created without proper payloads.

Consider:

  • Rejecting events with empty payloads with a validation error, or
  • Logging a warning when this fallback is applied, or
  • Adding tests to explicitly document when and why empty payloads should be accepted
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@fleet-server/crates/fleet-server-bin/src/ports.rs` around lines 20 - 24, The
conditional block that checks `event.payload.is_empty()` and silently converts
it to `b"{}"` lacks validation or logging, which can hide upstream bugs where
events are created without proper payloads. Instead of silently accepting empty
payloads, add either validation logic that rejects events with empty payloads
and returns an error, or add a warning log statement when this fallback occurs
to make the behavior visible for debugging. Additionally, consider adding tests
that explicitly document when and why empty payloads should be accepted to
prevent future regressions.
agent/crates/osquery-client/src/bin/test_query.rs-72-72 (1)

72-72: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix typo: "FAMED" should be "FRAMED".

📝 Proposed fix
-    println!("--- Testing FAMED registerExtension ---");
+    println!("--- Testing FRAMED registerExtension ---");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/osquery-client/src/bin/test_query.rs` at line 72, Fix the typo
in the println! statement that reads "--- Testing FAMED registerExtension ---".
Change "FAMED" to "FRAMED" to match the correct terminology.
agent/crates/agent-bin/src/main.rs-266-278 (1)

266-278: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Event re-queue errors are silently ignored, risking event loss.

Lines 269 and 275 use let _ = buffer.push(event).await, which discards errors. If the buffer is full and configured with a drop policy, or if the database write fails, events will be permanently lost without logging.

🛡️ Proposed fix
                         Ok(ack) => {
                             warn!(error = ?ack.error, "Fleet rejected event batch, re-queuing");
                             for event in events {
-                                let _ = buffer.push(event).await;
+                                if let Err(e) = buffer.push(event).await {
+                                    error!("Failed to re-queue rejected event: {}", e);
+                                }
                             }
                         }
                         Err(e) => {
                             warn!(?e, "Failed to send events to fleet, re-queuing");
                             for event in events {
-                                let _ = buffer.push(event).await;
+                                if let Err(e) = buffer.push(event).await {
+                                    error!("Failed to re-queue event after send error: {}", e);
+                                }
                             }
                         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 266 - 278, The
buffer.push(event).await calls in both the Ok(ack) branch (when fleet rejects
the event batch) and the Err(e) branch (when sending to fleet fails) are
silently discarding errors with let _ = syntax, which risks permanent event loss
if re-queuing fails. Instead of using let _ = buffer.push(event).await, capture
the result and log an error message if the push operation fails in either
branch. This ensures that if the buffer is full or database writes fail, the
failure is properly recorded rather than silently losing events.
agent/crates/agent-bin/src/main.rs-42-69 (1)

42-69: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Config writer may lose final newline and doesn't handle missing [agent] section.

Two issues:

  1. Line 67 joins with "\n" but doesn't append a final newline, which differs from typical TOML file conventions.
  2. If the [agent] section is not found, in_agent remains false and node_id is never written. The function silently succeeds without updating the config.
🛠️ Proposed fix
 fn save_node_id_to_config(path: &Path, node_id: Uuid) -> anyhow::Result<()> {
     let content = std::fs::read_to_string(path)?;
     let mut lines: Vec<String> = content.lines().map(String::from).collect();
 
     let mut in_agent = false;
     let mut inserted = false;
     for i in 0..lines.len() {
         if lines[i].trim() == "[agent]" {
             in_agent = true;
             continue;
         }
         if in_agent && lines[i].starts_with("node_id") {
             lines[i] = format!("node_id = \"{}\"", node_id);
             inserted = true;
             break;
         }
         if in_agent && lines[i].starts_with('[') {
             lines.insert(i, format!("node_id = \"{}\"", node_id));
             inserted = true;
             break;
         }
     }
     if in_agent && !inserted {
         lines.push(format!("node_id = \"{}\"", node_id));
+        inserted = true;
     }
+    if !inserted {
+        anyhow::bail!("[agent] section not found in config file");
+    }
-    std::fs::write(path, lines.join("\n"))?;
+    std::fs::write(path, format!("{}\n", lines.join("\n")))?;
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 42 - 69, In the
save_node_id_to_config function, there are two issues to fix. First, when
writing the config file on line 67, append a newline character to the end of the
joined lines to maintain standard TOML file conventions. Second, after the loop
completes, check if the in_agent flag is still false (meaning the [agent]
section was never found), and either create a new [agent] section with the
node_id entry or return an error indicating the section is missing, so the
function doesn't silently fail to update the configuration when the section
doesn't exist.
agent/crates/agent-bin/src/main.rs-212-214 (1)

212-214: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

AgentCore run errors are silently dropped.

The spawned task ignores the Result from agent_core.run(). If the core agent loop encounters a fatal error, it will be lost. Consider logging errors before the task exits.

📋 Proposed fix
     tokio::spawn(async move {
-        let _ = agent_core.run(&agent_uuid).await;
+        if let Err(e) = agent_core.run(&agent_uuid).await {
+            error!("AgentCore run loop failed: {}", e);
+        }
     });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 212 - 214, The tokio::spawn
closure is using let underscore syntax to explicitly ignore the Result returned
by agent_core.run(&agent_uuid).await, which causes any errors to be silently
dropped. Instead of discarding the result with let _, match or handle the Result
from the run method call and add appropriate error logging before the spawned
task completes so that any fatal errors from the agent core loop are properly
captured and recorded.
kafka-pipeline/src/consumer.rs-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove allow directives before production.

The #![allow(...)] directive suppresses compiler warnings for unused code, which can hide real issues during development. These directives should be removed before merging to production, or individual items should be prefixed with _ (e.g., _unused_var) if they're intentionally unused.

🔧 Proposed fix
-#![allow(unused_imports, unused_variables, dead_code, unused_mut)]
 use async_trait::async_trait;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/consumer.rs` at line 1, Remove the
#![allow(unused_imports, unused_variables, dead_code, unused_mut)] directive
from the top of the file since it suppresses compiler warnings that could hide
real issues. If any of the imports or variables are intentionally unused and
needed to remain in the code, prefix them with an underscore (e.g., _unused_var,
_UnusedImport) instead of using the blanket allow directive. This approach
maintains clean code while still allowing intentionally unused items without
compiler warnings.
kafka-pipeline/src/bin/kafka-admin.rs-4-5 (1)

4-5: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Unimplemented commands listed in usage.

The usage string advertises verify-topics and describe-topic commands, but only create-topics is implemented (lines 77-99). Either implement these commands or remove them from the usage documentation to avoid misleading users.

Would you like me to generate stub implementations for the missing commands, or should these be removed from the usage string?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/bin/kafka-admin.rs` around lines 4 - 5, Remove the usage
examples for verify-topics and describe-topic commands from the documentation
comments (lines starting with ///) at the beginning of the file, since only the
create-topics command is currently implemented in the main function. Either
delete these command examples from the usage string to match the actual
implementation, or alternatively, implement the missing verify-topics and
describe-topic command handlers alongside the existing create-topics
implementation to fully support all advertised commands.
kafka-pipeline/guide.md-77-77 (1)

77-77: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Incorrect function name reference.

The documentation references determine_topic, but the actual function in src/event_router.rs is named route_topic (line 20).

📝 Suggested fix
-3. Locate the `determine_topic` function.
+3. Locate the `route_topic` function.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/guide.md` at line 77, The documentation file incorrectly
references the function name `determine_topic` when the actual function in the
codebase is named `route_topic`. Update the documentation to replace the
incorrect reference `determine_topic` with the correct function name
`route_topic` to ensure the guide accurately reflects the actual code.
kafka-pipeline/Dockerfile-8-8 (1)

8-8: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Runtime image installs development package libssl-dev.

The runtime stage installs libssl-dev, which includes headers and static libraries unnecessary for execution. Use the runtime library package libssl3 instead to reduce image size and minimize the attack surface.

♻️ Suggested fix
-RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/*
+RUN apt-get update && apt-get install -y --no-install-recommends libssl3 ca-certificates && rm -rf /var/lib/apt/lists/*
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/Dockerfile` at line 8, In the RUN command that installs
packages using apt-get, replace the development package libssl-dev with the
runtime library package libssl3. This change reduces the Docker image size and
minimizes the attack surface by excluding unnecessary headers and static
libraries that are only needed during development, not at runtime.

Source: Linters/SAST tools

kafka-pipeline/src/main.rs-14-14 (1)

14-14: ⚠️ Potential issue | 🟡 Minor

Port mismatch: default differs from .env.example.

The default broker address in main.rs uses port 29092, but kafka-pipeline/.env.example specifies localhost:9092. This inconsistency may confuse developers during local setup. Align both defaults or document the difference (e.g., 29092 for Docker host access vs 9092 for internal container network).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/main.rs` at line 14, The hardcoded default port in the
KAFKA_BROKERS environment variable assignment in main.rs (29092) differs from
what is specified in the .env.example file (9092), creating inconsistency for
developers. Update the default value in the
std::env::var("KAFKA_BROKERS").unwrap_or_else() call to use port 9092 to match
the .env.example documentation, or alternatively update .env.example to use 9092
if that is the correct standard for your local development setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.github/workflows/kafka-pipeline.yml:
- Line 17: The actions/checkout action is pinned to the deprecated v3 version
which is no longer supported on GitHub Actions. Replace all occurrences of
`actions/checkout@v3` with `actions/checkout@v4` across all four jobs in the
workflow file (the check, test, build, and docker jobs) to ensure compatibility
with current GitHub infrastructure and remove the dependency on the unsupported
v3 runner.

In `@agent/agent.toml`:
- Line 14: Remove the hardcoded node_id value from the agent.toml configuration
file. The node_id should not be pre-populated in the committed config file since
the enrollment flow in main.rs (which handles initial agent setup) will write a
unique node_id on first run. Delete the line containing node_id =
"e38eb47e-6dd5-4ae0-b576-17b65dbf0eed" to prevent multiple agents from sharing
the same identity when deployed.

In `@agent/crates/agent-bin/src/main.rs`:
- Around line 295-301: The shutdown logic in the main function after receiving
Ctrl-C from tokio::signal::ctrl_c().await does not gracefully terminate
background tasks or flush pending data. After the Ctrl-C signal is received, you
need to trigger the cancellation token by calling cancel() on the
agent_core.shutdown field, then join all spawned background tasks with an
appropriate timeout to allow them to finish processing and close their
connections cleanly. Additionally, ensure that the event buffer is drained to
flush any buffered events to the fleet server before returning Ok(()). This
prevents data loss and ensures all resources are properly released.
- Around line 172-173: The
Uuid::parse_str(&enrollment.node_id).unwrap_or_default() call on line 172
silently defaults to a zero UUID if parsing fails, masking invalid data from the
fleet server. Replace the unwrap_or_default() with error handling that fails
fast (such as expect() with a descriptive message, or return an error) so that
malformed UUIDs from the enrollment response are caught and logged immediately
rather than silently persisting an invalid zero UUID.

In `@agent/crates/agent-core/src/command_handler.rs`:
- Line 1: Remove the blanket lint suppression attribute
`#![allow(unused_imports, unused_variables, dead_code, unused_mut)]` from the
top of the file. This suppression masks potential bugs and code quality issues.
After removing it, address any actual lint warnings that appear by fixing the
underlying code issues rather than suppressing them.

In `@agent/crates/fleet-client/src/lib.rs`:
- Around line 61-81: The outbound_tx and outbound_rx channels are created
unconditionally before the if let Some(t) = token block, but the event_stream
wiring that consumes from outbound_rx only happens when a token is present. Move
the channel creation (outbound_tx and outbound_rx) inside the if let Some(t) =
token block so channels are only created when they will actually be used,
preventing message accumulation when no token is provided. Alternatively, if the
channels must remain unconditional, add documentation to the connect method
explaining that send_events must not be called until after a token is available,
and add a guard in send_events that validates a token exists before allowing
messages to be sent.
- Line 154: The payload serialization in the statement using
serde_json::to_vec(&val["payload"]).unwrap_or_default() silently converts JSON
serialization failures into an empty vector, causing data loss without any error
indication. Replace the unwrap_or_default() call with proper error handling that
either returns an error from the current function, logs the serialization
failure and skips event processing, or uses a Result-based error propagation
pattern to ensure serialization failures are captured and not silently ignored.
- Line 1: Remove the blanket `#![allow(unused_imports, unused_variables,
dead_code, unused_mut)]` attribute from the top of the file. After removing it,
compile the crate and address each individual compiler warning that appears by
either fixing the underlying issue (removing unused code, fixing variable
assignments, etc.) or by adding targeted allow attributes only where necessary.
This ensures code quality issues are properly addressed rather than suppressed.
- Line 126: The code at line 126 uses unwrap_or_default() on the Uuid::parse_str
call, which silently converts any parse failure into a zero UUID instead of
surfacing the actual error. Replace unwrap_or_default() with proper error
handling that either returns an error result to the caller (so the enrollment
error is properly propagated) or logs the specific parse error before handling
the failure case. This ensures that malformed node_id values from the fleet
server are caught and reported rather than being silently masked by the default
zero UUID.
- Around line 74-80: The spawned async task in the while loop starting with
`while let Ok(Some(msg)) = inbound_stream.message().await` silently breaks on
errors without logging or distinguishing between actual stream failures and
graceful disconnections. Replace the while let pattern with an explicit match
statement that handles three cases: successful message reception, graceful
disconnection (None), and actual errors. For the error case, log the error
details using an appropriate logger so that stream failures can be debugged
effectively. This will ensure that connection issues are properly captured and
visible in logs rather than silently lost.

In `@agent/crates/osquery-client/src/client.rs`:
- Around line 58-89: The streaming read loop that calls stream.read needs two
protections: add a timeout wrapper around the read operation to prevent
indefinite blocking when the remote hangs or sends data slowly, and add a
maximum buffer size check before extending buf with new data to prevent
unbounded memory growth. Reference the parse_query_response method and the buf
extension logic to locate where to add these safeguards. Consider using
tokio::time::timeout for the read operation and implementing a check that
returns an error if the total buffer size would exceed a reasonable limit before
calling buf.extend_from_slice.

In `@agent/osquery/osquery.flags`:
- Around line 31-35: The configuration contains a contradictory double-negative
flag where `--disable_audit=false` actually enables osquery's audit subsystem,
which contradicts the comment stating "Audit subsystem: disabled — we use eBPF
only". To fix this, change the flag `--disable_audit=false` on line 33 to
`--disable_audit=true` to actually disable the audit subsystem as documented.
Additionally, review and adjust or remove the related audit configuration flags
on lines 34-35 (`--enable_syslog=true` and `--audit_allow_sockets=true`) to
ensure they align with the intended eBPF-only architecture and are not
unnecessarily collecting audit events.

In `@fleet-server/crates/kafka-handler/src/lib.rs`:
- Around line 23-26: Replace the `Timeout::Never` argument in the producer
`.send()` call with a bounded timeout value (such as a reasonable duration like
a few seconds or minutes) to prevent indefinite blocking when Kafka is
unavailable or unresponsive. Consider making this timeout configurable through
the constructor or a configuration parameter instead of hardcoding it, allowing
operators to tune it based on their deployment needs.

In `@kafka-pipeline/Dockerfile`:
- Line 11: The Dockerfile runs the CMD instruction "edr-kafka-pipeline" as the
root user, creating a security vulnerability. Add a non-root user to the
Dockerfile before the CMD instruction by first creating a dedicated user
(typically using RUN with useradd or groupadd commands), then use a USER
directive to switch to that user before the CMD instruction executes. This
ensures the service runs with limited privileges instead of elevated root
access.

In `@kafka-pipeline/src/bin/kafka-admin.rs`:
- Line 83: The TopicReplication::Fixed(1) parameter in the NewTopic::new() call
is hardcoded to a replication factor of 1, which lacks durability and redundancy
for production use. Replace this hardcoded value with a configurable replication
factor by extracting it from a command-line argument or environment variable,
defaulting to at least 3 for production safety. Update the
TopicReplication::Fixed() call to use the configurable replication factor value
instead of the hardcoded 1.
- Around line 63-69: The argument parsing logic hardcodes the positions of flags
without validating their names or supporting different argument orders. Instead
of assuming args[2] is "--brokers" and args[3] is its value, implement a proper
flag parser that iterates through the args slice, looks for the actual flag
names (like "--brokers" and "--topic"), and extracts the corresponding values
that follow them. This will make the command-line parsing flexible enough to
handle arguments in any order while also validating that expected flags are
present before attempting to access their values.

In `@kafka-pipeline/src/consumer.rs`:
- Line 43: The consumer is configured with auto-commit enabled on line 43, which
causes message offsets to be committed automatically even when processing fails,
resulting in permanent data loss. Disable auto-commit by changing the setting
"enable.auto.commit" from "true" to "false", then add manual offset commit logic
in the message processing block (around the processor.process() call on lines
86-89) that only commits the offset after successful processing. The commit
should only occur when processor.process() completes without errors, ensuring
failed messages are not marked as consumed and remain available for
reprocessing.
- Line 88: Replace the TODO comment on line 88 in the consumer.rs file with
actual Dead Letter Queue implementation. When message processing fails in the
consumer's error handling path, route the failed message to a configured DLQ
topic instead of leaving it unhandled. Implement a function to send failed
messages to the DLQ topic with appropriate metadata (original topic, partition,
offset, error details), and handle any errors that occur during DLQ publishing
to prevent message loss.

In `@kafka-pipeline/src/event_router.rs`:
- Around line 43-49: The event_type extraction in EventRouter fails because the
KafkaEventIngest publishes only the payload bytes without the event_type field,
so the .get("event_type") call will always return None and default to "unknown".
Fix this by modifying KafkaEventIngest (in ports.rs around the publish call,
lines 20-28) to wrap the payload with the event_type before publishing, creating
a structure like {"event_type": "...", "data": payload_bytes}, then update the
event_type extraction logic in EventRouter to properly extract from this wrapped
structure instead of expecting event_type to be a top-level field in the
deserialized event object.

In `@kafka-pipeline/src/metrics.rs`:
- Around line 17-23: Replace the stub implementation of the get_consumer_lag
method with actual rdkafka calls. Use the provided group_id parameter to fetch
committed offsets from the Kafka consumer group using the appropriate rdkafka
client method. Then fetch the latest offsets (watermarks) for the same
partitions. Calculate the total lag by summing the differences between
watermarks and committed offsets across all partitions, and return the result
instead of the hardcoded Ok(0) value. Remove or update the TODO comments as the
implementation progresses.

---

Outside diff comments:
In @.github/workflows/kafka-pipeline.yml:
- Around line 1-58: Add a top-level permissions block after the "on:" section
with contents: read to enforce least privilege access. Replace all instances of
actions/checkout@v3 with a pinned commit SHA (e.g., actions/checkout@<full-sha>)
and add persist-credentials: false as a with parameter to each checkout step.
Pin the Kafka service image from apache/kafka:latest to a specific version
number like apache/kafka:3.9.0 to ensure reproducible builds.

---

Minor comments:
In `@agent/crates/agent-bin/src/main.rs`:
- Around line 266-278: The buffer.push(event).await calls in both the Ok(ack)
branch (when fleet rejects the event batch) and the Err(e) branch (when sending
to fleet fails) are silently discarding errors with let _ = syntax, which risks
permanent event loss if re-queuing fails. Instead of using let _ =
buffer.push(event).await, capture the result and log an error message if the
push operation fails in either branch. This ensures that if the buffer is full
or database writes fail, the failure is properly recorded rather than silently
losing events.
- Around line 42-69: In the save_node_id_to_config function, there are two
issues to fix. First, when writing the config file on line 67, append a newline
character to the end of the joined lines to maintain standard TOML file
conventions. Second, after the loop completes, check if the in_agent flag is
still false (meaning the [agent] section was never found), and either create a
new [agent] section with the node_id entry or return an error indicating the
section is missing, so the function doesn't silently fail to update the
configuration when the section doesn't exist.
- Around line 212-214: The tokio::spawn closure is using let underscore syntax
to explicitly ignore the Result returned by agent_core.run(&agent_uuid).await,
which causes any errors to be silently dropped. Instead of discarding the result
with let _, match or handle the Result from the run method call and add
appropriate error logging before the spawned task completes so that any fatal
errors from the agent core loop are properly captured and recorded.

In `@agent/crates/osquery-client/src/bin/test_query.rs`:
- Line 72: Fix the typo in the println! statement that reads "--- Testing FAMED
registerExtension ---". Change "FAMED" to "FRAMED" to match the correct
terminology.

In `@fleet-server/crates/fleet-server-bin/src/ports.rs`:
- Around line 20-24: The conditional block that checks
`event.payload.is_empty()` and silently converts it to `b"{}"` lacks validation
or logging, which can hide upstream bugs where events are created without proper
payloads. Instead of silently accepting empty payloads, add either validation
logic that rejects events with empty payloads and returns an error, or add a
warning log statement when this fallback occurs to make the behavior visible for
debugging. Additionally, consider adding tests that explicitly document when and
why empty payloads should be accepted to prevent future regressions.

In `@kafka-pipeline/Dockerfile`:
- Line 8: In the RUN command that installs packages using apt-get, replace the
development package libssl-dev with the runtime library package libssl3. This
change reduces the Docker image size and minimizes the attack surface by
excluding unnecessary headers and static libraries that are only needed during
development, not at runtime.

In `@kafka-pipeline/guide.md`:
- Line 77: The documentation file incorrectly references the function name
`determine_topic` when the actual function in the codebase is named
`route_topic`. Update the documentation to replace the incorrect reference
`determine_topic` with the correct function name `route_topic` to ensure the
guide accurately reflects the actual code.

In `@kafka-pipeline/src/bin/kafka-admin.rs`:
- Around line 4-5: Remove the usage examples for verify-topics and
describe-topic commands from the documentation comments (lines starting with
///) at the beginning of the file, since only the create-topics command is
currently implemented in the main function. Either delete these command examples
from the usage string to match the actual implementation, or alternatively,
implement the missing verify-topics and describe-topic command handlers
alongside the existing create-topics implementation to fully support all
advertised commands.

In `@kafka-pipeline/src/consumer.rs`:
- Line 1: Remove the #![allow(unused_imports, unused_variables, dead_code,
unused_mut)] directive from the top of the file since it suppresses compiler
warnings that could hide real issues. If any of the imports or variables are
intentionally unused and needed to remain in the code, prefix them with an
underscore (e.g., _unused_var, _UnusedImport) instead of using the blanket allow
directive. This approach maintains clean code while still allowing intentionally
unused items without compiler warnings.

In `@kafka-pipeline/src/main.rs`:
- Line 14: The hardcoded default port in the KAFKA_BROKERS environment variable
assignment in main.rs (29092) differs from what is specified in the .env.example
file (9092), creating inconsistency for developers. Update the default value in
the std::env::var("KAFKA_BROKERS").unwrap_or_else() call to use port 9092 to
match the .env.example documentation, or alternatively update .env.example to
use 9092 if that is the correct standard for your local development setup.

---

Nitpick comments:
In `@agent/agent.toml`:
- Line 5: The log_level configuration in the agent.toml file is currently set to
"debug", which is too verbose for production deployments and can negatively
impact performance and disk usage. Change the log_level value from "debug" to
either "info" or "warn" to provide appropriate logging verbosity for production
environments while reducing performance overhead and log file growth.

In `@agent/crates/agent-bin/src/main.rs`:
- Around line 71-84: The parse_endpoint function silently falls back to default
values (127.0.0.1 and port 50051) when parsing fails without logging, which
could hide configuration errors from operators. Add logging statements that
trigger when the IP address parsing falls back to the default IPv4 address and
when the port parsing falls back to the default port 50051, so that operators
are informed when their endpoint configuration is malformed or invalid.
- Line 1: Remove the overly broad lint suppression attribute at the start of
main.rs. The current `#![allow(unused_imports, unused_variables, dead_code,
unused_mut)]` directive masks code quality issues. Modify or remove this
attribute to eliminate suppression of `unused_variables`, `dead_code`, and
`unused_mut` warnings, keeping only `unused_imports` if absolutely necessary.
This will allow the Rust compiler to properly warn about unused code and
unnecessary mutability, improving code quality for production.

In `@agent/crates/agent-core/src/lib.rs`:
- Line 1: Remove the module-level `#![allow(unused_imports, unused_variables,
dead_code, unused_mut)]` attribute at the beginning of the lib.rs file. If this
causes legitimate compiler warnings or errors to surface, address those warnings
by applying `#[allow(...)]` attributes only to the specific items (functions,
types, imports, or variables) that intentionally need the suppression, rather
than suppressing warnings globally across the entire module.

In `@agent/crates/agent-core/src/orchestrator.rs`:
- Line 1: The module-level `#![allow(unused_imports, unused_variables,
dead_code, unused_mut)]` attribute at the top of the orchestrator module is
suppressing warnings across the entire file, which can hide legitimate code
issues. Either remove this attribute entirely if the code no longer needs these
suppressions, or move it to specific functions, methods, or structs where the
warnings are intentionally accepted by using local attributes like
`#[allow(...)]` on individual items instead of the module-level blanket
suppression.

In `@agent/crates/fleet-client/src/enrollment.rs`:
- Around line 16-22: The enrollment call made through the client.unary() method
lacks an explicit timeout configuration, which could cause the agent to hang
indefinitely if the fleet server becomes unresponsive during the enrollment
handshake. Add a timeout wrapper around the unary() call that processes the
tonic_req with the path and JsonCodec for RegisterRequest and RegisterResponse
types to ensure the enrollment operation will fail gracefully with a timeout
error rather than hanging indefinitely.

In `@agent/crates/fleet-client/src/lib.rs`:
- Around line 139-168: The event processing loop is inefficiently calling
tx.send(proto_event).await sequentially for each event in the batch, which
causes unnecessary context switches and blocks pipelining. Replace the
sequential awaits in the loop with either a non-blocking approach using
tx.try_send for each proto_event instead of tx.send().await, or collect all
created AgentEvent instances into a Vec and use futures::stream::iter to
batch-send them via forward to the tx channel. This will allow multiple events
to be pipelined and reduce context switching overhead during batch processing.

In `@agent/crates/osquery-client/src/bin/test_query.rs`:
- Around line 82-95: The framed test reads only the 4-byte length header but
does not consume the actual response payload from the stream. In the Ok(Ok(_))
branch where resp_len is calculated, add a second read_exact call to read
resp_len bytes from the stream into a buffer to consume the full response
message, ensuring the socket is properly cleared and subsequent operations are
not affected by unconsumed data.

In `@agent/crates/osquery-client/src/client.rs`:
- Around line 76-86: The EOF detection logic in the error handling block relies
on fragile string matching of the error message (checking if err_str contains
"UnexpectedEof", "EOF", etc.), which will break if the thrift library changes
its error messages in future versions. Instead of converting the error to a
string and using contains checks, inspect the actual error type or kind directly
from the error object e. Determine the appropriate error type or kind exported
by the thrift crate being used and match against those types instead of string
patterns to robustly detect EOF conditions.

In `@agent/tests/agent_integration.rs`:
- Around line 1-10: The test_agent_integration function contains only comments
describing test steps but has no actual implementation or assertions. Either
implement the full test by replacing the comments with actual code that sets up
a mock fleet server using tonic with JsonCodec, starts the agent, and verifies
enrollment, events, heartbeats, and command handling as described in the
comments, or if implementation is deferred, add either a todo!() macro call at
the beginning of the function body or apply the #[ignore] attribute to the test
function to explicitly mark it as incomplete and prevent false-positive test
passes.

In `@api-backend/Dockerfile`:
- Around line 9-10: Restructure the Dockerfile build steps to separate
dependency caching from source code changes. Move the COPY . . instruction to
after the dependency layer by first copying only the Cargo.toml and Cargo.lock
files along with the api-backend/Cargo.toml, then building with a dummy main.rs
file to cache the dependencies in a separate layer. After the dependency build
completes and the fingerprint is cleaned, copy the actual api-backend/src
directory and run the cargo build command again to rebuild with the cached
dependencies. This two-stage approach ensures that changes to source code do not
invalidate the dependency cache layer.

In `@fleet-server/crates/fleet-server-bin/src/ports.rs`:
- Around line 56-57: The KafkaPublisher::new() call in the build_ports function
uses .expect() which causes a panic if Kafka is unavailable at startup. Change
the return type of build_ports to return a Result instead of unwrapping errors,
then propagate the error from KafkaPublisher::new() by using the ? operator. In
main.rs, update the call to build_ports to handle the returned Result and exit
gracefully with a descriptive error message if Kafka initialization fails.

In `@fleet-server/crates/kafka-handler/src/lib.rs`:
- Around line 10-15: The message timeout value is hardcoded to "5000" in the new
method of the Kafka handler, reducing configurability. Modify the new method to
accept a message_timeout_ms parameter (as a string or number type) instead of
hardcoding the value, then use that parameter when setting the
"message.timeout.ms" configuration. Alternatively, read this value from an
environment variable with a sensible default fallback to maintain backward
compatibility while allowing deployment-specific configuration.

In `@fleet-server/Dockerfile`:
- Around line 16-18: Restore Docker layer caching in the Dockerfile by splitting
the build into two stages. After setting WORKDIR /build, first copy only the
dependency manifests (Cargo.toml, Cargo.lock at the root, and the Cargo.toml
files from fleet-server/crates/ and sdk/ directories). Then run a cargo build
with a temporary dummy main.rs file to cache the dependency layer separately
from source code changes. Finally, copy the complete source code (COPY . .) and
run cargo build again with the actual source. This way, subsequent builds that
only change source files will reuse the cached dependency layer instead of
rebuilding all dependencies.

In `@infra/docker-compose.dev.yml`:
- Line 4: Replace the `apache/kafka:latest` image tag with a specific pinned
version to ensure reproducible development environments and prevent unexpected
breaking changes when the image updates. Instead of using the `latest` tag,
specify a concrete version number such as `apache/kafka:3.6.0` or another stable
release version that matches your requirements. This ensures all developers use
the same Kafka version consistently.

In `@infra/docker-compose.kafka-cluster.yml`:
- Line 4: The Kafka cluster nodes are using the apache/kafka:latest image tag
which can cause version drift and inconsistency across the cluster. Replace the
latest tag with a specific stable version number in all three Kafka node service
definitions (appearing at the image references across the configuration file).
Ensure all three nodes use the exact same pinned version to maintain cluster
compatibility and prevent quorum failures due to version mismatches.

In `@infra/scripts/create-topics.sh`:
- Around line 4-5: The variables BOOTSTRAP and KAFKA_BIN are defined at the
beginning of the script but are never actually used in the docker exec commands,
which instead contain hardcoded values. Replace the hardcoded occurrences of
"localhost:29092" with the BOOTSTRAP variable and "/opt/kafka/bin" with the
KAFKA_BIN variable throughout all docker exec command calls in the script to
utilize these defined variables.
- Line 15: The docker exec command for the aigis-kafka-dev container assumes the
container is running without validation, which causes cryptic errors if the
container doesn't exist or is stopped. Add a preflight check before the docker
exec command that verifies the container exists and is in a running state. You
can use docker inspect or docker ps to check the container status, and exit with
an informative error message if the container is not available. This check
should be placed immediately before the docker exec call that uses the
kafka-topics.sh script.

In `@kafka-pipeline/Cargo.toml`:
- Line 13: Remove the commented-out rdkafka dependency line in Cargo.toml. Since
the active rdkafka dependency is already specified elsewhere with an explicit
version, the commented-out line is redundant and serves no purpose. Simply
delete the entire commented line to clean up the configuration file and prevent
confusion.
- Around line 26-27: The dependencies metrics, metrics-exporter-prometheus, and
futures-util in kafka-pipeline/Cargo.toml are using hardcoded version strings
instead of referencing workspace dependencies like other dependencies do.
Replace the hardcoded version specifications for these three dependencies with
workspace = true in the kafka-pipeline/Cargo.toml file, and ensure their
corresponding version definitions exist in the workspace dependencies section of
the root Cargo.toml for centralized version management.

In `@kafka-pipeline/Dockerfile`:
- Line 3: The apt-get install command in the Dockerfile is installing
recommended packages unnecessarily, which increases the image size. Modify the
RUN instruction that installs cmake, g++, and protobuf-compiler to include the
--no-install-recommends flag in the apt-get install command. This flag prevents
installation of non-essential recommended packages and will reduce the builder
image size.

In `@kafka-pipeline/src/event_router.rs`:
- Around line 20-29: The comment on the osquery_result and osquery_snapshot
routing case within the route_topic method is vague and unhelpful. Replace the
"// default bucket" comment with a clearer explanation that clarifies why these
osquery event types are intentionally routed to the "aigis.events.process" topic
rather than to their own dedicated topic or the raw events topic. The new
comment should provide context about the design decision behind this routing
choice.
- Line 1: Remove the overly broad #![allow(unused_imports, unused_variables,
dead_code, unused_mut)] attribute at the module level in event_router.rs, as it
masks potential code quality issues. If specific warnings must be suppressed,
either fix the underlying code issues or use more granular #[allow(...)]
attributes scoped directly to the particular functions, structs, or variables
that genuinely require suppression. This ensures that legitimate warnings are
not hidden and code quality remains maintainable.

In `@kafka-pipeline/src/health.rs`:
- Around line 1-6: The is_healthy() function currently returns a hardcoded true
value without performing any actual health verification. Replace this stub
implementation with real health checks that verify the Kafka consumer connection
state, broker reachability, processing lag metrics, and recent successful
message consumption. The function should query the Kafka consumer's internal
state to determine connectivity, check broker availability, inspect lag metrics
to ensure processing is keeping up, and verify that messages have been consumed
recently. The function should return true only when all these checks pass,
otherwise return false to accurately reflect the system's health status.

In `@kafka-pipeline/src/main.rs`:
- Line 20: The linger.ms configuration in the Kafka producer setup is set to 5
milliseconds, which is too aggressive and reduces message batching efficiency.
Increase the value in the `.set("linger.ms", "5")` call to a value between
50-100 milliseconds to improve batching efficiency, reduce CPU overhead, and
minimize network round-trips while maintaining acceptable latency for the event
router.

In `@kafka-pipeline/tests/integration_test.rs`:
- Around line 1-8: The test_pipeline_integration function is a placeholder with
no actual test logic, which provides no meaningful coverage. Replace the
placeholder implementation with actual integration test logic that sets up a
test Kafka broker, produces messages, and verifies that the EventRouterProcessor
correctly routes and processes those messages according to the expected
behavior. Alternatively, if integration testing is not ready, remove the empty
test entirely to avoid misleading coverage metrics.

In `@README.md`:
- Around line 583-605: The MIT License code block in the README.md file is
missing a language identifier on its opening code fence. Add the `text` language
identifier after the opening triple backticks (```) that precedes the MIT
License content to enable proper syntax highlighting and rendering in the
documentation.
- Around line 122-123: The README uses inconsistent spelling variants for the
same word: "normalized" (American English) on line 122 and "normalised" (British
English) on lines 84, 123, 181, 234, and 551. Choose one spelling variant and
replace all occurrences throughout the entire README file to ensure consistency.
Update all instances of either "normalized" or "normalised" to match the
selected variant across the entire document.

In `@rule-engine/Dockerfile`:
- Around line 9-10: The current Dockerfile pattern copies all source code before
running cargo build, which means any source file change invalidates the entire
dependency cache layer. Restructure the build by first copying only the
Cargo.toml and Cargo.lock manifest files, then creating a dummy main.rs file in
rule-engine/src and running the initial cargo build to cache dependencies. After
the dependency layer is built and cached, copy the actual rule-engine/src source
code and run cargo build again. This separates the dependency resolution
(stable, rarely changes) from the source code layer (changes frequently),
allowing Docker to reuse the cached dependency layer when only source code is
modified.

In `@sdk/src/models/heartbeat.rs`:
- Line 5: Remove the redundant `node_id` field from the `HeartbeatRequest`
struct since it is automatically overwritten by the fleet-client's `heartbeat()`
method with the client's internal state. This field at line 5 is unused and
confusing for API consumers, so delete the entire `pub node_id: String,` line
from the struct definition.
🪄 Autofix (Beta)

❌ Autofix failed (check again to retry)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 106c1610-1113-4a11-a8c8-8ad013f72bbc

📥 Commits

Reviewing files that changed from the base of the PR and between 4aac882 and 63d75be.

📒 Files selected for processing (73)
  • .dev.env.example
  • .dockerignore
  • .env.example
  • .github/workflows/ci.yml
  • .github/workflows/kafka-pipeline.yml
  • .gitignore
  • Cargo.toml
  • EDR_IMPLEMENTATION_GUIDE.md
  • ISSUES.md
  • README.md
  • agent/.env.example
  • agent/agent.toml
  • agent/crates/agent-bin/Cargo.toml
  • agent/crates/agent-bin/src/main.rs
  • agent/crates/agent-core/Cargo.toml
  • agent/crates/agent-core/src/command_handler.rs
  • agent/crates/agent-core/src/config.rs
  • agent/crates/agent-core/src/lib.rs
  • agent/crates/agent-core/src/orchestrator.rs
  • agent/crates/agent-core/src/preflight.rs
  • agent/crates/fleet-client/Cargo.toml
  • agent/crates/fleet-client/src/enrollment.rs
  • agent/crates/fleet-client/src/lib.rs
  • agent/crates/osquery-client/src/bin/test_query.rs
  • agent/crates/osquery-client/src/client.rs
  • agent/crates/osquery-client/src/lib.rs
  • agent/crates/osquery-client/src/scheduler.rs
  • agent/crates/osquery-client/src/types.rs
  • agent/install.sh
  • agent/osquery-edr-linux-guide.md
  • agent/osquery/osquery.flags
  • agent/systemd/aigis-zero.service
  • agent/tests/agent_integration.rs
  • agent/uninstall.sh
  • api-backend/Dockerfile
  • fleet-server/Dockerfile
  • fleet-server/crates/fleet-manager/Cargo.toml
  • fleet-server/crates/fleet-server-bin/src/main.rs
  • fleet-server/crates/fleet-server-bin/src/ports.rs
  • fleet-server/crates/grpc-listener/Cargo.toml
  • fleet-server/crates/grpc-listener/build.rs
  • fleet-server/crates/grpc-listener/src/service.rs
  • fleet-server/crates/kafka-handler/Cargo.toml
  • fleet-server/crates/kafka-handler/src/lib.rs
  • fleet-server/crates/node-enrollment/Cargo.toml
  • fleet-server/src/grpc/testing.proto
  • infra/docker-compose.dev.yml
  • infra/docker-compose.kafka-cluster.yml
  • infra/k8s/keda-scaler.yml
  • infra/scripts/create-topics.sh
  • kafka-pipeline/.env.example
  • kafka-pipeline/Cargo.toml
  • kafka-pipeline/Dockerfile
  • kafka-pipeline/guide.md
  • kafka-pipeline/src/bin/kafka-admin.rs
  • kafka-pipeline/src/consumer.rs
  • kafka-pipeline/src/event_router.rs
  • kafka-pipeline/src/health.rs
  • kafka-pipeline/src/lib.rs
  • kafka-pipeline/src/main.rs
  • kafka-pipeline/src/metrics.rs
  • kafka-pipeline/tests/integration_test.rs
  • rule-engine/Dockerfile
  • sdk/Cargo.toml
  • sdk/build.rs
  • sdk/src/codec.rs
  • sdk/src/lib.rs
  • sdk/src/models/enrollment.rs
  • sdk/src/models/envelope.rs
  • sdk/src/models/event.rs
  • sdk/src/models/heartbeat.rs
  • sdk/src/models/mod.rs
  • typos.toml
💤 Files with no reviewable changes (3)
  • ISSUES.md
  • fleet-server/crates/grpc-listener/build.rs
  • EDR_IMPLEMENTATION_GUIDE.md

check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Update actions/checkout to v4.

Lines 17, 40, 48, and 56 use actions/checkout@v3, but the v3 runner is deprecated and no longer supported on GitHub Actions. Update to @v4 to ensure compatibility with current GitHub infrastructure.

🔧 Proposed fix
-      - uses: actions/checkout@v3
+      - uses: actions/checkout@v4

Apply this change to all four jobs (check, test, build, docker).

Also applies to: 40-40, 48-48, 56-56

🧰 Tools
🪛 actionlint (1.7.12)

[error] 17-17: the runner of "actions/checkout@v3" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)

🪛 zizmor (1.25.2)

[warning] 17-17: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false

(artipacked)


[error] 17-17: unpinned action reference (unpinned-uses): action is not pinned to a hash (required by blanket policy)

(unpinned-uses)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/kafka-pipeline.yml at line 17, The actions/checkout action
is pinned to the deprecated v3 version which is no longer supported on GitHub
Actions. Replace all occurrences of `actions/checkout@v3` with
`actions/checkout@v4` across all four jobs in the workflow file (the check,
test, build, and docker jobs) to ensure compatibility with current GitHub
infrastructure and remove the dependency on the unsupported v3 runner.

Source: Linters/SAST tools

Comment thread agent/agent.toml
event_drain_batch = 100 # rows per flush cycle
event_drain_interval_secs = 5 # how often to attempt shipping buffered events

node_id = "e38eb47e-6dd5-4ae0-b576-17b65dbf0eed"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Remove hardcoded node_id from committed config.

A fixed node_id = "e38eb47e-6dd5-4ae0-b576-17b65dbf0eed" value is present, but line 3 comments state it should be "filled in after enrollment". If this hardcoded UUID is deployed to multiple agents, fleet management will break because all agents will report the same identity.

The main.rs enrollment flow (lines 176-179) will overwrite this value after first enrollment, but:

  1. Git history will contain this UUID
  2. Operators copying the file may inadvertently deploy it
  3. The config file structure already supports optional node_id (line 3 shows commented example)
🔧 Proposed fix
-node_id = "e38eb47e-6dd5-4ae0-b576-17b65dbf0eed"
 [osquery]

This removes the hardcoded value, allowing enrollment to write the unique node_id on first run.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
node_id = "e38eb47e-6dd5-4ae0-b576-17b65dbf0eed"
[osquery]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/agent.toml` at line 14, Remove the hardcoded node_id value from the
agent.toml configuration file. The node_id should not be pre-populated in the
committed config file since the enrollment flow in main.rs (which handles
initial agent setup) will write a unique node_id on first run. Delete the line
containing node_id = "e38eb47e-6dd5-4ae0-b576-17b65dbf0eed" to prevent multiple
agents from sharing the same identity when deployed.

Comment on lines +172 to +173
let node_id = Uuid::parse_str(&enrollment.node_id).unwrap_or_default();
let token = enrollment.token;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fail fast if enrollment returns an invalid node_id UUID.

Line 172 uses .unwrap_or_default(), which substitutes a zero UUID (00000000-0000-0000-0000-000000000000) if parsing fails. If the fleet server returns a malformed UUID, the agent will silently use and persist the zero UUID, causing operational issues.

🔧 Proposed fix
-    let node_id = Uuid::parse_str(&enrollment.node_id).unwrap_or_default();
+    let node_id = Uuid::parse_str(&enrollment.node_id)
+        .map_err(|e| anyhow::anyhow!("Fleet returned invalid node_id '{}': {}", enrollment.node_id, e))?;
     let token = enrollment.token;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let node_id = Uuid::parse_str(&enrollment.node_id).unwrap_or_default();
let token = enrollment.token;
let node_id = Uuid::parse_str(&enrollment.node_id)
.map_err(|e| anyhow::anyhow!("Fleet returned invalid node_id '{}': {}", enrollment.node_id, e))?;
let token = enrollment.token;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 172 - 173, The
Uuid::parse_str(&enrollment.node_id).unwrap_or_default() call on line 172
silently defaults to a zero UUID if parsing fails, masking invalid data from the
fleet server. Replace the unwrap_or_default() with error handling that fails
fast (such as expect() with a descriptive message, or return an error) so that
malformed UUIDs from the enrollment response are caught and logged immediately
rather than silently persisting an invalid zero UUID.

Comment on lines +295 to +301
// Wait for shutdown signal
let _ = tokio::signal::ctrl_c().await;
info!("Ctrl-C received, shutting down");

// Notify stopping
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]);

res
Ok(())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Missing graceful shutdown of background tasks and event buffer flush.

The agent waits for Ctrl-C then immediately returns Ok(()), dropping all spawned tasks without cancellation or join. This means:

  • Buffered events are not flushed to the fleet server
  • The AgentCore cancellation token is never triggered
  • OSQuery connections may not close cleanly

Consider triggering agent_core.shutdown.cancel(), joining spawned tasks with a timeout, and draining the event buffer before exit.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-bin/src/main.rs` around lines 295 - 301, The shutdown
logic in the main function after receiving Ctrl-C from
tokio::signal::ctrl_c().await does not gracefully terminate background tasks or
flush pending data. After the Ctrl-C signal is received, you need to trigger the
cancellation token by calling cancel() on the agent_core.shutdown field, then
join all spawned background tasks with an appropriate timeout to allow them to
finish processing and close their connections cleanly. Additionally, ensure that
the event buffer is drained to flush any buffered events to the fleet server
before returning Ok(()). This prevents data loss and ensures all resources are
properly released.

@@ -0,0 +1,35 @@
#![allow(unused_imports, unused_variables, dead_code, unused_mut)]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Remove production lint suppressions.

Same issue as in fleet-client/src/lib.rs: blanket warning suppressions hide potential bugs and code quality issues.

🧹 Proposed fix
-#![allow(unused_imports, unused_variables, dead_code, unused_mut)]
 use edr_sdk::proto::fleet::{ServerCommand, server_command::Command};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#![allow(unused_imports, unused_variables, dead_code, unused_mut)]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agent/crates/agent-core/src/command_handler.rs` at line 1, Remove the blanket
lint suppression attribute `#![allow(unused_imports, unused_variables,
dead_code, unused_mut)]` from the top of the file. This suppression masks
potential bugs and code quality issues. After removing it, address any actual
lint warnings that appear by fixing the underlying code issues rather than
suppressing them.

let mut new_topics = Vec::new();
for (i, topic) in TOPICS.iter().enumerate() {
let new_topic =
NewTopic::new(topic.name, topic.partitions, TopicReplication::Fixed(1))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Hardcoded replication factor is not production-safe.

TopicReplication::Fixed(1) creates topics with no redundancy. In production Kafka clusters, a replication factor of at least 3 is recommended for durability. Consider making this configurable via command-line argument or environment variable.

♻️ Suggested enhancement
+    let replication = std::env::var("KAFKA_REPLICATION_FACTOR")
+        .ok()
+        .and_then(|s| s.parse::<i32>().ok())
+        .unwrap_or(3);
+
     let mut new_topics = Vec::new();
     for (i, topic) in TOPICS.iter().enumerate() {
         let new_topic =
-            NewTopic::new(topic.name, topic.partitions, TopicReplication::Fixed(1))
+            NewTopic::new(topic.name, topic.partitions, TopicReplication::Fixed(replication))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/bin/kafka-admin.rs` at line 83, The
TopicReplication::Fixed(1) parameter in the NewTopic::new() call is hardcoded to
a replication factor of 1, which lacks durability and redundancy for production
use. Replace this hardcoded value with a configurable replication factor by
extracting it from a command-line argument or environment variable, defaulting
to at least 3 for production safety. Update the TopicReplication::Fixed() call
to use the configurable replication factor value instead of the hardcoded 1.

.set("bootstrap.servers", brokers)
.set("group.id", group_id)
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "true")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Data loss risk: auto-commit enabled with failed message processing.

Line 43 enables auto.commit with a 1-second interval, but lines 86-89 only log processing errors without preventing offset commits. This means if processor.process() fails, the message offset will still be auto-committed within 1 second, causing permanent data loss of that message.

🛡️ Recommended fix: disable auto-commit and commit manually after successful processing

Update the consumer configuration:

         .set("group.id", group_id)
         .set("auto.offset.reset", "earliest")
-        .set("enable.auto.commit", "true")
-        .set("auto.commit.interval.ms", "1000")
+        .set("enable.auto.commit", "false")
         .set("fetch.min.bytes", "1")

Then manually commit after successful processing:

                             let payload = borrowed_msg.payload().unwrap_or(&[]);
 
-                            if let Err(e) = self.processor.process(key, payload, topic, partition, offset).await {
-                                error!(error = %e, topic, partition, offset, "Message processing failed");
-                                // TODO: Send to DLQ
+                            match self.processor.process(key, payload, topic, partition, offset).await {
+                                Ok(()) => {
+                                    if let Err(e) = self.consumer.commit_message(&borrowed_msg, rdkafka::consumer::CommitMode::Async) {
+                                        error!(error = %e, topic, partition, offset, "Failed to commit offset");
+                                    }
+                                }
+                                Err(e) => {
+                                    error!(error = %e, topic, partition, offset, "Message processing failed");
+                                    // TODO: Send to DLQ
+                                }
                             }

Also applies to: 86-89

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/consumer.rs` at line 43, The consumer is configured with
auto-commit enabled on line 43, which causes message offsets to be committed
automatically even when processing fails, resulting in permanent data loss.
Disable auto-commit by changing the setting "enable.auto.commit" from "true" to
"false", then add manual offset commit logic in the message processing block
(around the processor.process() call on lines 86-89) that only commits the
offset after successful processing. The commit should only occur when
processor.process() completes without errors, ensuring failed messages are not
marked as consumed and remain available for reprocessing.


if let Err(e) = self.processor.process(key, payload, topic, partition, offset).await {
error!(error = %e, topic, partition, offset, "Message processing failed");
// TODO: Send to DLQ

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

DLQ implementation missing.

The TODO on line 88 indicates that failed messages are not being sent to a Dead Letter Queue. Without DLQ handling, processing failures result in either message loss (with auto-commit) or infinite retries (with manual commit). Consider implementing DLQ routing for failed messages.

Do you want me to generate a DLQ implementation pattern for this consumer, or would you prefer to open a tracking issue?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/consumer.rs` at line 88, Replace the TODO comment on line
88 in the consumer.rs file with actual Dead Letter Queue implementation. When
message processing fails in the consumer's error handling path, route the failed
message to a configured DLQ topic instead of leaving it unhandled. Implement a
function to send failed messages to the DLQ topic with appropriate metadata
(original topic, partition, offset, error details), and handle any errors that
occur during DLQ publishing to prevent message loss.

Comment on lines +43 to +49
let event: Value =
serde_json::from_slice(payload).map_err(|e| format!("Invalid JSON: {e}"))?;

let event_type = event
.get("event_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how the agent constructs the payload that flows to Kafka
rg -nP -A5 -B5 'to_vec.*payload|serde_json::to_vec' agent/crates/fleet-client/src/lib.rs

# Check how fleet-server publishes to Kafka
rg -nP -A10 'fn ingest_event' fleet-server/crates/fleet-server-bin/src/ports.rs

Repository: swar09/project-edr

Length of output: 944


🏁 Script executed:

#!/bin/bash
# Get more context around the payload construction in agent
sed -n '130,170p' agent/crates/fleet-client/src/lib.rs

# Check if there's an event_type field being set in the protobuf
rg -n 'event_type' agent/crates/fleet-client/src/lib.rs | head -20

# Check the protobuf definition for AgentEvent
find . -name '*.proto' -o -name '*agent_event*' 2>/dev/null | head -10

Repository: swar09/project-edr

Length of output: 1819


🏁 Script executed:

#!/bin/bash
# Get full context of KafkaEventIngest::ingest_event
sed -n '15,45p' fleet-server/crates/fleet-server-bin/src/ports.rs

# Check if event_type is being used anywhere in ports.rs
rg -n 'event_type' fleet-server/crates/fleet-server-bin/src/ports.rs

# Check the publisher interface to see what .publish() accepts
rg -n -A15 'fn publish' fleet-server/crates/fleet-server-bin/src/ports.rs

Repository: swar09/project-edr

Length of output: 914


🏁 Script executed:

#!/bin/bash
# Find IncomingEvent definition
rg -n 'struct IncomingEvent|type IncomingEvent' fleet-server/crates/fleet-server-bin/src/ports.rs

# Get more context to understand where IncomingEvent comes from
rg -n 'IncomingEvent' fleet-server/crates/fleet-server-bin/src/ports.rs | head -20

# Check if there's any mapping or conversion from AgentEvent to IncomingEvent
rg -n -B5 -A5 'IncomingEvent' fleet-server/crates/fleet-server-bin/src/ports.rs | head -40

Repository: swar09/project-edr

Length of output: 914


🏁 Script executed:

#!/bin/bash
# Find IncomingEvent definition in fleet_manager
find . -path '*/fleet_manager/*' -name '*.rs' | xargs rg -l 'IncomingEvent'

# Search for struct IncomingEvent
rg -n 'struct IncomingEvent' --type rs

# Check what fields IncomingEvent has
rg -n -A10 'struct IncomingEvent' --type rs

Repository: swar09/project-edr

Length of output: 325


🏁 Script executed:

#!/bin/bash
# Find IncomingEvent definition
rg -n 'struct IncomingEvent' fleet-server/crates/fleet-manager/src/

# Check grpc-listener to see how IncomingEvent is created
rg -n -B5 -A15 'IncomingEvent' fleet-server/crates/grpc-listener/src/service.rs | head -60

Repository: swar09/project-edr

Length of output: 2167


🏁 Script executed:

#!/bin/bash
# Get the full IncomingEvent struct definition
sed -n '34,45p' fleet-server/crates/fleet-manager/src/ports.rs

# Check how the gRPC service maps AgentEvent to IncomingEvent
sed -n '95,110p' fleet-server/crates/grpc-listener/src/service.rs

Repository: swar09/project-edr

Length of output: 1136


🏁 Script executed:

#!/bin/bash
# Check the KafkaPublisher interface to see if it supports headers
rg -n 'trait.*Publisher|fn publish' kafka-handler/src/lib.rs | head -20

# Check EventRouter to see how it processes Kafka messages
rg -n -B3 -A10 'process\(' kafka-pipeline/src/event_router.rs | head -50

# Check what context EventRouter receives (Kafka message structure)
rg -n 'fn process|KafkaRecord|MessageKey' kafka-pipeline/src/event_router.rs

Repository: swar09/project-edr

Length of output: 658


🏁 Script executed:

#!/bin/bash
# Check the KafkaPublisher interface and implementation
find . -name 'lib.rs' -o -name '*.rs' | xargs rg -l 'KafkaPublisher|pub.*fn publish' | grep -i kafka

# Look for the actual kafka-handler implementation
find . -path '*/kafka*' -name '*.rs' 2>/dev/null

# Check what KafkaEventIngest passes as the key
sed -n '10,35p' fleet-server/crates/fleet-server-bin/src/ports.rs

Repository: swar09/project-edr

Length of output: 1219


🏁 Script executed:

#!/bin/bash
# Check KafkaPublisher::publish signature and implementation
rg -n -A20 'fn publish' fleet-server/crates/kafka-handler/src/lib.rs

# Check if there's any header handling
rg -n 'header|Header' fleet-server/crates/kafka-handler/src/lib.rs

Repository: swar09/project-edr

Length of output: 418


🏁 Script executed:

#!/bin/bash
# Verify what the actual payload JSON looks like—check if event_type could be embedded in payload itself
# Look at agent code to see what val["payload"] actually contains

sed -n '135,165p' agent/crates/fleet-client/src/lib.rs

# Check if there's any documentation or test that shows the event structure
rg -n -A5 'val\["payload"\]' agent/crates/fleet-client/src/lib.rs

Repository: swar09/project-edr

Length of output: 1640


🏁 Script executed:

#!/bin/bash
# Check if EventRouter has any access to Kafka message headers or if there's another way to get event_type
sed -n '25,60p' kafka-pipeline/src/event_router.rs

# Verify if there's any header handling in the consumer/message processor
rg -n 'header|Header|MessageProcessor' kafka-pipeline/src/consumer.rs | head -20

Repository: swar09/project-edr

Length of output: 1392


event_type extraction will always fail — payload lacks the event_type field.

The code attempts to extract event_type from the JSON payload, but the actual data reaching EventRouter contains only the inner event data without this field.

Data flow confirms the issue:

  1. Agent (fleet-client:154) serializes only val["payload"] to bytes: serde_json::to_vec(&val["payload"])
  2. The extracted event_type is placed in the AgentEvent protobuf as a separate field, not inside payload
  3. gRPC service creates IncomingEvent with the event_type field available (service.rs:102)
  4. KafkaEventIngest discards event_type and publishes only payload bytes with node_id as the key (ports.rs:20–28)
  5. EventRouter receives payload = val["payload"] JSON, which lacks the top-level "event_type" key
  6. .get("event_type") returns None, defaults to "unknown", routing all events to "aigis.events.raw"

Fix options:

  • Option A (recommended): Modify KafkaEventIngest to wrap the payload with event_type before publishing: {"event_type": "...", "data": payload_bytes}
  • Option B: Pass event_type as the Kafka message key or in headers, extract from there instead of parsing payload
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/event_router.rs` around lines 43 - 49, The event_type
extraction in EventRouter fails because the KafkaEventIngest publishes only the
payload bytes without the event_type field, so the .get("event_type") call will
always return None and default to "unknown". Fix this by modifying
KafkaEventIngest (in ports.rs around the publish call, lines 20-28) to wrap the
payload with the event_type before publishing, creating a structure like
{"event_type": "...", "data": payload_bytes}, then update the event_type
extraction logic in EventRouter to properly extract from this wrapped structure
instead of expecting event_type to be a top-level field in the deserialized
event object.

Comment on lines +17 to +23
pub async fn get_consumer_lag(&self, _group_id: &str) -> Result<i64, String> {
// Fetch committed offsets
// Fetch latest offsets (watermarks)
// Calculate difference
// Return total lag
Ok(0) // TODO: actual implementation
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Find usage of LagMonitor to assess impact of stub implementation

# Search for LagMonitor instantiation
rg -n 'LagMonitor::new' --type rust

# Search for get_consumer_lag calls
rg -n 'get_consumer_lag' --type rust

Repository: swar09/project-edr

Length of output: 178


Stub implementation needs actual rdkafka calls for functional lag monitoring.

The get_consumer_lag method returns hardcoded Ok(0) with TODO comments. While marked as dead code (intentionally unused scaffolding), this stub will need a working implementation before metric collection can detect consumer lag, backpressure, or processing delays. The commented steps suggest the intent—fetch committed offsets, watermarks, then calculate the difference—but implementation is required for this to be functional.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@kafka-pipeline/src/metrics.rs` around lines 17 - 23, Replace the stub
implementation of the get_consumer_lag method with actual rdkafka calls. Use the
provided group_id parameter to fetch committed offsets from the Kafka consumer
group using the appropriate rdkafka client method. Then fetch the latest offsets
(watermarks) for the same partitions. Calculate the total lag by summing the
differences between watermarks and committed offsets across all partitions, and
return the result instead of the hardcoded Ok(0) value. Remove or update the
TODO comments as the implementation progresses.

@coderabbitai

coderabbitai Bot commented Jun 27, 2026

Copy link
Copy Markdown

Note

Autofix is a beta feature. Expect some limitations and changes as we gather feedback and continue to improve it.

An unexpected error occurred while generating fixes: Sandbox execution failed: failed

@swar09 swar09 merged commit 4722032 into main Jun 27, 2026
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant