diff --git a/CHANGELOG.md b/CHANGELOG.md index 89fd3c4..0745f60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## [2.0.0-beta.3] - 2026-04-13 + +### Fixed +- **Token & cost accuracy** — Claude Code logs the same API response in multiple places (streaming chunks, sub-agent mirrors, `/compact` retries). CCMeter now dedupes by `requestId` (Anthropic's billing unit), eliminating the 2–3× over-counting previously observed on days with heavy sub-agent activity. Totals now match what Anthropic actually billed. +- **Multi-minute timeline accuracy** — long streaming responses (extended thinking + large outputs) now correctly distribute their tokens across the minutes they actually spanned, instead of collapsing onto the final completion minute. `active_minutes` clustering, the minute-level heatmap, and rate-limit forecasts are all more accurate. +- **Code metrics on partial-overlap streams** — when a non-canonical stream carries a unique `Edit`/`Write` block, its `lines_suggested` / `lines_added` / `lines_deleted` are preserved via zero-billing ghost markers, avoiding silent under-counting of code activity. Ghosts are deduped across multiple mirror files by timestamp so a 3-file (canonical + 2 mirrors) layout doesn't double-count line metrics. +- **Ghost events no longer leak into model breakdowns** — zero-billing markers now carry an empty `model` field, so they fall through to `ModelId::Other` and are filtered out of model-share aggregations instead of producing phantom slices. +- **User-side patch dedup** — patches replayed into sub-agent transcripts (Edit/Write acceptances) are now deduped by line `uuid`, fixing inflated `lines_added` and skewed efficiency scores on sub-agent–heavy days. +- **Cost fallback includes `cache_creation`** — token-based cost estimation (used when raw `costUSD` is absent, i.e. Pro plans) now bills `cache_creation` at `input_price × 1.25` instead of ignoring it. Closes a 5–15 % under-estimate on cache-heavy sessions. + +### Added +- **Versioned cache schema** with automatic invalidation. `~/.config/ccmeter/history.json` carries a `schema_version`; mismatches trigger a clean rebuild on next launch so accuracy fixes propagate without manual intervention. +- **One-time cache-state banner** at the top of the dashboard: + - "Cache rebuilt" (warning color) when a schema migration occurs. + - "Cache was unreadable" (error color) when the on-disk file couldn't be read or parsed, with a hint to delete it if the issue persists. + - Both dismiss on any keypress. +- **`CCMETER_FORCE_BANNER` env var** for testing the banners after migration has already happened. Set to `recovered` for the corruption banner, anything else for the migration banner. + +### Documentation +- README: new "Accurate token counting" section explaining the dedup methodology and what users should expect when upgrading from a pre-dedup version. + ## [2.0.0-beta.2] - 2026-04-13 ### Added diff --git a/Cargo.lock b/Cargo.lock index 14539d0..c9231e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,7 +184,7 @@ dependencies = [ [[package]] name = "ccmeter" -version = "2.0.0-beta.2" +version = "2.0.0-beta.3" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 71bb897..1e54024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ccmeter" -version = "2.0.0-beta.2" +version = "2.0.0-beta.3" edition = "2024" description = "A meter for Claude Code usage" repository = "https://github.com/hmenzagh/CCMeter" diff --git a/README.md b/README.md index 2773000..e9a027e 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,13 @@ CCMeter reads your local Claude Code session data and renders an interactive TUI - **Per-project detail** — dedicated charts, model distribution, cost sparklines, and estimated active time - **Time filters** — 1h, 12h, Today, Last week, Last month, All +**Rate limit tracking** (press `` ` `` to toggle) +- **Live usage monitor** — polls `/api/oauth/usage` for each Claude OAuth account and shows 5h, 7d, Opus, Sonnet, and Cowork window utilization in real time +- **Credential cards** — one per source root with subscription tier, expiry, and current usage bars +- **Session forecast** — extrapolates when you'll hit each rate limit window based on current token velocity +- **Session chart & timeline** — historical rate-limit hits per source, plus minute-level token usage over the active window +- **Overage tracking** — surfaces `extra_usage` credits and monthly limits when enabled + **Project handling & performance** - **Auto-discovery & grouping** — finds Claude projects and groups them by git repository - **Multi-source roots** — switch between Claude config directories with `Shift+Tab` @@ -95,8 +102,9 @@ ccmeter |-----|--------| | `Tab` | Cycle time filter | | `Shift+Tab` | Switch source root | +| `` ` `` | Toggle rate limit tracking view | | `j` / `k` or `Up` / `Down` | Scroll projects | -| `h` / `l` or `Left` / `Right` | Navigate between projects | +| `h` / `l` or `Left` / `Right` | Navigate between projects (or credentials in rate tracking) | | `Esc` | Deselect project | | `.` | Open settings panel | | `r` | Reload data | @@ -120,9 +128,21 @@ CCMeter discovers Claude Code sessions by scanning your home directory for any f Session JSONL → parallel parse → daily aggregates → cached history → TUI render ``` +### Accurate token counting + +Claude Code logs the same API response in several places — once per streaming chunk, again in every sub-agent's transcript, again in any `/compact` retry. Summing every line naïvely inflates tokens by 2–3× on busy days. + +CCMeter dedupes by `requestId` (Anthropic's billing unit: one request = one invoice line). For each request it picks the most complete log, then re-emits it as per-chunk deltas so multi-minute streams keep their real timestamps. Activity from non-canonical mirror logs survives as zero-billing "ghost" markers, so `active_minutes` and code metrics stay accurate even when the canonical log is just a terminal snapshot. User-side patches (Edit/Write acceptances) are deduped by line `uuid` for the same reason. + +Result: totals match what Anthropic actually billed, and the minute-level timeline reflects real activity. + ### Cache -Parsed metrics are persisted to `~/.config/ccmeter/history.json`. On subsequent launches, only new or modified session files are parsed, everything else is served from cache, making startup near-instant even with thousands of sessions. +Parsed metrics are persisted to `~/.config/ccmeter/history.json` so subsequent launches only re-parse new or modified session files. + +The cache is versioned. When CCMeter ships a change that would make pre-existing aggregates wrong (e.g. the dedup work above), the schema is bumped and any older cache is rebuilt from scratch on next launch — a one-time banner explains why historical totals may have shifted. A second banner appears if the cache was unreadable (corruption, disk error). Both dismiss on any keypress. + +**Upgrading from before the dedup fix?** Expect your historical numbers to drop on days with heavy sub-agent or `/compact` activity. That's the over-counting going away, not data loss. ### Per-project view @@ -139,6 +159,24 @@ Press `Esc` to go back to the global overview. CCMeter per-project view

+### Rate limit tracking + +Press `` ` `` (backtick) to switch to the rate limit tracking view. CCMeter reads your Claude OAuth credentials and polls `/api/oauth/usage` at randomized intervals (5–10 min per account) to show real-time utilization of each rate limit window. + +- **Credential cards** — one per source root, with subscription tier, token expiry, and usage bars for the 5h, 7d, Opus, Sonnet, and Cowork windows +- **Live summary & KPI bar** — currently selected account's status at a glance +- **Session forecast** — projects when you'll hit each limit based on recent token velocity +- **Usage timeline & session chart** — minute-level token usage for the active 5h window and historical rate-limit hits per source +- **Overages** — surfaces `extra_usage` credits and monthly limits when enabled on your plan + +Navigate between accounts with `←` / `→` (or `h` / `l`), refresh with `r`, and press `` ` `` again to return to the main dashboard. + +CCMeter only sees tokens from Claude Code sessions (local JSONL logs). Tokens consumed through the Claude chat (claude.ai web/desktop) count against the same rate limits but are not visible to CCMeter, so forecasts and utilization bars may under-report actual usage when you also chat with Claude alongside coding. Rate limit history is persisted locally at `~/.config/ccmeter/rate-history.json` and `~/.config/ccmeter/usage-hit-history.json` so session charts and hit timelines survive restarts — delete these files to reset the tracking history. + +

+ CCMeter rate limit tracking view +

