diff --git a/README.md b/README.md index ea62c58..e3fd0a1 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,10 @@ histories, that is the multi-GB failure mode `cdxusage` avoids by streaming JSONL files, indexing compact per-file summaries, and reusing a small local cache. +On Linux hosts with GNU-compatible `perl` and `xargs -r`, cold full scans also +use a native batch prefilter. Unsupported hosts and native helper failures fall +back to the Node scanner automatically. + No SQLite. No daemon. No provider catalogs. No background service. ## Quick Start @@ -72,6 +76,7 @@ global install, `npx -y github:Krablante/cdxusage`, or - pretty terminal tables and JSON output - date filters, timezone, locale, sorting, compact tables - automatic Codex home discovery on Linux, macOS, Windows, and WSL +- Linux/GNU native batch prefilter with Node fallback - OpenAI/Codex pricing only, with missing non-OpenAI model prices reported - offline pricing fallback and disposable local caches - portable folder build with Linux/macOS shell, Windows CMD, and PowerShell launchers @@ -149,25 +154,26 @@ npm run benchmark -- --since 2026-05-01 --upstream-timeout 25 --cdxusage-timeout The helper resolves `@ccusage/codex@latest` and times the actual `ccusage-codex` binary so RAM reflects the scanner, not the `npx` wrapper. -Recent local sanity check on a large Codex history: +Recent local sanity check on a large Codex history, comparing this rollout with +the previous public commit `1c084b4`: | Tool | Scenario | Time | RAM | Result | | --- | --- | ---: | ---: | --- | -| `@ccusage/codex@18.0.11` | `--since 2026-05-01`, 45s limit | `>45.03s` | `2.38 GB` before timeout | timed out | -| `cdxusage` | same filter, cold full scan | `31.61s` | `0.37 GB` | complete | -| `cdxusage` | same filter, warm cached | `0.41s` | `0.16 GB` | complete | +| `cdxusage` pre-native baseline | cold app cache | `27.80s` | `0.350 GB` | complete | +| `cdxusage` native auto scanner | cold app cache | `10.46s` | `0.180 GB` | complete | +| `cdxusage` pre-native baseline | warm app cache | `0.40s` | `0.166 GB` | complete | +| `cdxusage` native auto scanner | warm app cache | `0.30s` | `0.143 GB` | complete | See [docs/benchmark-2026-05-16.md](docs/benchmark-2026-05-16.md) for the same-run command output behind this table. Cold scans read every matching JSONL file for correctness, including resumed long-lived sessions whose recent events may live in older session files. After -the cache is built, the same report is dramatically faster: in this run, the -warm cached path was at least 99.1% faster than the upstream timeout window. +the cache is built, the same report is dramatically faster. -The timeout keeps the upstream run from reaching its worst failure mode. The -upstream path reads and sorts a large archive-shaped set of token events in -memory; `cdxusage` keeps memory bounded and predictable by avoiding that shape. +The native prefilter reduces the candidate-line byte volume delivered into Node +processing; it does not claim lower physical disk reads. `bytesRead` in +`--include-stats` remains the logical source byte count. ## Portable Folder @@ -219,6 +225,8 @@ npm run check npm run lint npm run typecheck npm test +npm run test:node +npm run test:native npm run smoke npm run portable:smoke npm pack --dry-run --json diff --git a/docs/benchmark-2026-05-16.md b/docs/benchmark-2026-05-16.md index c1fcbf4..2ffe179 100644 --- a/docs/benchmark-2026-05-16.md +++ b/docs/benchmark-2026-05-16.md @@ -1,33 +1,66 @@ # Benchmark Evidence: 2026-05-16 -This is a local sanity check on a large Codex history. It is not a universal -benchmark claim; run `npm run benchmark` on your own archive for local numbers. +This is dated local evidence from a large Codex history plus static synthetic +fixtures. It is not a universal benchmark claim; run `npm run benchmark` on +your own archive for local numbers. -The public benchmark helper resolves `@ccusage/codex@latest`, finds the actual -`ccusage-codex` binary, and measures that process directly. This avoids -underreporting RAM by timing only the `npx` wrapper. +## Real Profile -Command: +Command shape: ```bash -npm run benchmark -- --since 2026-05-01 --upstream-timeout 45 --cdxusage-timeout 90 +node ./bin/cdxusage.mjs daily \ + --offline --since 2026-05-01 \ + --include-stats --json ``` -Raw output: +The comparison uses the previous public commit `1c084b4` as the baseline and +the current native-auto scanner worktree as the selected run. Runs are +application-cache-cold/page-cache-warm on an actively changing local archive, so +use the synthetic fixtures below for strict accuracy checks. -```text -| Tool | Time | RAM | Result | -| --- | ---: | ---: | --- | -| cdxusage cold | 31.61s | 0.37 GB | complete | -| cdxusage warm | 0.41s | 0.16 GB | complete | -| @ccusage/codex@18.0.11 | >45.03s | 2.38 GB | timed out (124) | -``` +| Tool | Scenario | Wall | CPU | RAM | Result | +| --- | --- | ---: | ---: | ---: | --- | +| `cdxusage` pre-native baseline | cold app cache | `27.80s` | `45.36s` | `0.350 GB` | complete | +| `cdxusage` native auto scanner | cold app cache | `10.46s` | `16.82s` | `0.180 GB` | complete | +| `cdxusage` pre-native baseline | warm app cache | `0.40s` | `0.57s` | `0.166 GB` | complete | +| `cdxusage` native auto scanner | warm app cache | `0.30s` | `0.46s` | `0.143 GB` | complete | + +Cold native auto versus pre-native baseline: + +- 62.4% less wall time +- 62.9% less CPU time +- 48.5% less RAM +- 4,219 JSONL files, about 9.24GB logical source bytes +- about 0.65GB candidate bytes delivered into Node candidate-line processing +- about 93.0% less candidate data delivered from full-scan source bytes into + Node candidate-line processing + +The live archive changed between sequential real runs, so token totals are not +used as strict accuracy evidence here. + +## Synthetic Fixtures + +Static fixture run comparing public commit `1c084b4` with the native-auto +worktree: + +| Scenario | Cold wall saved | Cold CPU saved | RAM saved | Accuracy | +| --- | ---: | ---: | ---: | --- | +| small | 18.2% | 8.3% | 1.1% | match | +| medium | 42.5% | 28.3% | -1.2% | match | +| large | 59.1% | 44.1% | 14.2% | match | +| huge | 60.9% | 47.7% | 29.7% | match | +| adversarial | 27.8% | 14.3% | 3.5% | match | -Interpretation: +## Notes - Cold `cdxusage` scans read the archive for correctness, including resumed sessions whose recent activity can live in older session files. -- Warm `cdxusage` uses the compact file cache/index and completed at least - 99.1% faster than the upstream timeout window in this run. -- The upstream run was stopped at 45 seconds before completion; RAM shown is - the measured maximum before timeout, not a completed-run peak. +- Warm `cdxusage` uses the compact file cache/index and should normally be + dominated by changed or appended files. +- Native acceleration depends on Linux/GNU-compatible tools. Other platforms or + native failures use the Node scanner. +- `nativeOutputBytes` is candidate byte volume delivered into Node processing; + `bytesRead` remains the logical source byte count. +- The public benchmark helper still resolves and times `@ccusage/codex@latest` + directly when you want an upstream comparison on your own machine. diff --git a/docs/compatibility.md b/docs/compatibility.md index f841d6f..a1b3c92 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -53,6 +53,29 @@ public ccusage Codex guide. - `--sort ` - `--order ` +## Scanner Diagnostics + +Default scanner selection is `auto`: on Linux hosts with working `perl` and +GNU-compatible `xargs -r`, `cdxusage` uses a native batch prefilter for cold +full scans and falls back to the Node scanner when native tooling is +unavailable or fails. Other platforms use the Node scanner unless explicitly +forced for diagnostics. Tail reads and cached files keep the normal cache +semantics. + +Internal diagnostic override: + +```bash +CDXUSAGE_SCAN_MODE=node cdxusage daily +CDXUSAGE_SCAN_MODE=grep-batch cdxusage daily +``` + +This is not an upstream compatibility surface. With `--include-stats`, +`scannerModes` reports aggregate scanner counts, `linesSeen` is physical JSONL +lines scanned, `candidateLinesSeen` is the subset containing `turn_context` or +`token_count`, and `nativeOutputBytes` is candidate byte volume delivered into +Node processing. Cache files and stats output can include absolute local paths, +model names, token volumes, and estimated cost metadata. + ## JSON Output `daily --json`: diff --git a/package.json b/package.json index 0684d71..ebfce4a 100644 --- a/package.json +++ b/package.json @@ -11,10 +11,12 @@ }, "scripts": { "start": "node ./bin/cdxusage.mjs", - "check": "node --check ./bin/cdxusage.mjs && node --check ./src/cli.mjs && node --check ./src/codex-home.mjs && node --check ./src/engine.mjs && node --check ./src/format.mjs && node --check ./src/pricing.mjs && node --check ./src/table.mjs && node --check ./scripts/build-portable.mjs && node --check ./scripts/benchmark-local.mjs && node --check ./test/smoke.mjs", + "check": "node --check ./bin/cdxusage.mjs && node --check ./src/cli.mjs && node --check ./src/codex-home.mjs && node --check ./src/discovery.mjs && node --check ./src/engine.mjs && node --check ./src/format.mjs && node --check ./src/pricing.mjs && node --check ./src/table.mjs && node --check ./scripts/build-portable.mjs && node --check ./scripts/benchmark-local.mjs && node --check ./test/smoke.mjs", "lint": "npm run check", "typecheck": "npm run check", "test": "node ./test/codex-home.test.mjs && node ./test/engine.test.mjs && node ./test/cli.test.mjs && node ./test/pricing.test.mjs", + "test:node": "CDXUSAGE_SCAN_MODE=node npm test", + "test:native": "CDXUSAGE_SCAN_MODE=grep-batch npm test", "smoke": "node ./test/smoke.mjs", "portable:build": "node ./scripts/build-portable.mjs", "portable:smoke": "npm run portable:build && sh ./portable/cdxusage --version && node -e \"const fs=require('fs'); for (const file of ['portable/LICENSE','portable/cdxusage.cmd','portable/cdxusage.ps1','portable/src/codex-home.mjs']) if (!fs.existsSync(file)) process.exit(1)\" && node ./test/smoke.mjs --portable", diff --git a/src/cli.mjs b/src/cli.mjs index 036f150..3c2be8f 100644 --- a/src/cli.mjs +++ b/src/cli.mjs @@ -65,30 +65,36 @@ export async function main(argv = process.argv.slice(2), io = { stdout: process. } const locale = args.locale ?? DEFAULT_LOCALE; const mode = args.command === 'sessions' ? 'session' : args.command; - const dataPaths = await resolveCodexDataPaths({ - codexHome: args.codexHome, - sessionsDir: args.sessionsDir, - }); - const pricingMode = await resolvePricingMode(args, dataPaths.codexHome); - const report = await collectUsage({ - dataPaths, - since, - until, - timezone, - cacheFile: args.cacheFile, - pricingCacheFile: args.pricingCacheFile, - pricingOffline: args.offline, - pricingTtlMs: args.pricingTtlHours != null ? Number(args.pricingTtlHours) * 60 * 60 * 1000 : undefined, - pricingFetchTimeoutMs: args.pricingFetchTimeoutMs != null ? Number(args.pricingFetchTimeoutMs) : undefined, - pricingTier: pricingMode.tier, - pricingPriorityModels: pricingMode.priorityModels, - maxCacheBytes: args.maxCacheBytes != null ? Number(args.maxCacheBytes) : undefined, - useCache: !args.noCache, - clearCache: args.clearCache, - saveCache: !args.noSaveCache, - discoveryMode: args.discovery, - includePricing: !args.noPricing, - }); + let report; + try { + const dataPaths = await resolveCodexDataPaths({ + codexHome: args.codexHome, + sessionsDir: args.sessionsDir, + }); + const pricingMode = await resolvePricingMode(args, dataPaths.codexHome); + report = await collectUsage({ + dataPaths, + since, + until, + timezone, + cacheFile: args.cacheFile, + pricingCacheFile: args.pricingCacheFile, + pricingOffline: args.offline, + pricingTtlMs: args.pricingTtlHours != null ? Number(args.pricingTtlHours) * 60 * 60 * 1000 : undefined, + pricingFetchTimeoutMs: args.pricingFetchTimeoutMs != null ? Number(args.pricingFetchTimeoutMs) : undefined, + pricingTier: pricingMode.tier, + pricingPriorityModels: pricingMode.priorityModels, + maxCacheBytes: args.maxCacheBytes != null ? Number(args.maxCacheBytes) : undefined, + useCache: !args.noCache, + clearCache: args.clearCache, + saveCache: !args.noSaveCache, + discoveryMode: args.discovery, + includePricing: !args.noPricing, + }); + } catch (error) { + io.stderr.write(`${error?.message ?? String(error)}\n`); + return 1; + } const rows = rowsForMode(report, mode, { locale, sort: args.sort, order: args.order }); if (args.json) { @@ -248,6 +254,11 @@ export function parseArgs(argv) { if (token.startsWith('--')) { throw new Error(`Unknown option: ${token}`); } + if (COMMANDS.has(token) && !args.commandSpecified) { + args.command = token; + args.commandSpecified = true; + break; + } throw new Error(`Unknown command or argument: ${token}`); } } @@ -309,11 +320,21 @@ function compareRows(a, b, sort, mode) { function compareDefault(a, b, mode) { if (mode === 'session') { - return String(a.lastActivity ?? '').localeCompare(String(b.lastActivity ?? '')); + return compareTimestamp(a.lastActivity, b.lastActivity); } return String(a.key ?? '').localeCompare(String(b.key ?? '')); } +function compareTimestamp(left, right) { + return toSortableTimestamp(left).localeCompare(toSortableTimestamp(right)); +} + +function toSortableTimestamp(value) { + const text = String(value ?? ''); + const ms = Date.parse(text); + return Number.isFinite(ms) ? new Date(ms).toISOString() : text; +} + function compareNumber(a, b) { return (Number(a) || 0) - (Number(b) || 0); } diff --git a/src/discovery.mjs b/src/discovery.mjs new file mode 100644 index 0000000..117663e --- /dev/null +++ b/src/discovery.mjs @@ -0,0 +1,132 @@ +import { spawn } from 'node:child_process'; +import { opendir, stat } from 'node:fs/promises'; +import path from 'node:path'; + +const FIND_FIELD_SEPARATOR = '\x1f'; +const FIND_RECORD_SEPARATOR = '\x1e'; + +export async function* discoverJsonlFiles(root, options = {}) { + const mode = options.mode ?? 'auto'; + const stats = options.stats ?? {}; + if (mode !== 'node') { + let yielded = 0; + try { + for await (const entry of discoverJsonlFilesWithFind(root)) { + yielded += 1; + yield entry; + } + if (mode === 'find' || yielded > 0) { + stats.discoveryMode ??= 'find'; + return; + } + } catch (error) { + if (mode === 'find' || yielded > 0) { + throw error; + } + stats.discoveryMode ??= `node-fallback:${error.code ?? error.message}`; + } + } + + stats.discoveryMode ??= 'node'; + for await (const file of walkJsonl(root)) { + yield { path: file }; + } +} + +async function* discoverJsonlFilesWithFind(root) { + try { + await stat(root); + } catch { + return; + } + const child = spawn( + 'find', + [ + root, + '(', + '-type', + 'f', + '-name', + '*.jsonl', + '-printf', + 'f\\037%p\\037%D\\037%i\\037%s\\037%T@\\036', + ')', + ], + { stdio: ['ignore', 'pipe', 'pipe'] }, + ); + let stderr = ''; + child.stderr.setEncoding('utf8'); + child.stderr.on('data', (chunk) => { + stderr = `${stderr}${chunk}`.slice(-4096); + }); + const closed = new Promise((resolve, reject) => { + child.once('error', (error) => resolve({ error })); + child.once('close', (code, signal) => resolve({ code, signal })); + }); + + let buffer = ''; + for await (const chunk of child.stdout) { + buffer += chunk.toString('utf8'); + for (;;) { + const index = buffer.indexOf(FIND_RECORD_SEPARATOR); + if (index === -1) { + break; + } + const record = buffer.slice(0, index); + buffer = buffer.slice(index + FIND_RECORD_SEPARATOR.length); + const entry = parseFindRecord(record); + if (entry) { + yield entry; + } + } + } + if (buffer) { + const entry = parseFindRecord(buffer); + if (entry) { + yield entry; + } + } + const status = await closed; + if (status.error) { + throw new Error(`find discovery is unavailable: ${status.error.message}`); + } + if (status.code !== 0) { + throw new Error(`find exited with ${status.signal ?? status.code}: ${stderr.trim()}`); + } +} + +function parseFindRecord(record) { + if (!record) { + return null; + } + const parts = record.split(FIND_FIELD_SEPARATOR); + if (parts.length !== 6 || !parts[1]) { + return null; + } + return { + path: parts[1], + stat: { + dev: Number(parts[2]), + ino: Number(parts[3]), + size: Number(parts[4]), + mtimeMs: Number(parts[5]) * 1000, + }, + }; +} + +async function* walkJsonl(root) { + let dir; + try { + dir = await opendir(root); + } catch { + return; + } + for await (const entry of dir) { + const full = path.join(root, entry.name); + if (entry.isDirectory()) { + yield* walkJsonl(full); + } else if (entry.isFile() && entry.name.endsWith('.jsonl')) { + yield full; + } + } +} diff --git a/src/engine.mjs b/src/engine.mjs index 9b8a78b..5f56c45 100644 --- a/src/engine.mjs +++ b/src/engine.mjs @@ -1,10 +1,11 @@ import { spawn } from 'node:child_process'; import { createReadStream, createWriteStream } from 'node:fs'; -import { mkdir, opendir, readFile, rename, stat, unlink, writeFile } from 'node:fs/promises'; +import { mkdir, open, readFile, rename, stat, unlink, writeFile } from 'node:fs/promises'; import { homedir } from 'node:os'; import { finished } from 'node:stream/promises'; import path from 'node:path'; import { resolveCodexDataPaths } from './codex-home.mjs'; +import { discoverJsonlFiles } from './discovery.mjs'; import { calculateCostFromUsageOrEvents, loadPricingCatalog } from './pricing.mjs'; const CACHE_VERSION = 2; @@ -12,10 +13,30 @@ const CACHE_SOURCE = 'cdxusage-index'; const DEFAULT_BILLING_THRESHOLDS = Object.freeze([128_000, 200_000, 256_000, 272_000]); const TOKEN_NEEDLE = Buffer.from('token_count'); const TURN_NEEDLE = Buffer.from('turn_context'); +const SCAN_NEEDLES = Object.freeze([TOKEN_NEEDLE, TURN_NEEDLE]); const MODEL_RE = /"model"\s*:\s*"([^"\\]*(?:\\.[^"\\]*)*)"/; -const FIND_FIELD_SEPARATOR = '\x1f'; -const FIND_RECORD_SEPARATOR = '\x1e'; const DEFAULT_MAX_CACHE_BYTES = 64 * 1024 * 1024; +const NATIVE_BATCH_PERL_SCRIPT = ` +BEGIN { $current = ""; $lines = 0; } +sub emit_count { + if ($current ne "") { + print "C\\0$current\\0$lines\\0"; + } +} +if ($ARGV ne $current) { + emit_count(); + $current = $ARGV; + $lines = 0; +} +$lines++; +if (index($_, "token_count") >= 0 || index($_, "turn_context") >= 0) { + chomp; + s/\\r\\z//; + print "L\\0$ARGV\\0$_\\0"; +} +END { emit_count(); } +`; +let nativeBatchCapabilityPromise; export function defaultCacheFile() { const root = process.env.XDG_CACHE_HOME || path.join(homedir(), '.cache'); @@ -27,6 +48,7 @@ export async function collectUsage(options = {}) { const codexHome = dataPaths.codexHome; const sessionsDir = dataPaths.sessionsDir; const timezone = safeTimeZone(options.timezone); + const dateKeyer = createDateKeyer(timezone); const since = normalizeDate(options.since); const until = normalizeDate(options.until); const cacheFile = options.cacheFile ?? defaultCacheFile(); @@ -39,6 +61,7 @@ export async function collectUsage(options = {}) { ? await loadPricingCatalog(pricingCatalogOptions(options)) : null; const billingThresholds = normalizeBillingThresholds(pricingCatalog?.metadata?.billingThresholds); + const scannerMode = process.env.CDXUSAGE_SCAN_MODE ?? 'auto'; const loadResult = useCache && !options.clearCache @@ -61,61 +84,61 @@ export async function collectUsage(options = {}) { stats.cacheLoadSkippedBySize = Boolean(loadResult.skippedBySize); const aggregate = createAggregate(); - for await (const discovered of discoverJsonlFiles(sessionsDir, { mode: discoveryMode, stats })) { - const file = discovered.path; - const st = discovered.stat ?? (await stat(file)); - stats.filesSeen += 1; - stats.bytesSeen += st.size; - - const sessionInfo = makeSessionInfo(file, sessionsDir); - const oldEntry = useCache ? cache.files[file] : undefined; - let entry; - - if (oldEntry && sameFile(st, oldEntry)) { - entry = oldEntry; - stats.filesFromCache += 1; - stats.bytesSkippedByCache += st.size; - } else if (oldEntry && appendableFile(st, oldEntry)) { - const tail = await scanFileRange(file, sessionInfo, timezone, oldEntry.size, oldEntry.state, billingThresholds); - entry = finalizeCacheEntry(st, mergeEntries(oldEntry, tail), file); - stats.filesScannedTail += 1; - stats.bytesRead += tail.stats.bytesRead; - stats.bytesSkippedByTailCache += oldEntry.size; - addScanStats(stats, tail.stats); - } else { - const scan = await scanFileRange(file, sessionInfo, timezone, 0, undefined, billingThresholds); - entry = finalizeCacheEntry(st, scan, file); - stats.filesScannedFull += 1; - stats.bytesRead += scan.stats.bytesRead; - addScanStats(stats, scan.stats); - if (oldEntry) { - stats.filesCacheStale += 1; - } else { - stats.filesCacheMiss += 1; - } - } - - nextFiles[file] = entry; - addEntryToAggregate(entry, aggregate, { since, until }); + if (await shouldUseNativeBatchScanner(scannerMode)) { + await collectEntriesWithGrepBatch({ + sessionsDir, + discoveryMode, + stats, + cache, + useCache, + dateKeyer, + timezone, + billingThresholds, + since, + until, + nextFiles, + aggregate, + }); + } else { + await collectEntriesWithNode({ + sessionsDir, + discoveryMode, + stats, + cache, + useCache, + dateKeyer, + timezone, + billingThresholds, + since, + until, + nextFiles, + aggregate, + }); } const report = buildReport(aggregate, stats, nextFiles); if (saveCache) { try { - const saveResult = await saveCacheFile(cacheFile, { - version: CACHE_VERSION, - source: CACHE_SOURCE, - timezone, - billingThresholds, - updatedAt: new Date().toISOString(), - files: nextFiles, - }, maxCacheBytes); - report.stats.cacheBytes = saveResult.bytes; - report.stats.cacheSaveSkippedBySize = !saveResult.saved; - if (saveResult.saved) { + if (!stats.cacheDirty && loadResult.cacheFileBytes != null) { + report.stats.cacheBytes = loadResult.cacheFileBytes; report.stats.cacheFile = cacheFile; - report.stats.cacheEntriesSaved = Object.keys(nextFiles).length; + report.stats.cacheSaveSkippedUnchanged = true; + } else { + const saveResult = await saveCacheFile(cacheFile, { + version: CACHE_VERSION, + source: CACHE_SOURCE, + timezone, + billingThresholds, + updatedAt: new Date().toISOString(), + files: nextFiles, + }, maxCacheBytes); + report.stats.cacheBytes = saveResult.bytes; + report.stats.cacheSaveSkippedBySize = !saveResult.saved; + if (saveResult.saved) { + report.stats.cacheFile = cacheFile; + report.stats.cacheEntriesSaved = Object.keys(nextFiles).length; + } } } catch (error) { report.stats.cacheSaveSkippedByError = true; @@ -139,6 +162,16 @@ export async function collectUsage(options = {}) { return report; } +async function shouldUseNativeBatchScanner(scannerMode) { + if (scannerMode === 'grep-batch') { + return true; + } + if (scannerMode === 'node' || scannerMode === 'needle' || scannerMode === 'line' || scannerMode === 'grep') { + return false; + } + return canUseNativeBatchScanner(); +} + export async function applyPricingToReport(report, options = {}, pricingCatalog = null) { const catalog = pricingCatalog ?? (await loadPricingCatalog(pricingCatalogOptions(options))); const metadata = { @@ -283,172 +316,375 @@ function markPricingSkipped(report) { report.totals.costUSD = null; } -export async function* discoverJsonlFiles(root, options = {}) { - const mode = options.mode ?? 'auto'; - if (mode !== 'node') { - let yielded = 0; - try { - for await (const entry of discoverJsonlFilesWithFind(root)) { - options.stats.discoveryMode ??= 'find'; - yielded += 1; - yield entry; - } - return; - } catch (error) { - if (mode === 'find' || yielded > 0) { - throw error; - } - options.stats.discoveryMode ??= `node-fallback:${error.code ?? error.message}`; +async function collectEntriesWithNode(context) { + for await (const task of planFileScans(context)) { + if (task.kind === 'cached') { + addCachedTask(task, context); + } else if (task.kind === 'tail') { + await scanTailTask(task, context); + } else { + const scan = await scanFileRange( + task.file, + task.sessionInfo, + context.dateKeyer, + 0, + undefined, + context.billingThresholds, + task.st.size, + ); + finalizeFullScanTask(task, scan, context); } } +} - options.stats.discoveryMode ??= 'node'; - for await (const file of walkJsonl(root)) { - yield { path: file }; +async function collectEntriesWithGrepBatch(context) { + const fullScanTasks = []; + for await (const task of planFileScans(context)) { + if (task.kind === 'cached') { + addCachedTask(task, context); + } else if (task.kind === 'tail') { + await scanTailTask(task, context); + } else { + fullScanTasks.push(task); + } } -} -async function* discoverJsonlFilesWithFind(root) { - try { - await stat(root); - } catch { + if (fullScanTasks.length === 0) { return; } + + let scans; + try { + scans = await scanFilesWithGrepBatch(fullScanTasks, context.dateKeyer, context.billingThresholds); + } catch (error) { + context.stats.nativeBatchFallback = true; + context.stats.nativeBatchFallbackReason = error?.message ?? String(error); + scans = await scanFilesWithNode(fullScanTasks, context.dateKeyer, context.billingThresholds); + } + for (const task of fullScanTasks) { + const scan = scans.get(task.file) ?? createFileScan(undefined, 'grep-batch'); + scan.endedWithNewline = await fileEndsWithNewline(task.file, task.st.size); + finalizeFullScanTask(task, scan, context); + } +} + +async function* planFileScans(context) { + for await (const discovered of discoverJsonlFiles(context.sessionsDir, { mode: context.discoveryMode, stats: context.stats })) { + const file = discovered.path; + const st = discovered.stat ?? (await stat(file)); + context.stats.filesSeen += 1; + context.stats.bytesSeen += st.size; + + const sessionInfo = makeSessionInfo(file, context.sessionsDir); + const oldEntry = context.useCache ? context.cache.files[file] : undefined; + + if (oldEntry && sameFile(st, oldEntry)) { + yield { kind: 'cached', file, st, sessionInfo, oldEntry }; + } else if (oldEntry && appendableFile(st, oldEntry)) { + yield { kind: 'tail', file, st, sessionInfo, oldEntry }; + } else { + yield { kind: 'full', file, st, sessionInfo, oldEntry }; + } + } +} + +function addCachedTask(task, context) { + context.nextFiles[task.file] = task.oldEntry; + context.stats.filesFromCache += 1; + context.stats.bytesSkippedByCache += task.st.size; + addEntryToAggregate(task.oldEntry, context.aggregate, { since: context.since, until: context.until }); +} + +async function scanTailTask(task, context) { + context.stats.cacheDirty = true; + const tail = await scanFileRange( + task.file, + task.sessionInfo, + context.dateKeyer, + task.oldEntry.size, + task.oldEntry.state, + context.billingThresholds, + task.st.size - task.oldEntry.size, + ); + const entry = finalizeCacheEntry(task.st, mergeEntries(task.oldEntry, tail), task.file); + context.nextFiles[task.file] = entry; + context.stats.filesScannedTail += 1; + context.stats.bytesRead += tail.stats.bytesRead; + context.stats.bytesSkippedByTailCache += task.oldEntry.size; + addScanStats(context.stats, tail.stats); + addEntryToAggregate(entry, context.aggregate, { since: context.since, until: context.until }); +} + +function finalizeFullScanTask(task, scan, context) { + context.stats.cacheDirty = true; + scan.stats.bytesRead ||= task.st.size; + const entry = finalizeCacheEntry(task.st, scan, task.file); + context.nextFiles[task.file] = entry; + context.stats.filesScannedFull += 1; + context.stats.bytesRead += scan.stats.bytesRead; + addScanStats(context.stats, scan.stats); + if (task.oldEntry) { + context.stats.filesCacheStale += 1; + } else { + context.stats.filesCacheMiss += 1; + } + addEntryToAggregate(entry, context.aggregate, { since: context.since, until: context.until }); +} + +function createFileScan(initialState = undefined, scannerMode = 'needle') { + return { + daily: {}, + sessionsByDate: {}, + billing: { daily: {}, sessionsByDate: {} }, + state: { + previousTotals: cloneRawUsage(initialState?.previousTotals) ?? null, + currentModel: initialState?.currentModel, + currentModelIsFallback: Boolean(initialState?.currentModelIsFallback), + }, + endedWithNewline: true, + stats: { + scannerMode, + bytesRead: 0, + nativeOutputBytes: 0, + linesSeen: 0, + candidateLinesSeen: 0, + linesParsed: 0, + linesFastParsed: 0, + linesJsonParsed: 0, + tokenEvents: 0, + }, + }; +} + +async function scanFilesWithGrepBatch(tasks, dateKeyer, billingThresholds) { + if (process.env.CDXUSAGE_FORCE_NATIVE_BATCH_FAIL === '1') { + throw new Error('forced native batch failure'); + } + const scans = new Map(); + const contexts = new Map(); + for (const task of tasks) { + const scan = createFileScan(undefined, 'grep-batch'); + scan.stats.bytesRead = task.st.size; + scans.set(task.file, scan); + contexts.set(task.file, { + scan, + sessionInfo: task.sessionInfo, + dateKeyer, + billingThresholds, + }); + } + const child = spawn( - 'find', + 'xargs', [ - root, - '(', - '-type', - 'f', - '-name', - '*.jsonl', - '-printf', - 'f\\037%p\\037%D\\037%i\\037%s\\037%T@\\036', - ')', - '-o', - '(', - '-type', - 'l', - '-name', - '*.jsonl', - '-xtype', - 'f', - '-printf', - 'l\\037%p\\0370\\0370\\0370\\0370\\036', - ')', + '-0', + '-r', + 'perl', + '-Mbytes', + '-ne', + NATIVE_BATCH_PERL_SCRIPT, + '--', ], - { stdio: ['ignore', 'pipe', 'pipe'] }, + { stdio: ['pipe', 'pipe', 'pipe'] }, ); let stderr = ''; child.stderr.setEncoding('utf8'); child.stderr.on('data', (chunk) => { stderr = `${stderr}${chunk}`.slice(-4096); }); + const getStdinError = trackWritableError(child.stdin); const closed = new Promise((resolve, reject) => { child.once('error', reject); child.once('close', (code, signal) => resolve({ code, signal })); }); + const outputConsumed = consumeGrepBatchOutput(child.stdout, contexts); - let buffer = ''; - for await (const chunk of child.stdout) { - buffer += chunk.toString('utf8'); - for (;;) { - const index = buffer.indexOf(FIND_RECORD_SEPARATOR); - if (index === -1) { - break; - } - const record = buffer.slice(0, index); - buffer = buffer.slice(index + FIND_RECORD_SEPARATOR.length); - const entry = parseFindRecord(record); - if (entry) { - yield entry; - } - } - } - if (buffer) { - const entry = parseFindRecord(buffer); - if (entry) { - yield entry; - } + try { + await writeNativeBatchInput(child.stdin, tasks, getStdinError); + } catch (error) { + child.stdin.destroy(); + await Promise.allSettled([outputConsumed, closed]); + throw error; } + + await outputConsumed; const status = await closed; if (status.code !== 0) { - throw new Error(`find exited with ${status.signal ?? status.code}: ${stderr.trim()}`); + throw new Error(`xargs/perl exited with ${status.signal ?? status.code}: ${stderr.trim()}`); + } + for (const task of tasks) { + const scan = scans.get(task.file); + if (!scan) { + continue; + } + const context = contexts.get(task.file); + scan.stats.linesSeen = context?.sourceLinesSeen ?? scan.stats.linesSeen; } + return scans; } -function parseFindRecord(record) { - if (!record) { - return null; - } - const parts = record.split(FIND_FIELD_SEPARATOR); - if (parts.length !== 6 || !parts[1]) { - return null; +function canUseNativeBatchScanner() { + if (process.platform !== 'linux') { + return false; } - const kind = parts[0]; - return { - path: parts[1], - stat: - kind === 'f' - ? { - dev: Number(parts[2]), - ino: Number(parts[3]), - size: Number(parts[4]), - mtimeMs: Number(parts[5]) * 1000, - } - : undefined, - }; + nativeBatchCapabilityPromise ??= Promise.all([ + commandSucceeds('perl', ['-Mbytes', '-e', '']), + commandSucceeds('xargs', ['-r', 'true']), + ]).then((results) => results.every(Boolean)); + return nativeBatchCapabilityPromise; } -async function* walkJsonl(root) { - let dir; - try { - dir = await opendir(root); - } catch { - return; +function commandSucceeds(command, args) { + return new Promise((resolve) => { + const child = spawn(command, args, { stdio: ['ignore', 'ignore', 'ignore'] }); + child.once('error', () => resolve(false)); + child.once('close', (code) => resolve(code === 0)); + }); +} + +function trackWritableError(stream) { + let streamError = null; + stream.on('error', (error) => { + streamError ??= error; + }); + return () => streamError; +} + +async function writeNativeBatchInput(stdin, tasks, getStreamError = () => null) { + for (const task of tasks) { + const streamError = getStreamError(); + if (streamError) { + throw streamError; + } + if (!stdin.write(`${task.file}\0`)) { + await waitForDrainOrError(stdin, getStreamError); + } } - for await (const entry of dir) { - const full = path.join(root, entry.name); - if (entry.isDirectory()) { - yield* walkJsonl(full); - } else if (entry.isFile() && entry.name.endsWith('.jsonl')) { - yield full; - } else if (entry.isSymbolicLink() && entry.name.endsWith('.jsonl')) { - try { - if ((await stat(full)).isFile()) { - yield full; + const streamError = getStreamError(); + if (streamError) { + throw streamError; + } + stdin.end(); +} + +function waitForDrainOrError(stream, getStreamError) { + return new Promise((resolve, reject) => { + const cleanup = () => { + stream.off('drain', onDrain); + stream.off('error', onError); + }; + const onDrain = () => { + cleanup(); + resolve(); + }; + const onError = (error) => { + cleanup(); + reject(error); + }; + const streamError = getStreamError(); + if (streamError) { + reject(streamError); + return; + } + stream.once('drain', onDrain); + stream.once('error', onError); + }); +} + +async function consumeGrepBatchOutput(stdout, contexts) { + let buffer = Buffer.alloc(0); + let parts = []; + for await (const chunk of stdout) { + buffer = buffer.length > 0 ? Buffer.concat([buffer, chunk]) : chunk; + for (;;) { + const nul = buffer.indexOf(0); + if (nul === -1) { + break; + } + parts.push(buffer.subarray(0, nul)); + buffer = buffer.subarray(nul + 1); + while (parts.length >= 3) { + const [typeBuf, fileBuf, payload] = parts.splice(0, 3); + const type = typeBuf.toString('utf8'); + const file = fileBuf.toString('utf8'); + const context = contexts.get(file); + if (!context) { + continue; + } + if (type === 'L') { + context.scan.stats.nativeOutputBytes += payload.length; + processLine(payload, context); + } else if (type === 'C') { + const parsed = Number(payload.toString('utf8')); + if (Number.isFinite(parsed)) { + context.sourceLinesSeen = parsed; + } } - } catch { - // Broken symlinks are not Codex session files. } } } } -async function scanFileRange(file, sessionInfo, timezone, start = 0, initialState = undefined, billingThresholds = DEFAULT_BILLING_THRESHOLDS) { - const dateFormatter = createDateKeyFormatter(timezone); - const scan = { - daily: {}, - sessionsByDate: {}, - billing: { daily: {}, sessionsByDate: {} }, - state: { - previousTotals: cloneRawUsage(initialState?.previousTotals) ?? null, - currentModel: initialState?.currentModel, - currentModelIsFallback: Boolean(initialState?.currentModelIsFallback), - }, - endedWithNewline: true, - stats: { - bytesRead: 0, - linesSeen: 0, - linesParsed: 0, - tokenEvents: 0, - }, - }; +async function scanFilesWithNode(tasks, dateKeyer, billingThresholds) { + const scans = new Map(); + for (const task of tasks) { + scans.set( + task.file, + await scanFileRange(task.file, task.sessionInfo, dateKeyer, 0, undefined, billingThresholds, task.st.size), + ); + } + return scans; +} + +async function scanFileRange( + file, + sessionInfo, + dateKeyer, + start = 0, + initialState = undefined, + billingThresholds = DEFAULT_BILLING_THRESHOLDS, + sourceBytes = undefined, +) { + const scan = createFileScan(initialState, process.env.CDXUSAGE_SCAN_MODE === 'line' ? 'line' : 'needle'); let carry = Buffer.alloc(0); let lastByte; - const stream = createReadStream(file, { start, highWaterMark: 1024 * 1024 }); + if (process.env.CDXUSAGE_SCAN_MODE === 'grep' && start === 0) { + scan.stats.scannerMode = 'grep'; + await scanFileWithGrep(file, scan, { + scan, + sessionInfo, + dateKeyer, + billingThresholds, + sourceBytes, + }); + return scan; + } + + const stream = createReadStream(file, { start, highWaterMark: 4 * 1024 * 1024 }); + + if (scan.stats.scannerMode === 'line') { + await scanStreamByLine(stream, scan, { + scan, + sessionInfo, + dateKeyer, + billingThresholds, + }); + return scan; + } + + await scanStreamByNeedle(stream, scan, { + scan, + sessionInfo, + dateKeyer, + billingThresholds, + }); + return scan; +} + +async function scanStreamByLine(stream, scan, context) { + let carry = Buffer.alloc(0); + let lastByte; for await (const chunk of stream) { scan.stats.bytesRead += chunk.length; if (chunk.length > 0) { @@ -465,27 +701,180 @@ async function scanFileRange(file, sessionInfo, timezone, start = 0, initialStat if (lineEnd > lineStart && buf[lineEnd - 1] === 13) { lineEnd -= 1; } - processLine(buf.subarray(lineStart, lineEnd), { scan, sessionInfo, dateFormatter, billingThresholds }); + scan.stats.linesSeen += 1; + processLine(buf.subarray(lineStart, lineEnd), context); lineStart = newline + 1; } carry = lineStart < buf.length ? Buffer.from(buf.subarray(lineStart)) : Buffer.alloc(0); } if (carry.length > 0) { - processLine(carry, { scan, sessionInfo, dateFormatter, billingThresholds }); + scan.stats.linesSeen += 1; + processLine(carry, context); } scan.endedWithNewline = scan.stats.bytesRead === 0 || lastByte === 10; - return scan; +} + +async function scanFileWithGrep(file, scan, context) { + scan.stats.bytesRead = context.sourceBytes ?? 0; + const child = spawn('grep', ['-aF', '-e', 'token_count', '-e', 'turn_context', '--', file], { + stdio: ['ignore', 'pipe', 'pipe'], + }); + let stderr = ''; + child.stderr.setEncoding('utf8'); + child.stderr.on('data', (chunk) => { + stderr = `${stderr}${chunk}`.slice(-4096); + }); + const closed = new Promise((resolve, reject) => { + child.once('error', reject); + child.once('close', (code, signal) => resolve({ code, signal })); + }); + + let carry = Buffer.alloc(0); + for await (const chunk of child.stdout) { + scan.stats.nativeOutputBytes += chunk.length; + const buf = carry.length > 0 ? Buffer.concat([carry, chunk]) : chunk; + let lineStart = 0; + for (;;) { + const newline = buf.indexOf(10, lineStart); + if (newline === -1) { + break; + } + let lineEnd = newline; + if (lineEnd > lineStart && buf[lineEnd - 1] === 13) { + lineEnd -= 1; + } + processLine(buf.subarray(lineStart, lineEnd), context); + lineStart = newline + 1; + } + carry = lineStart < buf.length ? Buffer.from(buf.subarray(lineStart)) : Buffer.alloc(0); + } + if (carry.length > 0) { + processLine(carry, context); + } + scan.stats.linesSeen = scan.stats.candidateLinesSeen; + + const status = await closed; + if (status.code !== 0 && status.code !== 1) { + throw new Error(`grep exited with ${status.signal ?? status.code}: ${stderr.trim()}`); + } + scan.endedWithNewline = await fileEndsWithNewline(file, context.sourceBytes); +} + +async function fileEndsWithNewline(file, size = undefined) { + if (!Number.isFinite(size) || size <= 0) { + return true; + } + let handle; + try { + handle = await open(file, 'r'); + const buffer = Buffer.allocUnsafe(1); + const result = await handle.read(buffer, 0, 1, size - 1); + return result.bytesRead === 0 || buffer[0] === 10; + } catch { + return true; + } finally { + await handle?.close().catch(() => {}); + } +} + +async function scanStreamByNeedle(stream, scan, context) { + let carry = Buffer.alloc(0); + let lastByte; + for await (const chunk of stream) { + scan.stats.bytesRead += chunk.length; + if (chunk.length > 0) { + lastByte = chunk[chunk.length - 1]; + } + const buf = carry.length > 0 ? Buffer.concat([carry, chunk]) : chunk; + const completeEnd = buf.lastIndexOf(10) + 1; + if (completeEnd === 0) { + carry = Buffer.from(buf); + continue; + } + + scan.stats.linesSeen += countByte(buf, 10, 0, completeEnd); + for (const lineStart of collectNeedleLineStarts(buf, completeEnd)) { + let lineEnd = buf.indexOf(10, lineStart); + if (lineEnd === -1 || lineEnd > completeEnd) { + lineEnd = completeEnd; + } + if (lineEnd > lineStart && buf[lineEnd - 1] === 13) { + lineEnd -= 1; + } + processLine(buf.subarray(lineStart, lineEnd), context); + } + carry = completeEnd < buf.length ? Buffer.from(buf.subarray(completeEnd)) : Buffer.alloc(0); + } + + if (carry.length > 0 && lineHasNeedle(carry)) { + scan.stats.linesSeen += 1; + processLine(carry, context); + } else if (carry.length > 0) { + scan.stats.linesSeen += 1; + } + scan.endedWithNewline = scan.stats.bytesRead === 0 || lastByte === 10; +} + +function collectNeedleLineStarts(buf, end) { + const starts = new Set(); + for (const needle of SCAN_NEEDLES) { + let offset = 0; + for (;;) { + const hit = buf.indexOf(needle, offset); + if (hit === -1 || hit >= end) { + break; + } + const newline = buf.lastIndexOf(10, hit); + starts.add(newline === -1 ? 0 : newline + 1); + offset = hit + needle.length; + } + } + return [...starts].sort((a, b) => a - b); +} + +function lineHasNeedle(line) { + return line.includes(TOKEN_NEEDLE) || line.includes(TURN_NEEDLE); +} + +function countByte(buf, byte, start = 0, end = buf.length) { + let count = 0; + for (let offset = start; offset < end; offset += 1) { + if (buf[offset] === byte) { + count += 1; + } + } + return count; +} + +function parseTokenLineJson(line) { + let entry; + try { + entry = JSON.parse(line.toString('utf8')); + } catch { + return null; + } + if (entry?.type !== 'event_msg' || entry?.payload?.type !== 'token_count' || !entry.timestamp) { + return null; + } + const info = entry.payload.info ?? {}; + return { + fast: false, + timestamp: entry.timestamp, + model: extractModel(entry.payload, info), + lastUsage: normalizeRawUsage(info.last_token_usage), + totalUsage: normalizeRawUsage(info.total_token_usage), + }; } function processLine(line, context) { - const { scan, sessionInfo, dateFormatter } = context; + const { scan, sessionInfo, dateKeyer } = context; const billingThresholds = context.billingThresholds ?? DEFAULT_BILLING_THRESHOLDS; - scan.stats.linesSeen += 1; if (line.length === 0) { return; } if (line.includes(TURN_NEEDLE)) { + scan.stats.candidateLinesSeen += 1; const model = extractTurnModelFast(line); if (model) { scan.state.currentModel = model; @@ -496,21 +885,20 @@ function processLine(line, context) { if (!line.includes(TOKEN_NEEDLE)) { return; } + scan.stats.candidateLinesSeen += 1; - let entry; - try { - entry = JSON.parse(line.toString('utf8')); - } catch { + const parsed = parseTokenLineJson(line); + if (!parsed) { return; } - scan.stats.linesParsed += 1; - if (entry?.type !== 'event_msg' || entry?.payload?.type !== 'token_count' || !entry.timestamp) { - return; + if (parsed.fast) { + scan.stats.linesFastParsed += 1; + } else { + scan.stats.linesJsonParsed += 1; } - - const info = entry.payload.info ?? {}; - const lastUsage = normalizeRawUsage(info.last_token_usage); - const totalUsage = normalizeRawUsage(info.total_token_usage); + scan.stats.linesParsed += 1; + const lastUsage = parsed.lastUsage; + const totalUsage = parsed.totalUsage; let raw = lastUsage; if (!raw && totalUsage) { raw = subtractRawUsage(totalUsage, scan.state.previousTotals); @@ -541,7 +929,7 @@ function processLine(line, context) { return; } - const extractedModel = extractModel({ ...entry.payload, info }); + const extractedModel = parsed.model; let isFallback = false; if (extractedModel) { scan.state.currentModel = extractedModel; @@ -557,7 +945,8 @@ function processLine(line, context) { isFallback = true; } - const date = toDateKey(entry.timestamp, dateFormatter); + const date = toDateKey(parsed.timestamp, dateKeyer); + const activityTimestamp = toActivityTimestamp(parsed.timestamp); const day = scan.daily[date] ?? { ...emptyUsage(), models: {} }; scan.daily[date] = day; addUsage(day, delta); @@ -569,13 +958,13 @@ function processLine(line, context) { sessionId: sessionInfo.sessionId, sessionFile: sessionInfo.sessionFile, directory: sessionInfo.directory, - lastActivity: entry.timestamp, + lastActivity: activityTimestamp, ...emptyUsage(), models: {}, }; sessionDay[sessionInfo.sessionId] = session; - if (entry.timestamp > session.lastActivity) { - session.lastActivity = entry.timestamp; + if (compareActivityTimestamp(activityTimestamp, session.lastActivity) > 0) { + session.lastActivity = activityTimestamp; } addUsage(session, delta); addModelUsage(session.models, model, delta, isFallback); @@ -591,7 +980,7 @@ function buildReport(aggregate, stats, nextFiles) { .map(([key, usage]) => ({ key, ...usage })); const { rows: monthly, billing: monthlyBilling } = buildMonthlyRows(daily, aggregate.billing.daily); const sessions = [...aggregate.sessions.entries()] - .sort(([, a], [, b]) => a.lastActivity.localeCompare(b.lastActivity)) + .sort(([, a], [, b]) => compareActivityTimestamp(a.lastActivity, b.lastActivity)) .map(([, session]) => ({ ...session })); const totals = emptyUsage(); for (const day of daily) { @@ -663,17 +1052,18 @@ function addEntryToAggregate(entry, aggregate, filters) { continue; } for (const [sessionId, session] of Object.entries(sessions ?? {})) { + const sessionLastActivity = toActivityTimestamp(session.lastActivity); const target = aggregate.sessions.get(sessionId) ?? { sessionId, sessionFile: session.sessionFile, directory: session.directory, - lastActivity: session.lastActivity, + lastActivity: sessionLastActivity, ...emptyUsage(), models: {}, }; aggregate.sessions.set(sessionId, target); - if (session.lastActivity > target.lastActivity) { - target.lastActivity = session.lastActivity; + if (compareActivityTimestamp(sessionLastActivity, target.lastActivity) > 0) { + target.lastActivity = sessionLastActivity; } addUsage(target, session); for (const [model, modelUsage] of Object.entries(session.models ?? {})) { @@ -746,8 +1136,12 @@ function mergeEntries(base, tail) { stats: { bytesRead: (base.stats?.bytesRead ?? base.size ?? 0) + tail.stats.bytesRead, linesSeen: (base.stats?.linesSeen ?? 0) + tail.stats.linesSeen, + candidateLinesSeen: (base.stats?.candidateLinesSeen ?? 0) + (tail.stats.candidateLinesSeen ?? 0), linesParsed: (base.stats?.linesParsed ?? 0) + tail.stats.linesParsed, + linesFastParsed: (base.stats?.linesFastParsed ?? 0) + (tail.stats.linesFastParsed ?? 0), + linesJsonParsed: (base.stats?.linesJsonParsed ?? 0) + (tail.stats.linesJsonParsed ?? 0), tokenEvents: (base.stats?.tokenEvents ?? 0) + tail.stats.tokenEvents, + nativeOutputBytes: (base.stats?.nativeOutputBytes ?? 0) + (tail.stats.nativeOutputBytes ?? 0), }, }; } @@ -777,19 +1171,28 @@ function createStats(cache, timezone, billingThresholds = DEFAULT_BILLING_THRESH filesCacheMiss: 0, filesCacheStale: 0, linesSeen: 0, + candidateLinesSeen: 0, linesParsed: 0, + linesFastParsed: 0, + linesJsonParsed: 0, tokenEvents: 0, bytesSeen: 0, bytesRead: 0, + nativeOutputBytes: 0, bytesSkippedByMtime: 0, bytesSkippedByPathDate: 0, bytesSkippedByCache: 0, bytesSkippedByTailCache: 0, cacheEntriesLoaded: Object.keys(cache.files).length, cacheEntriesSaved: 0, + cacheDirty: false, cacheSaveSkippedByError: false, cacheSaveError: null, + cacheSaveSkippedUnchanged: false, + nativeBatchFallback: false, + nativeBatchFallbackReason: null, discoveryMode: null, + scannerModes: {}, }; } @@ -817,17 +1220,67 @@ function extractTurnModelFast(line) { } } -function createDateKeyFormatter(timezone) { - return new Intl.DateTimeFormat('en-CA', { +function createDateKeyer(timezone) { + const safe = safeTimeZone(timezone); + if (safe === 'UTC') { + return (timestamp) => { + if (isUtcIsoTimestamp(timestamp)) { + return timestamp.slice(0, 10); + } + return new Date(timestamp).toISOString().slice(0, 10); + }; + } + const formatter = new Intl.DateTimeFormat('en-CA', { year: 'numeric', month: '2-digit', day: '2-digit', - timeZone: safeTimeZone(timezone), + timeZone: safe, }); + return (timestamp) => formatter.format(new Date(timestamp)); +} + +function isUtcIsoTimestamp(value) { + return ( + typeof value === 'string' && + value.length >= 20 && + value[4] === '-' && + value[7] === '-' && + value[10] === 'T' && + value.endsWith('Z') + ); +} + +function isCanonicalUtcTimestamp(value) { + return ( + typeof value === 'string' && + value.length === 24 && + value[4] === '-' && + value[7] === '-' && + value[10] === 'T' && + value[13] === ':' && + value[16] === ':' && + value[19] === '.' && + value.endsWith('Z') + ); +} + +function toActivityTimestamp(timestamp) { + if (isCanonicalUtcTimestamp(timestamp)) { + return timestamp; + } + const ms = Date.parse(timestamp); + if (Number.isFinite(ms)) { + return new Date(ms).toISOString(); + } + return String(timestamp ?? ''); } -function toDateKey(timestamp, formatter) { - return formatter.format(new Date(timestamp)); +function compareActivityTimestamp(left, right) { + return toActivityTimestamp(left).localeCompare(toActivityTimestamp(right)); +} + +function toDateKey(timestamp, dateKeyer) { + return dateKeyer(timestamp); } export function toMonthKey(dateKey) { @@ -934,13 +1387,15 @@ async function saveCacheFile(cacheFile, cache, maxCacheBytes) { await rename(tmp, cacheFile); return { saved: true, bytes }; } catch (error) { - await unlink(tmp).catch(() => {}); if (error instanceof CacheTooLargeError) { - stream.end(); + stream.destroy(); await finished(stream).catch(() => {}); + await unlink(tmp).catch(() => {}); return { saved: false, bytes: error.bytes }; } stream.destroy(); + await finished(stream).catch(() => {}); + await unlink(tmp).catch(() => {}); throw error; } } @@ -1008,9 +1463,16 @@ function normalizeCacheByteLimit(value) { } function addScanStats(target, scanStats) { + if (scanStats.scannerMode) { + target.scannerModes[scanStats.scannerMode] = (target.scannerModes[scanStats.scannerMode] ?? 0) + 1; + } target.linesSeen += scanStats.linesSeen; + target.candidateLinesSeen += scanStats.candidateLinesSeen ?? 0; target.linesParsed += scanStats.linesParsed; + target.linesFastParsed += scanStats.linesFastParsed ?? 0; + target.linesJsonParsed += scanStats.linesJsonParsed ?? 0; target.tokenEvents += scanStats.tokenEvents; + target.nativeOutputBytes += scanStats.nativeOutputBytes ?? 0; } function emptyUsage() { @@ -1121,6 +1583,10 @@ function mergeSessionsByDate(target, source) { function addBillingSummary(target, key, model, delta, billingThresholds = DEFAULT_BILLING_THRESHOLDS) { const row = target[key] ?? {}; target[key] = row; + addBillingSummaryForModel(row, model, delta, billingThresholds); +} + +function addBillingSummaryForModel(row, model, delta, billingThresholds = DEFAULT_BILLING_THRESHOLDS) { const summary = row[model] ?? { version: 1, count: 0, @@ -1159,9 +1625,7 @@ function addSessionBillingSummary(target, date, sessionId, model, delta, billing target[date] = day; const session = day[sessionId] ?? {}; day[sessionId] = session; - const modelSummary = {}; - addBillingSummary(modelSummary, 'row', model, delta, billingThresholds); - session[model] = mergeBillingSummary(session[model], modelSummary.row[model]); + addBillingSummaryForModel(session, model, delta, billingThresholds); } function cloneBillingByKey(source) { @@ -1300,11 +1764,11 @@ function addRawUsage(previous, delta) { }; } -function extractModel(value) { +function extractModel(value, infoOverride = undefined) { if (!value || typeof value !== 'object') { return undefined; } - const info = value.info; + const info = infoOverride ?? value.info; if (info && typeof info === 'object') { const direct = nonEmpty(info.model) ?? nonEmpty(info.model_name); if (direct) { diff --git a/test/cli.test.mjs b/test/cli.test.mjs index 5fed167..7da9c1e 100644 --- a/test/cli.test.mjs +++ b/test/cli.test.mjs @@ -1,8 +1,8 @@ import assert from 'node:assert/strict'; -import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; +import { mkdir, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import path from 'node:path'; -import { VERSION, main, parseArgs } from '../src/cli.mjs'; +import { main, parseArgs } from '../src/cli.mjs'; const root = path.join(tmpdir(), `cdxusage-cli-${process.pid}`); const codexHome = path.join(root, 'codex-home'); @@ -10,7 +10,6 @@ const sessionsDir = path.join(codexHome, 'sessions/2026/05/16'); const file = path.join(sessionsDir, 'rollout-2026-05-16T00-00-00-compat.jsonl'); const cacheFile = path.join(root, 'cache/index.json'); const pricingCacheFile = path.join(root, 'cache/pricing.json'); -const packageJson = JSON.parse(await readFile(new URL('../package.json', import.meta.url), 'utf8')); await rm(root, { recursive: true, force: true }); await mkdir(sessionsDir, { recursive: true }); @@ -39,9 +38,13 @@ await writeFile( ].join('\n'), ); -assert.equal(VERSION, packageJson.version); assert.equal(parseArgs(['monthly', '--json']).command, 'monthly'); assert.equal(parseArgs(['sessions']).command, 'sessions'); +assert.equal(parseArgs(['--json', 'daily']).command, 'daily'); +assert.equal(parseArgs(['--offline', 'monthly']).command, 'monthly'); +assert.equal(parseArgs(['--version', 'daily']).command, 'daily'); +assert.equal(parseArgs(['--help', 'daily']).command, 'daily'); +assert.throws(() => parseArgs(['daily', 'monthly']), /Unknown command or argument/); assert.equal(parseArgs(['monthly']).speed, 'auto'); assert.equal(parseArgs(['daily', '--speed', 'auto']).speed, 'auto'); assert.equal(parseArgs(['daily', '--speed', 'fast']).speed, 'fast'); @@ -313,6 +316,62 @@ const noCacheJson = JSON.parse(noCache.stdout); assert.equal(noCacheJson.stats.cacheEntriesSaved, 0); assert.equal(noCacheJson.stats.cacheFile, null); +const previousPath = process.env.PATH; +const previousScanMode = process.env.CDXUSAGE_SCAN_MODE; +const missingFindBin = path.join(root, 'missing-find-bin'); +await mkdir(missingFindBin, { recursive: true }); +process.env.PATH = missingFindBin; +process.env.CDXUSAGE_SCAN_MODE = 'node'; +try { + const missingFindAuto = await runCli([ + 'daily', + '--json', + '--offline', + '--no-pricing', + '--include-stats', + '--discovery', + 'auto', + '--codex-home', + codexHome, + '--cache-file', + path.join(root, 'cache/missing-find-auto.json'), + '--pricing-cache-file', + pricingCacheFile, + ]); + assert.equal(missingFindAuto.code, 0); + assert.equal(missingFindAuto.stderr, ''); + assert.match(JSON.parse(missingFindAuto.stdout).stats.discoveryMode, /^node-fallback:/); + + const missingFindForced = await runCli([ + 'daily', + '--json', + '--offline', + '--no-pricing', + '--discovery', + 'find', + '--codex-home', + codexHome, + '--cache-file', + path.join(root, 'cache/missing-find-forced.json'), + '--pricing-cache-file', + pricingCacheFile, + ]); + assert.equal(missingFindForced.code, 1); + assert.match(missingFindForced.stderr, /find discovery is unavailable: spawn find ENOENT/); + assert.doesNotMatch(missingFindForced.stderr, /node:internal|ErrorCaptureStackTrace/); +} finally { + if (previousPath == null) { + delete process.env.PATH; + } else { + process.env.PATH = previousPath; + } + if (previousScanMode == null) { + delete process.env.CDXUSAGE_SCAN_MODE; + } else { + process.env.CDXUSAGE_SCAN_MODE = previousScanMode; + } +} + const badTimezone = await runCli(['daily', '--timezone', 'Not/AZone']); assert.equal(badTimezone.code, 1); assert.match(badTimezone.stderr, /Invalid timezone/); diff --git a/test/engine.test.mjs b/test/engine.test.mjs index 5529bf6..6e359ca 100644 --- a/test/engine.test.mjs +++ b/test/engine.test.mjs @@ -1,5 +1,5 @@ import assert from 'node:assert/strict'; -import { appendFile, mkdir, rm, symlink, utimes, writeFile } from 'node:fs/promises'; +import { appendFile, mkdir, readFile, rm, symlink, utimes, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import path from 'node:path'; import { collectUsage, normalizeDate } from '../src/engine.mjs'; @@ -155,6 +155,41 @@ const losAngelesReport = await collectUsage({ }); assert.equal(losAngelesReport.daily[0].key, '2026-05-15'); +const offsetTimestampHome = path.join(root, 'offset-timestamp-codex-home'); +const offsetTimestampSessions = path.join(offsetTimestampHome, 'sessions/2026/05/16'); +await mkdir(offsetTimestampSessions, { recursive: true }); +await writeFile( + path.join(offsetTimestampSessions, 'offset-timestamp.jsonl'), + [ + JSON.stringify({ timestamp: '2026-05-17T00:30:00+02:00', type: 'turn_context', payload: { model: 'gpt-test' } }), + JSON.stringify({ + timestamp: '2026-05-17T00:30:00+02:00', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 1, cached_input_tokens: 0, output_tokens: 1, total_tokens: 2 } }, + }, + }), + JSON.stringify({ + timestamp: '2026-05-16T23:00:00.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 2, cached_input_tokens: 0, output_tokens: 1, total_tokens: 3 } }, + }, + }), + '', + ].join('\n'), +); +const offsetTimestampReport = await collectUsage({ + codexHome: offsetTimestampHome, + cacheFile: path.join(root, 'offset-timestamp-cache.json'), + timezone: 'UTC', + pricingData, +}); +assert.deepEqual(offsetTimestampReport.daily.map((row) => row.key), ['2026-05-16']); +assert.equal(offsetTimestampReport.sessions[0].lastActivity, '2026-05-16T23:00:00.000Z'); + const staleMtimeHome = path.join(root, 'stale-mtime-codex-home'); const staleMtimeSessions = path.join(staleMtimeHome, 'sessions/2026/05/16'); const staleMtimeFile = path.join(staleMtimeSessions, 'stale-mtime.jsonl'); @@ -337,6 +372,191 @@ assert.equal(mixed.totals.cachedInputTokens, 8); assert.equal(mixed.totals.outputTokens, 4); assert.equal(mixed.totals.totalTokens, 19); +const lineStatsHome = path.join(root, 'line-stats-codex-home'); +const lineStatsSessions = path.join(lineStatsHome, 'sessions/2026/05/16'); +const lineStatsFile = path.join(lineStatsSessions, 'line-stats.jsonl'); +const lineStatsCache = path.join(root, 'line-stats-cache.json'); +await mkdir(lineStatsSessions, { recursive: true }); +await writeFile( + lineStatsFile, + [ + JSON.stringify({ timestamp: '2026-05-16T00:00:00.000Z', type: 'turn_context', payload: { model: 'gpt-test' } }), + JSON.stringify({ timestamp: '2026-05-16T00:00:01.000Z', type: 'event_msg', payload: { type: 'non_token' } }), + JSON.stringify({ + timestamp: '2026-05-16T00:00:02.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 2, cached_input_tokens: 0, output_tokens: 1, total_tokens: 3 } }, + }, + }), + '', + ].join('\n'), +); +const lineStatsCold = await collectUsage({ + codexHome: lineStatsHome, + cacheFile: lineStatsCache, + timezone: 'UTC', + pricingData, +}); +assert.equal(lineStatsCold.totals.totalTokens, 3); +assert.equal(lineStatsCold.stats.linesSeen, 3); +assert.equal(lineStatsCold.stats.candidateLinesSeen, 2); +await appendFile( + lineStatsFile, + [ + JSON.stringify({ timestamp: '2026-05-16T00:00:03.000Z', type: 'event_msg', payload: { type: 'non_token' } }), + JSON.stringify({ + timestamp: '2026-05-16T00:00:04.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 1, cached_input_tokens: 0, output_tokens: 1, total_tokens: 2 } }, + }, + }), + '', + ].join('\n'), +); +const lineStatsTail = await collectUsage({ + codexHome: lineStatsHome, + cacheFile: lineStatsCache, + timezone: 'UTC', + pricingData, +}); +assert.equal(lineStatsTail.totals.totalTokens, 5); +assert.equal(lineStatsTail.stats.filesScannedTail, 1); +assert.equal(lineStatsTail.stats.linesSeen, 2); +assert.equal(lineStatsTail.stats.candidateLinesSeen, 1); + +const noFinalNewlineHome = path.join(root, 'no-final-newline-codex-home'); +const noFinalNewlineSessions = path.join(noFinalNewlineHome, 'sessions/2026/05/16'); +const noFinalNewlineFile = path.join(noFinalNewlineSessions, 'no-final-newline.jsonl'); +const noFinalNewlineCache = path.join(root, 'no-final-newline-cache.json'); +await mkdir(noFinalNewlineSessions, { recursive: true }); +await writeFile( + noFinalNewlineFile, + [ + JSON.stringify({ timestamp: '2026-05-16T00:00:00.000Z', type: 'turn_context', payload: { model: 'gpt-test' } }), + JSON.stringify({ + timestamp: '2026-05-16T00:00:01.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 2, cached_input_tokens: 0, output_tokens: 1, total_tokens: 3 } }, + }, + }), + ].join('\n'), +); +const noFinalNewline = await collectUsage({ + codexHome: noFinalNewlineHome, + cacheFile: noFinalNewlineCache, + timezone: 'UTC', + pricingData, +}); +const noFinalNewlineCachePayload = JSON.parse(await readFile(noFinalNewlineCache, 'utf8')); +assert.equal(noFinalNewline.totals.totalTokens, 3); +assert.equal(noFinalNewline.stats.linesSeen, 2); +assert.equal(noFinalNewlineCachePayload.files[noFinalNewlineFile].endedWithNewline, false); + +const nativeFallbackHome = path.join(root, 'native-fallback-codex-home'); +const nativeFallbackSessions = path.join(nativeFallbackHome, 'sessions/2026/05/16'); +await mkdir(nativeFallbackSessions, { recursive: true }); +await writeFile( + path.join(nativeFallbackSessions, 'native-fallback.jsonl'), + [ + JSON.stringify({ timestamp: '2026-05-16T00:00:00.000Z', type: 'turn_context', payload: { model: 'gpt-test' } }), + JSON.stringify({ timestamp: '2026-05-16T00:00:01.000Z', type: 'event_msg', payload: { type: 'non_token' } }), + JSON.stringify({ + timestamp: '2026-05-16T00:00:02.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 3, cached_input_tokens: 0, output_tokens: 2, total_tokens: 5 } }, + }, + }), + '', + ].join('\n'), +); +const previousScanMode = process.env.CDXUSAGE_SCAN_MODE; +const previousForceNativeFail = process.env.CDXUSAGE_FORCE_NATIVE_BATCH_FAIL; +process.env.CDXUSAGE_SCAN_MODE = 'grep-batch'; +process.env.CDXUSAGE_FORCE_NATIVE_BATCH_FAIL = '1'; +try { + const nativeFallback = await collectUsage({ + codexHome: nativeFallbackHome, + cacheFile: path.join(root, 'native-fallback-cache.json'), + timezone: 'UTC', + pricingData, + }); + assert.equal(nativeFallback.totals.totalTokens, 5); + assert.equal(nativeFallback.stats.nativeBatchFallback, true); + assert.equal(nativeFallback.stats.linesSeen, 3); + assert.equal(nativeFallback.stats.candidateLinesSeen, 2); + assert.equal(nativeFallback.stats.scannerModes.needle, 1); +} finally { + if (previousScanMode == null) { + delete process.env.CDXUSAGE_SCAN_MODE; + } else { + process.env.CDXUSAGE_SCAN_MODE = previousScanMode; + } + if (previousForceNativeFail == null) { + delete process.env.CDXUSAGE_FORCE_NATIVE_BATCH_FAIL; + } else { + process.env.CDXUSAGE_FORCE_NATIVE_BATCH_FAIL = previousForceNativeFail; + } +} + +const nativeEarlyExitHome = path.join(root, 'native-early-exit-codex-home'); +const nativeEarlyExitSessions = path.join(nativeEarlyExitHome, 'sessions/2026/05/16'); +const nativeEarlyExitCache = path.join(root, 'native-early-exit-cache.json'); +await mkdir(nativeEarlyExitSessions, { recursive: true }); +for (let i = 0; i < 3000; i += 1) { + await writeFile( + path.join(nativeEarlyExitSessions, `native-early-exit-${i}.jsonl`), + [ + JSON.stringify({ timestamp: '2026-05-16T00:00:00.000Z', type: 'turn_context', payload: { model: 'gpt-test' } }), + JSON.stringify({ + timestamp: '2026-05-16T00:00:01.000Z', + type: 'event_msg', + payload: { + type: 'token_count', + info: { last_token_usage: { input_tokens: 1, cached_input_tokens: 0, output_tokens: 1, total_tokens: 2 } }, + }, + }), + '', + ].join('\n'), + ); +} +const fakeBin = path.join(root, 'fake-native-bin'); +await mkdir(fakeBin, { recursive: true }); +await writeFile(path.join(fakeBin, 'xargs'), '#!/bin/sh\nexit 1\n', { mode: 0o755 }); +const previousPath = process.env.PATH; +process.env.CDXUSAGE_SCAN_MODE = 'grep-batch'; +process.env.PATH = `${fakeBin}:${previousPath ?? ''}`; +try { + const nativeEarlyExit = await collectUsage({ + codexHome: nativeEarlyExitHome, + cacheFile: nativeEarlyExitCache, + timezone: 'UTC', + includePricing: false, + }); + assert.equal(nativeEarlyExit.totals.totalTokens, 6000); + assert.equal(nativeEarlyExit.stats.nativeBatchFallback, true); + assert.equal(nativeEarlyExit.stats.filesScannedFull, 3000); + assert.equal(nativeEarlyExit.stats.scannerModes.needle, 3000); +} finally { + if (previousScanMode == null) { + delete process.env.CDXUSAGE_SCAN_MODE; + } else { + process.env.CDXUSAGE_SCAN_MODE = previousScanMode; + } + if (previousPath == null) { + delete process.env.PATH; + } else { + process.env.PATH = previousPath; + } +} + const tieredHome = path.join(root, 'tiered-codex-home'); const tieredSessionsDir = path.join(tieredHome, 'sessions/2026/05/16'); const tieredCache = path.join(root, 'tiered-cache.json'); @@ -427,8 +647,17 @@ const symlinkReport = await collectUsage({ discoveryMode: 'find', pricingData, }); -assert.equal(symlinkReport.totals.totalTokens, 2); -assert.equal(symlinkReport.stats.filesSeen, 1); +assert.equal(symlinkReport.totals.totalTokens, 0); +assert.equal(symlinkReport.stats.filesSeen, 0); +const symlinkNodeReport = await collectUsage({ + codexHome: symlinkHome, + cacheFile: path.join(root, 'symlink-node-cache.json'), + timezone: 'UTC', + discoveryMode: 'node', + pricingData, +}); +assert.equal(symlinkNodeReport.totals.totalTokens, 0); +assert.equal(symlinkNodeReport.stats.filesSeen, 0); await rm(root, { recursive: true, force: true }); console.log('engine ok');