Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/claude_code_queue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ def get_stats(self) -> Dict[str, Any]:
}


@dataclass
class SessionStats:
"""Token usage statistics extracted from a session's JSONL log."""

input_tokens: int = 0
output_tokens: int = 0
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0
api_turns: int = 0

@property
def total_input_tokens(self) -> int:
"""Total tokens billed as input (non-cached + cache-write + cache-read)."""
return self.input_tokens + self.cache_creation_input_tokens + self.cache_read_input_tokens


@dataclass
class ExecutionResult:
"""Result of executing a prompt."""
Expand Down
115 changes: 114 additions & 1 deletion src/claude_code_queue/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Queue manager with execution loop.
"""

import json
import os
import sys
import time
Expand All @@ -10,7 +11,7 @@
from pathlib import Path
from typing import List, Optional, Callable, Dict, Any

from .models import QueuedPrompt, QueueState, PromptStatus, ExecutionResult
from .models import QueuedPrompt, QueueState, PromptStatus, ExecutionResult, SessionStats
from .storage import QueueStorage
from .claude_interface import ClaudeCodeInterface

Expand Down Expand Up @@ -266,15 +267,22 @@ def _process_execution_result(
"""Process the result of prompt execution."""
execution_summary = f"Execution completed in {result.execution_time:.1f}s"

# Extract token usage from the JSONL conversation log BEFORE any branch
# logic runs. CRITICAL: this must happen before _cleanup_rate_limit_artifacts()
# which deletes the JSONL file on the rate-limited path.
stats = self._extract_session_stats(prompt)

if result.success:
# retry_not_before is already None — cleared by _execute_prompt() via clear_retry_backoff().
prompt.status = PromptStatus.COMPLETED
prompt.add_log(f"{execution_summary} - SUCCESS")
if result.output:
prompt.add_log(f"Output:\n{result.output}")
self._log_session_stats(prompt, stats)

self.state.total_processed += 1
print(f"✓ Prompt {prompt.id} completed successfully")
print(self._format_stats_line(result.execution_time, stats))

elif result.is_non_retryable:
# Fix B — Non-retryable error: fail immediately, skip retry counter and can_retry().
Expand Down Expand Up @@ -317,10 +325,12 @@ def _process_execution_result(
else ""
)
prompt.add_log(f"Message{source_tag}: {result.rate_limit_info.limit_message}")
self._log_session_stats(prompt, stats)

if not was_already_rate_limited and self.state is not None:
self.state.rate_limited_count += 1
print(f"⚠ Prompt {prompt.id} rate limited, will retry later")
print(self._format_stats_line(result.execution_time, stats))

self._cleanup_rate_limit_artifacts(prompt)

Expand All @@ -340,23 +350,27 @@ def _process_execution_result(
)
if result.error:
prompt.add_log(f"Error: {result.error}")
self._log_session_stats(prompt, stats)
print(
f"✗ Prompt {prompt.id} failed, will retry in "
f"{self._generic_failure_retry_delay}s "
f"({prompt.retry_count}/{'∞' if prompt.max_retries == -1 else prompt.max_retries})"
)
print(self._format_stats_line(result.execution_time, stats))
else:
prompt.status = PromptStatus.FAILED
prompt.clear_retry_backoff() # Fix 3: clear stale field for YAML cleanliness
prompt.add_log(f"{execution_summary} - FAILED (max retries exceeded)")
if result.error:
prompt.add_log(f"Error: {result.error}")
self._log_session_stats(prompt, stats)

self.state.failed_count += 1
retries_str = "∞" if prompt.max_retries == -1 else str(prompt.max_retries)
print(
f"✗ Prompt {prompt.id} failed permanently after {retries_str} attempts"
)
print(self._format_stats_line(result.execution_time, stats))

self.state.last_processed = datetime.now()

Expand Down Expand Up @@ -487,6 +501,105 @@ def _format_duration(self, seconds: float) -> str:
return f"{hours}h"
return f"{hours}h {minutes}m"

def _extract_session_stats(self, prompt: QueuedPrompt) -> Optional[SessionStats]:
"""Extract token usage from the JSONL conversation log for a just-finished execution.

Locates the JSONL file using the same path-encoding logic as
_do_cleanup_rate_limit_artifacts(), then sums usage across all assistant
turns.

Returns None if the JSONL cannot be found or parsed.
Best-effort: failures are logged but never propagate.

IMPORTANT: This method relies on Claude Code's internal file layout under
~/.claude/projects/. See _do_cleanup_rate_limit_artifacts() for the same
caveat about undocumented internal structure.
"""
if not prompt.last_executed:
return None

try:
return self._do_extract_session_stats(prompt)
except Exception as e:
prompt.add_log(f"Warning: session stats extraction failed: {e}")
return None

def _do_extract_session_stats(self, prompt: QueuedPrompt) -> Optional[SessionStats]:
"""Inner implementation — may raise; caller catches all exceptions."""
cutoff = prompt.last_executed.timestamp()
claude_dir = Path.home() / ".claude"

resolved = prompt._resolved_working_directory or str(
Path(prompt.working_directory).resolve()
)
encoded = resolved.replace("/", "-")
jsonl_dir = claude_dir / "projects" / encoded

if not jsonl_dir.is_dir():
return None

# Find the newest .jsonl file with mtime >= cutoff (no size cap).
best_file = None
best_mtime = 0.0
for f in jsonl_dir.glob("*.jsonl"):
try:
st = f.stat()
if st.st_mtime >= cutoff and st.st_mtime > best_mtime:
best_mtime = st.st_mtime
best_file = f
except OSError:
pass

if best_file is None:
return None

# Sum usage across all assistant turns, line-by-line.
stats = SessionStats()
with open(best_file, "r") as fh:
for line in fh:
try:
obj = json.loads(line)
except ValueError:
continue
if obj.get("type") != "assistant" or "message" not in obj:
continue
usage = obj["message"].get("usage", {})
stats.input_tokens += usage.get("input_tokens", 0)
stats.output_tokens += usage.get("output_tokens", 0)
stats.cache_creation_input_tokens += usage.get("cache_creation_input_tokens", 0)
stats.cache_read_input_tokens += usage.get("cache_read_input_tokens", 0)
stats.api_turns += 1

if stats.api_turns == 0:
return None

return stats

def _format_stats_line(
self, execution_time: float, stats: Optional[SessionStats]
) -> str:
"""Format a stats line for console output after job completion."""
parts = [f"Duration: {self._format_duration(execution_time)}"]
if stats is not None:
parts.append(f"Input: {stats.total_input_tokens:,} tokens")
parts.append(f"Output: {stats.output_tokens:,} tokens")
return " " + " | ".join(parts)

def _log_session_stats(
self, prompt: QueuedPrompt, stats: Optional[SessionStats]
) -> None:
"""Log detailed token usage to the prompt's execution log (.md file)."""
if stats is None:
return
prompt.add_log(
f"Token usage: {stats.input_tokens:,} input"
f" + {stats.cache_creation_input_tokens:,} cache-write"
f" + {stats.cache_read_input_tokens:,} cache-read"
f" = {stats.total_input_tokens:,} total input,"
f" {stats.output_tokens:,} output"
f" ({stats.api_turns} API turn{'s' if stats.api_turns != 1 else ''})"
)

def add_prompt(self, prompt: QueuedPrompt) -> bool:
"""Add a prompt to the queue."""
try:
Expand Down
Loading