From e592642981f2c2f6a54a5629748de4244e22d640 Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Mon, 18 May 2026 18:43:01 +0200 Subject: [PATCH 1/3] feat(memory): implement TrajectoryRiskAccumulator and ImplicitConflictDetector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements two security and memory quality improvements: TrajectoryRiskAccumulator (#4372): per-session shadow memory that accumulates safety signals (PolicyViolation, PromptInjectionPattern, ToolChainAnomaly, ConfidenceDrop) with exponential temporal decay and gates tool execution when trajectory risk exceeds the configured threshold. Based on MAGE (arXiv:2605.03228) — reduces tool-chain attack success from 100% to ≤10% with default config. Controlled via [memory.shadow_memory] with enabled=false default. ImplicitConflictDetector (#4373): write-time predicate similarity detection for APEX-MEM that stages implicit conflict candidates using Levenshtein distance, extends SYNAPSE ActivatedFact with is_implicit_conflict/conflict_candidate_id fields, and annotates recall results when pending candidates exist. Addresses the STALE benchmark gap (55.2% frontier model accuracy on implicit conflicts). Controlled via [memory.graph.implicit_conflict] with enabled=false default. Both features are additive and opt-in: disabled by default with zero-overhead noop paths. No changes to existing memory pipeline behavior when both are disabled. Closes #4372 Closes #4373 --- CHANGELOG.md | 12 + crates/zeph-config/src/lib.rs | 11 +- crates/zeph-config/src/memory.rs | 392 ++++++++++++++ crates/zeph-config/src/root.rs | 1 + .../090_implicit_conflict_candidates.sql | 16 + crates/zeph-memory/src/graph/activation.rs | 6 + .../src/graph/implicit_conflict.rs | 505 ++++++++++++++++++ crates/zeph-memory/src/graph/mod.rs | 1 + crates/zeph-memory/src/graph/ontology.rs | 3 + crates/zeph-memory/src/graph/store/mod.rs | 125 +++++ crates/zeph-memory/src/lib.rs | 1 + crates/zeph-memory/src/shadow/mod.rs | 407 ++++++++++++++ crates/zeph-sanitizer/src/audit.rs | 59 ++ crates/zeph-sanitizer/src/lib.rs | 1 + crates/zeph-tools/src/executor.rs | 16 +- 15 files changed, 1551 insertions(+), 5 deletions(-) create mode 100644 crates/zeph-db/migrations/sqlite/090_implicit_conflict_candidates.sql create mode 100644 crates/zeph-memory/src/graph/implicit_conflict.rs create mode 100644 crates/zeph-memory/src/shadow/mod.rs create mode 100644 crates/zeph-sanitizer/src/audit.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aaba11cd..b9c87a177 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - `zeph-subagent`: `FleetRegistry` trait and `SqliteFleetRegistry` adapter; sub-agents spawned by `SubAgentManager` are now registered in the fleet `agent_sessions` table and visible in the fleet dashboard. `cancel_all` marks all active sub-agent sessions as cancelled on shutdown (closes #4370). +- `zeph-memory`: `TrajectoryRiskAccumulator` — exponential-decay risk scoring over tool-call + signals (policy violation, prompt injection, tool-chain anomaly, confidence drop) with + configurable half-life and escalation threshold. Raises `ToolError::TrajectoryRiskExceeded` + when the clamped score exceeds the configured limit. Resolves #4372. +- `zeph-memory`: `ImplicitConflictDetector` — write-time fuzzy predicate similarity detection + for APEX-MEM graph edges using normalised Levenshtein distance. Detected pairs are staged in + the new `implicit_conflict_candidates` table (migration 090) and annotated on SYNAPSE recall + via `annotate_conflicts`. Resolves #4373. +- `zeph-db`: migration `090_implicit_conflict_candidates.sql` — staging table for implicit + conflict candidate pairs with status/resolution lifecycle columns and three supporting indexes. +- `zeph-config`: `ImplicitConflictConfig` and supporting types (`SimilarityMethod`, + `ConflictResolutionStrategy`, `ConsolidationDaemonConfig`) under `[memory.graph.implicit_conflict]`. ## [0.21.2] - 2026-05-18 diff --git a/crates/zeph-config/src/lib.rs b/crates/zeph-config/src/lib.rs index eb4ce7b8b..b00d0e933 100644 --- a/crates/zeph-config/src/lib.rs +++ b/crates/zeph-config/src/lib.rs @@ -138,13 +138,16 @@ pub use logging::{LogRotation, LoggingConfig}; pub use mcp_security::{CapabilityClass, DataSensitivity, FlaggedParameter, ToolSecurityMeta}; pub use memory::{ AdmissionConfig, AdmissionStrategy, AdmissionWeights, AutoDreamConfig, BeliefRevisionConfig, - CategoryConfig, CompressionConfig, CompressionStrategy, ContextFormat, ContextStrategy, - DigestConfig, DocumentConfig, EmGraphConfig, ForgettingConfig, GraphConfig, HebbianConfig, + CategoryConfig, CompressionConfig, CompressionStrategy, ConflictResolutionStrategy, + ConsolidationDaemonConfig, ContextFormat, ContextStrategy, DigestConfig, DocumentConfig, + EmGraphConfig, ForgettingConfig, GraphConfig, HebbianConfig, ImplicitConflictConfig, MagicDocsConfig, MemCotConfig, MemoryConfig, MicrocompactConfig, NoteLinkingConfig, OpticalForgettingConfig, PersonaConfig, PruningStrategy, ReasoningConfig, RecallViewConfig, RetrievalConfig, RetrievalFailuresConfig, RpeConfig, SemanticConfig, SessionsConfig, - SidequestConfig, StoreRoutingConfig, StoreRoutingStrategy, TierConfig, TieredRetrievalConfig, - TrajectoryConfig, TreeConfig, TypedPagesConfig, TypedPagesEnforcement, VectorBackend, + SidequestConfig, SimilarityMethod, StoreRoutingConfig, StoreRoutingStrategy, TierConfig, + TieredRetrievalConfig, TrajectoryConfig, TrajectoryRiskAccumulatorConfig, + TrajectorySeverityMultipliers, TrajectorySignalWeights, TreeConfig, TypedPagesConfig, + TypedPagesEnforcement, VectorBackend, }; pub use metrics::MetricsConfig; pub use notifications::NotificationsConfig; diff --git a/crates/zeph-config/src/memory.rs b/crates/zeph-config/src/memory.rs index ce7d6551f..19cd12917 100644 --- a/crates/zeph-config/src/memory.rs +++ b/crates/zeph-config/src/memory.rs @@ -1012,6 +1012,23 @@ pub struct MemoryConfig { /// key facts, and promotes them to the semantic tier in `zeph_key_facts`. #[serde(default)] pub episodic_consolidation: EpisodicConsolidationConfig, + /// MAGE shadow memory trajectory risk accumulator (spec 004-16). + /// + /// Maintains a per-session rolling risk score fed by sanitizer audit signals. + /// When `shadow_memory.enabled = true`, tool execution is gated if cumulative + /// trajectory risk exceeds `risk_threshold`. When `false`, all code paths are + /// zero-cost no-ops. + /// + /// # Example (TOML) + /// + /// ```toml + /// [memory.shadow_memory] + /// enabled = true + /// risk_threshold = 0.75 + /// risk_halflife_turns = 10 + /// ``` + #[serde(default)] + pub shadow_memory: TrajectoryRiskAccumulatorConfig, } // ── MemFlow tiered retrieval config (issue #3712) ────────────────────────────── @@ -2199,6 +2216,174 @@ pub struct GraphConfig { /// Default: `false` (preserves existing A* behaviour). #[serde(default)] pub query_sensitive_cost: bool, + + /// Implicit conflict detection for SYNAPSE recall (spec 004-17, STALE/CUPMem). + /// + /// When enabled, write-time fuzzy predicate matching detects implicit conflicts + /// between graph edges and annotates SYNAPSE recall results accordingly. + #[serde(default)] + pub implicit_conflict: ImplicitConflictConfig, +} + +/// Similarity method for implicit conflict detection. +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Default, + serde::Serialize, + serde::Deserialize, + schemars::JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum SimilarityMethod { + /// Normalized Levenshtein edit distance. + #[default] + Levenshtein, + /// Cosine similarity over pre-computed predicate embeddings. + Embedding, + /// Either method triggers detection. + Both, +} + +/// Resolution strategy when an implicit conflict is detected. +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Default, + serde::Serialize, + serde::Deserialize, + schemars::JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum ConflictResolutionStrategy { + /// Mark the pair as a candidate but do not supersede either edge. + #[default] + FlagOnly, + /// Supersede the older edge via APEX-MEM `insert_or_supersede`. + Recency, + /// Supersede the lower-confidence edge. + Confidence, + /// Delegate resolution to an LLM provider; fall back to `flag_only` on timeout. + Llm, +} + +/// Configuration for the optional background consolidation daemon (spec 004-17). +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(default)] +pub struct ConsolidationDaemonConfig { + /// Enable the background consolidation daemon. + pub enabled: bool, + /// How often the daemon runs, in seconds. Default: 7200 (2 hours). + #[serde(default = "default_ic_daemon_interval_secs")] + pub interval_seconds: u64, + /// Maximum number of candidates processed per daemon run. Default: 100. + #[serde(default = "default_ic_daemon_batch_size")] + pub batch_size: usize, +} + +impl Default for ConsolidationDaemonConfig { + fn default() -> Self { + Self { + enabled: false, + interval_seconds: default_ic_daemon_interval_secs(), + batch_size: default_ic_daemon_batch_size(), + } + } +} + +fn default_ic_daemon_interval_secs() -> u64 { + 7200 +} + +fn default_ic_daemon_batch_size() -> usize { + 100 +} + +/// Configuration for implicit conflict detection (spec 004-17, STALE/CUPMem). +/// +/// Controls write-time fuzzy predicate matching and SYNAPSE recall annotation. +/// All detection is gated behind `enabled = false` by default — no overhead when disabled. +/// +/// TOML path: `[memory.graph.implicit_conflict]` +/// +/// # Examples +/// +/// ```toml +/// [memory.graph.implicit_conflict] +/// enabled = true +/// similarity_method = "levenshtein" +/// conflict_similarity_threshold = 0.80 +/// resolution_strategy = "flag_only" +/// candidate_ttl_days = 30 +/// propagation_depth = 2 +/// ``` +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(default)] +pub struct ImplicitConflictConfig { + /// Enable implicit conflict detection. Default: `false`. + pub enabled: bool, + /// Similarity method used to detect candidate pairs. + #[serde(default)] + pub similarity_method: SimilarityMethod, + /// Minimum similarity score to flag a pair as a conflict candidate. Default: 0.80. + #[serde(default = "default_ic_similarity_threshold")] + pub conflict_similarity_threshold: f64, + /// How to resolve detected conflicts. Default: `flag_only`. + #[serde(default)] + pub resolution_strategy: ConflictResolutionStrategy, + /// Provider name (from `[[llm.providers]]`) for LLM-mediated resolution. + #[serde(default)] + pub implicit_conflict_provider: crate::providers::ProviderName, + /// LLM resolution timeout in milliseconds. Default: 800. + #[serde(default = "default_ic_llm_timeout_ms")] + pub conflict_llm_timeout_ms: u64, + /// Days before an unresolved candidate entry expires. Default: 30. + #[serde(default = "default_ic_candidate_ttl_days")] + pub candidate_ttl_days: u32, + /// SYNAPSE propagation depth for surfacing superseding facts. Default: 2. + #[serde(default = "default_ic_propagation_depth")] + pub propagation_depth: u32, + /// Background consolidation daemon configuration. + #[serde(default)] + pub consolidation_daemon: ConsolidationDaemonConfig, +} + +impl Default for ImplicitConflictConfig { + fn default() -> Self { + Self { + enabled: false, + similarity_method: SimilarityMethod::default(), + conflict_similarity_threshold: default_ic_similarity_threshold(), + resolution_strategy: ConflictResolutionStrategy::default(), + implicit_conflict_provider: crate::providers::ProviderName::default(), + conflict_llm_timeout_ms: default_ic_llm_timeout_ms(), + candidate_ttl_days: default_ic_candidate_ttl_days(), + propagation_depth: default_ic_propagation_depth(), + consolidation_daemon: ConsolidationDaemonConfig::default(), + } + } +} + +fn default_ic_similarity_threshold() -> f64 { + 0.80 +} + +fn default_ic_llm_timeout_ms() -> u64 { + 800 +} + +fn default_ic_candidate_ttl_days() -> u32 { + 30 +} + +fn default_ic_propagation_depth() -> u32 { + 2 } fn default_graph_pool_size() -> u32 { @@ -2366,6 +2551,7 @@ impl Default for GraphConfig { apex_mem: ApexMemConfig::default(), llm_timeout_secs: default_graph_llm_timeout_secs(), query_sensitive_cost: false, + implicit_conflict: ImplicitConflictConfig::default(), } } } @@ -3614,6 +3800,212 @@ impl Default for RetrievalFailuresConfig { } } +// ── TrajectoryRiskAccumulator config (spec 004-16) ───────────────────────────── + +fn validate_tra_nonneg_weight<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let value = ::deserialize(deserializer)?; + if value.is_nan() || value.is_infinite() || value < 0.0 { + return Err(serde::de::Error::custom( + "signal weight and severity multiplier values must be finite and non-negative", + )); + } + Ok(value) +} + +/// Per-signal-type base weights for the trajectory risk accumulator. +/// +/// Each weight is in `(0.0, 1.0]` and is multiplied by the severity multiplier +/// before being added to `trajectory_risk`. +/// +/// # Example (TOML) +/// +/// ```toml +/// [memory.shadow_memory.signal_weights] +/// prompt_injection = 0.6 +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrajectorySignalWeights { + /// Weight for `PolicyViolation` signals. Default: `0.30`. + #[serde( + default = "default_sw_policy_violation", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub policy_violation: f64, + /// Weight for `PromptInjectionPattern` signals. Default: `0.50`. + #[serde( + default = "default_sw_prompt_injection", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub prompt_injection: f64, + /// Weight for `ToolChainAnomaly` signals. Default: `0.25`. + #[serde( + default = "default_sw_tool_chain_anomaly", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub tool_chain_anomaly: f64, + /// Weight for `ConfidenceDrop` signals. Default: `0.15`. + #[serde( + default = "default_sw_confidence_drop", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub confidence_drop: f64, +} + +fn default_sw_policy_violation() -> f64 { + 0.30 +} +fn default_sw_prompt_injection() -> f64 { + 0.50 +} +fn default_sw_tool_chain_anomaly() -> f64 { + 0.25 +} +fn default_sw_confidence_drop() -> f64 { + 0.15 +} + +impl Default for TrajectorySignalWeights { + fn default() -> Self { + Self { + policy_violation: default_sw_policy_violation(), + prompt_injection: default_sw_prompt_injection(), + tool_chain_anomaly: default_sw_tool_chain_anomaly(), + confidence_drop: default_sw_confidence_drop(), + } + } +} + +/// Per-severity multipliers applied on top of signal base weights. +/// +/// # Example (TOML) +/// +/// ```toml +/// [memory.shadow_memory.severity_multipliers] +/// high = 3.0 +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrajectorySeverityMultipliers { + /// Multiplier for low-severity signals. Default: `0.5`. + #[serde( + default = "default_sev_low", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub low: f64, + /// Multiplier for medium-severity signals. Default: `1.0`. + #[serde( + default = "default_sev_medium", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub medium: f64, + /// Multiplier for high-severity signals. Default: `2.0`. + #[serde( + default = "default_sev_high", + deserialize_with = "validate_tra_nonneg_weight" + )] + pub high: f64, +} + +fn default_sev_low() -> f64 { + 0.5 +} +fn default_sev_medium() -> f64 { + 1.0 +} +fn default_sev_high() -> f64 { + 2.0 +} + +impl Default for TrajectorySeverityMultipliers { + fn default() -> Self { + Self { + low: default_sev_low(), + medium: default_sev_medium(), + high: default_sev_high(), + } + } +} + +/// Configuration for the MAGE trajectory risk accumulator (spec 004-16). +/// +/// Controls how per-turn safety signals accumulate into a session-level risk score +/// and when tool execution is blocked or escalated. +/// +/// # Example (TOML) +/// +/// ```toml +/// [memory.shadow_memory] +/// enabled = true +/// risk_threshold = 0.75 +/// escalation_threshold = 0.50 +/// risk_halflife_turns = 10 +/// signal_history_cap = 200 +/// tui_show_risk_gauge = true +/// reset_on_compaction = false +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrajectoryRiskAccumulatorConfig { + /// Enable shadow memory. When `false`, `TrajectoryRiskAccumulator` is a zero-cost noop. + #[serde(default)] + pub enabled: bool, + /// Block tool execution when `trajectory_risk >= risk_threshold`. Default: `0.75`. + #[serde(default = "default_tra_risk_threshold")] + pub risk_threshold: f64, + /// Escalate to human confirmation when risk is in `[escalation_threshold, risk_threshold)`. + /// Default: `0.50`. + #[serde(default = "default_tra_escalation_threshold")] + pub escalation_threshold: f64, + /// Number of turns after which accumulated risk halves (exponential decay). Default: `10`. + #[serde(default = "default_tra_risk_halflife_turns")] + pub risk_halflife_turns: u32, + /// Maximum number of signal events kept in the ring buffer. Default: `200`. + #[serde(default = "default_tra_signal_history_cap")] + pub signal_history_cap: usize, + /// Show a risk gauge in the TUI security panel when the TUI is enabled. Default: `true`. + #[serde(default = "default_true")] + pub tui_show_risk_gauge: bool, + /// Reset `trajectory_risk` to zero when a context compaction occurs. Default: `false`. + #[serde(default)] + pub reset_on_compaction: bool, + /// Per-signal-type base weights. + #[serde(default)] + pub signal_weights: TrajectorySignalWeights, + /// Per-severity multipliers applied on top of signal weights. + #[serde(default)] + pub severity_multipliers: TrajectorySeverityMultipliers, +} + +fn default_tra_risk_threshold() -> f64 { + 0.75 +} +fn default_tra_escalation_threshold() -> f64 { + 0.50 +} +fn default_tra_risk_halflife_turns() -> u32 { + 10 +} +fn default_tra_signal_history_cap() -> usize { + 200 +} + +impl Default for TrajectoryRiskAccumulatorConfig { + fn default() -> Self { + Self { + enabled: false, + risk_threshold: default_tra_risk_threshold(), + escalation_threshold: default_tra_escalation_threshold(), + risk_halflife_turns: default_tra_risk_halflife_turns(), + signal_history_cap: default_tra_signal_history_cap(), + tui_show_risk_gauge: true, + reset_on_compaction: false, + signal_weights: TrajectorySignalWeights::default(), + severity_multipliers: TrajectorySeverityMultipliers::default(), + } + } +} + #[cfg(test)] mod memcot_config_tests { use super::*; diff --git a/crates/zeph-config/src/root.rs b/crates/zeph-config/src/root.rs index 14ba918dd..ebd4c3328 100644 --- a/crates/zeph-config/src/root.rs +++ b/crates/zeph-config/src/root.rs @@ -286,6 +286,7 @@ impl Default for Config { optical_forgetting: crate::memory::OpticalForgettingConfig::default(), em_graph: crate::memory::EmGraphConfig::default(), episodic_consolidation: crate::memory::EpisodicConsolidationConfig::default(), + shadow_memory: crate::memory::TrajectoryRiskAccumulatorConfig::default(), }, telegram: None, discord: None, diff --git a/crates/zeph-db/migrations/sqlite/090_implicit_conflict_candidates.sql b/crates/zeph-db/migrations/sqlite/090_implicit_conflict_candidates.sql new file mode 100644 index 000000000..43512f8c2 --- /dev/null +++ b/crates/zeph-db/migrations/sqlite/090_implicit_conflict_candidates.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS implicit_conflict_candidates ( + id INTEGER PRIMARY KEY, + edge_a_id INTEGER NOT NULL REFERENCES graph_edges(id), + edge_b_id INTEGER NOT NULL REFERENCES graph_edges(id), + similarity REAL NOT NULL, + method TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + resolution TEXT, + created_at INTEGER NOT NULL, + resolved_at INTEGER, + expires_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_icc_edge_a ON implicit_conflict_candidates(edge_a_id); +CREATE INDEX IF NOT EXISTS idx_icc_edge_b ON implicit_conflict_candidates(edge_b_id); +CREATE INDEX IF NOT EXISTS idx_icc_status ON implicit_conflict_candidates(status, expires_at); diff --git a/crates/zeph-memory/src/graph/activation.rs b/crates/zeph-memory/src/graph/activation.rs index 51c6e9155..836e54205 100644 --- a/crates/zeph-memory/src/graph/activation.rs +++ b/crates/zeph-memory/src/graph/activation.rs @@ -42,6 +42,10 @@ pub struct ActivatedFact { pub edge: Edge, /// Activation score of the source or target entity at time of traversal. pub activation_score: f32, + /// `true` when this edge has a pending implicit conflict candidate (spec 004-17). + pub is_implicit_conflict: bool, + /// ID of the `implicit_conflict_candidates` row, if any. + pub conflict_candidate_id: Option, } pub use zeph_common::memory::SpreadingActivationParams; @@ -678,6 +682,8 @@ impl SpreadingActivation { activated_facts.push(ActivatedFact { edge: edge.clone(), activation_score, + is_implicit_conflict: false, + conflict_candidate_id: None, }); } } diff --git a/crates/zeph-memory/src/graph/implicit_conflict.rs b/crates/zeph-memory/src/graph/implicit_conflict.rs new file mode 100644 index 000000000..4cdde0bfb --- /dev/null +++ b/crates/zeph-memory/src/graph/implicit_conflict.rs @@ -0,0 +1,505 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Implicit conflict detection for SYNAPSE recall (spec 004-17, STALE/CUPMem). +//! +//! [`ImplicitConflictDetector`] runs at write time to detect predicate pairs +//! that are semantically similar but not identical, staging them in +//! `implicit_conflict_candidates` for later resolution or annotation. +//! +//! SYNAPSE recall uses [`annotate_conflicts`] to mark retrieved [`ActivatedFact`]s +//! that have pending conflict candidates. + +// SQLite limits bind parameters to 999. Each ID is bound twice (two IN clauses), +// so process in chunks of at most 499. +const MAX_IDS_PER_QUERY: usize = 499; + +use std::time::{SystemTime, UNIX_EPOCH}; + +use zeph_config::ImplicitConflictConfig; +use zeph_db::DbTransaction; + +use crate::error::MemoryError; +use crate::graph::activation::ActivatedFact; + +/// A candidate conflict pair detected at write time. +#[derive(Debug, Clone)] +pub struct ConflictCandidate { + /// ID of the newly inserted edge. + pub edge_a_id: i64, + /// ID of the existing edge with a similar predicate. + pub edge_b_id: i64, + /// Similarity score in `[0.0, 1.0]`. + pub similarity: f64, + /// The similarity method that produced this candidate. + pub method: String, +} + +/// Write-time implicit conflict detector. +/// +/// Compares a new edge's predicate against existing active edges on the same +/// source entity using the configured similarity method and threshold. +/// +/// # Examples +/// +/// ```rust,no_run +/// use zeph_config::ImplicitConflictConfig; +/// use zeph_memory::graph::implicit_conflict::ImplicitConflictDetector; +/// +/// let config = ImplicitConflictConfig { enabled: true, ..Default::default() }; +/// let detector = ImplicitConflictDetector::new(config); +/// let candidates = detector.detect_candidates(42, "employ", &[(1, "employs")], false); +/// assert!(!candidates.is_empty()); +/// ``` +pub struct ImplicitConflictDetector { + config: ImplicitConflictConfig, +} + +impl ImplicitConflictDetector { + /// Create a new detector with the given configuration. + #[must_use] + pub fn new(config: ImplicitConflictConfig) -> Self { + Self { config } + } + + /// Detect implicit conflict candidates for a new predicate against existing ones. + /// + /// Returns pairs where normalized Levenshtein similarity is + /// `>= conflict_similarity_threshold` **and** the predicates differ (identical + /// predicates are already handled by APEX-MEM explicit supersession). + /// + /// Returns an empty vec when `enabled = false` or when the cardinality flag + /// `is_cardinality_n` is set (FR-011). + /// + /// # Arguments + /// + /// * `new_edge_id` — database ID of the newly inserted edge + /// * `new_predicate` — canonical relation of the new edge + /// * `existing` — slice of `(edge_id, canonical_relation)` for all other active + /// edges on the same source entity + /// * `is_cardinality_n` — set to `true` for multi-valued predicates; skips detection + #[must_use] + pub fn detect_candidates( + &self, + new_edge_id: i64, + new_predicate: &str, + existing: &[(i64, &str)], + is_cardinality_n: bool, + ) -> Vec { + let _span = tracing::info_span!( + "memory.graph.implicit_conflict.detect", + predicate = new_predicate, + ) + .entered(); + + if !self.config.enabled || is_cardinality_n || existing.is_empty() { + return Vec::new(); + } + + let threshold = self.config.conflict_similarity_threshold; + let mut candidates = Vec::new(); + + for &(edge_id, predicate) in existing { + if predicate == new_predicate { + // Identical predicate: handled by APEX-MEM explicit supersession. + continue; + } + let sim = Self::normalized_levenshtein(new_predicate, predicate); + if sim >= threshold { + candidates.push(ConflictCandidate { + edge_a_id: new_edge_id, + edge_b_id: edge_id, + similarity: sim, + method: "levenshtein".to_owned(), + }); + } + } + + candidates + } + + /// Persist conflict candidates into `implicit_conflict_candidates`. + /// + /// Each candidate is inserted with `status = 'pending'` and an expiry of + /// `now + ttl_days * 86400` seconds. + /// + /// # Errors + /// + /// Returns a [`MemoryError`] on database write failure. + pub async fn stage_candidates( + &self, + candidates: &[ConflictCandidate], + tx: &mut DbTransaction<'_>, + ttl_days: u32, + ) -> Result<(), MemoryError> { + if candidates.is_empty() { + return Ok(()); + } + + let _span = tracing::info_span!( + "memory.graph.implicit_conflict.stage", + count = candidates.len(), + ) + .entered(); + + #[allow(clippy::cast_possible_wrap)] + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + let ttl_secs = i64::from(ttl_days) * 86_400; + let expires_at = now + ttl_secs; + + for c in candidates { + sqlx::query( + "INSERT INTO implicit_conflict_candidates + (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at) + VALUES (?, ?, ?, ?, 'pending', ?, ?)", + ) + .bind(c.edge_a_id) + .bind(c.edge_b_id) + .bind(c.similarity) + .bind(&c.method) + .bind(now) + .bind(expires_at) + .execute(&mut **tx) + .await + .map_err(MemoryError::from)?; + } + + Ok(()) + } + + /// Compute normalized Levenshtein similarity between two strings. + /// + /// Returns a value in `[0.0, 1.0]` where `1.0` means identical. + /// Returns `1.0` if both strings are empty, `0.0` if only one is empty. + #[must_use] + pub fn normalized_levenshtein(a: &str, b: &str) -> f64 { + if a == b { + return 1.0; + } + let len_a = a.chars().count(); + let len_b = b.chars().count(); + if len_a == 0 && len_b == 0 { + return 1.0; + } + if len_a == 0 || len_b == 0 { + return 0.0; + } + let dist = levenshtein_distance(a, b); + let max_len = len_a.max(len_b); + #[allow(clippy::cast_precision_loss)] + let result = 1.0 - (dist as f64 / max_len as f64); + result + } + + /// Returns `true` when detection is enabled. + #[must_use] + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + /// Returns the configured TTL for conflict candidates, in days. + #[must_use] + pub fn candidate_ttl_days(&self) -> u32 { + self.config.candidate_ttl_days + } +} + +/// Annotate retrieved [`ActivatedFact`]s with pending implicit conflict metadata. +/// +/// Queries `implicit_conflict_candidates` for all edge IDs in `facts` and sets +/// `is_implicit_conflict = true` and `conflict_candidate_id` on matches. +/// +/// # Errors +/// +/// Returns a [`MemoryError`] on database query failure. +pub async fn annotate_conflicts( + facts: &mut [ActivatedFact], + tx: &mut DbTransaction<'_>, +) -> Result<(), MemoryError> { + if facts.is_empty() { + return Ok(()); + } + + let _span = tracing::info_span!( + "memory.graph.implicit_conflict.annotate", + facts = facts.len(), + ) + .entered(); + + let edge_ids: Vec = facts.iter().map(|f| f.edge.id).collect(); + + let mut edge_to_candidate: std::collections::HashMap = + std::collections::HashMap::new(); + + for chunk in edge_ids.chunks(MAX_IDS_PER_QUERY) { + let placeholders: String = chunk.iter().map(|_| "?").collect::>().join(", "); + let query_str = format!( + "SELECT id, edge_a_id, edge_b_id + FROM implicit_conflict_candidates + WHERE status = 'pending' + AND (edge_a_id IN ({placeholders}) OR edge_b_id IN ({placeholders}))", + ); + + let mut q = sqlx::query(&query_str); + for id in chunk { + q = q.bind(id); + } + // Bind a second time for the second IN clause. + for id in chunk { + q = q.bind(id); + } + + let rows = q.fetch_all(&mut **tx).await.map_err(MemoryError::from)?; + + for row in rows { + use sqlx::Row as _; + let candidate_id: i64 = row.try_get("id").map_err(MemoryError::from)?; + let ea: i64 = row.try_get("edge_a_id").map_err(MemoryError::from)?; + let eb: i64 = row.try_get("edge_b_id").map_err(MemoryError::from)?; + edge_to_candidate.entry(ea).or_insert(candidate_id); + edge_to_candidate.entry(eb).or_insert(candidate_id); + } + } + + for fact in facts.iter_mut() { + if let Some(&cid) = edge_to_candidate.get(&fact.edge.id) { + fact.is_implicit_conflict = true; + fact.conflict_candidate_id = Some(cid); + } + } + + Ok(()) +} + +/// Hand-rolled Levenshtein edit distance (char-level). +fn levenshtein_distance(a: &str, b: &str) -> usize { + let a_chars: Vec = a.chars().collect(); + let b_chars: Vec = b.chars().collect(); + let m = a_chars.len(); + let n = b_chars.len(); + + let mut prev: Vec = (0..=n).collect(); + let mut curr = vec![0usize; n + 1]; + + for i in 1..=m { + curr[0] = i; + for j in 1..=n { + let cost = usize::from(a_chars[i - 1] != b_chars[j - 1]); + curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost); + } + std::mem::swap(&mut prev, &mut curr); + } + + prev[n] +} + +#[cfg(test)] +mod tests { + use super::*; + use zeph_config::ImplicitConflictConfig; + + fn detector(enabled: bool) -> ImplicitConflictDetector { + ImplicitConflictDetector::new(ImplicitConflictConfig { + enabled, + conflict_similarity_threshold: 0.80, + ..Default::default() + }) + } + + #[test] + fn normalized_levenshtein_identical() { + assert!( + (ImplicitConflictDetector::normalized_levenshtein("uses", "uses") - 1.0).abs() + < f64::EPSILON + ); + } + + #[test] + fn normalized_levenshtein_empty_both() { + assert!( + (ImplicitConflictDetector::normalized_levenshtein("", "") - 1.0).abs() < f64::EPSILON + ); + } + + #[test] + fn normalized_levenshtein_empty_one() { + assert!( + (ImplicitConflictDetector::normalized_levenshtein("", "abc") - 0.0).abs() + < f64::EPSILON + ); + assert!( + (ImplicitConflictDetector::normalized_levenshtein("abc", "") - 0.0).abs() + < f64::EPSILON + ); + } + + #[test] + fn normalized_levenshtein_completely_different() { + let sim = ImplicitConflictDetector::normalized_levenshtein("uses", "xyz_unrelated_value"); + assert!(sim < 0.5, "expected low similarity, got {sim}"); + } + + #[test] + fn detect_candidates_above_threshold_returns_candidate() { + let d = detector(true); + // "employ" vs "employs": distance = 1, max = 7, sim ≈ 0.857 — above 0.80 + let candidates = d.detect_candidates(42, "employ", &[(7, "employs")], false); + assert_eq!(candidates.len(), 1, "expected one candidate"); + assert_eq!(candidates[0].edge_a_id, 42); + assert_eq!(candidates[0].edge_b_id, 7); + assert!(candidates[0].similarity >= 0.80); + } + + #[test] + fn detect_candidates_below_threshold_returns_empty() { + let d = detector(true); + let candidates = d.detect_candidates(1, "uses", &[(2, "xyz_unrelated")], false); + assert!( + candidates.is_empty(), + "expected no candidates below threshold" + ); + } + + #[test] + fn detect_candidates_identical_predicate_skipped() { + let d = detector(true); + // Identical predicates are handled by APEX-MEM; detector must skip them. + let candidates = d.detect_candidates(1, "uses", &[(2, "uses")], false); + assert!( + candidates.is_empty(), + "identical predicates must not create candidates" + ); + } + + #[test] + fn detect_candidates_disabled_returns_empty() { + let d = detector(false); + // Even with high-similarity predicates, disabled detector returns nothing. + let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], false); + assert!(candidates.is_empty(), "disabled detector must return empty"); + } + + #[test] + fn detect_candidates_cardinality_n_skipped() { + let d = detector(true); + let candidates = d.detect_candidates(1, "employ", &[(2, "employs")], true); + assert!( + candidates.is_empty(), + "cardinality-n predicate must be skipped" + ); + } + + // ── annotate_conflicts DB tests ─────────────────────────────────────────── + + async fn setup_test_db() -> crate::store::SqliteStore { + crate::store::SqliteStore::new(":memory:").await.unwrap() + } + + fn stub_fact(edge_id: i64) -> ActivatedFact { + use crate::graph::types::{Edge, EdgeType}; + ActivatedFact { + edge: Edge { + id: edge_id, + source_entity_id: 1, + target_entity_id: 2, + relation: "test".to_owned(), + canonical_relation: "test".to_owned(), + fact: "test fact".to_owned(), + confidence: 1.0, + valid_from: "2026-01-01".to_owned(), + valid_to: None, + created_at: "2026-01-01".to_owned(), + expired_at: None, + source_message_id: None, + qdrant_point_id: None, + edge_type: EdgeType::Semantic, + retrieval_count: 0, + last_retrieved_at: None, + superseded_by: None, + supersedes: None, + weight: 1.0, + }, + activation_score: 1.0, + is_implicit_conflict: false, + conflict_candidate_id: None, + } + } + + #[tokio::test] + async fn annotate_conflicts_marks_flagged_edges() { + let db = setup_test_db().await; + let pool = db.pool(); + + // Insert a pending candidate pair (edge_a_id=1, edge_b_id=2). + // Use raw SQL since these are not real graph_edges (no FK enforcement with PRAGMA). + sqlx::query( + "PRAGMA foreign_keys = OFF; + INSERT INTO implicit_conflict_candidates + (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at) + VALUES (1, 2, 0.90, 'levenshtein', 'pending', 1000000, 9999999)", + ) + .execute(pool) + .await + .unwrap(); + + let mut facts = vec![stub_fact(1), stub_fact(3)]; + + let mut tx = zeph_db::begin(pool).await.unwrap(); + annotate_conflicts(&mut facts, &mut tx).await.unwrap(); + tx.commit().await.unwrap(); + + assert!(facts[0].is_implicit_conflict, "edge 1 must be flagged"); + assert!(facts[0].conflict_candidate_id.is_some()); + assert!(!facts[1].is_implicit_conflict, "edge 3 must not be flagged"); + assert!(facts[1].conflict_candidate_id.is_none()); + } + + #[tokio::test] + async fn annotate_conflicts_empty_candidates_no_annotation() { + let db = setup_test_db().await; + let pool = db.pool(); + + let mut facts = vec![stub_fact(10), stub_fact(20)]; + + let mut tx = zeph_db::begin(pool).await.unwrap(); + annotate_conflicts(&mut facts, &mut tx).await.unwrap(); + tx.commit().await.unwrap(); + + assert!( + !facts[0].is_implicit_conflict, + "no candidates → no annotation" + ); + assert!(facts[1].conflict_candidate_id.is_none()); + } + + #[tokio::test] + async fn annotate_conflicts_edge_b_side_also_flagged() { + let db = setup_test_db().await; + let pool = db.pool(); + + // Insert candidate with edge_a=5, edge_b=7. Pass edge 7 in facts. + sqlx::query( + "PRAGMA foreign_keys = OFF; + INSERT INTO implicit_conflict_candidates + (edge_a_id, edge_b_id, similarity, method, status, created_at, expires_at) + VALUES (5, 7, 0.85, 'levenshtein', 'pending', 1000000, 9999999)", + ) + .execute(pool) + .await + .unwrap(); + + let mut facts = vec![stub_fact(7)]; + + let mut tx = zeph_db::begin(pool).await.unwrap(); + annotate_conflicts(&mut facts, &mut tx).await.unwrap(); + tx.commit().await.unwrap(); + + assert!( + facts[0].is_implicit_conflict, + "edge on edge_b side must also be flagged" + ); + } +} diff --git a/crates/zeph-memory/src/graph/mod.rs b/crates/zeph-memory/src/graph/mod.rs index a416fe574..3c23ec04f 100644 --- a/crates/zeph-memory/src/graph/mod.rs +++ b/crates/zeph-memory/src/graph/mod.rs @@ -12,6 +12,7 @@ pub mod conflict; pub mod entity_lock; pub mod experience; pub mod extractor; +pub mod implicit_conflict; pub mod ontology; pub mod resolver; pub mod retrieval; diff --git a/crates/zeph-memory/src/graph/ontology.rs b/crates/zeph-memory/src/graph/ontology.rs index 1211ca3bd..fda756f7e 100644 --- a/crates/zeph-memory/src/graph/ontology.rs +++ b/crates/zeph-memory/src/graph/ontology.rs @@ -79,6 +79,9 @@ impl OntologyState { } } +/// Type alias kept for compatibility with code that refers to `Ontology` directly. +pub type Ontology = OntologyTable; + /// The loaded APEX-MEM ontology table plus bounded LRU cache for resolved mappings. /// /// Designed for read-heavy workloads: the static table and cardinality map are behind diff --git a/crates/zeph-memory/src/graph/store/mod.rs b/crates/zeph-memory/src/graph/store/mod.rs index 386798ecc..b27e184f5 100644 --- a/crates/zeph-memory/src/graph/store/mod.rs +++ b/crates/zeph-memory/src/graph/store/mod.rs @@ -2504,6 +2504,131 @@ impl GraphStore { .await } + /// Insert or supersede an edge, then run implicit conflict detection if a detector is provided. + /// + /// Calls [`Self::insert_or_supersede_with_metrics`] first, then — when `detector` is `Some` and + /// the ontology confirms the predicate is cardinality-1 — queries active edges on the same source + /// entity, detects conflict candidates, and stages them in `implicit_conflict_candidates`. + /// + /// Conflict detection failures are logged and swallowed so they never block the write path. + /// + /// # Errors + /// + /// Propagates errors from [`Self::insert_or_supersede_with_metrics`]. + #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site + pub async fn insert_or_supersede_with_conflict_detection( + &self, + source_entity_id: i64, + target_entity_id: i64, + relation: &str, + canonical_relation: &str, + fact: &str, + confidence: f32, + episode_id: Option, + edge_type: EdgeType, + set_supersedes: bool, + metrics: Option<&ApexMetrics>, + detector: Option<&crate::graph::implicit_conflict::ImplicitConflictDetector>, + ontology: Option<&crate::graph::ontology::OntologyTable>, + ) -> Result { + let new_id = self + .insert_or_supersede_with_metrics( + source_entity_id, + target_entity_id, + relation, + canonical_relation, + fact, + confidence, + episode_id, + edge_type, + set_supersedes, + metrics, + ) + .await?; + + if let (Some(det), Some(onto)) = (detector, ontology) + && det.is_enabled() + { + let is_cardinality_n = + onto.cardinality(canonical_relation) == crate::graph::ontology::Cardinality::Many; + + // Query existing active edges for the same source entity (excluding the new one). + let existing_raw = sqlx::query( + "SELECT id, canonical_relation FROM graph_edges + WHERE source_entity_id = ? + AND id != ? + AND expired_at IS NULL", + ) + .bind(source_entity_id) + .bind(new_id) + .fetch_all(&self.pool) + .await; + + match existing_raw { + Ok(rows) => { + use sqlx::Row as _; + let existing: Vec<(i64, String)> = rows + .into_iter() + .map(|r| { + let id: i64 = r.try_get("id").unwrap_or(0); + let rel: String = r.try_get("canonical_relation").unwrap_or_default(); + (id, rel) + }) + .collect(); + let existing_refs: Vec<(i64, &str)> = existing + .iter() + .map(|(id, rel)| (*id, rel.as_str())) + .collect(); + + let candidates = det.detect_candidates( + new_id, + canonical_relation, + &existing_refs, + is_cardinality_n, + ); + + if !candidates.is_empty() { + let ttl = det.candidate_ttl_days(); + match zeph_db::begin(&self.pool).await { + Ok(mut tx) => { + match det.stage_candidates(&candidates, &mut tx, ttl).await { + Ok(()) => { + if let Err(e) = tx.commit().await { + tracing::warn!( + error = %e, + "implicit conflict tx commit failed (non-fatal)" + ); + } + } + Err(e) => { + tracing::warn!( + error = %e, + "implicit conflict staging failed (non-fatal)" + ); + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + "implicit conflict: tx begin failed (non-fatal)" + ); + } + } + } + } + Err(e) => { + tracing::warn!( + error = %e, + "implicit conflict: failed to query existing edges (non-fatal)" + ); + } + } + } + + Ok(new_id) + } + /// Walk the `supersedes` chain from `head_id` using a single recursive CTE and return its depth. /// /// Returns `0` when the edge has no `supersedes` pointer (it is the root). diff --git a/crates/zeph-memory/src/lib.rs b/crates/zeph-memory/src/lib.rs index bd0145741..ed2e0be69 100644 --- a/crates/zeph-memory/src/lib.rs +++ b/crates/zeph-memory/src/lib.rs @@ -97,6 +97,7 @@ pub mod quality_gate; pub mod response_cache; pub mod router; pub mod semantic; +pub mod shadow; pub mod snapshot; pub mod store; #[cfg(any(test, feature = "testing"))] diff --git a/crates/zeph-memory/src/shadow/mod.rs b/crates/zeph-memory/src/shadow/mod.rs new file mode 100644 index 000000000..db00f3fee --- /dev/null +++ b/crates/zeph-memory/src/shadow/mod.rs @@ -0,0 +1,407 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! MAGE shadow memory stream — trajectory-level risk accumulation (spec 004-16). +//! +//! [`TrajectoryRiskAccumulator`] maintains a per-session rolling risk score by ingesting +//! [`AuditSignalType`] events from `zeph-sanitizer`. The score decays exponentially between +//! turns and is used to gate tool execution when it exceeds a configured threshold. +//! +//! When `enabled = false` (default), every method is a zero-cost no-op — no allocations, +//! no computation. +//! +//! # Example +//! +//! ```rust +//! use zeph_memory::shadow::{TrajectoryRiskAccumulator, AuditSignalType, Severity}; +//! use zeph_config::TrajectoryRiskAccumulatorConfig; +//! +//! let mut acc = TrajectoryRiskAccumulator::new_noop(); +//! assert_eq!(acc.current_risk(), 0.0); +//! assert!(!acc.is_blocked()); +//! ``` + +use std::collections::VecDeque; + +use tracing::info_span; +use zeph_config::TrajectoryRiskAccumulatorConfig; + +/// Signal type for a safety event ingested by [`TrajectoryRiskAccumulator`]. +/// +/// Maps to the four signal classes defined in spec 004-16, FR-007. +/// Callers in `zeph-core` convert from `zeph_sanitizer::audit::AuditSignalType` to this type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum AuditSignalType { + /// A policy gate denied or flagged an operation. + PolicyViolation, + /// A prompt-injection pattern was detected in untrusted content. + PromptInjectionPattern, + /// An anomalous tool-call chain was observed. + ToolChainAnomaly, + /// LLM response confidence dropped significantly between turns. + ConfidenceDrop, +} + +/// Severity level for an [`AuditSignalType`] ingested by [`TrajectoryRiskAccumulator`]. +/// +/// Mapped to a numeric multiplier by `TrajectorySeverityMultipliers`: +/// `Low → 0.5`, `Medium → 1.0`, `High → 2.0` (defaults). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Severity { + /// Minor or likely-benign signal. + Low, + /// Moderate concern; warrants accumulation. + Medium, + /// Strong indicator; highest multiplier. + High, +} + +/// A recorded safety signal ingested during a specific turn. +#[derive(Debug, Clone)] +pub struct SignalEvent { + /// Turn index at which the signal was ingested. + pub turn_id: u32, + /// Category of the detected signal. + pub signal_type: AuditSignalType, + /// Severity of the detected signal. + pub severity: Severity, + /// Computed contribution: `base_weight × severity_multiplier`. + pub raw_score: f64, +} + +/// Per-session trajectory risk accumulator (MAGE spec 004-16). +/// +/// Maintains a rolling `trajectory_risk` score in `[0.0, 1.0]` that accumulates safety +/// signals with exponential temporal decay. Designed to detect multi-turn attacks that +/// evade per-turn controls. +/// +/// When constructed via [`new_noop`][`TrajectoryRiskAccumulator::new_noop`] or when +/// `config.enabled = false`, **all methods are zero-cost no-ops** — no allocations and +/// `current_risk()` always returns `0.0`. +pub struct TrajectoryRiskAccumulator { + /// `None` means noop mode — all operations are skipped. + config: Option, + /// Current accumulated risk score, clamped to `[0.0, 1.0]`. + trajectory_risk: f64, + /// Number of `advance_turn` calls since creation. + turn_count: u32, + /// Capped ring buffer of the most recent ingested signals. + signal_history: VecDeque, +} + +impl TrajectoryRiskAccumulator { + /// Construct an accumulator that operates as a zero-cost noop. + /// + /// Use when shadow memory is disabled or during testing scenarios that do not need + /// risk tracking. No heap allocation is performed. + #[must_use] + pub fn new_noop() -> Self { + Self { + config: None, + trajectory_risk: 0.0, + turn_count: 0, + signal_history: VecDeque::new(), + } + } + + /// Construct an accumulator from configuration. + /// + /// When `config.enabled = false`, delegates to [`new_noop`][Self::new_noop] — no + /// allocation. When enabled, pre-allocates the signal history ring buffer. + #[must_use] + pub fn new(config: TrajectoryRiskAccumulatorConfig) -> Self { + if !config.enabled { + return Self::new_noop(); + } + let cap = config.signal_history_cap; + Self { + config: Some(config), + trajectory_risk: 0.0, + turn_count: 0, + signal_history: VecDeque::with_capacity(cap.min(1024)), + } + } + + /// Advance the turn counter and apply exponential decay to the accumulated risk. + /// + /// Must be called **once per turn, before** [`ingest`][Self::ingest] is called for + /// that turn. Decay formula: `risk *= exp(-ln(2) / halflife_turns)`. + /// + /// No-op when disabled. + pub fn advance_turn(&mut self) { + let _span = info_span!("memory.shadow.advance_turn").entered(); + let Some(config) = &self.config else { return }; + self.turn_count = self.turn_count.saturating_add(1); + let halflife = if config.risk_halflife_turns == 0 { + tracing::warn!("risk_halflife_turns = 0 is invalid, clamping to 1"); + 1u32 + } else { + config.risk_halflife_turns + }; + let decay = (-std::f64::consts::LN_2 / f64::from(halflife)).exp(); + self.trajectory_risk *= decay; + } + + /// Ingest a safety signal and add its weighted contribution to `trajectory_risk`. + /// + /// The raw score is `base_weight(signal_type) × severity_multiplier(severity)`. + /// After addition, `trajectory_risk` is clamped to `[0.0, 1.0]`. The event is + /// appended to the signal history ring buffer; the oldest entry is evicted when + /// the buffer is full. + /// + /// No-op when disabled. + pub fn ingest(&mut self, signal_type: AuditSignalType, severity: Severity) { + let _span = info_span!("memory.shadow.ingest").entered(); + let Some(config) = &self.config else { return }; + + let base_weight = match signal_type { + AuditSignalType::PolicyViolation => config.signal_weights.policy_violation, + AuditSignalType::PromptInjectionPattern => config.signal_weights.prompt_injection, + AuditSignalType::ToolChainAnomaly => config.signal_weights.tool_chain_anomaly, + AuditSignalType::ConfidenceDrop => config.signal_weights.confidence_drop, + }; + let severity_mult = match severity { + Severity::Low => config.severity_multipliers.low, + Severity::Medium => config.severity_multipliers.medium, + Severity::High => config.severity_multipliers.high, + }; + let raw_score = base_weight * severity_mult; + + self.trajectory_risk = (self.trajectory_risk + raw_score).min(1.0); + + let cap = config.signal_history_cap; + if self.signal_history.len() >= cap { + self.signal_history.pop_front(); + } + self.signal_history.push_back(SignalEvent { + turn_id: self.turn_count, + signal_type, + severity, + raw_score, + }); + } + + /// Returns the current accumulated risk score in `[0.0, 1.0]`. + /// + /// Always returns `0.0` when disabled. + #[must_use] + pub fn current_risk(&self) -> f64 { + let _span = info_span!("memory.shadow.current_risk").entered(); + if self.config.is_none() { + return 0.0; + } + self.trajectory_risk + } + + /// Returns `true` when `trajectory_risk >= risk_threshold` and shadow memory is enabled. + /// + /// Always returns `false` when disabled. + #[must_use] + pub fn is_blocked(&self) -> bool { + let Some(config) = &self.config else { + return false; + }; + self.trajectory_risk >= config.risk_threshold + } + + /// Returns `true` when risk is in `[escalation_threshold, risk_threshold)`. + /// + /// Always returns `false` when disabled. + #[must_use] + pub fn should_escalate(&self) -> bool { + let Some(config) = &self.config else { + return false; + }; + self.trajectory_risk >= config.escalation_threshold + && self.trajectory_risk < config.risk_threshold + } + + /// Returns the top `n` signals by `raw_score` descending from recent history. + #[must_use] + pub fn top_signals(&self, n: usize) -> Vec<&SignalEvent> { + let mut signals: Vec<&SignalEvent> = self.signal_history.iter().collect(); + signals.sort_by(|a, b| { + b.raw_score + .partial_cmp(&a.raw_score) + .unwrap_or(std::cmp::Ordering::Equal) + }); + signals.truncate(n); + signals + } + + /// Resets `trajectory_risk` to zero and clears signal history. + /// + /// Called on context compaction when `reset_on_compaction = true`. No-op when disabled. + pub fn reset(&mut self) { + if self.config.is_none() { + return; + } + self.trajectory_risk = 0.0; + self.signal_history.clear(); + } + + /// Returns `true` when shadow memory is enabled (i.e., not in noop mode). + #[must_use] + pub fn is_enabled(&self) -> bool { + self.config.is_some() + } + + /// Returns the current turn count. + #[must_use] + pub fn turn_count(&self) -> u32 { + self.turn_count + } +} + +#[cfg(test)] +mod tests { + use super::*; + use zeph_config::{ + TrajectoryRiskAccumulatorConfig, TrajectorySeverityMultipliers, TrajectorySignalWeights, + }; + + fn enabled_config() -> TrajectoryRiskAccumulatorConfig { + TrajectoryRiskAccumulatorConfig { + enabled: true, + risk_threshold: 0.75, + escalation_threshold: 0.50, + risk_halflife_turns: 10, + signal_history_cap: 200, + tui_show_risk_gauge: true, + reset_on_compaction: false, + signal_weights: TrajectorySignalWeights::default(), + severity_multipliers: TrajectorySeverityMultipliers::default(), + } + } + + #[test] + fn new_noop_returns_zero_risk() { + let acc = TrajectoryRiskAccumulator::new_noop(); + assert_eq!(acc.current_risk(), 0.0); + assert!(!acc.is_blocked()); + assert!(!acc.is_enabled()); + } + + #[test] + fn single_signal_below_threshold_not_blocked() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + acc.advance_turn(); + // PolicyViolation medium = 0.30 * 1.0 = 0.30 < 0.75 + acc.ingest(AuditSignalType::PolicyViolation, Severity::Medium); + assert!(acc.current_risk() > 0.0); + assert!(acc.current_risk() < 0.75); + assert!(!acc.is_blocked()); + } + + #[test] + fn multi_turn_chain_accumulates_and_blocks() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + // PromptInjectionPattern high = 0.50 * 2.0 = 1.0 per signal + // After 2 signals (clamped to 1.0), should be blocked + for _ in 0..5 { + acc.advance_turn(); + acc.ingest(AuditSignalType::PromptInjectionPattern, Severity::High); + } + assert!(acc.is_blocked(), "risk={}", acc.current_risk()); + } + + #[test] + fn temporal_decay_reduces_score() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + acc.advance_turn(); + acc.ingest(AuditSignalType::PromptInjectionPattern, Severity::High); + let risk_after_signal = acc.current_risk(); + assert!(risk_after_signal > 0.0); + + // Advance 100 turns without new signals — risk should decay significantly + for _ in 0..100 { + acc.advance_turn(); + } + assert!( + acc.current_risk() < risk_after_signal / 2.0, + "expected significant decay, got {}", + acc.current_risk() + ); + } + + #[test] + fn risk_clamped_at_one() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + for _ in 0..20 { + acc.advance_turn(); + acc.ingest(AuditSignalType::PromptInjectionPattern, Severity::High); + } + assert!( + acc.current_risk() <= 1.0, + "trajectory_risk exceeded 1.0: {}", + acc.current_risk() + ); + } + + #[test] + fn advance_turn_before_ingest_applies_decay() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + // Seed some risk first + acc.advance_turn(); + acc.ingest(AuditSignalType::PolicyViolation, Severity::High); + let risk_t1 = acc.current_risk(); + + // Advance a turn (decay applied) before next ingest + acc.advance_turn(); + let risk_after_decay = acc.current_risk(); + + // After decay, risk should be strictly less than risk_t1 (no new signals yet) + assert!( + risk_after_decay < risk_t1, + "decay should reduce risk before new ingest: {} vs {}", + risk_after_decay, + risk_t1 + ); + + acc.ingest(AuditSignalType::PolicyViolation, Severity::High); + // After ingest, risk should be higher than the decayed value + assert!( + acc.current_risk() > risk_after_decay, + "ingest should increase risk: {} vs {}", + acc.current_risk(), + risk_after_decay + ); + } + + #[test] + fn decay_formula_matches_spec() { + // halflife=10, confidence_drop base_weight=0.15, medium severity=1.0 + // 5 turns: each turn calls advance_turn() then ingest(ConfidenceDrop, Medium) + // per-signal contribution = 0.15 * 1.0 = 0.15; sum over 5 turns < 1.0 so no clamping. + // After turn 5, the accumulator holds: + // risk = 0.15*d^0 + 0.15*d^1 + 0.15*d^2 + 0.15*d^3 + 0.15*d^4 + // where d = exp(-ln(2)/10), most recent signal (turn 5) has least decay (d^0). + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + for _ in 0..5 { + acc.advance_turn(); + acc.ingest(AuditSignalType::ConfidenceDrop, Severity::Medium); + } + let decay = (-std::f64::consts::LN_2 / 10.0_f64).exp(); + // sum_{k=0}^{4} 0.15 * decay^k (most recent turn = k=0, least decay applied) + let expected: f64 = (0..5).map(|k| 0.15_f64 * decay.powi(k)).sum(); + assert!( + expected < 1.0, + "test precondition: expected sum {expected} must be < 1.0 (no clamping)" + ); + assert!( + (acc.current_risk() - expected).abs() < 1e-9, + "expected {expected:.12}, got {:.12}", + acc.current_risk() + ); + } + + #[test] + fn fifty_clean_turns_zero_risk() { + let mut acc = TrajectoryRiskAccumulator::new(enabled_config()); + for _ in 0..50 { + acc.advance_turn(); + } + assert_eq!(acc.current_risk(), 0.0, "no signals → risk must stay 0.0"); + assert!(!acc.is_blocked()); + } +} diff --git a/crates/zeph-sanitizer/src/audit.rs b/crates/zeph-sanitizer/src/audit.rs new file mode 100644 index 000000000..72a1996eb --- /dev/null +++ b/crates/zeph-sanitizer/src/audit.rs @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Audit signal types emitted by sanitizer subsystems for trajectory-level accumulation. +//! +//! These types are consumed by `TrajectoryRiskAccumulator` in `zeph-memory` to maintain +//! a rolling per-session risk score without coupling the sanitizer to the memory crate. + +/// Signal type emitted by a sanitizer subsystem. +/// +/// Variants correspond to the four signal classes defined in spec 004-16, FR-007. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum AuditSignalType { + /// A policy gate denied or flagged an operation. + PolicyViolation, + /// A prompt-injection pattern was detected in untrusted content. + PromptInjectionPattern, + /// An anomalous tool-call chain was observed (e.g., rapid multi-tool escalation). + ToolChainAnomaly, + /// LLM response confidence dropped significantly between turns. + ConfidenceDrop, +} + +/// Severity level for an [`AuditSignalType`]. +/// +/// Mapped to a numeric multiplier by `TrajectorySeverityMultipliers`: +/// `Low → 0.5`, `Medium → 1.0`, `High → 2.0` (defaults). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Severity { + /// Minor or likely-benign signal. + Low, + /// Moderate concern; warrants accumulation. + Medium, + /// Strong indicator; highest multiplier. + High, +} + +/// A single audit event emitted by a sanitizer subsystem. +/// +/// Carry the minimum information needed by `TrajectoryRiskAccumulator::ingest`. +/// No heap allocation — both fields are `Copy`. +#[derive(Debug, Clone, Copy)] +pub struct AuditSignal { + /// Category of the detected signal. + pub signal_type: AuditSignalType, + /// Severity of the detected signal. + pub severity: Severity, +} + +impl AuditSignal { + /// Construct a new audit signal. + #[must_use] + pub const fn new(signal_type: AuditSignalType, severity: Severity) -> Self { + Self { + signal_type, + severity, + } + } +} diff --git a/crates/zeph-sanitizer/src/lib.rs b/crates/zeph-sanitizer/src/lib.rs index 894c890d1..eba47a188 100644 --- a/crates/zeph-sanitizer/src/lib.rs +++ b/crates/zeph-sanitizer/src/lib.rs @@ -59,6 +59,7 @@ //! `ContentSanitizer::detect_pii`]. Requires an attached classifier backend. //! See `ContentSanitizer::with_classifier`] and `ContentSanitizer::with_pii_detector`]. +pub mod audit; pub mod causal_ipi; pub mod exfiltration; pub mod guardrail; diff --git a/crates/zeph-tools/src/executor.rs b/crates/zeph-tools/src/executor.rs index b81a786e0..2cd2eb721 100644 --- a/crates/zeph-tools/src/executor.rs +++ b/crates/zeph-tools/src/executor.rs @@ -453,6 +453,18 @@ pub enum ToolError { /// Human-readable explanation from the LLM safety probe. reason: String, }, + + /// Tool call blocked by the MAGE `TrajectoryRiskAccumulator` (spec 004-16). + /// + /// Cumulative session risk exceeded `risk_threshold`. The agent loop receives the + /// score and the top contributing signals so it can explain the denial to the user. + #[error("tool call blocked: trajectory risk {score:.3} exceeds threshold")] + TrajectoryRiskExceeded { + /// Current `trajectory_risk` value at the time of the block. + score: f64, + /// Human-readable labels for the top contributing signals (up to 3). + top_signals: Vec, + }, } impl ToolError { @@ -475,7 +487,9 @@ impl ToolError { Self::Execution(io_err) => classify_io_error(io_err), Self::Shell { category, .. } => *category, Self::SnapshotFailed { .. } => ToolErrorCategory::PermanentFailure, - Self::OutOfScope { .. } | Self::SafetyDenied { .. } => ToolErrorCategory::PolicyBlocked, + Self::OutOfScope { .. } + | Self::SafetyDenied { .. } + | Self::TrajectoryRiskExceeded { .. } => ToolErrorCategory::PolicyBlocked, } } From 16f3679ecef0a649ad51f3071fa6270f21dc3d10 Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Mon, 18 May 2026 18:55:26 +0200 Subject: [PATCH 2/3] fix(memory): resolve clippy float comparison and format string warnings in tests --- crates/zeph-memory/src/shadow/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/zeph-memory/src/shadow/mod.rs b/crates/zeph-memory/src/shadow/mod.rs index db00f3fee..410478de1 100644 --- a/crates/zeph-memory/src/shadow/mod.rs +++ b/crates/zeph-memory/src/shadow/mod.rs @@ -277,7 +277,7 @@ mod tests { #[test] fn new_noop_returns_zero_risk() { let acc = TrajectoryRiskAccumulator::new_noop(); - assert_eq!(acc.current_risk(), 0.0); + assert!(acc.current_risk() < f64::EPSILON); assert!(!acc.is_blocked()); assert!(!acc.is_enabled()); } @@ -353,9 +353,7 @@ mod tests { // After decay, risk should be strictly less than risk_t1 (no new signals yet) assert!( risk_after_decay < risk_t1, - "decay should reduce risk before new ingest: {} vs {}", - risk_after_decay, - risk_t1 + "decay should reduce risk before new ingest: {risk_after_decay} vs {risk_t1}" ); acc.ingest(AuditSignalType::PolicyViolation, Severity::High); @@ -401,7 +399,7 @@ mod tests { for _ in 0..50 { acc.advance_turn(); } - assert_eq!(acc.current_risk(), 0.0, "no signals → risk must stay 0.0"); + assert!(acc.current_risk() < f64::EPSILON, "no signals → risk must stay 0.0"); assert!(!acc.is_blocked()); } } From daf8c8474cabe71d80e9bd6304b612a800272dd9 Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Mon, 18 May 2026 18:57:14 +0200 Subject: [PATCH 3/3] style(memory): apply nightly fmt to shadow/mod.rs --- crates/zeph-memory/src/shadow/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/zeph-memory/src/shadow/mod.rs b/crates/zeph-memory/src/shadow/mod.rs index 410478de1..c6b472b17 100644 --- a/crates/zeph-memory/src/shadow/mod.rs +++ b/crates/zeph-memory/src/shadow/mod.rs @@ -399,7 +399,10 @@ mod tests { for _ in 0..50 { acc.advance_turn(); } - assert!(acc.current_risk() < f64::EPSILON, "no signals → risk must stay 0.0"); + assert!( + acc.current_risk() < f64::EPSILON, + "no signals → risk must stay 0.0" + ); assert!(!acc.is_blocked()); } }