diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml new file mode 100644 index 0000000000..8458aa4642 --- /dev/null +++ b/.github/workflows/benchmark.yaml @@ -0,0 +1,127 @@ +name: Performance Benchmark +on: + pull_request: + types: [opened, synchronize, reopened] + paths-ignore: + - '*.md' + - '.github/workflows/*' + workflow_call: + inputs: + after_image: + required: true + type: string + before_image: + required: false + type: string + +concurrency: + group: benchmark-${{ github.ref }} + cancel-in-progress: true + +jobs: + benchmark: + runs-on: ubuntu-large + permissions: + pull-requests: write + contents: read + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.25" + + - name: Install Kind + run: | + curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.27.0/kind-linux-amd64 + chmod +x ./kind + sudo mv ./kind /usr/local/bin/kind + + - name: Install Helm + run: | + curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash + + - name: Resolve before image + id: before-image + env: + GH_TOKEN: ${{ github.token }} + run: | + if [[ -n "${{ inputs.before_image }}" ]]; then + echo "BEFORE_IMAGE=${{ inputs.before_image }}" >> "$GITHUB_OUTPUT" + else + LATEST_TAG=$(gh api repos/${{ github.repository }}/releases/latest --jq '.tag_name') + echo "BEFORE_IMAGE=quay.io/kubescape/node-agent:${LATEST_TAG}" >> "$GITHUB_OUTPUT" + fi + + - name: Build after image + id: after-image + if: ${{ !inputs.after_image }} + run: | + curl https://github.com/inspektor-gadget/inspektor-gadget/releases/download/v0.48.1/ig_0.48.1_amd64.deb -LO && sudo dpkg -i ig_0.48.1_amd64.deb + make gadgets + make binary + make docker-build IMAGE=quay.io/kubescape/node-agent TAG=bench-${{ github.sha }} + echo "AFTER_IMAGE=quay.io/kubescape/node-agent:bench-${{ github.sha }}" >> "$GITHUB_OUTPUT" + + - name: Set after image from input + id: after-image-input + if: ${{ inputs.after_image }} + run: | + echo "AFTER_IMAGE=${{ inputs.after_image }}" >> "$GITHUB_OUTPUT" + + - name: Determine after image + id: resolve-after + run: | + AFTER="${{ steps.after-image.outputs.AFTER_IMAGE || steps.after-image-input.outputs.AFTER_IMAGE }}" + echo "AFTER_IMAGE=${AFTER}" >> "$GITHUB_OUTPUT" + + - name: Load after image into Kind + if: ${{ !inputs.after_image }} + run: | + # Kind cluster is created by dedup-bench.sh, but we need to pre-load the image + # The script's load_image function handles this automatically + echo "After image will be loaded by dedup-bench.sh" + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install Python dependencies + run: pip install -r benchmark/requirements.txt + + - name: Run benchmark + env: + BEFORE_IMAGE: ${{ steps.before-image.outputs.BEFORE_IMAGE }} + AFTER_IMAGE: ${{ steps.resolve-after.outputs.AFTER_IMAGE }} + OUTPUT_DIR: ${{ github.workspace }}/benchmark-output + run: | + chmod +x benchmark/dedup-bench.sh + benchmark/dedup-bench.sh "$BEFORE_IMAGE" "$AFTER_IMAGE" + + - name: Generate markdown report + if: always() + run: | + python3 benchmark/compare-metrics.py --format markdown \ + "${{ github.workspace }}/benchmark-output/before" \ + "${{ github.workspace }}/benchmark-output/after" > report.md || true + + - name: Comment on PR + uses: peter-evans/create-or-update-comment@v4 + if: github.event_name == 'pull_request' && always() + with: + issue-number: ${{ github.event.pull_request.number }} + body-path: report.md + comment-tag: benchmark-results + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + if: always() + with: + name: benchmark-results + path: ${{ github.workspace }}/benchmark-output/ + retention-days: 30 diff --git a/.github/workflows/incluster-comp-pr-merged.yaml b/.github/workflows/incluster-comp-pr-merged.yaml index dcf1338c44..1e51b6f042 100644 --- a/.github/workflows/incluster-comp-pr-merged.yaml +++ b/.github/workflows/incluster-comp-pr-merged.yaml @@ -336,6 +336,14 @@ jobs: path: failed_*.txt retention-days: 7 + benchmark: + needs: docker-build + if: ${{ contains(github.event.pull_request.labels.*.name, 'release') }} + uses: ./.github/workflows/benchmark.yaml + with: + after_image: ${{ inputs.IMAGE_NAME }}:${{ needs.docker-build.outputs.IMAGE_TAG_PRERELEASE }} + secrets: inherit + create-release-and-retag: if: ${{ contains(github.event.pull_request.labels.*.name, 'release') && always() && contains(needs.*.result, 'success') && !(contains(needs.*.result, 'failure')) && !(contains (needs.*.result,'cancelled')) || inputs.FORCE }} name: Docker retag and create release diff --git a/benchmark/compare-metrics.py b/benchmark/compare-metrics.py new file mode 100644 index 0000000000..ee4ea61464 --- /dev/null +++ b/benchmark/compare-metrics.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +""" +Compare before/after eBPF dedup benchmark metrics. + +Usage: python3 compare-metrics.py [--format text|markdown] + +Reads cpu_metrics.csv, memory_metrics.csv, dedup_total.json, and events_total.json +produced by dedup-bench.sh and prints a side-by-side comparison table. +""" + +import argparse +import json +import sys +from pathlib import Path + +import pandas as pd + +SIGNIFICANT_THRESHOLD = 10.0 # percent change considered significant + + +def load_csv(directory: Path, name: str) -> pd.DataFrame: + path = directory / name + if not path.exists(): + print(f"Warning: {path} not found", file=sys.stderr) + return pd.DataFrame(columns=["Time", "Pod", "Value"]) + return pd.read_csv(path) + + +def load_json(directory: Path, name: str) -> dict | None: + path = directory / name + if not path.exists(): + return None + with open(path) as f: + return json.load(f) + + +def compute_resource_stats(df: pd.DataFrame) -> dict: + """Filter to node-agent pods and compute avg/peak.""" + na = df[df["Pod"].str.contains("node-agent", na=False)] + if na.empty: + return {"avg": 0.0, "peak": 0.0} + return {"avg": na["Value"].mean(), "peak": na["Value"].max()} + + +def format_delta(before: float, after: float) -> str: + if before == 0: + return "N/A" + pct = (after - before) / before * 100 + sign = "+" if pct >= 0 else "" + return f"{sign}{pct:.1f}%" + + +def format_delta_md(before: float, after: float) -> str: + """Format delta for markdown, bolding significant changes.""" + if before == 0: + return "N/A" + pct = (after - before) / before * 100 + sign = "+" if pct >= 0 else "" + text = f"{sign}{pct:.1f}%" + if abs(pct) >= SIGNIFICANT_THRESHOLD: + return f"**{text}**" + return text + + +# --------------------------------------------------------------- +# Text output (original format) +# --------------------------------------------------------------- + +def print_resource_table_text(before_dir: Path, after_dir: Path) -> None: + before_cpu = compute_resource_stats(load_csv(before_dir, "cpu_metrics.csv")) + after_cpu = compute_resource_stats(load_csv(after_dir, "cpu_metrics.csv")) + before_mem = compute_resource_stats(load_csv(before_dir, "memory_metrics.csv")) + after_mem = compute_resource_stats(load_csv(after_dir, "memory_metrics.csv")) + + rows = [ + ("Avg CPU (cores)", before_cpu["avg"], after_cpu["avg"]), + ("Peak CPU (cores)", before_cpu["peak"], after_cpu["peak"]), + ("Avg Memory (MiB)", before_mem["avg"], after_mem["avg"]), + ("Peak Memory (MiB)", before_mem["peak"], after_mem["peak"]), + ] + + print(" Node-Agent Resource Usage") + print(" " + "-" * 55) + print(f" {'Metric':<22}{'BEFORE':>12}{'AFTER':>12}{'Delta':>12}") + print(" " + "-" * 55) + for label, bv, av in rows: + print(f" {label:<22}{bv:>12.3f}{av:>12.3f}{format_delta(bv, av):>12}") + print() + + +def print_dedup_table_text(after_dir: Path) -> None: + data = load_json(after_dir, "dedup_total.json") + if not data or data.get("status") != "success": + print(" Dedup Effectiveness: no data available\n") + return + + results = data.get("data", {}).get("result", []) + if not results: + print(" Dedup Effectiveness: no data available\n") + return + + by_type = _aggregate_dedup(results) + + print(" Dedup Effectiveness (AFTER only)") + print(" " + "-" * 55) + print(f" {'Event Type':<16}{'Passed':>10}{'Deduped':>10}{'Ratio':>10}") + print(" " + "-" * 55) + for et in sorted(by_type): + passed = by_type[et]["passed"] + deduped = by_type[et]["deduplicated"] + total = passed + deduped + ratio = f"{deduped / total * 100:.1f}%" if total > 0 else "N/A" + print(f" {et:<16}{passed:>10.0f}{deduped:>10.0f}{ratio:>10}") + print() + + +def print_event_comparison_text(before_dir: Path, after_dir: Path) -> None: + before_counters = _extract_counters(load_json(before_dir, "events_total.json")) + after_counters = _extract_counters(load_json(after_dir, "events_total.json")) + all_names = sorted(set(before_counters) | set(after_counters)) + + if not all_names: + return + + print(" Event Counters") + print(" " + "-" * 55) + print(f" {'Metric':<35}{'BEFORE':>10}{'AFTER':>10}") + print(" " + "-" * 55) + for name in all_names: + bv = before_counters.get(name, 0.0) + av = after_counters.get(name, 0.0) + short = name.replace("node_agent_", "") + print(f" {short:<35}{bv:>10.0f}{av:>10.0f}") + print() + + +# --------------------------------------------------------------- +# Markdown output (for PR comments) +# --------------------------------------------------------------- + +def print_resource_table_md(before_dir: Path, after_dir: Path) -> None: + before_cpu = compute_resource_stats(load_csv(before_dir, "cpu_metrics.csv")) + after_cpu = compute_resource_stats(load_csv(after_dir, "cpu_metrics.csv")) + before_mem = compute_resource_stats(load_csv(before_dir, "memory_metrics.csv")) + after_mem = compute_resource_stats(load_csv(after_dir, "memory_metrics.csv")) + + rows = [ + ("Avg CPU (cores)", before_cpu["avg"], after_cpu["avg"]), + ("Peak CPU (cores)", before_cpu["peak"], after_cpu["peak"]), + ("Avg Memory (MiB)", before_mem["avg"], after_mem["avg"]), + ("Peak Memory (MiB)", before_mem["peak"], after_mem["peak"]), + ] + + print("
") + print("Node-Agent Resource Usage") + print() + print("| Metric | BEFORE | AFTER | Delta |") + print("|--------|-------:|------:|------:|") + for label, bv, av in rows: + print(f"| {label} | {bv:.3f} | {av:.3f} | {format_delta_md(bv, av)} |") + print() + print("
") + print() + + +def print_dedup_table_md(after_dir: Path) -> None: + data = load_json(after_dir, "dedup_total.json") + if not data or data.get("status") != "success": + print("
") + print("Dedup Effectiveness") + print() + print("No data available.") + print() + print("
") + print() + return + + results = data.get("data", {}).get("result", []) + if not results: + print("
") + print("Dedup Effectiveness") + print() + print("No data available.") + print() + print("
") + print() + return + + by_type = _aggregate_dedup(results) + + print("
") + print("Dedup Effectiveness (AFTER only)") + print() + print("| Event Type | Passed | Deduped | Ratio |") + print("|------------|-------:|--------:|------:|") + for et in sorted(by_type): + passed = by_type[et]["passed"] + deduped = by_type[et]["deduplicated"] + total = passed + deduped + ratio = f"{deduped / total * 100:.1f}%" if total > 0 else "N/A" + print(f"| {et} | {passed:.0f} | {deduped:.0f} | {ratio} |") + print() + print("
") + print() + + +def print_event_comparison_md(before_dir: Path, after_dir: Path) -> None: + before_counters = _extract_counters(load_json(before_dir, "events_total.json")) + after_counters = _extract_counters(load_json(after_dir, "events_total.json")) + all_names = sorted(set(before_counters) | set(after_counters)) + + if not all_names: + return + + print("
") + print("Event Counters") + print() + print("| Metric | BEFORE | AFTER |") + print("|--------|-------:|------:|") + for name in all_names: + bv = before_counters.get(name, 0.0) + av = after_counters.get(name, 0.0) + short = name.replace("node_agent_", "") + print(f"| {short} | {bv:.0f} | {av:.0f} |") + print() + print("
") + print() + + +# --------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------- + +def _aggregate_dedup(results: list) -> dict: + by_type: dict[str, dict[str, float]] = {} + for item in results: + et = item["metric"].get("event_type", "unknown") + result = item["metric"].get("result", "unknown") + value = float(item["value"][1]) if len(item.get("value", [])) > 1 else 0.0 + by_type.setdefault(et, {"passed": 0.0, "deduplicated": 0.0}) + by_type[et][result] = value + return by_type + + +def _extract_counters(data: dict | None) -> dict[str, float]: + if not data or data.get("status") != "success": + return {} + counters: dict[str, float] = {} + for item in data.get("data", {}).get("result", []): + name = item["metric"].get("__name__", "") + value = float(item["value"][1]) if len(item.get("value", [])) > 1 else 0.0 + counters[name] = counters.get(name, 0.0) + value + return counters + + +# --------------------------------------------------------------- +# Main +# --------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="Compare before/after eBPF dedup benchmark metrics." + ) + parser.add_argument( + "--format", + choices=["text", "markdown"], + default="text", + help="Output format (default: text)", + ) + parser.add_argument("before_dir", type=Path, help="Directory with before metrics") + parser.add_argument("after_dir", type=Path, help="Directory with after metrics") + args = parser.parse_args() + + for d in (args.before_dir, args.after_dir): + if not d.is_dir(): + print(f"Error: {d} is not a directory", file=sys.stderr) + sys.exit(1) + + if args.format == "markdown": + print("## Performance Benchmark Results") + print() + print_resource_table_md(args.before_dir, args.after_dir) + print_dedup_table_md(args.after_dir) + print_event_comparison_md(args.before_dir, args.after_dir) + else: + print() + print("=" * 61) + print(" eBPF Dedup Benchmark Results") + print("=" * 61) + print() + print_resource_table_text(args.before_dir, args.after_dir) + print_dedup_table_text(args.after_dir) + print_event_comparison_text(args.before_dir, args.after_dir) + print("=" * 61) + print() + + +if __name__ == "__main__": + main() diff --git a/benchmark/dedup-bench.sh b/benchmark/dedup-bench.sh new file mode 100755 index 0000000000..2f93cda951 --- /dev/null +++ b/benchmark/dedup-bench.sh @@ -0,0 +1,439 @@ +#!/usr/bin/env bash +set -euo pipefail + +# ============================================================= +# eBPF Dedup Benchmark — Kind Cluster Performance Test +# +# Usage: ./dedup-bench.sh +# Example: ./dedup-bench.sh quay.io/kubescape/node-agent:baseline quay.io/kubescape/node-agent:dedup +# +# Environment variable alternatives (for CI): +# BEFORE_IMAGE=quay.io/kubescape/node-agent:v1 \ +# AFTER_IMAGE=quay.io/kubescape/node-agent:v2 \ +# ./dedup-bench.sh +# +# For private (armo) chart: +# HELM_MODE=armo ARMO_ACCOUNT=... ARMO_ACCESS_KEY=... \ +# ARMO_IMAGE_PULL_SECRET=... ARMO_SERVER=api-dev.armosec.io \ +# ./dedup-bench.sh quay.io/armosec/node-agent:v0.0.240 quay.io/armosec/node-agent:test +# +# Estimated runtime: ~35 minutes +# ============================================================= + +CLUSTER_NAME="dedup-bench" +KUBESCAPE_NS="kubescape" +MONITORING_NS="monitoring" +HELM_MODE="${HELM_MODE:-kubescape}" # "kubescape" (default) or "armo" (private chart) +LOAD_DURATION=600 # 10 minutes +WARMUP_SECONDS=120 # 2 minutes +METRICS_DURATION=10 # minutes (matches LOAD_DURATION, excludes warmup) +PROM_LOCAL_PORT=9090 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +OUTPUT_BASE="${OUTPUT_DIR:-${SCRIPT_DIR}/dedup-bench-output}" + +PORT_FORWARD_PID="" + +# --------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------- + +log() { echo "==> [$(date +%H:%M:%S)] $*"; } +die() { echo "ERROR: $*" >&2; exit 1; } + +split_image() { + # Split image into repo and tag. Default tag to "latest". + local image="$1" + if [[ "$image" == *:* ]]; then + echo "${image%:*}" "${image##*:}" + else + echo "$image" "latest" + fi +} + +# --------------------------------------------------------------- +# check_prerequisites +# --------------------------------------------------------------- +check_prerequisites() { + log "Checking prerequisites..." + local missing=() + for cmd in kind helm kubectl docker python3; do + command -v "$cmd" &>/dev/null || missing+=("$cmd") + done + [[ ${#missing[@]} -eq 0 ]] || die "Missing required tools: ${missing[*]}" + + # Set up Python venv with deps + local venv_dir="${SCRIPT_DIR}/.venv" + if [[ ! -d "$venv_dir" ]]; then + log "Creating Python venv and installing dependencies..." + python3 -m venv "$venv_dir" + "$venv_dir/bin/pip" install -r "$SCRIPT_DIR/requirements.txt" + fi + # Use the venv python for the rest of the script + export PATH="${venv_dir}/bin:${PATH}" + + local cpus + cpus=$(nproc) + if (( cpus < 8 )); then + echo "WARNING: Host has $cpus CPUs (recommended >= 8). Results may be noisy." + fi + + local mem_gb + mem_gb=$(free -g | awk '/^Mem:/{print $2}') + if (( mem_gb < 16 )); then + echo "WARNING: Host has ${mem_gb}GB RAM (recommended >= 16). Results may be noisy." + fi +} + +# --------------------------------------------------------------- +# create_kind_cluster (idempotent) +# --------------------------------------------------------------- +create_kind_cluster() { + log "Creating kind cluster '$CLUSTER_NAME'..." + kind delete cluster --name "$CLUSTER_NAME" 2>/dev/null || true + + local tmpfile + tmpfile=$(mktemp /tmp/kind-config-XXXX.yaml) + cat > "$tmpfile" <<'EOF' +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: +- role: control-plane + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + system-reserved: cpu=500m,memory=512Mi +- role: worker +EOF + kind create cluster --name "$CLUSTER_NAME" --wait 5m --config "$tmpfile" + rm -f "$tmpfile" + log "Kind cluster ready." +} + +# --------------------------------------------------------------- +# install_prometheus +# --------------------------------------------------------------- +install_prometheus() { + log "Installing Prometheus..." + helm repo add --force-update prometheus-community https://prometheus-community.github.io/helm-charts + helm repo update + helm upgrade --install prometheus prometheus-community/kube-prometheus-stack \ + --namespace "$MONITORING_NS" --create-namespace --wait --timeout 5m \ + --set grafana.enabled=false \ + --set alertmanager.enabled=false \ + --set prometheus.prometheusSpec.scrapeInterval=10s \ + --set prometheus.prometheusSpec.podMonitorSelectorNilUsesHelmValues=false \ + --set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false + + kubectl wait --for=condition=Ready pod -l app.kubernetes.io/name=prometheus \ + -n "$MONITORING_NS" --timeout=300s + log "Prometheus ready." +} + +# --------------------------------------------------------------- +# install_kubescape / swap_image +# --------------------------------------------------------------- +_helm_install_kubescape() { + local repo="$1" tag="$2" + + if [[ "$HELM_MODE" == "armo" ]]; then + _helm_install_armo "$repo" "$tag" + else + _helm_install_kubescape_oss "$repo" "$tag" + fi +} + +_helm_install_kubescape_oss() { + local repo="$1" tag="$2" + helm repo add --force-update kubescape https://kubescape.github.io/helm-charts + helm repo update + helm upgrade --install kubescape kubescape/kubescape-operator \ + -n "$KUBESCAPE_NS" --create-namespace --wait --timeout 5m \ + --set clusterName="kind-${CLUSTER_NAME}" \ + --set capabilities.runtimeDetection=enable \ + --set capabilities.runtimeObservability=enable \ + --set capabilities.malwareDetection=disable \ + --set capabilities.prometheusExporter=enable \ + --set nodeAgent.config.prometheusExporter=enable \ + --set nodeAgent.serviceMonitor.enabled=true \ + --set nodeAgent.config.stdoutExporter=true \ + --set nodeAgent.config.httpExporterConfig=null \ + --set alertCRD.scopeClustered=true \ + --set nodeAgent.image.repository="$repo" \ + --set nodeAgent.image.tag="$tag" \ + --set nodeAgent.image.pullPolicy=IfNotPresent +} + +_helm_install_armo() { + local repo="$1" tag="$2" + + # Validate required env vars for armo mode + : "${ARMO_ACCOUNT:?ARMO_ACCOUNT required for HELM_MODE=armo}" + : "${ARMO_ACCESS_KEY:?ARMO_ACCESS_KEY required for HELM_MODE=armo}" + : "${ARMO_IMAGE_PULL_SECRET:?ARMO_IMAGE_PULL_SECRET required for HELM_MODE=armo}" + : "${ARMO_SERVER:?ARMO_SERVER required for HELM_MODE=armo}" + + helm repo add --force-update armosec https://armosec.github.io/helm-charts/ + helm repo update + # Use IfNotPresent only when the image is pre-loaded into kind; otherwise Always + local pull_policy="Always" + if docker image inspect "${repo}:${tag}" &>/dev/null; then + pull_policy="IfNotPresent" + fi + + helm upgrade --install kubescape armosec/armosec-kubescape-operator \ + -n "$KUBESCAPE_NS" --create-namespace --wait --timeout 10m \ + --set kubescape-operator.account="${ARMO_ACCOUNT}" \ + --set kubescape-operator.accessKey="${ARMO_ACCESS_KEY}" \ + --set kubescape-operator.imagePullSecret.password="${ARMO_IMAGE_PULL_SECRET}" \ + --set kubescape-operator.server="${ARMO_SERVER}" \ + --set kubescape-operator.clusterName="kind-${CLUSTER_NAME}" \ + --set kubescape-operator.capabilities.runtimeDetection=enable \ + --set kubescape-operator.capabilities.runtimeObservability=enable \ + --set kubescape-operator.capabilities.malwareDetection=disable \ + --set kubescape-operator.capabilities.prometheusExporter=enable \ + --set kubescape-operator.nodeAgent.config.prometheusExporter=enable \ + --set kubescape-operator.nodeAgent.serviceMonitor.enabled=true \ + --set kubescape-operator.nodeAgent.config.stdoutExporter=true \ + --set kubescape-operator.nodeAgent.config.httpExporterConfig=null \ + --set kubescape-operator.alertCRD.scopeClustered=true \ + --set kubescape-operator.nodeAgent.image.repository="$repo" \ + --set kubescape-operator.nodeAgent.image.tag="$tag" \ + --set kubescape-operator.nodeAgent.image.pullPolicy="$pull_policy" +} + +install_kubescape() { + log "Installing Kubescape with node-agent $1:$2..." + _helm_install_kubescape "$1" "$2" + if ! kubectl wait --for=condition=Ready pod -l app.kubernetes.io/component=node-agent \ + -n "$KUBESCAPE_NS" --timeout=600s; then + log "ERROR: node-agent pod did not become ready. Diagnostics:" + kubectl get pods -n "$KUBESCAPE_NS" -o wide + kubectl describe pod -l app.kubernetes.io/component=node-agent -n "$KUBESCAPE_NS" | tail -60 + kubectl logs -l app.kubernetes.io/component=node-agent -n "$KUBESCAPE_NS" --tail=40 2>/dev/null || true + die "node-agent pod failed to become ready" + fi + log "Kubescape ready." +} + +swap_image() { + log "Swapping node-agent image to $1:$2..." + _helm_install_kubescape "$1" "$2" + kubectl rollout status daemonset/node-agent -n "$KUBESCAPE_NS" --timeout=600s + log "Node-agent rollout complete." +} + +# --------------------------------------------------------------- +# load_image — preload into kind if available locally +# --------------------------------------------------------------- +load_image() { + local image="$1" + if docker image inspect "$image" &>/dev/null; then + log "Loading $image into kind cluster..." + kind load docker-image "$image" --name "$CLUSTER_NAME" + else + log "Image $image not found locally, assuming registry-pullable." + fi +} + +# --------------------------------------------------------------- +# deploy / remove load simulator +# --------------------------------------------------------------- +deploy_load_simulator() { + log "Deploying load simulator..." + kubectl create namespace load-simulator --dry-run=client -o yaml | kubectl apply -f - + + cat <<'EOF' > /tmp/load-sim-config.yaml +cpuLoadMs: 500 +numberParallelCPUs: 2 +dnsRate: 2 +execRate: 10 +hardlinkRate: 10 +httpRate: 100 +networkRate: 10 +openRate: 1000 +symlinkRate: 10 +EOF + kubectl create configmap config --from-file=config.yaml=/tmp/load-sim-config.yaml \ + -n load-simulator --dry-run=client -o yaml | kubectl apply -f - + + kubectl apply -f "$SCRIPT_DIR/load-simulator/daemonset.yaml" -n load-simulator + kubectl wait --for=condition=ready pod -l app=load-simulator \ + -n load-simulator --timeout=300s + log "Load simulator running." +} + +remove_load_simulator() { + log "Removing load simulator..." + kubectl delete namespace load-simulator --wait=false 2>/dev/null || true +} + +# --------------------------------------------------------------- +# port-forward helpers +# --------------------------------------------------------------- +start_port_forward() { + log "Starting Prometheus port-forward on :$PROM_LOCAL_PORT..." + kubectl port-forward svc/prometheus-kube-prometheus-prometheus \ + "$PROM_LOCAL_PORT":9090 -n "$MONITORING_NS" & + PORT_FORWARD_PID=$! + + # Wait for readiness + local retries=30 + while (( retries > 0 )); do + if curl -sf "http://localhost:$PROM_LOCAL_PORT/-/ready" &>/dev/null; then + log "Prometheus port-forward ready." + return 0 + fi + sleep 1 + (( retries-- )) + done + die "Prometheus port-forward failed to become ready." +} + +stop_port_forward() { + if [[ -n "$PORT_FORWARD_PID" ]]; then + kill "$PORT_FORWARD_PID" 2>/dev/null || true + wait "$PORT_FORWARD_PID" 2>/dev/null || true + PORT_FORWARD_PID="" + log "Port-forward stopped." + fi +} + +# --------------------------------------------------------------- +# collect_metrics +# --------------------------------------------------------------- +collect_metrics() { + local output_dir="$1" + mkdir -p "$output_dir" + + log "Collecting Prometheus metrics into $output_dir..." + + # Collect CPU/memory via existing PrometheusMetricsCollector + OUTPUT_DIR="$output_dir" DURATION_TIME="$METRICS_DURATION" python3 -c " +import sys; sys.path.insert(0, '${SCRIPT_DIR}') +from get_data_from_prometheus import PrometheusMetricsCollector, PrometheusConfig +config = PrometheusConfig() +config.url = 'http://localhost:${PROM_LOCAL_PORT}' +config.rate_window = '1m' +collector = PrometheusMetricsCollector(config=config) +collector.run() +" + + # Collect dedup-specific and event counter metrics + python3 -c " +import requests, json, os +from datetime import datetime, timedelta, timezone + +url = 'http://localhost:${PROM_LOCAL_PORT}' +end = datetime.now(timezone.utc) +start = end - timedelta(minutes=${METRICS_DURATION}) +queries = { + 'dedup_total': 'sum by (event_type, result) (increase(node_agent_dedup_events_total[${METRICS_DURATION}m]))', + 'events_total': '{__name__=~\"node_agent_(exec|open|dns|network|syscall|capability)_counter\"}', + 'rule_total': 'sum by (rule_id) (increase(node_agent_rule_counter[${METRICS_DURATION}m]))', +} +output_dir = '${output_dir}' +os.makedirs(output_dir, exist_ok=True) +for name, query in queries.items(): + try: + resp = requests.get(f'{url}/api/v1/query', params={'query': query, 'time': end.isoformat()}, timeout=30) + resp.raise_for_status() + data = resp.json() + if data.get('status') != 'success': + print(f'Warning: {name}: Prometheus returned status={data.get("status")}') + with open(os.path.join(output_dir, f'{name}.json'), 'w') as f: + json.dump(data, f, indent=2) + print(f'Collected {name}') + except Exception as e: + print(f'Warning: {name}: {e}') +" + + log "Metrics collection complete." +} + +# --------------------------------------------------------------- +# cleanup (trap EXIT) +# --------------------------------------------------------------- +cleanup() { + log "Cleaning up..." + stop_port_forward + remove_load_simulator + kind delete cluster --name "$CLUSTER_NAME" 2>/dev/null || true + log "Done." +} + +# --------------------------------------------------------------- +# Main +# --------------------------------------------------------------- +main() { + # Support both positional args and environment variables + local before_image="${1:-${BEFORE_IMAGE:-}}" + local after_image="${2:-${AFTER_IMAGE:-}}" + + if [[ -z "$before_image" || -z "$after_image" ]]; then + echo "Usage: $0 " + echo " or: BEFORE_IMAGE=... AFTER_IMAGE=... $0" + echo "" + echo " e.g. $0 quay.io/kubescape/node-agent:baseline quay.io/kubescape/node-agent:dedup" + exit 1 + fi + + local before_repo before_tag after_repo after_tag + + read -r before_repo before_tag <<< "$(split_image "$before_image")" + read -r after_repo after_tag <<< "$(split_image "$after_image")" + + trap cleanup EXIT + + check_prerequisites + + rm -rf "$OUTPUT_BASE" + mkdir -p "$OUTPUT_BASE/before" "$OUTPUT_BASE/after" + + # --- Cluster & infrastructure --- + create_kind_cluster + install_prometheus + + # --- BEFORE run --- + log "===== BEFORE run: $before_image =====" + load_image "$before_image" + install_kubescape "$before_repo" "$before_tag" + deploy_load_simulator + + log "Warming up (${WARMUP_SECONDS}s)..." + sleep "$WARMUP_SECONDS" + + log "Load running for ${LOAD_DURATION}s..." + sleep "$LOAD_DURATION" + + start_port_forward + collect_metrics "$OUTPUT_BASE/before" + stop_port_forward + remove_load_simulator + + # --- AFTER run --- + log "===== AFTER run: $after_image =====" + load_image "$after_image" + swap_image "$after_repo" "$after_tag" + deploy_load_simulator + + log "Warming up (${WARMUP_SECONDS}s)..." + sleep "$WARMUP_SECONDS" + + log "Load running for ${LOAD_DURATION}s..." + sleep "$LOAD_DURATION" + + start_port_forward + collect_metrics "$OUTPUT_BASE/after" + stop_port_forward + + # --- Compare --- + log "===== Comparison =====" + python3 "$SCRIPT_DIR/compare-metrics.py" "$OUTPUT_BASE/before" "$OUTPUT_BASE/after" + + log "Results saved in $OUTPUT_BASE/" + # cleanup via trap EXIT +} + +main "$@" diff --git a/benchmark/get_data_from_prometheus.py b/benchmark/get_data_from_prometheus.py new file mode 100644 index 0000000000..7791993494 --- /dev/null +++ b/benchmark/get_data_from_prometheus.py @@ -0,0 +1,200 @@ +import os +import requests +from datetime import datetime, timedelta, timezone +import pandas as pd +import matplotlib.pyplot as plt +import logging +from typing import Optional, List, Dict +from dataclasses import dataclass + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +@dataclass +class PrometheusConfig: + # url: str = "http://localhost:9090" + url: str = "http://prometheus-operated.monitoring.svc.cluster.local:9090" + namespace: str = "kubescape" + pod_regex: str = ".*" # All pods + step_seconds: str = "30" # Step size for Prometheus queries + rate_window: str = "5m" # Rate window for CPU queries + +class PrometheusMetricsCollector: + def __init__(self, config: Optional[PrometheusConfig] = None): + self.config = config or PrometheusConfig() + + # Get output directory from environment variable with 'output' as default + self.output_dir = os.getenv('OUTPUT_DIR', 'output') + logger.info(f"Using output directory: {self.output_dir}") + + # Ensure the output directory exists + try: + os.makedirs(self.output_dir, exist_ok=True) + logger.info(f"Successfully created/verified output directory: {self.output_dir}") + except Exception as e: + logger.warning(f"Failed to create {self.output_dir}, falling back to 'output': {e}") + self.output_dir = 'output' + os.makedirs(self.output_dir, exist_ok=True) + + # Get duration from environment variable + try: + self.duration_minutes = int(os.getenv('DURATION_TIME', '30')) + logger.info(f"Using duration of {self.duration_minutes} minutes") + except ValueError as e: + logger.error(f"Error parsing duration: {e}") + self.duration_minutes = 30 + + # Calculate time window based on duration + self.end_time = datetime.now(timezone.utc) + self.start_time = self.end_time - timedelta(minutes=self.duration_minutes) + + def query_prometheus_range(self, query: str) -> Optional[List[Dict]]: + """Execute a Prometheus range query with error handling.""" + params = { + 'query': query, + 'start': self.start_time.isoformat(), + 'end': self.end_time.isoformat(), + 'step': f"{self.config.step_seconds}s" + } + + try: + logger.info(f"Querying Prometheus with: {query}") + logger.info(f"Time range: {self.start_time} to {self.end_time}") + response = requests.get( + f'{self.config.url}/api/v1/query_range', + params=params, + timeout=30 + ) + response.raise_for_status() + + data = response.json() + if 'data' in data and 'result' in data['data']: + return data['data']['result'] + else: + logger.warning("No data found for the query") + return None + + except requests.exceptions.RequestException as e: + logger.error(f"Error querying Prometheus: {str(e)}") + return None + + def process_metrics(self, metrics: List[Dict], metric_type: str) -> pd.DataFrame: + """Process metrics into a DataFrame.""" + if not metrics: + return pd.DataFrame(columns=['Time', 'Pod', 'Value']) + + all_data = [] + for item in metrics: + pod = item['metric'].get('pod', 'unknown') + for timestamp, value in item['values']: + try: + timestamp_readable = datetime.fromtimestamp(float(timestamp), timezone.utc) + value = float(value) + if metric_type == "Memory": + value = value / (1024 ** 2) # Convert to MiB + all_data.append({ + 'Time': timestamp_readable, + 'Pod': pod, + 'Value': value + }) + except (ValueError, TypeError) as e: + logger.error(f"Error processing metric value: {str(e)}") + continue + + return pd.DataFrame(all_data) + + def filter_zero_values(self, df: pd.DataFrame) -> pd.DataFrame: + """Filter out negative values and handle NaN values.""" + df['Value'] = pd.to_numeric(df['Value'], errors='coerce') + return df[df['Value'].notna() & (df['Value'] >= 0)] + + def plot_individual(self, df: pd.DataFrame, metric_type: str) -> None: + """Create plots.""" + if df.empty: + logger.warning(f"No data to plot for {metric_type}") + return + + plt.style.use('bmh') + + for pod, pod_data in df.groupby('Pod'): + try: + plt.figure(figsize=(12, 6)) + + plt.plot(pod_data['Time'], pod_data['Value'], + label=pod, marker='o', linestyle='-', markersize=4) + + title = (f"{metric_type} Usage Over {self.duration_minutes} Minutes\n" + f"Pod: {pod}") + plt.title(title, fontsize=16) + plt.xlabel("Time (UTC)", fontsize=12) + plt.ylabel(f"{metric_type} ({'MiB' if metric_type == 'Memory' else 'Cores'})", + fontsize=12) + + plt.grid(True, linestyle='--', alpha=0.7) + plt.xticks(rotation=45) + plt.tight_layout() + + filename = os.path.join(self.output_dir, f"{pod}_{metric_type.lower()}_usage.png") + plt.savefig(filename, dpi=300, bbox_inches='tight') + logger.info(f"Saved graph: {filename}") + plt.close() + + except Exception as e: + logger.error(f"Error creating plot for pod {pod}: {str(e)}") + plt.close() + + def save_to_csv(self, df: pd.DataFrame, metric_type: str) -> None: + """Save data to CSV.""" + if df.empty: + logger.warning(f"No data to save for {metric_type}") + return + + try: + filename = os.path.join(self.output_dir, f"{metric_type.lower()}_metrics.csv") + df.to_csv(filename, index=False) + logger.info(f"Saved data to CSV: {filename}") + except Exception as e: + logger.error(f"Error saving CSV file: {str(e)}") + + def run(self): + """Main execution method.""" + logger.info(f"Starting metrics collection for the past {self.duration_minutes} minutes") + + memory_query = ( + f'container_memory_working_set_bytes{{namespace="{self.config.namespace}",' + f'pod=~"{self.config.pod_regex}", container!="", container!="POD"}}' + ) + memory_results = self.query_prometheus_range(memory_query) + + if memory_results: + logger.info("Memory query returned results:") + for result in memory_results: + logger.info(f"Metric labels: {result['metric']}") + + cpu_query = ( + f'sum(rate(container_cpu_usage_seconds_total{{namespace="{self.config.namespace}",' + f'pod=~"{self.config.pod_regex}"}}[{self.config.rate_window}])) by (pod)' + ) + cpu_results = self.query_prometheus_range(cpu_query) + + if memory_results: + memory_df = self.process_metrics(memory_results, "Memory") + memory_df = self.filter_zero_values(memory_df) + self.save_to_csv(memory_df, "Memory") + self.plot_individual(memory_df, "Memory") + + if cpu_results: + cpu_df = self.process_metrics(cpu_results, "CPU") + cpu_df = self.filter_zero_values(cpu_df) + self.save_to_csv(cpu_df, "CPU") + self.plot_individual(cpu_df, "CPU") + + logger.info(f"Metrics collection complete for {self.duration_minutes} minute period") + +if __name__ == "__main__": + collector = PrometheusMetricsCollector() + collector.run() diff --git a/benchmark/load-simulator/daemonset.yaml b/benchmark/load-simulator/daemonset.yaml new file mode 100644 index 0000000000..fb808fdba8 --- /dev/null +++ b/benchmark/load-simulator/daemonset.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: load-simulator +spec: + selector: + matchLabels: + app: load-simulator + template: + metadata: + labels: + app: load-simulator + spec: + containers: + - name: load-simulator + image: quay.io/benarmosec/load-simulator-f866d4884d08e4a0d1907b094978b48d + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 512Mi + volumeMounts: + - mountPath: /etc/config + name: config-volume + volumes: + - name: config-volume + configMap: + name: config diff --git a/benchmark/requirements.txt b/benchmark/requirements.txt new file mode 100644 index 0000000000..8206f4bc29 --- /dev/null +++ b/benchmark/requirements.txt @@ -0,0 +1,3 @@ +pandas==2.2.3 +matplotlib==3.10.1 +requests==2.32.3 diff --git a/docs/dedup-testing/after_cpu_usage.png b/docs/dedup-testing/after_cpu_usage.png new file mode 100644 index 0000000000..24b7212137 Binary files /dev/null and b/docs/dedup-testing/after_cpu_usage.png differ diff --git a/docs/dedup-testing/after_memory_usage.png b/docs/dedup-testing/after_memory_usage.png new file mode 100644 index 0000000000..f1d00a18aa Binary files /dev/null and b/docs/dedup-testing/after_memory_usage.png differ diff --git a/docs/dedup-testing/before_cpu_usage.png b/docs/dedup-testing/before_cpu_usage.png new file mode 100644 index 0000000000..93b61f39c5 Binary files /dev/null and b/docs/dedup-testing/before_cpu_usage.png differ diff --git a/docs/dedup-testing/before_memory_usage.png b/docs/dedup-testing/before_memory_usage.png new file mode 100644 index 0000000000..e6afdef5b6 Binary files /dev/null and b/docs/dedup-testing/before_memory_usage.png differ diff --git a/docs/dedup-testing/benchmark-results.md b/docs/dedup-testing/benchmark-results.md new file mode 100644 index 0000000000..eb46c1736a --- /dev/null +++ b/docs/dedup-testing/benchmark-results.md @@ -0,0 +1,91 @@ +# eBPF Event Dedup Benchmark Results + +Benchmark comparing `node-agent:v0.3.71` (baseline) vs the local build with the dedup cache (`feature/ebpf-event-dedup`). + +## Setup + +- **Cluster**: kind (2 nodes: control-plane + worker) +- **Prometheus**: kube-prometheus-stack with 10s scrape interval +- **Kubescape**: kubescape-operator chart with runtimeDetection + runtimeObservability enabled +- **Load simulator**: DaemonSet generating events at configurable rates +- **Duration**: 2 min warmup + 10 min load per run +- **CPU rate window**: `rate(...[1m])` for responsive measurement + +### Load Simulator Config + +| Parameter | Value | +|-----------|-------| +| openRate | 1000/sec | +| httpRate | 100/sec | +| execRate | 10/sec | +| networkRate | 10/sec | +| dnsRate | 2/sec | +| hardlinkRate | 10/sec | +| symlinkRate | 10/sec | +| cpuLoadMs | 500 | +| numberParallelCPUs | 2 | + +## Resource Usage + +| Metric | BEFORE (v0.3.71) | AFTER (dedup) | Delta | +|--------|-------------------|---------------|-------| +| Avg CPU (cores) | 0.178 | 0.150 | **-15.9%** | +| Peak CPU (cores) | 0.220 | 0.156 | **-29.1%** | +| Avg Memory (MiB) | 339.5 | 335.9 | -1.1% | +| Peak Memory (MiB) | 345.5 | 338.4 | -2.1% | + +### CPU Usage + +| BEFORE (v0.3.71) | AFTER (dedup) | +|---|---| +| ![before cpu](before_cpu_usage.png) | ![after cpu](after_cpu_usage.png) | + +### Memory Usage + +| BEFORE (v0.3.71) | AFTER (dedup) | +|---|---| +| ![before memory](before_memory_usage.png) | ![after memory](after_memory_usage.png) | + +## Dedup Effectiveness + +Events processed by the dedup cache during the AFTER run: + +| Event Type | Passed | Deduped | Dedup Ratio | +|------------|--------|---------|-------------| +| http | 1,701 | 119,453 | **98.6%** | +| network | 900 | 77,968 | **98.9%** | +| open | 59,569 | 626,133 | **91.3%** | +| syscall | 998 | 1,967 | **66.3%** | +| dns | 1,197 | 0 | 0.0% | +| hardlink | 6,000 | 0 | 0.0% | +| symlink | 6,000 | 0 | 0.0% | + +## Event Counters (cumulative, both runs) + +| Metric | BEFORE | AFTER | +|--------|--------|-------| +| open_counter | 801,868 | 816,637 | +| network_counter | 92,197 | 93,735 | +| exec_counter | 7,009 | 7,130 | +| syscall_counter | 3,628 | 3,735 | +| dns_counter | 1,401 | 1,422 | +| capability_counter | 9 | 9 | + +Event counters are consistent between runs, confirming the load simulator produced comparable workloads. + +## Analysis + +- The dedup cache reduces **avg CPU by ~16%** and **peak CPU by ~29%** under sustained load (~1,100 events/sec). +- Memory impact is negligible (~1%) since the dedup cache uses a fixed-size, lock-free array (2 MiB for 2^18 slots at 8 bytes each). +- High-frequency event types benefit most: **network (98.9%)**, **http (98.6%)**, and **open (91.3%)** dedup ratios. +- Events with unique keys per occurrence (dns, hardlink, symlink) show 0% dedup, which is expected. +- The CPU savings come from skipping CEL rule evaluation on deduplicated events. The eBPF ingestion and event enrichment cost (which dominates baseline CPU) is unchanged. + +## Reproducing + +```bash +cd benchmark +./dedup-bench.sh quay.io/kubescape/node-agent:v0.3.71 quay.io/kubescape/node-agent:test +``` + +Requires: kind, helm, kubectl, docker, python3. Estimated runtime: ~35 minutes. diff --git a/go.mod b/go.mod index 7716f70592..e0e5d61ba6 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v4 v4.3.0 github.com/cenkalti/backoff/v5 v5.0.3 + github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.20.0 github.com/crewjam/rfc5424 v0.1.0 github.com/cyphar/filepath-securejoin v0.6.0 @@ -156,7 +157,6 @@ require ( github.com/bodgit/windows v1.0.1 // indirect github.com/briandowns/spinner v1.23.2 // indirect github.com/campoy/embedmd v1.0.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/colorprofile v0.3.1 // indirect github.com/charmbracelet/lipgloss v1.1.0 // indirect github.com/charmbracelet/x/ansi v0.9.3 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index f31ff3d18e..eb410ef7d2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,6 +29,12 @@ const NodeNameEnvVar = "NODE_NAME" const PodNameEnvVar = "POD_NAME" const NamespaceEnvVar = "NAMESPACE_NAME" +// EventDedupConfig controls eBPF event deduplication before CEL rule evaluation. +type EventDedupConfig struct { + Enabled bool `mapstructure:"enabled"` + SlotsExponent uint8 `mapstructure:"slotsExponent"` +} + type Config struct { BlockEvents bool `mapstructure:"blockEvents"` CelConfigCache cache.FunctionCacheConfig `mapstructure:"celConfigCache"` @@ -74,6 +80,7 @@ type Config struct { StandaloneMonitoringEnabled bool `mapstructure:"standaloneMonitoringEnabled"` SeccompProfileBackend string `mapstructure:"seccompProfileBackend"` EventBatchSize int `mapstructure:"eventBatchSize"` + EventDedup EventDedupConfig `mapstructure:"eventDedup"` ExcludeJsonPaths []string `mapstructure:"excludeJsonPaths"` ExcludeLabels map[string][]string `mapstructure:"excludeLabels"` ExcludeNamespaces []string `mapstructure:"excludeNamespaces"` @@ -191,6 +198,8 @@ func LoadConfigOptional(path string, errNotFound bool) (Config, error) { viper.SetDefault("celConfigCache::ttl", 1*time.Minute) viper.SetDefault("ignoreRuleBindings", false) + viper.SetDefault("eventDedup::enabled", true) + viper.SetDefault("eventDedup::slotsExponent", 18) viper.SetDefault("dnsCacheSize", 50000) viper.SetDefault("seccompProfileBackend", "storage") // "storage" or "crd" viper.SetDefault("containerEolNotificationBuffer", 100) @@ -243,6 +252,12 @@ func LoadConfigOptional(path string, errNotFound bool) (Config, error) { config.SeccompProfileBackend, SeccompBackendStorage, SeccompBackendCRD) } + // Validate eventDedup slotsExponent range + if config.EventDedup.Enabled && (config.EventDedup.SlotsExponent < 10 || config.EventDedup.SlotsExponent > 30) { + return Config{}, fmt.Errorf("invalid eventDedup.slotsExponent value: %d (must be between 10 and 30)", + config.EventDedup.SlotsExponent) + } + return config, nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 8acb04cf8b..acb00bd7e2 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,6 +80,7 @@ func TestLoadConfig(t *testing.T) { }, WorkerPoolSize: 3000, EventBatchSize: 15000, + EventDedup: EventDedupConfig{Enabled: true, SlotsExponent: 18}, WorkerChannelSize: 750000, BlockEvents: false, ProcfsScanInterval: 30 * time.Second, diff --git a/pkg/containerwatcher/v2/container_watcher.go b/pkg/containerwatcher/v2/container_watcher.go index 212e80e6a2..2fa79594fa 100644 --- a/pkg/containerwatcher/v2/container_watcher.go +++ b/pkg/containerwatcher/v2/container_watcher.go @@ -19,6 +19,7 @@ import ( "github.com/kubescape/node-agent/pkg/config" "github.com/kubescape/node-agent/pkg/containerprofilemanager" "github.com/kubescape/node-agent/pkg/containerwatcher" + "github.com/kubescape/node-agent/pkg/dedupcache" "github.com/kubescape/node-agent/pkg/containerwatcher/v2/tracers" "github.com/kubescape/node-agent/pkg/dnsmanager" "github.com/kubescape/node-agent/pkg/ebpf/events" @@ -136,6 +137,12 @@ func CreateContainerWatcher( rulePolicyReporter := rulepolicy.NewRulePolicyReporter(ruleManager, containerProfileManager) + // Create dedup cache if enabled + var dedupCache *dedupcache.DedupCache + if cfg.EventDedup.Enabled { + dedupCache = dedupcache.NewDedupCache(cfg.EventDedup.SlotsExponent) + } + // Create event handler factory eventHandlerFactory := NewEventHandlerFactory( cfg, @@ -149,6 +156,7 @@ func CreateContainerWatcher( thirdPartyTracers.ThirdPartyEventReceivers, thirdPartyEnricher, rulePolicyReporter, + dedupCache, ) // Create event enricher @@ -459,6 +467,7 @@ func (cw *ContainerWatcher) processQueueBatch() { func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) { enrichedEvent := cw.eventEnricher.EnrichEvents(entry) + enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000)) select { case cw.workerChan <- enrichedEvent: diff --git a/pkg/containerwatcher/v2/event_handler_factory.go b/pkg/containerwatcher/v2/event_handler_factory.go index b6d0a65d0d..5349abbda6 100644 --- a/pkg/containerwatcher/v2/event_handler_factory.go +++ b/pkg/containerwatcher/v2/event_handler_factory.go @@ -10,6 +10,7 @@ import ( "github.com/kubescape/node-agent/pkg/config" "github.com/kubescape/node-agent/pkg/containerprofilemanager" "github.com/kubescape/node-agent/pkg/containerwatcher" + "github.com/kubescape/node-agent/pkg/dedupcache" "github.com/kubescape/node-agent/pkg/dnsmanager" "github.com/kubescape/node-agent/pkg/ebpf/events" "github.com/kubescape/node-agent/pkg/eventreporters/rulepolicy" @@ -44,6 +45,20 @@ func (ma *ManagerAdapter) ReportEvent(eventType utils.EventType, event utils.K8s ma.reportEventFunc(eventType, event) } +// TTL constants for dedup windows in 64ms buckets. +const ( + dedupTTLOpen uint16 = 156 // 10s + dedupTTLNetwork uint16 = 78 // 5s + dedupTTLDNS uint16 = 156 // 10s + dedupTTLCapabilities uint16 = 156 // 10s + dedupTTLHTTP uint16 = 31 // 2s + dedupTTLSSH uint16 = 156 // 10s + dedupTTLSymlink uint16 = 156 // 10s + dedupTTLHardlink uint16 = 156 // 10s + dedupTTLPtrace uint16 = 156 // 10s + dedupTTLSyscall uint16 = 78 // 5s +) + // EventHandlerFactory manages the mapping of event types to their managers type EventHandlerFactory struct { handlers map[utils.EventType][]Manager @@ -52,6 +67,10 @@ type EventHandlerFactory struct { cfg config.Config containerCollection *containercollection.ContainerCollection containerCache *maps.SafeMap[string, *containercollection.Container] // Cache for container lookups + containerProfileManager containerprofilemanager.ContainerProfileManagerClient + dedupCache *dedupcache.DedupCache + metrics metricsmanager.MetricsManager + dedupSkipSet map[Manager]struct{} // Managers to skip when event is duplicate } // NewEventHandlerFactory creates a new event handler factory @@ -67,6 +86,7 @@ func NewEventHandlerFactory( thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.GenericEventReceiver]], thirdPartyEnricher containerwatcher.TaskBasedEnricher, rulePolicyReporter *rulepolicy.RulePolicyReporter, + dedupCache *dedupcache.DedupCache, ) *EventHandlerFactory { factory := &EventHandlerFactory{ handlers: make(map[utils.EventType][]Manager), @@ -75,14 +95,14 @@ func NewEventHandlerFactory( cfg: cfg, containerCollection: containerCollection, containerCache: &maps.SafeMap[string, *containercollection.Container]{}, + containerProfileManager: containerProfileManager, + dedupCache: dedupCache, + metrics: metrics, + dedupSkipSet: make(map[Manager]struct{}), } // Create adapters for managers that don't implement the Manager interface directly containerProfileAdapter := NewManagerAdapter(func(eventType utils.EventType, event utils.K8sEvent) { - // check for dropped events - if event.HasDroppedEvents() { - containerProfileManager.ReportDroppedEvent(event.GetContainerID()) - } // ContainerProfileManager has specific methods for different event types switch eventType { case utils.CapabilitiesEventType: @@ -171,9 +191,108 @@ func NewEventHandlerFactory( rulePolicyAdapter, ) + // Populate dedupSkipSet: managers that skip processing when event is duplicate. + // RuleManager checks enrichedEvent.Duplicate internally. + factory.dedupSkipSet[containerProfileAdapter] = struct{}{} + factory.dedupSkipSet[malwareManager] = struct{}{} + return factory } +// computeEventDedupKey computes a dedup key and TTL for the given event. +// Returns shouldDedup=false for event types that must not be deduplicated. +func computeEventDedupKey(enrichedEvent *events.EnrichedEvent) (key uint64, ttl uint16, shouldDedup bool) { + event := enrichedEvent.Event + mntns := enrichedEvent.MountNamespaceID + if mntns == 0 { + if ee, ok := event.(utils.EnrichEvent); ok { + mntns = ee.GetMountNsID() + } + } + + switch event.GetEventType() { + case utils.OpenEventType: + if e, ok := event.(utils.OpenEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeOpenKey(mntns, pid, e.GetPath(), e.GetFlagsRaw()), dedupTTLOpen, true + } + case utils.NetworkEventType: + if e, ok := event.(utils.NetworkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + dst := e.GetDstEndpoint() + return dedupcache.ComputeNetworkKey(mntns, pid, dst.Addr, e.GetDstPort(), e.GetProto()), dedupTTLNetwork, true + } + case utils.DnsEventType: + if e, ok := event.(utils.DNSEvent); ok { + return dedupcache.ComputeDNSKey(mntns, e.GetDNSName()), dedupTTLDNS, true + } + case utils.CapabilitiesEventType: + if e, ok := event.(utils.CapabilitiesEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeCapabilitiesKey(mntns, pid, e.GetCapability(), e.GetSyscall()), dedupTTLCapabilities, true + } + case utils.HTTPEventType: + if e, ok := event.(utils.HttpEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + req := e.GetRequest() + if req == nil || req.URL == nil { + return 0, 0, false + } + return dedupcache.ComputeHTTPKey(mntns, pid, string(e.GetDirection()), req.Method, req.Host, req.URL.Path, req.URL.RawQuery), dedupTTLHTTP, true + } + case utils.SSHEventType: + if e, ok := event.(utils.SshEvent); ok { + return dedupcache.ComputeSSHKey(mntns, e.GetDstIP(), e.GetDstPort()), dedupTTLSSH, true + } + case utils.SymlinkEventType: + if e, ok := event.(utils.LinkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeSymlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLSymlink, true + } + case utils.HardlinkEventType: + if e, ok := event.(utils.LinkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeHardlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLHardlink, true + } + case utils.PtraceEventType: + if e, ok := event.(utils.PtraceEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputePtraceKey(mntns, pid, e.GetExePath()), dedupTTLPtrace, true + } + case utils.SyscallEventType: + if e, ok := event.(utils.SyscallEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeSyscallKey(mntns, pid, e.GetSyscall()), dedupTTLSyscall, true + } + } + // exec, exit, fork, randomx, kmod, bpf, unshare, iouring — no dedup + return 0, 0, false +} + // ProcessEvent processes an event through all registered handlers func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent) { if enrichedEvent.ContainerID == "" { @@ -190,6 +309,23 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent return } + // Dedup check: compute key and check cache before dispatching to handlers + if ehf.dedupCache != nil { + key, ttl, shouldDedup := computeEventDedupKey(enrichedEvent) + if shouldDedup { + duplicate := ehf.dedupCache.CheckAndSet(key, ttl, enrichedEvent.DedupBucket) + if duplicate { + enrichedEvent.Duplicate = true + } + ehf.metrics.ReportDedupEvent(enrichedEvent.Event.GetEventType(), duplicate) + } + } + + // Always report dropped events regardless of dedup status + if enrichedEvent.Event.HasDroppedEvents() { + ehf.containerProfileManager.ReportDroppedEvent(enrichedEvent.Event.GetContainerID()) + } + // Get handlers for this event type eventType := enrichedEvent.Event.GetEventType() handlers, exists := ehf.handlers[eventType] @@ -200,6 +336,11 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent pprof.Do(context.Background(), pprof.Labels("event", string(eventType)), func(_ context.Context) { // Process event through each handler for _, handler := range handlers { + if enrichedEvent.Duplicate { + if _, skip := ehf.dedupSkipSet[handler]; skip { + continue + } + } if enrichedHandler, ok := handler.(containerwatcher.EnrichedEventReceiver); ok { enrichedHandler.ReportEnrichedEvent(enrichedEvent) } else if handler, ok := handler.(containerwatcher.EventReceiver); ok { diff --git a/pkg/dedupcache/dedup_cache.go b/pkg/dedupcache/dedup_cache.go new file mode 100644 index 0000000000..504fece658 --- /dev/null +++ b/pkg/dedupcache/dedup_cache.go @@ -0,0 +1,60 @@ +package dedupcache + +import ( + "sync/atomic" + + "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger/helpers" +) + +// DedupCache is a lock-free, fixed-size deduplication cache. +// Each slot packs a 48-bit key and 16-bit expiry bucket into a single atomic uint64. +// Concurrent access from thousands of goroutines is safe without mutexes — +// benign races only cause missed dedup (safe direction), never false dedup. +type DedupCache struct { + slots []atomic.Uint64 + mask uint64 +} + +// NewDedupCache creates a cache with 2^slotsExponent slots. +// Each slot is 8 bytes; e.g. exponent 18 = 262,144 slots = 2 MB. +// slotsExponent is clamped to [10, 30] (1 KB to 8 GB). +func NewDedupCache(slotsExponent uint8) *DedupCache { + const minExponent, maxExponent, defaultExponent = 10, 30, 18 + if slotsExponent < minExponent || slotsExponent > maxExponent { + logger.L().Warning("slotsExponent out of range, using default", + helpers.Int("requested", int(slotsExponent)), + helpers.Int("default", defaultExponent)) + slotsExponent = defaultExponent + } + size := uint64(1) << slotsExponent + return &DedupCache{ + slots: make([]atomic.Uint64, size), + mask: size - 1, + } +} + +// pack stores the upper 48 bits of key and 16-bit expiry bucket in one uint64. +func pack(key uint64, expiryBucket uint16) uint64 { + return (key & 0xFFFFFFFFFFFF0000) | uint64(expiryBucket) +} + +// unpack extracts the 48-bit key portion and 16-bit expiry bucket. +func unpack(packed uint64) (keyBits uint64, expiryBucket uint16) { + return packed & 0xFFFFFFFFFFFF0000, uint16(packed) +} + +// CheckAndSet returns true if the key is already present and not expired (duplicate). +// Otherwise it inserts the key with expiry = currentBucket + ttlBuckets and returns false. +func (c *DedupCache) CheckAndSet(key uint64, ttlBuckets uint16, currentBucket uint16) bool { + idx := key & c.mask + + stored := c.slots[idx].Load() + storedKey, storedExpiry := unpack(stored) + if storedKey == (key & 0xFFFFFFFFFFFF0000) && int16(storedExpiry-currentBucket) > 0 { + return true // duplicate + } + + c.slots[idx].Store(pack(key, currentBucket+ttlBuckets)) + return false +} diff --git a/pkg/dedupcache/dedup_cache_test.go b/pkg/dedupcache/dedup_cache_test.go new file mode 100644 index 0000000000..b8a1e7c890 --- /dev/null +++ b/pkg/dedupcache/dedup_cache_test.go @@ -0,0 +1,167 @@ +package dedupcache + +import ( + "sync" + "testing" +) + +func TestCheckAndSet_BasicInsertAndLookup(t *testing.T) { + c := NewDedupCache(10) // 1024 slots + + key := uint64(0xDEADBEEF12340000) + ttl := uint16(156) // ~10s in 64ms buckets + now := uint16(1000) + + // First call: not a duplicate + if c.CheckAndSet(key, ttl, now) { + t.Fatal("expected false on first insert") + } + + // Second call: duplicate + if !c.CheckAndSet(key, ttl, now) { + t.Fatal("expected true on second lookup") + } +} + +func TestCheckAndSet_TTLExpiry(t *testing.T) { + c := NewDedupCache(10) + + key := uint64(0xABCDABCD00000000) + ttl := uint16(10) // expires at bucket 1010 + now := uint16(1000) + + c.CheckAndSet(key, ttl, now) + + // Still within TTL (bucket 1009 < expiry 1010) + if !c.CheckAndSet(key, ttl, uint16(1009)) { + t.Fatal("expected duplicate within TTL") + } + + // Exactly at expiry boundary (1010 is NOT > 1010, so expired) + if c.CheckAndSet(key, ttl, uint16(1010)) { + t.Fatal("expected not duplicate at expiry boundary") + } + + // After expiry + if c.CheckAndSet(key, ttl, uint16(1100)) { + t.Fatal("expected not duplicate after expiry") + } +} + +func TestCheckAndSet_SlotCollision(t *testing.T) { + c := NewDedupCache(10) // mask = 1023 + + // Two different keys that map to the same slot but have different upper 48 bits + key1 := uint64(0xAAAA000000000100) // slot = 0x100 & 0x3FF = 256 + key2 := uint64(0xBBBB000000000100) // slot = 0x100 & 0x3FF = 256, different upper bits + + ttl := uint16(156) + now := uint16(1000) + + c.CheckAndSet(key1, ttl, now) + + // key2 overwrites key1's slot — not a duplicate + if c.CheckAndSet(key2, ttl, now) { + t.Fatal("expected false for different key in same slot") + } + + // key1 is now evicted — not found + if c.CheckAndSet(key1, ttl, now) { + t.Fatal("expected false for evicted key") + } +} + +func TestCheckAndSet_PackUnpack(t *testing.T) { + key := uint64(0xDEADBEEFCAFE0000) + expiry := uint16(42) + + packed := pack(key, expiry) + gotKey, gotExpiry := unpack(packed) + + if gotKey != (key & 0xFFFFFFFFFFFF0000) { + t.Fatalf("key mismatch: got %x, want %x", gotKey, key&0xFFFFFFFFFFFF0000) + } + if gotExpiry != expiry { + t.Fatalf("expiry mismatch: got %d, want %d", gotExpiry, expiry) + } +} + +func TestCheckAndSet_Uint16WrapAround(t *testing.T) { + c := NewDedupCache(10) + + key := uint64(0xDEADBEEF12340000) + ttl := uint16(156) // ~10s + + // Insert near the uint16 max boundary + nearMax := uint16(65500) + if c.CheckAndSet(key, ttl, nearMax) { + t.Fatal("expected false on first insert near wrap boundary") + } + + // Expiry = 65500 + 156 = 65656, which wraps to 65656 - 65536 = 120 + // Check at nearMax+1: should still be duplicate (expiry hasn't been reached) + if !c.CheckAndSet(key, ttl, nearMax+1) { + t.Fatal("expected duplicate just after insert near wrap boundary") + } + + // Check after wrap: bucket 0 is past nearMax but before wrapped expiry (120) + if !c.CheckAndSet(key, ttl, 0) { + t.Fatal("expected duplicate at bucket 0 (after wrap, before expiry)") + } + + // Check at bucket 100: still before wrapped expiry of 120 + if !c.CheckAndSet(key, ttl, 100) { + t.Fatal("expected duplicate at bucket 100 (before wrapped expiry 120)") + } + + // Check at bucket 121: past the wrapped expiry of 120 + if c.CheckAndSet(key, ttl, 121) { + t.Fatal("expected not duplicate past wrapped expiry") + } +} + +func TestCheckAndSet_ConcurrentHammer(t *testing.T) { + c := NewDedupCache(14) // 16384 slots + + const goroutines = 100 + const opsPerGoroutine = 10000 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < opsPerGoroutine; i++ { + key := uint64(id*opsPerGoroutine+i) << 16 + c.CheckAndSet(key, 156, uint16(1000)) + } + }(g) + } + + wg.Wait() + // No panics or data races = success (run with -race) +} + +func BenchmarkCheckAndSet(b *testing.B) { + c := NewDedupCache(18) // production size + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + key := uint64(i) << 16 + c.CheckAndSet(key, 156, uint16(1000)) + } +} + +func BenchmarkCheckAndSet_Hit(b *testing.B) { + c := NewDedupCache(18) + key := uint64(0xDEADBEEF00000000) + c.CheckAndSet(key, 156, uint16(1000)) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + c.CheckAndSet(key, 156, uint16(1000)) + } +} diff --git a/pkg/dedupcache/keys.go b/pkg/dedupcache/keys.go new file mode 100644 index 0000000000..508c7277ae --- /dev/null +++ b/pkg/dedupcache/keys.go @@ -0,0 +1,136 @@ +package dedupcache + +import ( + "encoding/binary" + + "github.com/cespare/xxhash/v2" +) + +// Reusable byte buffers for writing integers into the hash. +// These are stack-allocated per call via the fixed-size array trick. + +func writeUint64(h *xxhash.Digest, v uint64) { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], v) + h.Write(buf[:]) +} + +func writeUint32(h *xxhash.Digest, v uint32) { + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], v) + h.Write(buf[:]) +} + +func writeUint16(h *xxhash.Digest, v uint16) { + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], v) + h.Write(buf[:]) +} + +// writeString writes a length-prefixed string to the hash digest. +// The uint32 length prefix prevents collisions between adjacent variable-length strings +// (e.g. host="fo"+path="o/bar" vs host="foo"+path="/bar"). +func writeString(h *xxhash.Digest, s string) { + writeUint32(h, uint32(len(s))) + h.WriteString(s) +} + +// ComputeOpenKey computes a dedup key for open events. +func ComputeOpenKey(mntns uint64, pid uint32, path string, flagsRaw uint32) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, path) + writeUint32(h, flagsRaw) + return h.Sum64() +} + +// ComputeNetworkKey computes a dedup key for network events. +func ComputeNetworkKey(mntns uint64, pid uint32, dstAddr string, dstPort uint16, proto string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, dstAddr) + writeUint16(h, dstPort) + writeString(h, proto) + return h.Sum64() +} + +// ComputeDNSKey computes a dedup key for DNS events. +// No qtype getter exists in the interface, so key is mntns + dnsName. +func ComputeDNSKey(mntns uint64, dnsName string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeString(h, dnsName) + return h.Sum64() +} + +// ComputeCapabilitiesKey computes a dedup key for capabilities events. +func ComputeCapabilitiesKey(mntns uint64, pid uint32, capability string, syscall string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, capability) + writeString(h, syscall) + return h.Sum64() +} + +// ComputeHTTPKey computes a dedup key for HTTP events. +func ComputeHTTPKey(mntns uint64, pid uint32, direction string, method string, host string, path string, rawQuery string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, direction) + writeString(h, method) + writeString(h, host) + writeString(h, path) + writeString(h, rawQuery) + return h.Sum64() +} + +// ComputeSSHKey computes a dedup key for SSH events. +func ComputeSSHKey(mntns uint64, dstIP string, dstPort uint16) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeString(h, dstIP) + writeUint16(h, dstPort) + return h.Sum64() +} + +// ComputeSymlinkKey computes a dedup key for symlink events. +func ComputeSymlinkKey(mntns uint64, pid uint32, oldPath string, newPath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, oldPath) + writeString(h, newPath) + return h.Sum64() +} + +// ComputeHardlinkKey computes a dedup key for hardlink events. +func ComputeHardlinkKey(mntns uint64, pid uint32, oldPath string, newPath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, oldPath) + writeString(h, newPath) + return h.Sum64() +} + +// ComputePtraceKey computes a dedup key for ptrace events. +func ComputePtraceKey(mntns uint64, pid uint32, exePath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, exePath) + return h.Sum64() +} + +// ComputeSyscallKey computes a dedup key for syscall events. +func ComputeSyscallKey(mntns uint64, pid uint32, syscall string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + writeString(h, syscall) + return h.Sum64() +} diff --git a/pkg/dedupcache/keys_test.go b/pkg/dedupcache/keys_test.go new file mode 100644 index 0000000000..63205da0db --- /dev/null +++ b/pkg/dedupcache/keys_test.go @@ -0,0 +1,163 @@ +package dedupcache + +import "testing" + +func TestComputeOpenKey_Deterministic(t *testing.T) { + k1 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + k2 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeOpenKey_DifferentInputs(t *testing.T) { + k1 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + k2 := ComputeOpenKey(123456, 42, "/etc/shadow", 0x02) + k3 := ComputeOpenKey(123456, 43, "/etc/passwd", 0x02) + k4 := ComputeOpenKey(789012, 42, "/etc/passwd", 0x02) + k5 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x04) + + keys := []uint64{k1, k2, k3, k4, k5} + for i := 0; i < len(keys); i++ { + for j := i + 1; j < len(keys); j++ { + if keys[i] == keys[j] { + t.Fatalf("collision between key[%d]=%x and key[%d]=%x", i, keys[i], j, keys[j]) + } + } + } +} + +func TestComputeNetworkKey_Deterministic(t *testing.T) { + k1 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + k2 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeNetworkKey_DifferentInputs(t *testing.T) { + k1 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + k2 := ComputeNetworkKey(100, 1, "10.0.0.2", 80, "tcp") + k3 := ComputeNetworkKey(100, 1, "10.0.0.1", 443, "tcp") + k4 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "udp") + if k1 == k2 || k1 == k3 || k1 == k4 { + t.Fatal("unexpected collision") + } +} + +func TestComputeDNSKey_Deterministic(t *testing.T) { + k1 := ComputeDNSKey(100, "example.com") + k2 := ComputeDNSKey(100, "example.com") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeDNSKey_DifferentInputs(t *testing.T) { + k1 := ComputeDNSKey(100, "example.com") + k2 := ComputeDNSKey(100, "other.com") + k3 := ComputeDNSKey(200, "example.com") + if k1 == k2 || k1 == k3 { + t.Fatal("unexpected collision") + } +} + +func TestComputeHTTPKey_IncludesRawQuery(t *testing.T) { + k1 := ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api", "page=1") + k2 := ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api", "page=1' OR 1=1--") + if k1 == k2 { + t.Fatal("different query strings must produce different keys") + } +} + +func TestComputeSSHKey_Deterministic(t *testing.T) { + k1 := ComputeSSHKey(100, "192.168.1.1", 22) + k2 := ComputeSSHKey(100, "192.168.1.1", 22) + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeCapabilitiesKey_NoAdjacentStringCollision(t *testing.T) { + // Without length-prefix, "CAP_NET" + "RAWsocket" could collide with "CAP_NETR" + "AWsocket" + k1 := ComputeCapabilitiesKey(100, 1, "CAP_NET", "RAWsocket") + k2 := ComputeCapabilitiesKey(100, 1, "CAP_NETR", "AWsocket") + if k1 == k2 { + t.Fatal("adjacent string collision: length-prefix should prevent this") + } +} + +func TestComputeHTTPKey_NoAdjacentStringCollision(t *testing.T) { + // host="fo" + path="o/bar" vs host="foo" + path="/bar" + k1 := ComputeHTTPKey(100, 1, "outbound", "GET", "fo", "o/bar", "") + k2 := ComputeHTTPKey(100, 1, "outbound", "GET", "foo", "/bar", "") + if k1 == k2 { + t.Fatal("adjacent string collision: length-prefix should prevent this") + } +} + +func TestComputeSymlinkKey_NoAdjacentStringCollision(t *testing.T) { + k1 := ComputeSymlinkKey(100, 1, "/tmp/a", "b/link") + k2 := ComputeSymlinkKey(100, 1, "/tmp/ab", "/link") + if k1 == k2 { + t.Fatal("adjacent string collision: length-prefix should prevent this") + } +} + +func TestComputeCapabilitiesKey_Deterministic(t *testing.T) { + k1 := ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + k2 := ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeSyscallKey_DifferentInputs(t *testing.T) { + k1 := ComputeSyscallKey(100, 1, "read") + k2 := ComputeSyscallKey(100, 1, "write") + if k1 == k2 { + t.Fatal("unexpected collision") + } +} + +func BenchmarkComputeOpenKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + } +} + +func BenchmarkComputeNetworkKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + } +} + +func BenchmarkComputeDNSKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeDNSKey(100, "example.com") + } +} + +func BenchmarkComputeHTTPKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api/v1/users", "page=1&limit=50") + } +} + +func BenchmarkComputeCapabilitiesKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + } +} + +func BenchmarkComputeSyscallKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeSyscallKey(100, 1, "read") + } +} diff --git a/pkg/ebpf/events/enriched_event.go b/pkg/ebpf/events/enriched_event.go index c158e3be5a..54eec8da25 100644 --- a/pkg/ebpf/events/enriched_event.go +++ b/pkg/ebpf/events/enriched_event.go @@ -33,4 +33,9 @@ type EnrichedEvent struct { // This uniquely identifies the container/host and is used for context lookup. // May be 0 if unavailable. MountNamespaceID uint64 + // Duplicate is set by the dedup check in EventHandlerFactory.ProcessEvent(). + // Consumers that respect this flag skip processing for deduplicated events. + Duplicate bool + // DedupBucket is the current time expressed in 64ms buckets, cached per batch tick. + DedupBucket uint16 } diff --git a/pkg/metricsmanager/metrics_manager_interface.go b/pkg/metricsmanager/metrics_manager_interface.go index cce38824db..503aa904d7 100644 --- a/pkg/metricsmanager/metrics_manager_interface.go +++ b/pkg/metricsmanager/metrics_manager_interface.go @@ -18,4 +18,5 @@ type MetricsManager interface { //ReportEbpfStats(stats *top.Event[toptypes.Stats]) ReportContainerStart() ReportContainerStop() + ReportDedupEvent(eventType utils.EventType, duplicate bool) } diff --git a/pkg/metricsmanager/metrics_manager_mock.go b/pkg/metricsmanager/metrics_manager_mock.go index 1d8f24e668..f58d378aca 100644 --- a/pkg/metricsmanager/metrics_manager_mock.go +++ b/pkg/metricsmanager/metrics_manager_mock.go @@ -63,3 +63,5 @@ func (m *MetricsMock) ReportRuleEvaluationTime(ruleID string, eventType utils.Ev func (m *MetricsMock) ReportContainerStart() {} func (m *MetricsMock) ReportContainerStop() {} + +func (m *MetricsMock) ReportDedupEvent(eventType utils.EventType, duplicate bool) {} diff --git a/pkg/metricsmanager/prometheus/prometheus.go b/pkg/metricsmanager/prometheus/prometheus.go index 77f2eb22f5..870b799fd7 100644 --- a/pkg/metricsmanager/prometheus/prometheus.go +++ b/pkg/metricsmanager/prometheus/prometheus.go @@ -59,6 +59,9 @@ type PrometheusMetric struct { containerStartCounter prometheus.Counter containerStopCounter prometheus.Counter + // Dedup metrics + dedupEventCounter *prometheus.CounterVec + // Cache to avoid allocating Labels maps on every call ruleCounterCache map[string]prometheus.Counter alertCounterCache map[string]prometheus.Counter @@ -200,6 +203,12 @@ func NewPrometheusMetric() *PrometheusMetric { Help: "The total number of container stop events", }), + // Dedup metrics + dedupEventCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "node_agent_dedup_events_total", + Help: "Total number of events processed by the dedup layer", + }, []string{eventTypeLabel, "result"}), + // Initialize counter caches ruleCounterCache: make(map[string]prometheus.Counter), alertCounterCache: make(map[string]prometheus.Counter), @@ -238,6 +247,7 @@ func (p *PrometheusMetric) Destroy() { prometheus.Unregister(p.ebpfBpfCounter) prometheus.Unregister(p.containerStartCounter) prometheus.Unregister(p.containerStopCounter) + prometheus.Unregister(p.dedupEventCounter) // Unregister program ID metrics prometheus.Unregister(p.programRuntimeGauge) prometheus.Unregister(p.programRunCountGauge) @@ -381,3 +391,11 @@ func (p *PrometheusMetric) ReportContainerStart() { func (p *PrometheusMetric) ReportContainerStop() { p.containerStopCounter.Inc() } + +func (p *PrometheusMetric) ReportDedupEvent(eventType utils.EventType, duplicate bool) { + result := "passed" + if duplicate { + result = "deduplicated" + } + p.dedupEventCounter.WithLabelValues(string(eventType), result).Inc() +} diff --git a/pkg/rulemanager/rule_manager.go b/pkg/rulemanager/rule_manager.go index 80f3a760b3..eea071da91 100644 --- a/pkg/rulemanager/rule_manager.go +++ b/pkg/rulemanager/rule_manager.go @@ -147,6 +147,9 @@ func (rm *RuleManager) startRuleManager(container *containercollection.Container } func (rm *RuleManager) ReportEnrichedEvent(enrichedEvent *events.EnrichedEvent) { + if enrichedEvent.Duplicate { + return + } rm.enrichEventWithContext(enrichedEvent) var profileExists bool