From b4db93de6ec5a68f7afc3637910c462cedab8892 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Wed, 17 Jun 2026 22:00:56 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`agent/b?= =?UTF-8?q?ug-fixes-01`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @swar09. * https://github.com/swar09/project-edr/pull/14#issuecomment-4735898454 The following files were modified: * `agent/crates/agent-bin/src/main.rs` * `agent/crates/agent-core/src/command_handler.rs` * `agent/crates/agent-core/src/lib.rs` * `agent/crates/agent-core/src/orchestrator.rs` * `agent/crates/agent-core/src/preflight.rs` * `agent/crates/fleet-client/src/enrollment.rs` * `agent/crates/fleet-client/src/lib.rs` * `agent/crates/osquery-client/src/bin/test_query.rs` * `agent/crates/osquery-client/src/client.rs` * `agent/crates/osquery-client/src/scheduler.rs` * `fleet-server/crates/fleet-server-bin/src/main.rs` * `fleet-server/crates/fleet-server-bin/src/ports.rs` * `fleet-server/crates/kafka-handler/src/lib.rs` * `infra/scripts/create-topics.sh` * `kafka-pipeline/src/bin/kafka-admin.rs` * `kafka-pipeline/src/consumer.rs` * `kafka-pipeline/src/event_router.rs` * `kafka-pipeline/src/health.rs` * `kafka-pipeline/src/main.rs` * `kafka-pipeline/src/metrics.rs` * `sdk/build.rs` * `sdk/src/codec.rs` --- agent/crates/agent-bin/src/main.rs | 32 ++++++ .../crates/agent-core/src/command_handler.rs | 21 ++++ agent/crates/agent-core/src/lib.rs | 15 ++- agent/crates/agent-core/src/orchestrator.rs | 39 +++++++ agent/crates/agent-core/src/preflight.rs | 55 ++++++++++ agent/crates/fleet-client/src/enrollment.rs | 14 +++ agent/crates/fleet-client/src/lib.rs | 102 ++++++++++++++++++ .../osquery-client/src/bin/test_query.rs | 20 ++++ agent/crates/osquery-client/src/client.rs | 16 +++ agent/crates/osquery-client/src/scheduler.rs | 12 +++ .../crates/fleet-server-bin/src/main.rs | 9 ++ .../crates/fleet-server-bin/src/ports.rs | 29 +++++ fleet-server/crates/kafka-handler/src/lib.rs | 23 ++++ infra/scripts/create-topics.sh | 2 +- kafka-pipeline/src/bin/kafka-admin.rs | 20 +++- kafka-pipeline/src/consumer.rs | 48 +++++++++ kafka-pipeline/src/event_router.rs | 38 +++++++ kafka-pipeline/src/health.rs | 8 +- kafka-pipeline/src/main.rs | 10 +- kafka-pipeline/src/metrics.rs | 21 ++++ sdk/build.rs | 18 ++++ sdk/src/codec.rs | 44 ++++++++ 22 files changed, 587 insertions(+), 9 deletions(-) diff --git a/agent/crates/agent-bin/src/main.rs b/agent/crates/agent-bin/src/main.rs index b85b43a..93f4e56 100644 --- a/agent/crates/agent-bin/src/main.rs +++ b/agent/crates/agent-bin/src/main.rs @@ -39,6 +39,16 @@ struct Args { enroll: bool, } +/// Updates or inserts the `node_id` field in the `[agent]` section of the TOML configuration file. +/// +/// # Examples +/// +/// ``` +/// use std::path::Path; +/// use uuid::Uuid; +/// let node_id = Uuid::new_v4(); +/// // save_node_id_to_config(Path::new("config.toml"), node_id).unwrap(); +/// ``` fn save_node_id_to_config(path: &Path, node_id: Uuid) -> anyhow::Result<()> { let content = std::fs::read_to_string(path)?; let mut lines: Vec = content.lines().map(String::from).collect(); @@ -68,6 +78,16 @@ fn save_node_id_to_config(path: &Path, node_id: Uuid) -> anyhow::Result<()> { Ok(()) } +/// Extracts IP address and port from an endpoint string. +/// +/// Invalid IP addresses default to 127.0.0.1, and missing or invalid ports default to 50051. +/// +/// # Examples +/// +/// ``` +/// let (ip, port) = parse_endpoint("http://192.168.1.1:8080"); +/// assert_eq!(port, 8080); +/// ``` fn parse_endpoint(endpoint: &str) -> (std::net::IpAddr, u16) { let clean = endpoint .trim_start_matches("http://") @@ -83,6 +103,18 @@ fn parse_endpoint(endpoint: &str) -> (std::net::IpAddr, u16) { (ip, port) } +/// Initializes and runs the Aigis-Zero agent until interrupted. +/// +/// Loads configuration, validates the environment, enrolls with the fleet server, +/// and establishes background tasks for event collection, heartbeats, and command handling. +/// The agent blocks until Ctrl-C is received. +/// +/// If `--check` is set, validates the configuration and environment, then exits. +/// +/// # Errors +/// +/// Returns an error if configuration is invalid, the agent cannot connect to the fleet, +/// or enrollment fails. #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); diff --git a/agent/crates/agent-core/src/command_handler.rs b/agent/crates/agent-core/src/command_handler.rs index 2b2f1c8..c636cf7 100644 --- a/agent/crates/agent-core/src/command_handler.rs +++ b/agent/crates/agent-core/src/command_handler.rs @@ -12,6 +12,27 @@ pub struct CommandHandler { } impl CommandHandler { + /// Processes a server command and returns a JSON status response. + /// + /// Handles different command types with specific actions: + /// - `Isolate`: Isolates or de-isolates the process based on the flag. + /// - `ConfigUpdate`: Acknowledges the configuration update. + /// - `Ack`: Acknowledges the message. + /// + /// # Errors + /// + /// Returns an error if the command is missing from the message or if an isolation operation fails. + /// + /// # Examples + /// + /// ``` + /// # async fn example() { + /// let handler = CommandHandler { /* ... */ }; + /// let cmd = ServerCommand { command: Some(Command::Ack(())) }; + /// let result = handler.handle(cmd).await; + /// assert!(result.is_ok()); + /// # } + /// ``` pub async fn handle(&self, msg: ServerCommand) -> Result { let command = msg.command.ok_or("missing command")?; diff --git a/agent/crates/agent-core/src/lib.rs b/agent/crates/agent-core/src/lib.rs index 0811ddc..c7aa97d 100644 --- a/agent/crates/agent-core/src/lib.rs +++ b/agent/crates/agent-core/src/lib.rs @@ -41,12 +41,17 @@ pub struct AgentCore { } impl AgentCore { - /// Start all background tasks and block until the shutdown token fires. + /// Executes the agent's background tasks until shutdown is triggered. /// - /// # Parameters - /// - `agent_uuid`: The node UUID assigned during enrollment. Passed into - /// `OsqueryCollector::start` so that every `OsqueryResult` carries the - /// correct identity before it is serialised into an `AgentEvent`. + /// Spawns an OSQuery polling task and a command listener task. Both tasks + /// are given up to 5 seconds to gracefully exit after shutdown is signaled. + /// + /// # Examples + /// + /// ```ignore + /// let core = AgentCore { /* ... */ }; + /// core.run("my-agent-uuid").await?; + /// ``` pub async fn run(&self, agent_uuid: &str) -> Result<()> { let shutdown = self.shutdown.clone(); diff --git a/agent/crates/agent-core/src/orchestrator.rs b/agent/crates/agent-core/src/orchestrator.rs index 1d15031..fb252b4 100644 --- a/agent/crates/agent-core/src/orchestrator.rs +++ b/agent/crates/agent-core/src/orchestrator.rs @@ -5,6 +5,19 @@ use event_buffer::EventBuffer; use osquery_client::types::OsqueryResult; use tokio::sync::mpsc; +/// Runs the agent orchestrator, loading configuration and managing osquery event collection. +/// +/// Reads configuration from the `EDR_AGENT_CONFIG` environment variable +/// (defaulting to `"agent.toml"`), initializes logging, starts collecting events +/// from osquery, and attempts to enroll with the fleet server. Processes collected +/// events until interrupted by Ctrl-C. Fleet enrollment failures do not prevent +/// the agent from continuing to operate offline. +/// +/// # Examples +/// +/// ```no_run +/// run().await?; +/// ``` pub async fn run() -> Result<()> { let config_path = std::env::var("EDR_AGENT_CONFIG").unwrap_or_else(|_| "agent.toml".to_string()); @@ -157,6 +170,14 @@ fn encode_result(result: &OsqueryResult) -> String { serde_json::to_string(&event).unwrap_or_default() } +/// Retrieves the system hostname, or a fallback value if unavailable. +/// +/// # Examples +/// +/// ``` +/// let hostname = hostname_or_default(); +/// assert!(!hostname.is_empty()); +/// ``` fn hostname_or_default() -> String { hostname::get() .ok() @@ -164,6 +185,14 @@ fn hostname_or_default() -> String { .unwrap_or_else(|| "unknown-host".to_string()) } +/// Retrieves the system machine ID or a fallback value if unavailable. +/// +/// # Examples +/// +/// ``` +/// let machine_id = read_machine_id(); +/// assert!(!machine_id.is_empty()); +/// ``` pub fn read_machine_id() -> String { if let Ok(id) = std::fs::read_to_string("/etc/machine-id") { let trimmed = id.trim(); @@ -180,6 +209,16 @@ pub fn read_machine_id() -> String { "unknown-machine-id".to_string() } +/// Retrieves the operating system version string. +/// +/// Reads the system's `/etc/os-release` file to determine the OS version. If the file is missing or cannot be read, returns a descriptive fallback message. +/// +/// # Examples +/// +/// ``` +/// let version = get_os_version(); +/// assert!(!version.is_empty()); +/// ``` pub fn get_os_version() -> String { use std::fs::File; use std::io::{BufRead, BufReader}; diff --git a/agent/crates/agent-core/src/preflight.rs b/agent/crates/agent-core/src/preflight.rs index 4574f0f..2049c2f 100644 --- a/agent/crates/agent-core/src/preflight.rs +++ b/agent/crates/agent-core/src/preflight.rs @@ -13,6 +13,26 @@ pub struct PreflightReport { } impl PreflightReport { + /// Checks whether the system meets all required preflight conditions. + /// + /// Returns `true` if the configuration and log directories are writable, both osqueryd and nft + /// are installed, and the process is running as root. + /// + /// # Examples + /// + /// ``` + /// let report = PreflightReport { + /// config_dir_writable: Ok(()), + /// data_dir_writable: Ok(()), + /// log_dir_writable: Ok(()), + /// osqueryd_installed: Ok("Found".to_string()), + /// nft_installed: Ok("Found".to_string()), + /// bpf_jit_enabled: Ok(true), + /// inotify_watches: Ok(524288), + /// is_root: true, + /// }; + /// assert!(report.is_ok()); + /// ``` pub fn is_ok(&self) -> bool { self.config_dir_writable.is_ok() && self.data_dir_writable.is_ok() @@ -22,6 +42,17 @@ impl PreflightReport { && self.is_root } + /// Prints a human-readable report of the preflight environment checks. + /// + /// Displays the status of all checks (root, directory accessibility, BPF JIT, inotify limits, + /// and required dependencies) using `[OK]`, `[WARN]`, or `[FAIL]` indicators. + /// + /// # Examples + /// + /// ``` + /// let report = run_preflight(&config); + /// report.print(); + /// ``` pub fn print(&self) { println!("Aigis-Zero Agent Pre-flight Environment Check"); @@ -77,6 +108,22 @@ impl PreflightReport { } } +/// Validates the system environment. +/// +/// Checks root privilege, directory writability, BPF JIT status, inotify limits, +/// and the presence of required executables (`osqueryd` and `nft`). +/// +/// # Examples +/// +/// ```no_run +/// let config = /* ... */; +/// let report = run_preflight(&config); +/// if report.is_ok() { +/// println!("Environment is ready"); +/// } else { +/// report.print(); +/// } +/// ``` pub fn run_preflight(config: &crate::config::AgentConfig) -> PreflightReport { let is_root = unsafe { libc::getuid() } == 0; @@ -151,6 +198,14 @@ pub fn run_preflight(config: &crate::config::AgentConfig) -> PreflightReport { } } +/// Checks if a command is available in the system PATH. +/// +/// # Examples +/// +/// ```ignore +/// assert!(which("sh")); +/// assert!(!which("nonexistent_xyz_command")); +/// ``` fn which(cmd: &str) -> bool { Command::new("which") .arg(cmd) diff --git a/agent/crates/fleet-client/src/enrollment.rs b/agent/crates/fleet-client/src/enrollment.rs index 072889d..12c0216 100644 --- a/agent/crates/fleet-client/src/enrollment.rs +++ b/agent/crates/fleet-client/src/enrollment.rs @@ -6,6 +6,20 @@ use edr_sdk::codec::JsonCodec; pub struct AgentEnrollment; impl AgentEnrollment { + /// Enrolls an agent with a remote service. + /// + /// # Errors + /// + /// Returns an error if the enrollment request fails. + /// + /// # Examples + /// + /// ```ignore + /// let channel = Channel::from_static("http://localhost:50051").connect().await?; + /// let request = RegisterRequest { hostname: "my-host".into(), ..Default::default() }; + /// let result = AgentEnrollment::enroll(channel, request).await?; + /// println!("Node ID: {}", result.node_id); + /// ``` pub async fn enroll(channel: Channel, request: RegisterRequest) -> Result { tracing::info!("Enrolling agent: {:?}", request.hostname); diff --git a/agent/crates/fleet-client/src/lib.rs b/agent/crates/fleet-client/src/lib.rs index fe2d867..f362348 100644 --- a/agent/crates/fleet-client/src/lib.rs +++ b/agent/crates/fleet-client/src/lib.rs @@ -31,6 +31,17 @@ pub struct FleetClient { } impl FleetClient { + /// Creates a new fleet client for the specified endpoint. + /// + /// The client is initialized but not connected. Call [`connect`](Self::connect) or + /// [`connect_with_retry`](Self::connect_with_retry) to establish a connection. + /// + /// # Examples + /// + /// ``` + /// let client = FleetClient::new("http://localhost:50051".to_string()); + /// assert!(client.node_id().is_none()); + /// ``` pub fn new(endpoint: String) -> Self { Self { endpoint, @@ -42,6 +53,17 @@ impl FleetClient { } } + /// Connects to the fleet server. + /// + /// If a token is provided, establishes a bidirectional event stream for sending and receiving messages. + /// + /// # Examples + /// + /// ``` + /// let mut client = FleetClient::new("http://fleet.example.com:8080".to_string()); + /// client.connect(Some("auth_token")).await?; + /// # Ok::<(), anyhow::Error>(()) + /// ``` pub async fn connect(&mut self, token: Option<&str>) -> Result<(), anyhow::Error> { info!(endpoint = %self.endpoint, "Connecting to fleet server"); @@ -87,6 +109,20 @@ impl FleetClient { Ok(()) } + /// Establishes a connection to the fleet server with automatic retries on failure. + /// + /// Attempts to connect up to `max_attempts` times, waiting between attempts with + /// delays that increase exponentially with each retry. If `max_attempts` is 0, retries + /// indefinitely until successful. The initial delay is `base_delay`; each subsequent + /// retry multiplies the delay by 2, capped at 2^5 multiplier. + /// + /// # Examples + /// + /// ``` + /// let mut client = FleetClient::new("http://localhost:50051".to_string()); + /// let result = client.connect_with_retry(5, Duration::from_millis(100), Some("token")).await; + /// assert!(result.is_ok()); + /// ``` pub async fn connect_with_retry( &mut self, max_attempts: u32, @@ -110,6 +146,16 @@ impl FleetClient { } } + /// Enrolls the agent with the fleet service. + /// + /// Registers the agent and stores the assigned node ID and authentication token from the response. + /// + /// # Examples + /// + /// ``` + /// let response = client.enroll(request).await?; + /// assert!(!response.node_id.is_empty()); + /// ``` pub async fn enroll( &mut self, request: RegisterRequest, @@ -130,6 +176,19 @@ impl FleetClient { Ok(response) } + /// Sends a batch of events to the fleet server. + /// + /// # Errors + /// + /// Returns an error if the outbound event stream is not connected or if sending fails. + /// + /// # Examples + /// + /// ``` + /// let batch = EventBatch { events: vec![...] }; + /// let ack = client.send_events(&batch).await?; + /// assert!(ack.success); + /// ``` pub async fn send_events(&mut self, batch: &EventBatch) -> Result { let tx = self .outbound_tx @@ -173,6 +232,14 @@ impl FleetClient { }) } + /// Sends a heartbeat to the fleet server with the agent's status and event queue information. + /// + /// # Examples + /// + /// ```no_run + /// let request = HeartbeatRequest { status: "active".to_string(), events_buffered: 5 }; + /// let response = client.heartbeat(&request).await?; + /// ``` pub async fn heartbeat( &mut self, request: &HeartbeatRequest, @@ -201,6 +268,12 @@ impl FleetClient { Ok(HeartbeatResponse { ok: response.ok }) } + /// Attempts to receive a pending server command without blocking. + /// + /// # Returns + /// + /// `Some(ServerCommand)` if a message is available, `None` if the inbound channel + /// is empty, or an error if the channel is disconnected. pub fn try_receive(&mut self) -> Result, anyhow::Error> { let rx = self .inbound_rx @@ -216,6 +289,24 @@ impl FleetClient { } } + /// Waits for the next inbound server command. + /// + /// # Returns + /// + /// `Some(ServerCommand)` if a message is received, `None` if the channel is closed. + /// + /// # Errors + /// + /// Returns an error if not connected. + /// + /// # Examples + /// + /// ``` + /// match client.receive().await? { + /// Some(cmd) => println!("Received: {:?}", cmd), + /// None => println!("Server disconnected"), + /// } + /// ``` pub async fn receive(&mut self) -> Result, anyhow::Error> { let rx = self .inbound_rx @@ -224,10 +315,21 @@ impl FleetClient { Ok(rx.recv().await) } + /// Gets the agent's node identifier. + /// + /// # Examples + /// + /// ``` + /// let client = FleetClient::new("http://localhost:50051".to_string()); + /// assert!(client.node_id().is_none()); // Before enrollment + /// ``` pub fn node_id(&self) -> Option { self.node_id } + /// Retrieves the authentication token. + /// + /// `Some(&str)` containing the token if set, `None` otherwise. pub fn token(&self) -> Option<&str> { self.token.as_deref() } diff --git a/agent/crates/osquery-client/src/bin/test_query.rs b/agent/crates/osquery-client/src/bin/test_query.rs index ef5728d..59311a7 100644 --- a/agent/crates/osquery-client/src/bin/test_query.rs +++ b/agent/crates/osquery-client/src/bin/test_query.rs @@ -8,6 +8,26 @@ use thrift::protocol::{ }; use thrift::transport::TBufferChannel; +/// Tests registerExtension RPC communication with an osquery socket in framed and unframed modes. +/// +/// Establishes connections to `/var/osquery/osquery.em`, builds a Thrift `registerExtension` +/// request with extension info and an empty registry, and sends it in two transmission modes: +/// - **Framed**: Prefixes the payload with a 4-byte big-endian length field. +/// - **Unframed**: Sends the raw payload without a length prefix. +/// +/// For each mode, attempts to read a response within a 2-second timeout and logs the outcome. +/// +/// # Examples +/// +/// ```no_run +/// // Run to test both framed and unframed RPC modes +/// // cargo run +/// ``` +/// +/// # Returns +/// +/// `Ok(())` if both test attempts complete, or an error if socket connection, write operations, +/// or Thrift protocol serialization fail. #[tokio::main] async fn main() -> Result<(), Box> { let socket = Path::new("/var/osquery/osquery.em"); diff --git a/agent/crates/osquery-client/src/client.rs b/agent/crates/osquery-client/src/client.rs index dbc4ac3..6f9cc69 100644 --- a/agent/crates/osquery-client/src/client.rs +++ b/agent/crates/osquery-client/src/client.rs @@ -23,6 +23,17 @@ impl OsqueryClient { }) } + /// Executes a SQL query against the osquery daemon. + /// + /// # Examples + /// + /// ``` + /// # async fn example() -> anyhow::Result<()> { + /// let mut client = OsqueryClient::connect("/var/osquery/osquery.sock").await?; + /// let response = client.query("SELECT * FROM processes").await?; + /// # Ok(()) + /// # } + /// ``` pub async fn query(&mut self, sql: &str) -> Result { tracing::debug!("Executing query: {}", sql); @@ -89,6 +100,11 @@ impl OsqueryClient { } } + /// Deserializes a Thrift-formatted buffer into a query response. + /// + /// Parses the binary buffer according to the Thrift protocol, extracting the query status + /// (code and message) and result rows (as a list of string key-value maps). Returns an error + /// if the buffer contains a Thrift exception or is malformed. fn parse_query_response(buf: &[u8]) -> Result { let mut t = TBufferChannel::with_capacity(buf.len(), 0); t.set_readable_bytes(buf); diff --git a/agent/crates/osquery-client/src/scheduler.rs b/agent/crates/osquery-client/src/scheduler.rs index 051dd30..a449abc 100644 --- a/agent/crates/osquery-client/src/scheduler.rs +++ b/agent/crates/osquery-client/src/scheduler.rs @@ -14,6 +14,18 @@ pub struct QueryScheduler { } impl QueryScheduler { + /// Initializes a QueryScheduler with a SQLite database. + /// + /// If the `scheduled_queries` table is empty, three default queries are seeded: + /// `running_processes`, `listening_ports`, and `users`. + /// + /// # Examples + /// + /// ``` + /// use std::path::Path; + /// let scheduler = QueryScheduler::new(Path::new("/tmp/test.db"))?; + /// # Ok::<(), anyhow::Error>(()) + /// ``` pub fn new(db_path: &Path) -> Result { let conn = Connection::open(db_path)?; diff --git a/fleet-server/crates/fleet-server-bin/src/main.rs b/fleet-server/crates/fleet-server-bin/src/main.rs index e84ebcd..a54a836 100644 --- a/fleet-server/crates/fleet-server-bin/src/main.rs +++ b/fleet-server/crates/fleet-server-bin/src/main.rs @@ -9,6 +9,15 @@ use tokio_util::sync::CancellationToken; use fleet_tracing::{LogFormat, TracingConfig}; use grpc_listener::{FleetServiceImpl, GrpcListenerConfig, GrpcServer, shutdown_signal}; +/// Initializes and runs the Fleet server. +/// +/// Sets up application infrastructure and starts the gRPC server, +/// gracefully shutting down on receipt of SIGINT or SIGTERM. +/// +/// # Errors +/// +/// Returns an error if settings cannot be loaded, tracing initialization fails, +/// or the database connection fails. #[tokio::main] async fn main() -> Result<()> { // Load .env file into standard environment variables. diff --git a/fleet-server/crates/fleet-server-bin/src/ports.rs b/fleet-server/crates/fleet-server-bin/src/ports.rs index 93946e9..4c7ecb0 100644 --- a/fleet-server/crates/fleet-server-bin/src/ports.rs +++ b/fleet-server/crates/fleet-server-bin/src/ports.rs @@ -16,6 +16,20 @@ pub struct KafkaEventIngest { #[async_trait] impl EventIngestPort for KafkaEventIngest { + /// Publishes an incoming event to Kafka and returns an acknowledgment. + /// + /// On successful publication, an acknowledgment command with the event's sequence ID is returned. + /// If publication fails, an internal gRPC error is returned. + /// + /// # Examples + /// + /// ``` + /// match ingest.ingest_event(event).await { + /// Ok(Some(OutgoingCommand::Ack { sequence_id })) => println!("Acked: {}", sequence_id), + /// Err(status) => eprintln!("Failed: {}", status), + /// _ => {} + /// } + /// ``` async fn ingest_event(&self, event: IncomingEvent) -> Result, Status> { let payload = if event.payload.is_empty() { b"{}" @@ -41,6 +55,21 @@ impl EventIngestPort for KafkaEventIngest { } } +/// Constructs the port implementations for the application. +/// +/// # Returns +/// +/// A tuple of (node enroller, health tracker, event ingest port). +/// +/// # Panics +/// +/// Panics if Kafka publisher initialization fails. +/// +/// # Examples +/// +/// ``` +/// let (enroller, tracker, ingest) = build_ports(pool, "secret", "localhost:9092", "events"); +/// ``` pub fn build_ports( pg_pool: sqlx::PgPool, jwt_secret: &str, diff --git a/fleet-server/crates/kafka-handler/src/lib.rs b/fleet-server/crates/kafka-handler/src/lib.rs index 12b269c..1255d46 100644 --- a/fleet-server/crates/kafka-handler/src/lib.rs +++ b/fleet-server/crates/kafka-handler/src/lib.rs @@ -7,6 +7,18 @@ pub struct KafkaPublisher { } impl KafkaPublisher { + /// Initializes a new Kafka publisher connected to the specified brokers. + /// + /// # Examples + /// + /// ``` + /// let publisher = KafkaPublisher::new("localhost:9092")?; + /// # Ok::<(), String>(()) + /// ``` + /// + /// # Errors + /// + /// Returns an error if the Kafka producer cannot be created or configured. pub fn new(brokers: &str) -> Result { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) @@ -17,6 +29,17 @@ impl KafkaPublisher { Ok(Self { producer }) } + /// Publishes a message to Kafka. + /// + /// # Examples + /// + /// ```no_run + /// # async fn example() -> Result<(), String> { + /// let publisher = KafkaPublisher::new("localhost:9092")?; + /// publisher.publish("my-topic", "key1", b"hello").await?; + /// # Ok(()) + /// # } + /// ``` pub async fn publish(&self, topic: &str, key: &str, payload: &[u8]) -> Result<(), String> { let record = FutureRecord::to(topic).key(key).payload(payload); diff --git a/infra/scripts/create-topics.sh b/infra/scripts/create-topics.sh index e3257e0..9e05e0f 100644 --- a/infra/scripts/create-topics.sh +++ b/infra/scripts/create-topics.sh @@ -4,7 +4,7 @@ set -euo pipefail BOOTSTRAP="localhost:29092" KAFKA_BIN="/opt/kafka/bin" -# Function to create topic +# create_topic creates a Kafka topic with the specified partitions, retention time, and replication factor in the aigis-kafka-dev container. create_topic() { local topic=$1 local partitions=$2 diff --git a/kafka-pipeline/src/bin/kafka-admin.rs b/kafka-pipeline/src/bin/kafka-admin.rs index d5a2f0b..68a05bb 100644 --- a/kafka-pipeline/src/bin/kafka-admin.rs +++ b/kafka-pipeline/src/bin/kafka-admin.rs @@ -57,7 +57,25 @@ const TOPICS: &[TopicSpec] = &[ }, ]; -#[tokio::main] +/// Administers Kafka topics by executing a command specified via CLI arguments. +/// +/// Parses command-line arguments to extract the command name and `--brokers` option, creates +/// an admin client connected to the specified brokers, and dispatches to the command handler. +/// Currently supports `create-topics`, which creates the predefined set of topics with their +/// configured partition counts, retention periods, and cleanup policies. If fewer than four +/// arguments are provided, prints usage information without performing any action. Prints a +/// message for each topic creation attempt, indicating success or failure. +/// +/// # Examples +/// +/// ```no_run +/// // Invoke the binary with: +/// // cargo run -- create-topics --brokers localhost:9092 +/// ``` +/// +/// # Errors +/// +/// Returns an error if the admin client cannot be created or if the topic creation request fails. async fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); if args.len() < 4 { diff --git a/kafka-pipeline/src/consumer.rs b/kafka-pipeline/src/consumer.rs index 10d627e..97961e3 100644 --- a/kafka-pipeline/src/consumer.rs +++ b/kafka-pipeline/src/consumer.rs @@ -29,6 +29,36 @@ pub struct ConsumerWorker { } impl ConsumerWorker { + /// Creates and configures a Kafka consumer worker. + /// + /// Initializes a consumer connected to the specified brokers, subscribes to the provided topics, + /// and prepares the worker to process messages via the given processor. + /// + /// # Arguments + /// + /// * `brokers` - Comma-separated Kafka broker addresses + /// * `group_id` - Consumer group identifier for offset management + /// * `topics` - Topics to subscribe to for message consumption + /// * `processor` - Handler for processing received messages + /// * `shutdown` - Cancellation token to trigger graceful shutdown + /// + /// # Errors + /// + /// Returns an error string if consumer creation or topic subscription fails. + /// + /// # Examples + /// + /// ``` + /// let processor = Box::new(MyProcessor); + /// let shutdown = CancellationToken::new(); + /// let worker = ConsumerWorker::new( + /// "localhost:9092", + /// "my-group", + /// &["topic1"], + /// processor, + /// shutdown, + /// )?; + /// ``` pub fn new( brokers: &str, group_id: &str, @@ -60,6 +90,24 @@ impl ConsumerWorker { }) } + /// Continuously processes Kafka messages until shutdown. + /// + /// Enters an infinite loop that awaits either a shutdown signal or the next message from the + /// Kafka stream. For each message, it extracts the key, payload, and metadata (topic, partition, + /// offset), and delegates processing to the configured `MessageProcessor`. If processing fails, + /// the error is logged and the loop continues. + /// + /// # Examples + /// + /// ``` + /// let shutdown = CancellationToken::new(); + /// let processor = Box::new(MyProcessor); + /// let worker = ConsumerWorker::new("localhost:9092", "my-group", &["my-topic"], processor, shutdown.clone()).unwrap(); + /// + /// tokio::spawn(worker.run()); + /// // ... handle messages in background ... + /// shutdown.cancel(); + /// ``` pub async fn run(&self) { use tokio_stream::StreamExt; diff --git a/kafka-pipeline/src/event_router.rs b/kafka-pipeline/src/event_router.rs index 306e8fa..c8f97e5 100644 --- a/kafka-pipeline/src/event_router.rs +++ b/kafka-pipeline/src/event_router.rs @@ -13,10 +13,30 @@ pub struct EventRouterProcessor { } impl EventRouterProcessor { + /// Constructs a new EventRouterProcessor with the provided Kafka producer. + /// + /// # Examples + /// + /// ``` + /// let producer = FutureProducer::new(...); + /// let router = EventRouterProcessor::new(producer); + /// ``` pub fn new(producer: FutureProducer) -> Self { Self { producer } } + /// Maps an event type to its target Kafka topic. + /// + /// # Examples + /// + /// ```ignore + /// let topic = processor.route_topic("process_start"); + /// assert_eq!(topic, "aigis.events.process"); + /// let topic = processor.route_topic("file_create"); + /// assert_eq!(topic, "aigis.events.file"); + /// let topic = processor.route_topic("unknown_event"); + /// assert_eq!(topic, "aigis.events.raw"); + /// ``` fn route_topic(&self, event_type: &str) -> &str { match event_type { "process_start" | "process_end" => "aigis.events.process", @@ -31,6 +51,24 @@ impl EventRouterProcessor { #[async_trait::async_trait] impl MessageProcessor for EventRouterProcessor { + /// Routes events to typed Kafka topics based on their event type. + /// + /// Extracts the `event_type` field from the JSON payload, maps it to a target topic + /// using `route_topic`, and forwards the original payload bytes to Kafka. If the + /// `event_type` field is missing or not a string, defaults to `"unknown"`. + /// + /// # Errors + /// + /// Returns an error message if the payload is not valid JSON or the Kafka send operation fails. + /// + /// # Examples + /// + /// ``` + /// # async fn example(processor: &EventRouterProcessor) { + /// let payload = br#"{"event_type": "process_start", "pid": 1234}"#; + /// assert!(processor.process(None, payload, "input", 0, 0).await.is_ok()); + /// # } + /// ``` async fn process( &self, key: Option<&[u8]>, diff --git a/kafka-pipeline/src/health.rs b/kafka-pipeline/src/health.rs index cb061e5..e2333f1 100644 --- a/kafka-pipeline/src/health.rs +++ b/kafka-pipeline/src/health.rs @@ -1,6 +1,12 @@ #![allow(dead_code)] -/// Health check module +/// Indicates the health status of the system. +/// +/// # Examples +/// +/// ``` +/// assert!(is_healthy()); +/// ``` pub fn is_healthy() -> bool { true } diff --git a/kafka-pipeline/src/main.rs b/kafka-pipeline/src/main.rs index 620f411..00748e5 100644 --- a/kafka-pipeline/src/main.rs +++ b/kafka-pipeline/src/main.rs @@ -4,7 +4,15 @@ use tracing::info; pub mod consumer; pub mod event_router; -#[tokio::main] +/// Initializes and runs the Kafka event routing pipeline. +/// +/// Consumes events from the `aigis.events.raw` topic and processes them through the event router. +/// The Kafka broker addresses are read from the `KAFKA_BROKERS` environment variable +/// (defaults to `localhost:29092`). The pipeline gracefully shuts down upon receiving SIGINT (Ctrl+C). +/// +/// # Errors +/// +/// Returns an error if the consumer worker fails to initialize. async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter("info") diff --git a/kafka-pipeline/src/metrics.rs b/kafka-pipeline/src/metrics.rs index 2a51ef7..dfc7cd0 100644 --- a/kafka-pipeline/src/metrics.rs +++ b/kafka-pipeline/src/metrics.rs @@ -8,12 +8,33 @@ pub struct LagMonitor { } impl LagMonitor { + /// Constructs a new LagMonitor with the provided Kafka consumer. + /// + /// # Examples + /// + /// ``` + /// use rdkafka::consumer::StreamConsumer; + /// let consumer = StreamConsumer::new(&Default::default()).unwrap(); + /// let monitor = LagMonitor::new(consumer); + /// ``` pub fn new(consumer: StreamConsumer) -> Self { Self { _consumer: consumer, } } + /// Retrieves the consumer lag for a Kafka consumer group. + /// + /// # Arguments + /// + /// * `group_id` - The consumer group ID for which to retrieve lag. + /// + /// # Examples + /// + /// ```ignore + /// let lag = monitor.get_consumer_lag("my-group").await?; + /// assert!(lag >= 0); + /// ``` pub async fn get_consumer_lag(&self, _group_id: &str) -> Result { // Fetch committed offsets // Fetch latest offsets (watermarks) diff --git a/sdk/build.rs b/sdk/build.rs index b4fd430..dac8300 100644 --- a/sdk/build.rs +++ b/sdk/build.rs @@ -1,3 +1,21 @@ +/// Generates Rust code from protobuf definitions at build time. +/// +/// Compiles the fleet, agent, and events protocol buffer files using Tonic and Prost code generation. +/// This function is invoked automatically by Cargo during the build process. +/// +/// # Examples +/// +/// This function runs automatically when building the SDK: +/// +/// ```sh +/// cargo build +/// ``` +/// The generated Rust code from `proto/fleet.proto`, `proto/agent.proto`, and `proto/events.proto` +/// is made available to the crate. +/// +/// # Errors +/// +/// Propagates any compilation errors from the protobuf compiler. fn main() -> Result<(), Box> { tonic_prost_build::configure().compile_protos( &[ diff --git a/sdk/src/codec.rs b/sdk/src/codec.rs index 727dc27..8bdcc84 100644 --- a/sdk/src/codec.rs +++ b/sdk/src/codec.rs @@ -8,6 +8,13 @@ use tonic::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; pub struct JsonCodec(PhantomData<(T, U)>); impl Default for JsonCodec { + /// Creates a default JSON codec. + /// + /// # Examples + /// + /// ``` + /// let codec: JsonCodec = Default::default(); + /// ``` fn default() -> Self { Self(PhantomData) } @@ -26,10 +33,26 @@ where type Encoder = JsonEncoder; type Decoder = JsonDecoder; + /// Creates a JSON encoder for this codec. + /// + /// # Examples + /// + /// ``` + /// let mut codec = JsonCodec::default(); + /// let _encoder = codec.encoder(); + /// ``` fn encoder(&mut self) -> Self::Encoder { JsonEncoder(PhantomData) } + /// Creates a new JSON decoder for this codec. + /// + /// # Examples + /// + /// ``` + /// let mut codec = JsonCodec::default(); + /// let decoder = codec.decoder(); + /// ``` fn decoder(&mut self) -> Self::Decoder { JsonDecoder(PhantomData) } @@ -42,6 +65,11 @@ where type Item = T; type Error = Status; + /// Encodes an item to JSON and appends it to the destination buffer. + /// + /// # Errors + /// + /// Returns an internal `Status` error if the item cannot be serialized to JSON. fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { let bytes = serde_json::to_vec(&item).map_err(|e| Status::internal(e.to_string()))?; dst.put_slice(&bytes); @@ -56,6 +84,22 @@ where type Item = U; type Error = Status; + /// Decodes a JSON-encoded message from the buffer. + /// + /// Returns `None` if the buffer is empty. Otherwise, deserializes all remaining + /// bytes as a JSON message of type `U`. + /// + /// # Errors + /// + /// Returns a `Status::internal` error if JSON deserialization fails. + /// + /// # Examples + /// + /// ``` + /// let mut decoder = JsonDecoder::(PhantomData); + /// let mut empty_buffer = DecodeBuf::new(&[]); + /// assert_eq!(decoder.decode(&mut empty_buffer), Ok(None)); + /// ``` fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result, Self::Error> { if !src.has_remaining() { return Ok(None);