+ ## Configuration User overrides are stored at `~/.config/ccmeter/overrides.json` and can be edited through the settings panel or manually. @@ -160,28 +198,31 @@ User overrides are stored at `~/.config/ccmeter/overrides.json` and can be edite ``` src/ ├── main.rs # Entry point & event loop -├── app.rs # Core application state +├── app.rs # Core application state & view routing +├── update_check.rs # GitHub release version check ├── config/ -│ ├── mod.rs │ ├── discovery.rs # Project auto-discovery -│ └── overrides.rs # User configuration & merges +│ ├── overrides.rs # User configuration & merges +│ └── settings.rs # Persistent user preferences ├── data/ -│ ├── mod.rs │ ├── parser.rs # JSONL session parsing │ ├── cache.rs # Persistent metric cache +│ ├── index.rs # Compact event index │ ├── tokens.rs # Daily token aggregation -│ └── models.rs # Model pricing tables +│ ├── models.rs # Model pricing tables +│ ├── oauth.rs # OAuth credential loading & async usage polling +│ ├── rate_limits.rs # Rate-limit hit parsing +│ ├── rate_history.rs # Persisted rate-limit history +│ └── hit_history.rs # Historical hit aggregation └── ui/ - ├── mod.rs ├── dashboard.rs # Main layout ├── heatmap.rs # Heatmap rendering + ├── loading.rs # Startup loading screen ├── theme.rs # Color theme ├── time_filter.rs # Time range logic ├── settings_view.rs # Settings panel - └── cards/ - ├── mod.rs - ├── data.rs # Card data aggregation - └── render.rs # Card rendering + ├── cards/ # Per-project cards + └── rate_tracking/ # Rate limit tracking view (13 modules) ``` ## License diff --git a/assets/rate-tracking.png b/assets/rate-tracking.png new file mode 100644 index 0000000..fc07b83 Binary files /dev/null and b/assets/rate-tracking.png differ diff --git a/src/app.rs b/src/app.rs index 7e69584..a71608d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -147,6 +147,12 @@ pub(crate) struct App { pub(crate) update_info: Option, update_rx: mpsc::Receiver, + + /// Outcome of the initial on-disk cache load. `Migrated` and + /// `Recovered` both trigger a one-time banner but with distinct + /// messages so the user can tell a planned schema bump apart from a + /// corrupted cache. Dismissed on the first user keypress. + pub(crate) cache_load_state: cache::CacheLoad, } impl App { @@ -166,7 +172,17 @@ impl App { discovery::discover_project_groups_with_root_map(); let raw_groups = Arc::new(raw_groups); let session_map = Arc::new(session_map); - let (merged_cache, index) = load_data(&raw_groups, &session_map); + let (merged_cache, index, mut cache_load_state) = load_data(&raw_groups, &session_map); + // Debug escape hatch for screenshotting the banners after a + // migration already happened. Set CCMETER_FORCE_BANNER=recovered + // for the corruption banner, anything else for the migration one. + if let Ok(kind) = std::env::var("CCMETER_FORCE_BANNER") { + cache_load_state = if kind.eq_ignore_ascii_case("recovered") { + cache::CacheLoad::Recovered + } else { + cache::CacheLoad::Migrated + }; + } let (daily_tokens, thresholds) = compute_daily_and_thresholds(&merged_cache, None, None); let minute_tokens = index.build_minute_tokens(None, None); @@ -258,6 +274,7 @@ impl App { refresh_requested: false, update_info: None, update_rx, + cache_load_state, }; app.record_rate_history(); app @@ -589,6 +606,12 @@ impl App { return false; } + // Dismiss the cache-state notice on the first user keypress. + if self.cache_load_state != cache::CacheLoad::Fresh { + self.cache_load_state = cache::CacheLoad::Fresh; + self.render_dirty = true; + } + // Tab/BackTab cycle time filters globally, except in Settings/RateTracking. if !matches!(self.view, View::Settings(_) | View::RateTracking) { match key.code { @@ -879,7 +902,7 @@ fn compute_hit_tokens(hits: &mut [RateLimitHit], index: &EventIndex) { fn load_data( raw_groups: &[discovery::ProjectGroup], session_map: &HashMap, -) -> (cache::Cache, EventIndex) { +) -> (cache::Cache, EventIndex, cache::CacheLoad) { let all_session_files: Vec = raw_groups .iter() .flat_map(|g| g.sources.iter()) @@ -887,13 +910,13 @@ fn load_data( .collect(); let events = parser::parse_session_files(&all_session_files); - let old_cache = cache::load(); + let outcome = cache::load(); let fresh_cache = cache::from_events(&events, session_map); - let merged = cache::merge(old_cache, &fresh_cache); + let merged = cache::merge(outcome.cache, &fresh_cache); cache::save(&merged); let index = EventIndex::build(&events, session_map); - (merged, index) + (merged, index, outcome.state) } fn spawn_reload( @@ -905,7 +928,7 @@ fn spawn_reload( let session_map = Arc::clone(session_map); let tx = tx.clone(); std::thread::spawn(move || { - let (cache, index) = load_data(&raw_groups, &session_map); + let (cache, index, _) = load_data(&raw_groups, &session_map); let _ = tx.send((cache, index)); }); } diff --git a/src/data/cache.rs b/src/data/cache.rs index 301433a..7490277 100644 --- a/src/data/cache.rs +++ b/src/data/cache.rs @@ -36,11 +36,54 @@ pub struct DayEntry { pub active_minutes: u64, } +/// Bumped whenever the on-disk cache schema or aggregation logic changes in +/// a way that would make pre-existing values inconsistent with a fresh parse. +/// Caches with a different (or missing) version are dropped at load time. +/// +/// History: +/// - v2: `parse_session_files` now dedups events by `request_id` (max usage +/// per request). Pre-fix caches over-counted tokens/cost on days with +/// sub-agent activity (parent + sub-agent JSONLs both logged the same +/// `req_…`), so the high-water-mark merge would freeze the inflated +/// values in place. +const CURRENT_SCHEMA_VERSION: u32 = 2; + /// Full cache: source_root -> cwd -> date (YYYY-MM-DD) -> metrics. #[derive(Debug, Default, Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct Cache(HashMap>>); +/// On-disk wrapper. Pre-v2 caches were the bare `Cache` map at top level +/// and fail this deserialize, triggering migration. +#[derive(Debug, Deserialize)] +struct VersionedCache { + schema_version: u32, + data: Cache, +} + +/// Borrowing twin so `save` can serialize without cloning the full cache. +#[derive(Serialize)] +struct VersionedCacheRef<'a> { + schema_version: u32, + data: &'a Cache, +} + +/// `Migrated` (planned schema bump, "cache rebuilt") and `Recovered` +/// (genuine load failure) are surfaced separately so a real corruption +/// isn't masked by the migration banner. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CacheLoad { + Fresh, + Migrated, + Recovered, +} + +#[derive(Debug)] +pub struct LoadOutcome { + pub cache: Cache, + pub state: CacheLoad, +} + impl Cache { pub fn new() -> Self { Self(HashMap::new()) @@ -110,15 +153,57 @@ fn cache_path() -> PathBuf { home.join(".config").join("ccmeter").join("history.json") } -pub fn load() -> Cache { - let path = cache_path(); - if !path.exists() { - return Cache::new(); +pub fn load() -> LoadOutcome { + let raw = match std::fs::read_to_string(cache_path()) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return LoadOutcome { + cache: Cache::new(), + state: CacheLoad::Fresh, + }; + } + // Any other filesystem error (permissions, I/O) is a recovery + // scenario — the file is there but we can't read it. + Err(_) => { + return LoadOutcome { + cache: Cache::new(), + state: CacheLoad::Recovered, + }; + } + }; + + // Path 1: current schema — clean load. + if let Ok(v) = serde_json::from_str::(&raw) { + if v.schema_version == CURRENT_SCHEMA_VERSION { + return LoadOutcome { + cache: v.data, + state: CacheLoad::Fresh, + }; + } + // Wrapped but wrong version → deliberate migration. + return LoadOutcome { + cache: Cache::new(), + state: CacheLoad::Migrated, + }; + } + + // Path 2: legacy pre-v2 cache was a bare `Cache` HashMap at the top + // level. If it parses as such, this is a planned migration from the + // pre-versioning era, not corruption. + if serde_json::from_str::(&raw).is_ok() { + return LoadOutcome { + cache: Cache::new(), + state: CacheLoad::Migrated, + }; + } + + // Path 3: file is neither a versioned cache nor a legacy one — either + // truncated, corrupted, or produced by a tool we don't recognize. + // Surface this distinctly so the user knows the cache had a real issue. + LoadOutcome { + cache: Cache::new(), + state: CacheLoad::Recovered, } - std::fs::read_to_string(&path) - .ok() - .and_then(|s| serde_json::from_str(&s).ok()) - .unwrap_or_default() } pub fn save(cache: &Cache) { @@ -126,7 +211,11 @@ pub fn save(cache: &Cache) { if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } - if let Ok(json) = serde_json::to_string_pretty(cache) { + let wrapped = VersionedCacheRef { + schema_version: CURRENT_SCHEMA_VERSION, + data: cache, + }; + if let Ok(json) = serde_json::to_string_pretty(&wrapped) { let tmp = path.with_extension("json.tmp"); if std::fs::write(&tmp, &json).is_ok() && std::fs::rename(&tmp, &path).is_err() { let _ = std::fs::write(&path, &json); @@ -547,6 +636,9 @@ mod tests { lines_added: 5, lines_deleted: 3, session_file: "session1.jsonl".into(), + request_id: None, + raw_cost_usd: None, + line_uuid: None, }, Event { timestamp: Utc.with_ymd_and_hms(2026, 1, 15, 12, 0, 0).unwrap(), @@ -561,6 +653,9 @@ mod tests { lines_added: 0, lines_deleted: 0, session_file: "session2.jsonl".into(), + request_id: None, + raw_cost_usd: None, + line_uuid: None, }, ]; @@ -591,6 +686,9 @@ mod tests { lines_added: 0, lines_deleted: 0, session_file: "unknown.jsonl".into(), + request_id: None, + raw_cost_usd: None, + line_uuid: None, }]; let cache = from_events(&events, &session_info); diff --git a/src/data/parser.rs b/src/data/parser.rs index b4f4ac1..e66269e 100644 --- a/src/data/parser.rs +++ b/src/data/parser.rs @@ -31,6 +31,23 @@ pub struct Event { pub lines_deleted: u64, /// Basename of the JSONL file (session UUID). pub session_file: String, + /// API request id (`req_…`) — the canonical identifier Anthropic bills + /// against. Used to dedup events that get rewritten across files (e.g. + /// sub-agents echoing the parent's messages, auto-compact retries) or + /// across streaming chunks within the same file. `None` for user-side + /// events (`lines_accepted`, etc). + pub request_id: Option, + /// Raw `costUSD` read verbatim from the JSONL (the authoritative billed + /// amount, cumulative per streaming snapshot when emitted by Claude + /// Code). Preserved separately from `cost_usd` so `deltaize_stream` can + /// delta the cumulative rather than fall back to a token-pricing + /// approximation. `None` when the field is absent (Pro-plan logs, which + /// never carry `costUSD`). + pub raw_cost_usd: Option, + /// Per-line UUID from the JSONL. Used to dedup user-side patch events + /// that get mirrored across parent and sub-agent files (same uuid, same + /// content). Assistant events are deduped by `request_id` instead. + pub line_uuid: Option, } // --------------------------------------------------------------------------- @@ -47,6 +64,12 @@ struct RawLine { cost_usd: Option, #[serde(rename = "toolUseResult")] tool_use_result: Option, + #[serde(rename = "requestId")] + request_id: Option, + /// Per-line UUID assigned by Claude Code. Preserved byte-identical when + /// the same event is mirrored into a sub-agent's JSONL, so we can use it + /// to dedup user-side patch events (which have no `requestId`). + uuid: Option, } #[derive(Deserialize)] @@ -98,16 +121,270 @@ struct RawPatchHunk { // Public API // --------------------------------------------------------------------------- -/// Parse a list of JSONL files in parallel and return all events. +/// Parse a list of JSONL files in parallel and dedup so totals match +/// Anthropic's billing (1 `request_id` = 1 HTTP request = 1 invoice line). +/// See `dedup_by_request_id` for the canonicalization algorithm. pub fn parse_session_files(paths: &[PathBuf]) -> Vec { - let mut events: Vec = paths + let raw: Vec = paths .par_iter() .filter_map(|p| parse_one_file(p)) .flatten() .collect(); - events.sort_by_key(|e| e.timestamp); - events + dedup_by_request_id(raw) +} + +fn dedup_by_request_id(events: Vec) -> Vec { + let total = events.len(); + let mut without_req: Vec = Vec::new(); + let mut by_req: HashMap>> = HashMap::new(); + for e in events { + match &e.request_id { + Some(id) => { + let inner = by_req.entry(id.clone()).or_default(); + // Avoid cloning session_file on every event: only clone on + // the first chunk per stream (i.e. per unique file). + if let Some(stream) = inner.get_mut(&e.session_file) { + stream.push(e); + } else { + inner.insert(e.session_file.clone(), vec![e]); + } + } + None => without_req.push(e), + } + } + + // User-side events (patches) have no requestId but carry a per-line + // `uuid` Claude Code preserves byte-identical across sub-agent mirrors. + // Dedup by uuid; events without a uuid (older logs) pass through + // unchanged — under-counting is worse than a rare over-count. + let mut out: Vec = Vec::with_capacity(total); + let mut seen_uuids: std::collections::HashSet = + std::collections::HashSet::with_capacity(without_req.len()); + for e in without_req { + match &e.line_uuid { + Some(uuid) => { + if seen_uuids.insert(uuid.clone()) { + out.push(e); + } + } + None => out.push(e), + } + } + + for (_req, files) in by_req { + let mut streams: Vec> = files.into_values().collect(); + // Sort streams by `session_file` for deterministic canonical pick: + // HashMap iteration is randomized, and two mirrors with identical + // `stream_score` would otherwise flip which one wins, making + // provenance-asserting tests flaky. + streams.sort_by(|a, b| { + let ka = a.first().map(|e| e.session_file.as_str()).unwrap_or(""); + let kb = b.first().map(|e| e.session_file.as_str()).unwrap_or(""); + ka.cmp(kb) + }); + // Sort chunks by timestamp so deltaization sees snapshots in order. + for s in &mut streams { + s.sort_by_key(|e| e.timestamp); + } + let Some(canonical_idx) = pick_canonical_index(&streams) else { + continue; + }; + + // Ghost chunks emit per non-canonical timestamp not covered by the + // canonical. Billing/model are zeroed so they can't inflate totals + // or leak into `index::model_shares` (cleared model → `ModelId::Other`, + // filtered by `total > 0`); line metrics are preserved because a + // mirror timestamp absent from the canonical necessarily carries + // unique Edit/Write content. Dedup by `timestamp` alone is safe: + // mirrors are byte-identical at matching timestamps. + let canonical_ts: std::collections::HashSet> = + streams[canonical_idx].iter().map(|e| e.timestamp).collect(); + // Debug-only invariant guard: if two non-canonical streams land on + // the same timestamp with different `lines_*`, the mirror format + // has diverged and timestamp-only dedup would drop distinct content. + let mut ghost_seen: std::collections::HashMap, (u64, u64, u64, u64)> = + std::collections::HashMap::new(); + let mut ghosts: Vec = Vec::new(); + for (i, stream) in streams.iter().enumerate() { + if i == canonical_idx { + continue; + } + for e in stream { + if canonical_ts.contains(&e.timestamp) { + continue; + } + let metrics = ( + e.lines_suggested, + e.lines_added, + e.lines_deleted, + e.lines_accepted, + ); + if let Some(prev) = ghost_seen.get(&e.timestamp) { + debug_assert_eq!( + prev, &metrics, + "mirror invariant violated at {}: {:?} vs {:?}", + e.timestamp, prev, metrics + ); + continue; + } + ghost_seen.insert(e.timestamp, metrics); + ghosts.push(Event { + timestamp: e.timestamp, + model: String::new(), + input_tokens: 0, + output_tokens: 0, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + cost_usd: 0.0, + lines_suggested: e.lines_suggested, + lines_accepted: e.lines_accepted, + lines_added: e.lines_added, + lines_deleted: e.lines_deleted, + session_file: e.session_file.clone(), + request_id: None, + raw_cost_usd: None, + line_uuid: None, + }); + } + } + + let canonical = streams.swap_remove(canonical_idx); + out.extend(deltaize_stream(canonical)); + out.extend(ghosts); + } + + out.sort_by_key(|e| e.timestamp); + out +} + +/// Score a stream by `(max_total_usage, chunk_count, lines_sum, +/// latest_timestamp)` and return the index of the stream with the highest +/// score. Ties are broken lexicographically: richer billing > more chunks +/// > more content > later. +/// +/// Returns the index (rather than the stream itself) so the caller can keep +/// borrowing the other streams for ghost-chunk extraction before moving the +/// canonical out. +fn pick_canonical_index(streams: &[Vec]) -> Option { + streams + .iter() + .enumerate() + .max_by_key(|(_, s)| stream_score(s)) + .map(|(i, _)| i) +} + +type StreamScore = (u64, usize, u64, DateTime); + +fn stream_score(s: &[Event]) -> StreamScore { + let max_total = s + .iter() + .map(|e| { + e.input_tokens + + e.output_tokens + + e.cache_read_input_tokens + + e.cache_creation_input_tokens + }) + .max() + .unwrap_or(0); + let chunks = s.len(); + let lines_sum: u64 = s + .iter() + .map(|e| e.lines_suggested + e.lines_added + e.lines_deleted) + .sum(); + let latest_ts = s + .iter() + .map(|e| e.timestamp) + .max() + .unwrap_or_else(|| DateTime::::from_timestamp(0, 0).unwrap()); + (max_total, chunks, lines_sum, latest_ts) +} + +/// Convert cumulative snapshots into per-chunk deltas. First chunk's delta +/// equals its full snapshot (implicit `prev = 0`). +/// +/// **Cost handling** — two paths, in order of fidelity: +/// 1. If **every** chunk in the stream carries a raw `costUSD` (API logs +/// where Claude Code records Anthropic's billed amount on each snapshot), +/// delta that cumulative value. This preserves whatever pricing +/// Anthropic actually applied — promo rates, cache-creation surcharges, +/// post-launch price changes the local table may not know about. +/// 2. Otherwise (Pro-plan logs, or any stream with even a single chunk +/// missing `costUSD`), recompute from delta tokens via `model_pricing`. +/// +/// The all-or-none gate on path 1 is deliberate. A mixed stream (some raw +/// cost, some missing) cannot be reconciled correctly: emitting a +/// token-pricing estimate on the gap chunks would not advance the raw +/// cumulative baseline, so the next raw-cost chunk would re-emit spend +/// already covered by the gap estimates — inflating the total. Falling +/// back uniformly to tokens keeps the total internally consistent at the +/// cost of one stream's raw-cost fidelity. +fn deltaize_stream(mut stream: Vec) -> Vec { + let mut prev_in = 0u64; + let mut prev_out = 0u64; + let mut prev_cr = 0u64; + let mut prev_cc = 0u64; + let mut prev_raw_cost = 0.0_f64; + + let use_raw_cost = !stream.is_empty() && stream.iter().all(|e| e.raw_cost_usd.is_some()); + + for e in stream.iter_mut() { + let d_in = e.input_tokens.saturating_sub(prev_in); + let d_out = e.output_tokens.saturating_sub(prev_out); + let d_cr = e.cache_read_input_tokens.saturating_sub(prev_cr); + let d_cc = e.cache_creation_input_tokens.saturating_sub(prev_cc); + + // Snapshot monotonicity: advance only when the new value is larger, + // so a transient regression (e.g. unordered mirror) doesn't poison + // subsequent deltas. + prev_in = prev_in.max(e.input_tokens); + prev_out = prev_out.max(e.output_tokens); + prev_cr = prev_cr.max(e.cache_read_input_tokens); + prev_cc = prev_cc.max(e.cache_creation_input_tokens); + + e.input_tokens = d_in; + e.output_tokens = d_out; + e.cache_read_input_tokens = d_cr; + e.cache_creation_input_tokens = d_cc; + + e.cost_usd = if use_raw_cost { + let cur_cost = e + .raw_cost_usd + .expect("use_raw_cost invariant: every chunk has raw cost"); + let delta = (cur_cost - prev_raw_cost).max(0.0); + prev_raw_cost = prev_raw_cost.max(cur_cost); + delta + } else { + cost_from_tokens(&e.model, d_in, d_out, d_cr, d_cc) + }; + // lines_* are per-chunk (not cumulative) — preserve as-is. + } + + stream +} + +/// Cost approximation from token counts via the local pricing table. Used +/// as fallback when the JSONL doesn't carry a raw `costUSD`. Mirrors +/// Anthropic's billing structure: fresh input at `input_price`, cache reads +/// at `cache_read_price`, cache creation at `input_price × 1.25` (the +/// default 5-min ephemeral TTL that Claude Code uses), output at +/// `output_price`. +fn cost_from_tokens( + model: &str, + input: u64, + output: u64, + cache_read: u64, + cache_creation: u64, +) -> f64 { + // Mirrors `CACHE_CREATION_WEIGHT` in `index::split_entry_cost`. + const CACHE_CREATION_MULTIPLIER: f64 = 1.25; + let (input_price, output_price, cache_read_price) = super::models::model_pricing(model); + let fresh_input = input.saturating_sub(cache_read); + (fresh_input as f64 * input_price + + cache_read as f64 * cache_read_price + + cache_creation as f64 * input_price * CACHE_CREATION_MULTIPLIER + + output as f64 * output_price) + / super::models::TOKENS_PER_MILLION } // --------------------------------------------------------------------------- @@ -157,26 +434,6 @@ fn count_diff_lines(old: Option<&str>, new: Option<&str>) -> (u64, u64) { (added, removed) } -/// Compute cost for an event, using costUSD if available, otherwise model pricing. -fn compute_cost( - cost_usd: f64, - model: &str, - input_tokens: u64, - output_tokens: u64, - cache_read: u64, -) -> f64 { - if cost_usd > 0.0 { - cost_usd - } else { - let (input_price, output_price, cache_read_price) = super::models::model_pricing(model); - let fresh_input = input_tokens.saturating_sub(cache_read); - (fresh_input as f64 * input_price - + cache_read as f64 * cache_read_price - + output_tokens as f64 * output_price) - / super::models::TOKENS_PER_MILLION - } -} - /// Try to extract an Event from a single JSON line. /// Handles both assistant messages (tokens + suggested lines) and user messages (accepted lines). fn parse_line(line: &str, session_file: &str) -> Option { @@ -192,13 +449,23 @@ fn parse_line(line: &str, session_file: &str) -> Option { let msg = raw.message?; let usage = msg.usage?; + let request_id = raw.request_id; + let line_uuid = raw.uuid.clone(); let model = msg.model.unwrap_or_default(); let input_tokens = usage.input_tokens.unwrap_or(0); let output_tokens = usage.output_tokens.unwrap_or(0); let cache_read = usage.cache_read_input_tokens.unwrap_or(0); let cache_creation = usage.cache_creation_input_tokens.unwrap_or(0); - let raw_cost = raw.cost_usd.unwrap_or(0.0); - let cost_usd = compute_cost(raw_cost, &model, input_tokens, output_tokens, cache_read); + let raw_cost_usd = raw.cost_usd; + let cost_usd = raw_cost_usd.filter(|c| *c > 0.0).unwrap_or_else(|| { + cost_from_tokens( + &model, + input_tokens, + output_tokens, + cache_read, + cache_creation, + ) + }); // Count lines suggested via Edit/Write tool_use blocks let mut lines_suggested = 0u64; @@ -243,6 +510,9 @@ fn parse_line(line: &str, session_file: &str) -> Option { lines_added: lines_added_total, lines_deleted: lines_deleted_total, session_file: session_file.to_owned(), + request_id, + raw_cost_usd, + line_uuid, }) } Some("user") => { @@ -290,6 +560,9 @@ fn parse_line(line: &str, session_file: &str) -> Option { lines_added: added, lines_deleted: deleted, session_file: session_file.to_owned(), + request_id: None, + raw_cost_usd: None, + line_uuid: raw.uuid, }) } _ => None, @@ -303,6 +576,7 @@ fn parse_line(line: &str, session_file: &str) -> Option { #[cfg(test)] mod tests { use super::*; + use chrono::Timelike; fn make_line(json: &str) -> String { json.to_string() @@ -332,8 +606,11 @@ mod tests { assert_eq!(ev.output_tokens, 200); assert_eq!(ev.cache_read_input_tokens, 500); assert_eq!(ev.cache_creation_input_tokens, 300); - // cost computed from model pricing: (500*5.0 + 500*0.50 + 200*25.0) / 1M - assert!((ev.cost_usd - 0.00775).abs() < 1e-9); + // cost from model pricing: + // fresh_input 500 * $5/M + cache_read 500 * $0.5/M + // + cache_creation 300 * $5/M * 1.25 + output 200 * $25/M + // = 2500 + 250 + 1875 + 5000 = 9625 micro-USD + assert!((ev.cost_usd - 0.009625).abs() < 1e-9); assert_eq!(ev.lines_suggested, 0); assert_eq!(ev.lines_added, 0); assert_eq!(ev.lines_deleted, 0); @@ -469,4 +746,726 @@ mod tests { let ev = parse_line(&line, "test.jsonl").expect("should parse"); assert_eq!(ev.model, "claude-opus-4-6"); } + + #[test] + fn captures_request_id() { + let line = make_line( + r#"{ + "type": "assistant", + "timestamp": "2026-04-01T12:00:00.000Z", + "requestId": "req_01abc", + "message": { + "id": "msg_01abc", + "model": "claude-opus-4-6", + "usage": { "input_tokens": 1, "output_tokens": 1 } + } + }"#, + ); + let ev = parse_line(&line, "test.jsonl").expect("should parse"); + assert_eq!(ev.request_id.as_deref(), Some("req_01abc")); + } + + // ------------------------------------------------------------------ + // Unit tests on deltaize_stream / stream_score + // ------------------------------------------------------------------ + + fn mk_event( + ts: &str, + req: &str, + file: &str, + model: &str, + (i, o, cr, cc): (u64, u64, u64, u64), + (ls, la, ld): (u64, u64, u64), + ) -> Event { + mk_event_with_cost(ts, req, file, model, (i, o, cr, cc), (ls, la, ld), None) + } + + fn mk_event_with_cost( + ts: &str, + req: &str, + file: &str, + model: &str, + (i, o, cr, cc): (u64, u64, u64, u64), + (ls, la, ld): (u64, u64, u64), + raw_cost_usd: Option, + ) -> Event { + Event { + timestamp: DateTime::parse_from_rfc3339(ts) + .unwrap() + .with_timezone(&Utc), + model: model.into(), + input_tokens: i, + output_tokens: o, + cache_read_input_tokens: cr, + cache_creation_input_tokens: cc, + cost_usd: 0.0, + lines_suggested: ls, + lines_accepted: 0, + lines_added: la, + lines_deleted: ld, + session_file: file.into(), + request_id: Some(req.into()), + raw_cost_usd, + line_uuid: None, + } + } + + #[test] + fn first_chunk_delta_equals_full_snapshot() { + let stream = vec![mk_event( + "2026-04-01T12:00:00.100Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (3, 21, 6754, 13837), + (0, 0, 0), + )]; + let out = deltaize_stream(stream); + assert_eq!(out.len(), 1); + assert_eq!(out[0].input_tokens, 3); + assert_eq!(out[0].output_tokens, 21); + assert_eq!(out[0].cache_read_input_tokens, 6754); + assert_eq!(out[0].cache_creation_input_tokens, 13837); + } + + #[test] + fn delta_handles_non_monotonic_snapshot_defensively() { + // output regresses mid-stream (shouldn't happen in real data, but + // saturating_sub + monotonic prev must prevent panic & underflow). + let stream = vec![ + mk_event( + "2026-04-01T12:00:00.100Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (3, 50, 100, 0), + (0, 0, 0), + ), + mk_event( + "2026-04-01T12:00:00.200Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (3, 30, 100, 0), + (0, 0, 0), + ), // regression + mk_event( + "2026-04-01T12:00:00.300Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (3, 80, 100, 0), + (0, 0, 0), + ), + ]; + let out = deltaize_stream(stream); + // Expected deltas: out (50, 0, 30), since prev tracks the max. + assert_eq!(out[0].output_tokens, 50); + assert_eq!(out[1].output_tokens, 0); + assert_eq!(out[2].output_tokens, 30); + // Total reconciles to the max snapshot (80), not the last (80). + let sum_out: u64 = out.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 80); + } + + #[test] + fn delta_cost_uses_raw_cumulative_when_present() { + // Three chunks with a cumulative raw costUSD that diverges from the + // pricing-table estimate — e.g. Anthropic applied promo pricing. + // We must preserve Anthropic's billed amount via delta, not fall + // back to the token-pricing approximation. + let stream = vec![ + mk_event_with_cost( + "2026-04-01T12:00:00.100Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 10, 500, 0), + (0, 0, 0), + Some(0.42), + ), + mk_event_with_cost( + "2026-04-01T12:00:00.200Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 30, 500, 0), + (0, 0, 0), + Some(1.00), + ), + mk_event_with_cost( + "2026-04-01T12:00:00.300Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 80, 500, 0), + (0, 0, 0), + Some(2.50), + ), + ]; + let out = deltaize_stream(stream); + // Per-chunk deltas: 0.42, 0.58, 1.50. Sum = 2.50 (final cumulative). + assert!((out[0].cost_usd - 0.42).abs() < 1e-9); + assert!((out[1].cost_usd - 0.58).abs() < 1e-9); + assert!((out[2].cost_usd - 1.50).abs() < 1e-9); + let sum: f64 = out.iter().map(|e| e.cost_usd).sum(); + assert!( + (sum - 2.50).abs() < 1e-9, + "sum must reconstitute final raw cumulative cost" + ); + + // Sanity: differs from the token-pricing fallback (0.80 approx), so + // we really preserved Anthropic's authoritative figure. + let fallback = cost_from_tokens("claude-opus-4-6", 100, 80, 500, 0); + assert!( + (sum - fallback).abs() > 1e-3, + "test invariant: raw cost must diverge from token-pricing estimate" + ); + } + + #[test] + fn mixed_raw_cost_stream_falls_back_to_tokens_without_inflating() { + // Mixed streams must fall back to tokens uniformly. Mixing a token + // estimate on a gap chunk with a delta of the next raw cumulative + // would double-emit the gap's spend (raw cumulative already includes + // it) and inflate the total. + let stream = vec![ + mk_event_with_cost( + "2026-04-01T12:00:00.100Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 10, 500, 0), + (0, 0, 0), + Some(0.42), + ), + mk_event_with_cost( + "2026-04-01T12:00:00.200Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 30, 500, 0), + (0, 0, 0), + None, // gap + ), + mk_event_with_cost( + "2026-04-01T12:00:00.300Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 80, 500, 0), + (0, 0, 0), + Some(1.00), + ), + ]; + let out = deltaize_stream(stream); + + // All-or-none: stream falls back to cost_from_tokens for every chunk. + // Total must equal the token-pricing cost of the final snapshot + // (100, 80, 500), never the inflated 1.30 that mixed delta gave. + let sum: f64 = out.iter().map(|e| e.cost_usd).sum(); + let expected = cost_from_tokens("claude-opus-4-6", 100, 80, 500, 0); + assert!( + (sum - expected).abs() < 1e-9, + "mixed stream must fall back to token pricing, got {sum} expected {expected}" + ); + + // And critically: the inflation path (0.42 + token_gap + 0.58 ≈ 1.30) + // must not happen. + assert!(sum < 1.30 - 1e-3, "must not inflate via hybrid delta"); + } + + #[test] + fn delta_cost_sums_to_final_cost() { + // Opus 4-6 pricing: in=5, out=25, cache_read=0.5 per M tokens. + let stream = vec![ + mk_event( + "2026-04-01T12:00:00.100Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 10, 500, 0), + (0, 0, 0), + ), + mk_event( + "2026-04-01T12:00:00.200Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 30, 500, 0), + (0, 0, 0), + ), + mk_event( + "2026-04-01T12:00:00.300Z", + "r", + "f.jsonl", + "claude-opus-4-6", + (100, 80, 500, 0), + (0, 0, 0), + ), + ]; + let out = deltaize_stream(stream); + let sum_cost: f64 = out.iter().map(|e| e.cost_usd).sum(); + let final_cost = cost_from_tokens("claude-opus-4-6", 100, 80, 500, 0); + assert!( + (sum_cost - final_cost).abs() < 1e-9, + "sum of deltaized costs ({sum_cost}) must equal final cost ({final_cost})" + ); + } + + #[test] + fn canonical_stream_prefers_richer_file_on_mirror_tie() { + // Two streams with the SAME max_total. Winner must be the one with + // more chunks (proxy for completeness) and more lines. + let rich = vec![ + mk_event( + "2026-04-01T12:00:00.100Z", + "r", + "rich.jsonl", + "claude-opus-4-6", + (3, 21, 1000, 0), + (0, 0, 0), + ), + mk_event( + "2026-04-01T12:00:00.200Z", + "r", + "rich.jsonl", + "claude-opus-4-6", + (3, 100, 1000, 0), + (0, 0, 0), + ), // edit block + mk_event( + "2026-04-01T12:00:00.300Z", + "r", + "rich.jsonl", + "claude-opus-4-6", + (3, 200, 1000, 0), + (40, 20, 20), + ), // tool_use Edit, lines=40 + ]; + // Poor stream: single terminal snapshot, SAME max_total, but a later + // timestamp (would have won the old latest-ts tiebreak). + let poor = vec![mk_event( + "2026-04-01T12:00:00.400Z", // LATER than rich's last chunk + "r", + "poor.jsonl", + "claude-opus-4-6", + (3, 200, 1000, 0), + (0, 0, 0), + )]; + let streams = vec![rich.clone(), poor.clone()]; + let idx = pick_canonical_index(&streams).unwrap(); + let canonical = &streams[idx]; + assert_eq!(canonical.len(), 3, "canonical must be the rich stream"); + assert_eq!(canonical[0].session_file, "rich.jsonl"); + // lines survive + let total_lines: u64 = canonical.iter().map(|e| e.lines_suggested).sum(); + assert_eq!(total_lines, 40); + } + + // ------------------------------------------------------------------ + // Integration tests via parse_session_files (tmp files on disk) + // ------------------------------------------------------------------ + + fn tmp_dir(label: &str) -> PathBuf { + let d = std::env::temp_dir().join(format!( + "ccmeter_{label}_{}_{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(), + )); + std::fs::create_dir_all(&d).unwrap(); + d + } + + fn write_file(dir: &Path, name: &str, lines: &[&str]) -> PathBuf { + use std::io::Write; + let p = dir.join(name); + let mut f = std::fs::File::create(&p).unwrap(); + for l in lines { + writeln!(f, "{l}").unwrap(); + } + p + } + + #[test] + fn preserves_lines_when_chunks_share_cumulative_total() { + // Two chunks under the same requestId with IDENTICAL cumulative + // snapshots (text chunk then Edit chunk — newer Claude Code writes + // final usage on every content-block line). lines_suggested from + // the Edit chunk must survive even though its tokens delta to zero. + let dir = tmp_dir("same_total"); + let text_line = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"text"}],"usage":{"input_tokens":3,"output_tokens":200,"cache_read_input_tokens":1000,"cache_creation_input_tokens":0}}}"#; + let edit_line = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.200Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"a\nb\nc","new_string":"A\nB\nC"}}],"usage":{"input_tokens":3,"output_tokens":200,"cache_read_input_tokens":1000,"cache_creation_input_tokens":0}}}"#; + let path = write_file(&dir, "s.jsonl", &[text_line, edit_line]); + + let events = parse_session_files(&[path]); + let _ = std::fs::remove_dir_all(&dir); + + let total_lines: u64 = events.iter().map(|e| e.lines_suggested).sum(); + assert!( + total_lines > 0, + "lines_suggested must survive; got {total_lines}" + ); + // Billing invariant: summed deltas = final snapshot. + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 200); + } + + #[test] + fn sums_lines_across_chunks_within_same_request_same_file() { + let dir = tmp_dir("sum_lines"); + let c1 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"a\nb","new_string":"A\nB"}}],"usage":{"input_tokens":3,"output_tokens":50,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let c2 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.200Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"x\ny\nz","new_string":"X\nY\nZ"}}],"usage":{"input_tokens":3,"output_tokens":100,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let path = write_file(&dir, "s.jsonl", &[c1, c2]); + + let events = parse_session_files(&[path]); + let _ = std::fs::remove_dir_all(&dir); + + let total_sugg: u64 = events.iter().map(|e| e.lines_suggested).sum(); + // Each Edit contributes added+removed; exact count depends on diff logic. + // Minimum: c1 contributes >= 2, c2 contributes >= 3 → total >= 5. + assert!( + total_sugg >= 5, + "expected at least 5 suggested lines, got {total_sugg}" + ); + } + + #[test] + fn does_not_double_count_lines_mirrored_across_files() { + let dir = tmp_dir("mirror_lines"); + let edit = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"a\nb","new_string":"A\nB"}}],"usage":{"input_tokens":3,"output_tokens":50,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let parent = write_file(&dir, "parent.jsonl", &[edit]); + let agent = write_file(&dir, "agent-sub.jsonl", &[edit]); + + let events_mirrored = parse_session_files(&[parent.clone(), agent.clone()]); + let _ = std::fs::remove_dir_all(&dir); + + // Reference: same edit in a single file. + let dir2 = tmp_dir("mirror_single"); + let solo = write_file(&dir2, "solo.jsonl", &[edit]); + let events_solo = parse_session_files(&[solo]); + let _ = std::fs::remove_dir_all(&dir2); + + let mirrored_lines: u64 = events_mirrored.iter().map(|e| e.lines_suggested).sum(); + let solo_lines: u64 = events_solo.iter().map(|e| e.lines_suggested).sum(); + assert_eq!( + mirrored_lines, solo_lines, + "mirror must not inflate lines_suggested" + ); + } + + #[test] + fn multi_minute_request_preserves_chunk_timestamps() { + let dir = tmp_dir("multi_minute"); + // Single stream spanning 19:11 → 19:12 → 19:14 (gap of 2 min). + // All three timestamps must survive dedup so active_minutes + // clustering (5-min gap threshold) sees the full interval. + let c1 = r#"{"type":"assistant","timestamp":"2026-04-01T19:11:00.000Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":10,"output_tokens":50,"cache_read_input_tokens":0,"cache_creation_input_tokens":0}}}"#; + let c2 = r#"{"type":"assistant","timestamp":"2026-04-01T19:12:00.000Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":10,"output_tokens":150,"cache_read_input_tokens":0,"cache_creation_input_tokens":0}}}"#; + let c3 = r#"{"type":"assistant","timestamp":"2026-04-01T19:14:00.000Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":10,"output_tokens":300,"cache_read_input_tokens":0,"cache_creation_input_tokens":0}}}"#; + let path = write_file(&dir, "s.jsonl", &[c1, c2, c3]); + + let events = parse_session_files(&[path]); + let _ = std::fs::remove_dir_all(&dir); + + assert_eq!(events.len(), 3, "all three chunk timestamps must survive"); + // Minutes observed: + let mut minutes: Vec<_> = events + .iter() + .map(|e| (e.timestamp.hour(), e.timestamp.minute())) + .collect(); + minutes.sort(); + assert_eq!(minutes, vec![(19, 11), (19, 12), (19, 14)]); + // Billing: deltas sum to final snapshot. + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 300); + } + + #[test] + fn ghosts_dedup_across_multiple_non_canonical_mirrors() { + // 3-file scenario: late terminal canonical + 2 byte-identical mirror + // streams with chunks at the same early timestamps. Without + // timestamp-only dedup the ghosts would be emitted twice (once per + // mirror), inflating both line metrics and session_minutes counts. + let dir = tmp_dir("multi_mirror_ghosts"); + let early1 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"a","new_string":"A"}}],"usage":{"input_tokens":3,"output_tokens":50,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let early2 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:01.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":150,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let late = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:05.000Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":300,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + + // Two mirror files with identical early chunks. + let pa = write_file(&dir, "a.jsonl", &[early1, early2]); + let pb = write_file(&dir, "b.jsonl", &[early1, early2]); + // Canonical: terminal snapshot, higher max_total. + let pc = write_file(&dir, "c.jsonl", &[late]); + + let events = parse_session_files(&[pa, pb, pc]); + let _ = std::fs::remove_dir_all(&dir); + + // 1 canonical chunk + exactly 2 ghosts (one per unique non-canonical + // timestamp), NOT 4 (which would be ghosts × 2 mirrors). + assert_eq!(events.len(), 3, "ghosts must be deduped across mirrors"); + let ghosts: Vec<&Event> = events.iter().filter(|e| e.output_tokens == 0).collect(); + assert_eq!(ghosts.len(), 2); + + // Billing untouched. + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 300); + + // Edit lines counted exactly once (3+3 not 6+6 for added/deleted=1 + // each via count_diff_lines("a","A")). + let total_added: u64 = events.iter().map(|e| e.lines_added).sum(); + let total_deleted: u64 = events.iter().map(|e| e.lines_deleted).sum(); + assert_eq!(total_added, 1, "Edit added counted once across mirrors"); + assert_eq!(total_deleted, 1, "Edit deleted counted once across mirrors"); + } + + #[test] + fn ghost_model_is_cleared_to_avoid_zero_token_model_share_leak() { + // Ghost events keep a timestamp + line metrics but must not register + // as model usage in downstream aggregators (`index::model_shares` + // accepts known models even with total=0). Clearing `model` makes + // them fall through to ModelId::Other and be filtered out. + let dir = tmp_dir("ghost_model_cleared"); + let early = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":50,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let late = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:05.000Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":300,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let pa = write_file(&dir, "a.jsonl", &[early]); + let pb = write_file(&dir, "b.jsonl", &[late]); + + let events = parse_session_files(&[pa, pb]); + let _ = std::fs::remove_dir_all(&dir); + + let ghosts: Vec<&Event> = events.iter().filter(|e| e.output_tokens == 0).collect(); + assert!(!ghosts.is_empty()); + assert!( + ghosts.iter().all(|g| g.model.is_empty()), + "ghost.model must be cleared to avoid leaking into model_shares" + ); + } + + #[test] + fn partial_overlap_preserves_unique_lines_from_non_canonical_stream() { + // Pathological partial-overlap: the non-canonical stream carries a + // unique Edit block (lines_suggested != 0), the canonical stream is + // a terminal snapshot with higher max_total but no content blocks + // recorded. The Edit's line metrics must survive via the ghost + // chunk; otherwise we'd silently under-count code activity. + let dir = tmp_dir("partial_overlap_lines"); + // a.jsonl: 2 chunks, the second carries an Edit (3 added, 2 removed + // → lines_suggested = 5 from `count_diff_lines`). + let a1 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"thinking"}],"usage":{"input_tokens":3,"output_tokens":50,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let a2 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:01.200Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","content":[{"type":"tool_use","name":"Edit","input":{"old_string":"a\nb\nc","new_string":"X\nY\nZ"}}],"usage":{"input_tokens":3,"output_tokens":150,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + // b.jsonl: terminal snapshot only, higher max_total, no content blocks. + let b1 = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:03.500Z","requestId":"r","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":300,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}"#; + let pa = write_file(&dir, "a.jsonl", &[a1, a2]); + let pb = write_file(&dir, "b.jsonl", &[b1]); + + let events = parse_session_files(&[pa, pb]); + let _ = std::fs::remove_dir_all(&dir); + + // Billing comes from b's snapshot only. + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 300, "billing must equal canonical snapshot"); + + // Line metrics from a.jsonl's Edit are preserved via the ghost. + let total_sugg: u64 = events.iter().map(|e| e.lines_suggested).sum(); + let total_added: u64 = events.iter().map(|e| e.lines_added).sum(); + let total_deleted: u64 = events.iter().map(|e| e.lines_deleted).sum(); + assert!( + total_sugg > 0, + "Edit lines_suggested must survive on the ghost; got {total_sugg}" + ); + assert_eq!(total_added, 3, "all 3 new lines must be counted"); + assert_eq!(total_deleted, 3, "all 3 old lines must be counted"); + + // Ghost itself: zero billing, non-zero lines, in a.jsonl. + let ghost_with_lines: Vec<&Event> = events + .iter() + .filter(|e| e.output_tokens == 0 && e.lines_added > 0) + .collect(); + assert_eq!(ghost_with_lines.len(), 1); + assert_eq!(ghost_with_lines[0].session_file, "a.jsonl"); + assert_eq!(ghost_with_lines[0].cost_usd, 0.0); + assert!(ghost_with_lines[0].request_id.is_none()); + } + + #[test] + fn partial_overlap_preserves_non_canonical_timestamps_as_ghosts() { + // Realistic partial-overlap scenario: parent file logs progressive + // streaming chunks (cumulative usage), a sub-agent file logs only a + // terminal snapshot with higher `max_total`. The canonical-stream + // picker correctly chooses the sub-agent for billing (= highest + // final snapshot), but we must NOT lose the earlier minutes of + // activity the parent captured. They come back as zero-usage ghost + // events so `active_minutes` and the minute-level timeline stay + // correct. + let dir = tmp_dir("partial_overlap"); + let mk = |ts: &str, out: u64| { + format!( + r#"{{"type":"assistant","timestamp":"{ts}","requestId":"r","message":{{"id":"m","model":"claude-opus-4-6","usage":{{"input_tokens":3,"output_tokens":{out},"cache_read_input_tokens":100,"cache_creation_input_tokens":0}}}}}}"# + ) + }; + // Parent file: 3 progressive chunks, terminates at out=200. + let a_lines = [ + mk("2026-04-01T12:00:00.100Z", 50), + mk("2026-04-01T12:00:01.200Z", 120), + mk("2026-04-01T12:00:02.300Z", 200), + ]; + // Sub-agent file: 1 late chunk with the complete snapshot, out=300. + let b_lines = [mk("2026-04-01T12:00:03.500Z", 300)]; + let pa = write_file( + &dir, + "a.jsonl", + &a_lines.iter().map(String::as_str).collect::>(), + ); + let pb = write_file( + &dir, + "b.jsonl", + &b_lines.iter().map(String::as_str).collect::>(), + ); + + let events = parse_session_files(&[pa, pb]); + let _ = std::fs::remove_dir_all(&dir); + + // 1 canonical chunk (from b.jsonl) + 3 ghost chunks (from a.jsonl). + assert_eq!(events.len(), 4, "4 events: 1 canonical + 3 ghosts"); + + // Billing invariant: sum must equal the canonical snapshot — ghosts + // contribute zero. Never the inflated 50+120+200+300 = 670. + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + assert_eq!(sum_out, 300, "billing stays at the canonical snapshot"); + + // Activity preservation: the 3 parent timestamps survive as ghosts + // in a.jsonl, and b.jsonl carries the canonical. + let canonical: Vec<&Event> = events.iter().filter(|e| e.output_tokens > 0).collect(); + assert_eq!(canonical.len(), 1); + assert_eq!(canonical[0].session_file, "b.jsonl"); + + let ghosts: Vec<&Event> = events.iter().filter(|e| e.output_tokens == 0).collect(); + assert_eq!(ghosts.len(), 3); + assert!(ghosts.iter().all(|g| g.session_file == "a.jsonl")); + assert!(ghosts.iter().all(|g| g.request_id.is_none())); + } + + #[test] + fn independent_subagent_calls_preserved() { + // A requestId that only exists in an agent file (no parent mirror). + let dir = tmp_dir("indep_agent"); + let line = r#"{"type":"assistant","timestamp":"2026-04-01T12:00:00.100Z","requestId":"r_indep","message":{"id":"m","model":"claude-opus-4-6","usage":{"input_tokens":5,"output_tokens":40,"cache_read_input_tokens":200,"cache_creation_input_tokens":0}}}"#; + let path = write_file(&dir, "agent-solo.jsonl", &[line]); + + let events = parse_session_files(&[path]); + let _ = std::fs::remove_dir_all(&dir); + + assert_eq!(events.len(), 1); + assert_eq!(events[0].output_tokens, 40); + assert_eq!(events[0].cache_read_input_tokens, 200); + } + + #[test] + fn deltaized_tokens_sum_to_final_snapshot_across_streams() { + // Two streams (parent + mirror), each with 3 cumulative chunks. + // Total billable must equal ONE final snapshot (not two). + let dir = tmp_dir("sum_invariant"); + let mk = |ts: &str, out: u64| { + format!( + r#"{{"type":"assistant","timestamp":"{ts}","requestId":"r","message":{{"id":"m","model":"claude-opus-4-6","usage":{{"input_tokens":3,"output_tokens":{out},"cache_read_input_tokens":500,"cache_creation_input_tokens":0}}}}}}"# + ) + }; + let p1 = mk("2026-04-01T12:00:00.100Z", 10); + let p2 = mk("2026-04-01T12:00:00.200Z", 50); + let p3 = mk("2026-04-01T12:00:00.300Z", 120); + let parent = write_file(&dir, "parent.jsonl", &[&p1, &p2, &p3]); + let agent = write_file(&dir, "agent-echo.jsonl", &[&p1, &p2, &p3]); + + let events = parse_session_files(&[parent, agent]); + let _ = std::fs::remove_dir_all(&dir); + + let sum_in: u64 = events.iter().map(|e| e.input_tokens).sum(); + let sum_out: u64 = events.iter().map(|e| e.output_tokens).sum(); + let sum_cr: u64 = events.iter().map(|e| e.cache_read_input_tokens).sum(); + assert_eq!(sum_in, 3, "sum(input) must equal final input_tokens"); + assert_eq!(sum_out, 120, "sum(output) must equal final output_tokens"); + assert_eq!(sum_cr, 500, "sum(cache_read) must equal final cache_read"); + } + + #[test] + fn cost_from_tokens_includes_cache_creation_surcharge() { + // Opus 4-6: input=$5/M, output=$25/M, cache_read=$0.5/M, + // cache_creation = input × 1.25 = $6.25/M. + let cost = cost_from_tokens("claude-opus-4-6", 0, 0, 0, 1_000_000); + assert!( + (cost - 6.25).abs() < 1e-9, + "1M cache_creation tokens must cost $6.25 (= $5 × 1.25), got {cost}" + ); + + // Sanity: a full request with all four buckets. + let cost = cost_from_tokens("claude-opus-4-6", 1000, 200, 500, 300); + // fresh=500 × $5/M = $0.0025 + // cache_read=500 × $0.5/M = $0.00025 + // cache_creation=300 × $5/M × 1.25 = $0.001875 + // output=200 × $25/M = $0.005 + // sum = $0.009625 + assert!((cost - 0.009625).abs() < 1e-9); + } + + #[test] + fn dedups_user_events_mirrored_across_files_by_uuid() { + // Same user-side patch line mirrored into parent + sub-agent logs + // (identical `uuid`). Must be counted once for lines_added / + // lines_accepted, otherwise efficiency metrics inflate. + let dir = tmp_dir("user_uuid_dedup"); + let patch = r#"{"type":"user","timestamp":"2026-04-01T12:00:00.000Z","uuid":"u-1","toolUseResult":{"structuredPatch":[{"lines":["+a","+b","-c"]}]}}"#; + let parent = write_file(&dir, "parent.jsonl", &[patch]); + let agent = write_file(&dir, "agent-sub.jsonl", &[patch]); + + let events = parse_session_files(&[parent, agent]); + let _ = std::fs::remove_dir_all(&dir); + + assert_eq!( + events.len(), + 1, + "user event must not be duplicated by mirror" + ); + assert_eq!(events[0].lines_added, 2); + assert_eq!(events[0].lines_deleted, 1); + assert_eq!(events[0].lines_accepted, 3); + } + + #[test] + fn keeps_user_events_without_uuid_even_when_duplicated() { + // Edge case: older Claude Code versions may not emit uuid on user + // events. We must keep them all — under-counting is worse than a + // rare over-count, and historical data shouldn't silently vanish. + let dir = tmp_dir("user_no_uuid"); + let patch = r#"{"type":"user","timestamp":"2026-04-01T12:00:00.000Z","toolUseResult":{"structuredPatch":[{"lines":["+a"]}]}}"#; + let a = write_file(&dir, "a.jsonl", &[patch]); + let b = write_file(&dir, "b.jsonl", &[patch]); + + let events = parse_session_files(&[a, b]); + let _ = std::fs::remove_dir_all(&dir); + + assert_eq!(events.len(), 2); + assert!(events.iter().all(|e| e.line_uuid.is_none())); + } + + #[test] + fn keeps_user_events_without_request_id() { + use std::io::Write; + + let dir = tmp_dir("user_events"); + let user = r#"{"type":"user","timestamp":"2026-04-01T12:00:00.000Z","toolUseResult":{"structuredPatch":[{"lines":["+a","-b"]}]}}"#; + let path_a = dir.join("a.jsonl"); + writeln!(std::fs::File::create(&path_a).unwrap(), "{user}").unwrap(); + let path_b = dir.join("b.jsonl"); + writeln!(std::fs::File::create(&path_b).unwrap(), "{user}").unwrap(); + + let events = parse_session_files(&[path_a, path_b]); + let _ = std::fs::remove_dir_all(&dir); + + assert_eq!(events.len(), 2); + assert!(events.iter().all(|e| e.request_id.is_none())); + } } diff --git a/src/ui/dashboard.rs b/src/ui/dashboard.rs index b2fa504..902e0f8 100644 --- a/src/ui/dashboard.rs +++ b/src/ui/dashboard.rs @@ -42,30 +42,33 @@ impl App { return; } - let has_update = self.update_info.is_some(); + fn push(cs: &mut Vec, c: Constraint) -> usize { + cs.push(c); + cs.len() - 1 + } + let mut constraints = vec![Constraint::Length(1)]; + let update_idx = self + .update_info + .is_some() + .then(|| push(&mut constraints, Constraint::Length(1))); + // 4 rows: top border + title + hint + bottom border. + let cache_idx = (self.cache_load_state != crate::data::cache::CacheLoad::Fresh) + .then(|| push(&mut constraints, Constraint::Length(4))); + let content_idx = push(&mut constraints, Constraint::Min(1)); + let footer_idx = push(&mut constraints, Constraint::Length(1)); + let outer = Layout::default() .direction(Direction::Vertical) - .constraints(if has_update { - vec![ - Constraint::Length(1), - Constraint::Length(1), - Constraint::Min(1), - Constraint::Length(1), - ] - } else { - vec![ - Constraint::Length(1), - Constraint::Length(0), - Constraint::Min(1), - Constraint::Length(1), - ] - }) + .constraints(constraints) .split(area); - self.draw_footer(frame, outer[3]); + self.draw_footer(frame, outer[footer_idx]); - if let Some(info) = &self.update_info { - self.draw_update_banner(frame, outer[1], info); + if let (Some(idx), Some(info)) = (update_idx, &self.update_info) { + self.draw_update_banner(frame, outer[idx], info); + } + if let Some(idx) = cache_idx { + self.draw_cache_state_banner(frame, outer[idx]); } // Replace time-filter / source tabs with a plain title in settings view @@ -92,7 +95,7 @@ impl App { self.draw_header(frame, &top_cols); } - let content_area = outer[2]; + let content_area = outer[content_idx]; if content_area.width < MIN_WIDTH || content_area.height < MIN_HEIGHT { self.draw_too_small_popup(frame, content_area, MIN_WIDTH, MIN_HEIGHT); } else { @@ -219,6 +222,67 @@ impl App { frame.render_widget(Paragraph::new(line).alignment(Alignment::Center), area); } + fn draw_cache_state_banner(&self, frame: &mut Frame, area: Rect) { + use crate::data::cache::CacheLoad; + let t = theme(); + let tick = self.start_time.elapsed().as_millis() / 400; + + // Migration vs corruption get different colors, icons, and copy so + // the user can tell a planned schema bump apart from a cache that + // genuinely failed to load. + let (icon, accent, title_text, body_text) = match self.cache_load_state { + CacheLoad::Fresh => return, + CacheLoad::Migrated => ( + if tick.is_multiple_of(2) { + "\u{21bb}" + } else { + "\u{2022}" + }, + t.warning, + "Cache rebuilt", + " — duplicate sub-agent events are no longer counted; historical totals may shift", + ), + CacheLoad::Recovered => ( + if tick.is_multiple_of(2) { + "\u{26a0}" + } else { + "\u{2022}" + }, + t.error, + "Cache was unreadable", + " — rebuilt from scratch; if this keeps happening, delete ~/.config/ccmeter/history.json", + ), + }; + + let title = Line::from(vec![ + Span::styled( + format!(" {icon} "), + Style::default().fg(accent).add_modifier(Modifier::BOLD), + ), + Span::styled( + title_text, + Style::default().fg(accent).add_modifier(Modifier::BOLD), + ), + Span::styled(body_text, Style::default().fg(t.text_primary)), + ]); + let hint = Line::from(Span::styled( + "press any key to dismiss", + Style::default() + .fg(t.text_dim) + .add_modifier(Modifier::ITALIC), + )); + + let block = Block::default() + .borders(Borders::ALL) + .border_type(ratatui::widgets::BorderType::Rounded) + .border_style(Style::default().fg(accent)); + + let paragraph = Paragraph::new(vec![title, hint]) + .alignment(Alignment::Center) + .block(block); + frame.render_widget(paragraph, area); + } + fn draw_too_small_popup(&self, frame: &mut Frame, area: Rect, min_w: u16, min_h: u16) { let t = theme(); let msg = format!(