diff --git a/ai_analysis.py b/ai_analysis.py index 2d27e90..48b1d7c 100644 --- a/ai_analysis.py +++ b/ai_analysis.py @@ -21,6 +21,90 @@ _OLLAMA_KEEP_ALIVE = '30m' +# ── Anthropic prompt-cache sentinels ───────────────────────────────────── +# +# Heavy editorial calls reuse the same transcript across many requests in +# one session (Story analysis runs four passes per chunk; chat resends +# the transcript on every user turn). To let Anthropic prompt caching +# kick in without refactoring every prompt template, call sites wrap the +# cacheable content in these sentinels: +# +# prompt = f"...TRANSCRIPT:\n{CACHE_TX_START}{transcript}{CACHE_TX_END}..." +# +# The wrapper functions below (_call_ai, _call_ai_json, _call_ai_chat, +# _call_ai_chat_stream) detect the sentinels and: +# - On Anthropic, move the wrapped content into a structured system +# block list with cache_control markers (5m TTL for stable text and +# DNA examples, 1h TTL for the transcript). The user prompt is sent +# with the wrapped section replaced by a short pointer. +# - On Ollama / OpenAI, strip the sentinels and send the prompt +# verbatim so non-Anthropic behavior is byte-identical to before. +# +# The sentinels are deliberately unusual strings that will not collide +# with real interview content. They are stripped before any non-Anthropic +# call, so they never reach a model that does not understand them. +CACHE_TX_START = "<<>>" +CACHE_TX_END = "<<>>" +CACHE_DNA_START = "<<>>" +CACHE_DNA_END = "<<>>" +_TRANSCRIPT_HOISTED_NOTICE = "(TRANSCRIPT provided in the system prompt above.)" +_DNA_HOISTED_NOTICE = "(EDITORIAL STYLE provided in the system prompt above.)" + + +def _extract_cache_segment(text: str, start: str, end: str): + """Return (segment_or_None, text_with_segment_replaced). + + The replacement leaves a short notice in place of the cached content + so the user prompt still reads coherently. Sentinels themselves are + always removed. When the sentinels are absent, returns + ``(None, text)`` unchanged. + """ + if not text or start not in text or end not in text: + return None, text + i = text.find(start) + j = text.find(end, i + len(start)) + if j < 0: + return None, text.replace(start, "").replace(end, "") + segment = text[i + len(start):j] + notice = ( + _TRANSCRIPT_HOISTED_NOTICE if start == CACHE_TX_START + else _DNA_HOISTED_NOTICE if start == CACHE_DNA_START + else "" + ) + cleaned = text[:i] + notice + text[j + len(end):] + return segment, cleaned + + +def _strip_cache_sentinels(text: str) -> str: + """Remove any cache sentinel markers from ``text`` without touching + the content they wrap. Used for non-Anthropic providers where the + full prompt is sent inline.""" + if not text: + return text + return ( + text + .replace(CACHE_TX_START, "") + .replace(CACHE_TX_END, "") + .replace(CACHE_DNA_START, "") + .replace(CACHE_DNA_END, "") + ) + + +def _split_cacheable_prompt(prompt: str): + """Pull transcript and DNA segments out of a sentinel-tagged prompt. + + Returns ``(prompt_without_segments, transcript_or_None, dna_or_None)``. + Used by the wrapper functions to decide between the cached-system + Anthropic payload and the sentinel-stripped fallback path for other + providers. + """ + if not prompt: + return prompt, None, None + transcript, prompt = _extract_cache_segment(prompt, CACHE_TX_START, CACHE_TX_END) + dna, prompt = _extract_cache_segment(prompt, CACHE_DNA_START, CACHE_DNA_END) + return prompt, transcript, dna + + def _load_chat_system_prompt(): """Load the master chat system prompt from prompts/chat-system-prompt.md. @@ -100,7 +184,12 @@ def _build_transcript_message(project_name, segments, formatted, parts.append(f"SPEAKERS: {', '.join(speakers)}") parts.append('') parts.append('TRANSCRIPT:') - parts.append(formatted) + # Wrap just the transcript bytes in cache sentinels. The chat-stream + # wrapper hoists the wrapped chunk into a cached text block when the + # active provider is Anthropic, so every follow-up turn in the + # session re-uses the cached transcript instead of paying full + # input-token cost. Ollama strips the sentinels. + parts.append(f'{CACHE_TX_START}{formatted}{CACHE_TX_END}') if analysis_block: parts.append(analysis_block) if relevant_excerpts_block: @@ -151,9 +240,16 @@ def _build_chat_messages(message, history, project_name, segments, messages = [] style_block = get_active_style_block(profile_id=profile_id) if style_block: + # The style block (Editorial DNA / My Style examples) is stable + # across a session, so wrap it in DNA sentinels for the + # Anthropic prompt cache. Non-Anthropic providers receive the + # sentinel-stripped string and behavior is unchanged. messages.append({ 'role': 'user', - 'content': f'STYLE CONTEXT (active My Style profile):\n\n{style_block}', + 'content': ( + 'STYLE CONTEXT (active My Style profile):\n\n' + f'{CACHE_DNA_START}{style_block}{CACHE_DNA_END}' + ), }) transcript_msg = _build_transcript_message( @@ -302,7 +398,103 @@ def chat_about_transcript(transcript, message, history=None, project_name="Inter ] -def _call_ai_chat_stream(system_message, messages, num_ctx=32768): +def _prepare_chat_messages_for_provider(provider_name, system_message, messages): + """Return (system_param, messages_param) ready for ``provider_name``. + + Looks for cache sentinels inside the messages list. On Anthropic, the + transcript message's content becomes a list of text blocks with + cache_control so each turn after the first re-uses the cached prefix + instead of paying full input-token cost. On Ollama / OpenAI the + sentinels are stripped and the messages are returned unchanged. + """ + if provider_name != "anthropic": + cleaned = [] + for m in messages or []: + if not isinstance(m, dict): + cleaned.append(m) + continue + content = m.get("content") + if isinstance(content, str): + cleaned.append({**m, "content": _strip_cache_sentinels(content)}) + else: + cleaned.append(m) + return system_message, cleaned + + from ai_providers.anthropic_provider import ( + build_cached_system_blocks, + build_cached_user_messages, + ) + + transcript_text = None + dna_text = None + rebuilt: list = [] + transcript_message_index = None + + _NOISE_AFTER_HOIST = ( + "style context (active my style profile):", + ) + _HOIST_NOTICES = ( + _TRANSCRIPT_HOISTED_NOTICE.lower(), + _DNA_HOISTED_NOTICE.lower(), + ) + + def _is_noise_only(s: str) -> bool: + stripped = (s or "").strip().lower() + if not stripped: + return True + for notice in _HOIST_NOTICES: + stripped = stripped.replace(notice, "").strip() + if not stripped: + return True + return any(stripped.startswith(p) and len(stripped) <= len(p) + 4 + for p in _NOISE_AFTER_HOIST) + + for m in messages or []: + if not isinstance(m, dict): + rebuilt.append(m) + continue + content = m.get("content") + if not isinstance(content, str): + rebuilt.append(m) + continue + seg_tx, content_after = _extract_cache_segment(content, CACHE_TX_START, CACHE_TX_END) + seg_dna, content_after = _extract_cache_segment(content_after, CACHE_DNA_START, CACHE_DNA_END) + if seg_tx and transcript_text is None: + transcript_text = seg_tx + if seg_dna and dna_text is None: + dna_text = seg_dna + if (seg_tx or seg_dna) and _is_noise_only(content_after): + continue + new_msg = {**m, "content": content_after} + if seg_tx and transcript_message_index is None: + transcript_message_index = len(rebuilt) + rebuilt.append(new_msg) + + if not transcript_text and not dna_text: + return system_message, rebuilt + + system_param = build_cached_system_blocks( + system_message, dna_block=dna_text, transcript_block=None, + ) if (system_message or dna_text) else system_message + + if transcript_text and transcript_message_index is not None: + head = rebuilt[:transcript_message_index] + tail = rebuilt[transcript_message_index + 1:] + framing = (rebuilt[transcript_message_index].get("content") or "").strip() + extras = [] + if framing: + extras.append({"type": "text", "text": framing}) + new_message_block = build_cached_user_messages( + transcript_block=transcript_text, + other_user_messages=[], + extra_user_blocks=extras, + )[0] + rebuilt = head + [new_message_block] + tail + + return system_param, rebuilt + + +def _call_ai_chat_stream(system_message, messages, num_ctx=32768, call_site=None): """Stream chat tokens through the active AI provider. ``system_message`` is the master system prompt (chat-system-prompt.md @@ -317,9 +509,13 @@ def _call_ai_chat_stream(system_message, messages, num_ctx=32768): """ from ai_providers import get_active_provider provider = get_active_provider(model_resolver=_get_ollama_model) - yield from provider.generate_stream( - system_message, messages, task_type="chat", num_ctx=num_ctx, + system_param, messages_param = _prepare_chat_messages_for_provider( + provider.name, system_message, messages, ) + kwargs = {"task_type": "chat", "num_ctx": num_ctx} + if provider.name == "anthropic": + kwargs["call_site"] = call_site or "_call_ai_chat_stream" + yield from provider.generate_stream(system_param, messages_param, **kwargs) def chat_about_transcript_stream(transcript, message, history=None, project_name="Interview", @@ -456,17 +652,26 @@ def _is_stuck(tail): yield ('done', cleaned) -def _call_ai_chat(system_message, messages, num_ctx=32768): +def _call_ai_chat(system_message, messages, num_ctx=32768, call_site=None): """Non-streaming chat call through the active provider. Same input shape as :func:`_call_ai_chat_stream` — a system string plus a messages array. Returns the full reply as a single string. + + Honors the cache sentinels (CACHE_TX_START / END, CACHE_DNA_START / END) + when the active provider is Anthropic: the wrapped sections are moved + out of the user message and into structured cached system / user + content blocks before the request is sent. """ from ai_providers import get_active_provider provider = get_active_provider(model_resolver=_get_ollama_model) - return provider.generate( - system_message, messages, task_type="chat", num_ctx=num_ctx, + system_param, messages_param = _prepare_chat_messages_for_provider( + provider.name, system_message, messages, ) + kwargs = {"task_type": "chat", "num_ctx": num_ctx} + if provider.name == "anthropic": + kwargs["call_site"] = call_site or "_call_ai_chat" + return provider.generate(system_param, messages_param, **kwargs) def _count_clip_markers(text): @@ -2275,13 +2480,13 @@ def _build_chunk_search_prompt(chunk, message, phrases, words, chunk_idx, GOOD: "She walks through how a single rejected draft became the backbone of the final piece."{keyword_hint} EXCERPT: -{chunk_block}""" +{CACHE_TX_START}{chunk_block}{CACHE_TX_END}""" user_prompt = f"User's question: {message}\n\nReturn JSON with up to 3 candidate moments from this excerpt." return system_prompt, user_prompt -def _call_ai_json(system_prompt, user_prompt, timeout=180, model_override=None): +def _call_ai_json(system_prompt, user_prompt, timeout=180, model_override=None, call_site=None): """Low-temperature Ollama call optimized for structured output. Uses a smaller num_ctx than chat because each chunk fits comfortably @@ -2315,9 +2520,16 @@ def _call_ai_json(system_prompt, user_prompt, timeout=180, model_override=None): model_name = model_override or ( _get_ollama_model() if provider.name == "ollama" else provider.name ) + # Cache-key strips sentinels from BOTH prompts so a sentinel-tagged + # prompt and a plain equivalent prompt do not produce two LRU + # entries. The chunked-search path embeds the chunk inside the + # system prompt, so we also need to extract sentinels from there + # for the Anthropic prompt-cache hoist below. + plain_system_prompt = _strip_cache_sentinels(system_prompt) + plain_user_prompt = _strip_cache_sentinels(user_prompt) cache_key = ( - hashlib.sha1(system_prompt.encode('utf-8', 'replace')).hexdigest(), - hashlib.sha1(user_prompt.encode('utf-8', 'replace')).hexdigest(), + hashlib.sha1(plain_system_prompt.encode('utf-8', 'replace')).hexdigest(), + hashlib.sha1(plain_user_prompt.encode('utf-8', 'replace')).hexdigest(), model_name, provider.name, ) @@ -2325,10 +2537,32 @@ def _call_ai_json(system_prompt, user_prompt, timeout=180, model_override=None): if cached is not None: return cached try: - result = provider.generate( - system_prompt, user_prompt, task_type="analysis", - timeout=timeout, model_override=model_override, - ) + if provider.name == "anthropic": + from ai_providers.anthropic_provider import build_cached_system_blocks + clean_user, user_tx, user_dna = _split_cacheable_prompt(user_prompt) + clean_system, sys_tx, sys_dna = _split_cacheable_prompt(system_prompt) + transcript = sys_tx or user_tx + dna = sys_dna or user_dna + if transcript or dna: + system_param = build_cached_system_blocks( + clean_system, dna_block=dna, transcript_block=transcript, + ) + result = provider.generate( + system_param, clean_user, task_type="analysis", + timeout=timeout, model_override=model_override, + call_site=call_site or "_call_ai_json", + ) + else: + result = provider.generate( + plain_system_prompt, plain_user_prompt, task_type="analysis", + timeout=timeout, model_override=model_override, + call_site=call_site or "_call_ai_json", + ) + else: + result = provider.generate( + plain_system_prompt, plain_user_prompt, task_type="analysis", + timeout=timeout, model_override=model_override, + ) except RuntimeError as e: # Permanent provider problems (no key, bad key) must abort the # whole analysis — silently returning '' on every chunk would @@ -2664,7 +2898,9 @@ def _rerank_candidates_globally(candidates, message, top_k=5): "Only return indices that appear in the menu above." ) try: - response = _call_ai_json(system_prompt, user_prompt, timeout=90) + response = _call_ai_json( + system_prompt, user_prompt, timeout=90, call_site="chat_chunk_rerank", + ) except Exception: response = '' if not response: @@ -2815,7 +3051,10 @@ def _run_chunk(idx_chunk): # available (~2-3× faster decode on Apple Silicon). Synthesis # rerank below stays on the user's hardware-tier variant so # the global pick remains high-quality. - response = _call_ai_json(system_prompt, user_prompt, model_override=fast_model) + response = _call_ai_json( + system_prompt, user_prompt, model_override=fast_model, + call_site="chat_chunk_search", + ) return _parse_chunk_response(response, chunk) with ThreadPoolExecutor(max_workers=_CHAT_CHUNK_CONCURRENCY) as pool: @@ -2901,7 +3140,10 @@ def _run_chunk(idx_chunk): chunk, message, phrases, words, idx, len(chunks), project_name, strict_keyword=strict_keyword, ) - response = _call_ai_json(system_prompt, user_prompt, model_override=fast_model) + response = _call_ai_json( + system_prompt, user_prompt, model_override=fast_model, + call_site="chat_chunk_search_stream", + ) return _parse_chunk_response(response, chunk) completed = 0 @@ -3745,7 +3987,7 @@ def _format_transcript_paragraphs_for_ai(transcript, max_paragraph_seconds=60): return _format_paragraphs_as_lines(paragraphs) -def _call_ai(prompt, system_prompt="", task_type="analysis"): +def _call_ai(prompt, system_prompt="", task_type="analysis", call_site=None): """Single-prompt generation through the active provider. ``task_type`` selects the model when the provider tiers (Anthropic uses @@ -3753,25 +3995,38 @@ def _call_ai(prompt, system_prompt="", task_type="analysis"): Editorial DNA's classifier passes ``"analysis"``; My Style synthesis passes ``"profile_creation"``. + When the prompt contains the cache sentinels (CACHE_TX_START/END or + CACHE_DNA_START/END) and the active provider is Anthropic, the + wrapped sections are hoisted into structured system blocks with + cache_control. Other providers see the same prompt with sentinels + stripped, so behavior is unchanged on Ollama and OpenAI. + Raises ``RuntimeError`` on provider error so the caller can surface a clear message to the user instead of silently falling back. """ from ai_providers import get_active_provider - # Skip the storytelling foundation for structured JSON analysis. - # The foundation is ~28 KB (~7 K tokens) of narrative-editorial - # guidance. For analysis calls the prompt is a self-contained JSON - # extraction task: the transcript IS the data and it must survive - # intact. With num_ctx=12288 and num_predict=4096 the available - # input budget is ~8192 tokens — the foundation alone would consume - # ~7094 of those, leaving ~1098 for the entire transcript. Ollama - # silently truncates the overflow, so the model never sees the real - # transcript and hallucinates plausible-looking timecodes and generic - # descriptions. Chat and Story Builder still get the full foundation. if task_type != "analysis": system_prompt = inject_storytelling_foundation(system_prompt) provider = get_active_provider(model_resolver=_get_ollama_model) + + if provider.name == "anthropic": + from ai_providers.anthropic_provider import build_cached_system_blocks + clean_prompt, transcript, dna = _split_cacheable_prompt(prompt) + if transcript or dna: + system_param = build_cached_system_blocks( + system_prompt, dna_block=dna, transcript_block=transcript, + ) + return provider.generate( + system_param, clean_prompt, task_type=task_type, + call_site=call_site or "_call_ai", + ) + return provider.generate( + system_prompt, _strip_cache_sentinels(prompt), task_type=task_type, + call_site=call_site or "_call_ai", + ) + return provider.generate( - system_prompt, prompt, task_type=task_type, + system_prompt, _strip_cache_sentinels(prompt), task_type=task_type, ) @@ -4053,12 +4308,12 @@ def build_story(transcript, message, project_name="Interview", segment_vectors=N USER REQUEST: {message} TRANSCRIPT (presented in recording order — re-sequence freely for narrative arc): -{formatted} +{CACHE_TX_START}{formatted}{CACHE_TX_END} Return ONLY valid JSON. No markdown, no extra text.""" system_prompt = inject_my_style(system_prompt, profile_id=profile_id) - response = _call_ai(prompt, system_prompt) + response = _call_ai(prompt, system_prompt, call_site="build_story") return _parse_json_response(response) @@ -4071,7 +4326,7 @@ def _segment_vector_prompt(transcript_text: str, project_name: str) -> str: PROJECT: {project_name} TRANSCRIPT: -{transcript_text} +{CACHE_TX_START}{transcript_text}{CACHE_TX_END} STEP 1 — Identify the distinct threads or topics the speaker discusses. Use the speaker's own words and phrasing for each thread title. Do not invent abstract corporate language. "The day I quit" — yes. "Professional Transition Event" — no. @@ -4130,6 +4385,7 @@ def _generate_vectors_single_chunk(transcript_text: str, project_name: str): response = _call_ai( _segment_vector_prompt(transcript_text, project_name), _SEGMENT_VECTOR_SYSTEM_PROMPT, + call_site="generate_segment_vectors", ) return _extract_segment_list(_parse_json_response(response)) @@ -4420,7 +4676,7 @@ def _menu_sort_key(s): USER REQUEST: {message} AVAILABLE SEGMENTS (pre-classified): -{menu} +{CACHE_TX_START}{menu}{CACHE_TX_END} Return ONLY valid JSON in this shape: {{ @@ -4440,7 +4696,7 @@ def _menu_sort_key(s): }}""" system_prompt = inject_my_style(system_prompt, profile_id=profile_id) - response = _call_ai(prompt, system_prompt) + response = _call_ai(prompt, system_prompt, call_site="build_story_from_vectors") parsed = _parse_json_response(response) if not isinstance(parsed, dict): parsed = {'clips': []} @@ -4551,7 +4807,7 @@ def _analyze_story_soundbites(transcript_text, project_name, soundbites_target): prompt = f"""PROJECT: {project_name} TRANSCRIPT: -{transcript_text} +{CACHE_TX_START}{transcript_text}{CACHE_TX_END} Return ONLY this JSON object: {{ @@ -4566,7 +4822,7 @@ def _analyze_story_soundbites(transcript_text, project_name, soundbites_target): - "why" is a short phrase explaining editorial value (emotional peak, thesis statement, surprising admission, etc.). Be ruthless — return fewer if the transcript only has fewer genuine standouts. Return ONLY valid JSON, nothing else.""" - parsed = _parse_json_response(_call_ai(prompt, system_prompt)) + parsed = _parse_json_response(_call_ai(prompt, system_prompt, call_site="analyze_story_soundbites")) if isinstance(parsed, dict) and ( parsed.get('strongest_soundbites') or parsed.get('soundbites') or parsed.get('quotes') or parsed.get('best_quotes') @@ -4574,7 +4830,7 @@ def _analyze_story_soundbites(transcript_text, project_name, soundbites_target): return parsed retry = _parse_json_response(_call_ai( prompt + '\n\nNO MARKDOWN. JSON ONLY. Fill the strongest_soundbites array with real quotes from the transcript.', - system_prompt, + system_prompt, call_site="analyze_story_soundbites_retry", )) return retry if isinstance(retry, dict) else (parsed if isinstance(parsed, dict) else {}) @@ -4590,7 +4846,7 @@ def _analyze_story_beats(transcript_text, project_name, beats_target): prompt = f"""PROJECT: {project_name} TRANSCRIPT: -{transcript_text} +{CACHE_TX_START}{transcript_text}{CACHE_TX_END} Return ONLY this JSON object — fill both lists: {{ @@ -4606,7 +4862,7 @@ def _analyze_story_beats(transcript_text, project_name, beats_target): Suggest 3-7 b-roll moments. Each one should be a CONCRETE visual idea pinned to the timecode where it would land — describe what you'd specifically want to see, not generic filler like "nature shots" or "stock footage". CRITICAL: Copy the exact HH:MM:SS timecodes from the transcript for start and end. Use string format like "00:02:45". Return ONLY valid JSON, nothing else.""" - parsed = _parse_json_response(_call_ai(prompt, system_prompt)) + parsed = _parse_json_response(_call_ai(prompt, system_prompt, call_site="analyze_story_beats")) if isinstance(parsed, dict) and ( parsed.get('story_beats') or parsed.get('beats') or parsed.get('broll_suggestions') or parsed.get('broll') @@ -4614,7 +4870,7 @@ def _analyze_story_beats(transcript_text, project_name, beats_target): return parsed retry = _parse_json_response(_call_ai( prompt + '\n\nNO MARKDOWN. JSON ONLY. FILL BOTH LISTS — story_beats and broll_suggestions.', - system_prompt, + system_prompt, call_site="analyze_story_beats_retry", )) return retry if isinstance(retry, dict) else (parsed if isinstance(parsed, dict) else {}) @@ -4628,7 +4884,7 @@ def _analyze_story_overview(transcript_text, project_name): prompt = f"""PROJECT: {project_name} TRANSCRIPT: -{transcript_text} +{CACHE_TX_START}{transcript_text}{CACHE_TX_END} Return ONLY this JSON object: {{ @@ -4639,14 +4895,15 @@ def _analyze_story_overview(transcript_text, project_name): Pick 3-7 themes — short noun phrases for the recurring topics. An empty list is fine if there aren't real recurring patterns. Return ONLY valid JSON, nothing else.""" - parsed = _parse_json_response(_call_ai(prompt, system_prompt)) + parsed = _parse_json_response(_call_ai(prompt, system_prompt, call_site="analyze_story_overview")) if isinstance(parsed, dict) and ( parsed.get('summary') or parsed.get('themes') or parsed.get('overview') or parsed.get('synopsis') ): return parsed retry = _parse_json_response(_call_ai( - prompt + '\n\nNO MARKDOWN. NO PROSE. JSON ONLY.', system_prompt + prompt + '\n\nNO MARKDOWN. NO PROSE. JSON ONLY.', system_prompt, + call_site="analyze_story_overview_retry", )) return retry if isinstance(retry, dict) else (parsed if isinstance(parsed, dict) else {}) @@ -4670,7 +4927,7 @@ def _analyze_social(transcript_text, project_name, clips_target=7): PROJECT: {project_name} TRANSCRIPT: -{transcript_text} +{CACHE_TX_START}{transcript_text}{CACHE_TX_END} Return a JSON object with this exact structure: {{ @@ -4701,7 +4958,7 @@ def _analyze_social(transcript_text, project_name, clips_target=7): Use the HH:MM:SS format as a string, like "00:02:45". Do NOT convert to decimal numbers. Return ONLY valid JSON, no markdown formatting.""" - response = _call_ai(prompt, system_prompt) + response = _call_ai(prompt, system_prompt, call_site="analyze_social") parsed = _parse_json_response(response) if isinstance(parsed, dict) and ( parsed.get('social_clips') or parsed.get('clips') @@ -4720,12 +4977,13 @@ def _analyze_social(transcript_text, project_name, clips_target=7): '[{"rank":1,"title":"...","start":"00:00:00","end":"00:00:00",' '"duration_seconds":30,"text":"...","platform":"instagram_reels",' '"why":"...","hook":"...","hashtags":["..."]}]\n\n' - f'PROJECT: {project_name}\n\nTRANSCRIPT:\n{transcript_text}\n\n' + f'PROJECT: {project_name}\n\nTRANSCRIPT:\n' + f'{CACHE_TX_START}{transcript_text}{CACHE_TX_END}\n\n' f'Pick the {clips_target} BEST clips, 15-60 seconds each, ' 'ranked by predicted engagement. Return ONLY the JSON array, ' 'nothing else.' ) - retry = _parse_json_response(_call_ai(retry_prompt, retry_system)) + retry = _parse_json_response(_call_ai(retry_prompt, retry_system, call_site="analyze_social_retry")) if isinstance(retry, dict) and retry: return retry if isinstance(retry, list) and retry: diff --git a/ai_providers/anthropic_provider.py b/ai_providers/anthropic_provider.py index 76d8930..e5f9633 100644 --- a/ai_providers/anthropic_provider.py +++ b/ai_providers/anthropic_provider.py @@ -2,29 +2,89 @@ Model selection is hard-coded by ``task_type`` and never exposed to the user: - - ``profile_creation`` → claude-opus-4-20250514 (My Style synthesis) - - everything else → claude-sonnet-4-20250514 + - ``profile_creation``, ``analysis``, ``story_brief`` + -> claude-opus-4-7 (highest quality for editorial work) + - everything else (chat, general) + -> claude-sonnet-4-6 (fast, cost-effective) + +These IDs were bumped from ``claude-opus-4-20250514`` / +``claude-sonnet-4-20250514`` in May 2026 ahead of the June 15 2026 +deprecation EOL on the 4.0 IDs. The SDK manages SSE streaming, retry/backoff, and typed error classes that we map to clear user-facing messages. + +PROMPT CACHING +-------------- +Heavy editorial calls reuse the same transcript across many requests in a +single session (Story analysis runs four passes per chunk; chat resends +the transcript on every user turn). To avoid re-billing the same input +tokens on every call, the wrapper layer in ``ai_analysis.py`` can pass: + + - ``system_prompt`` as a list of structured text blocks. Blocks marked + with ``cache_control`` are cached server-side. The stable system + text and Editorial DNA examples share an ephemeral 5-minute cache; + the transcript block gets the extended 1-hour TTL (requires the + ``extended-cache-ttl-2025-04-11`` beta header, which we send on + every request so callers do not need to think about it). + - User messages with content blocks that carry ``cache_control``. The + chat path uses this so the per-turn transcript message hits the + cache on every follow-up turn. + +Each response.usage is logged with the cache hit rate, and we warn when +the same in-process prefix re-creates a cache entry without reading from +the existing one (a sign that the cache_control breakpoint moved +between calls). """ +import logging import time -from typing import Union +from typing import Any, Dict, List, Optional, Union from .base import BaseProvider from . import ProviderError -MODEL_DEFAULT = "claude-sonnet-4-20250514" -MODEL_PROFILE = "claude-opus-4-20250514" +MODEL_DEFAULT = "claude-sonnet-4-6" +MODEL_OPUS = "claude-opus-4-7" + +# Task types that benefit from Opus-level reasoning: deep editorial +# analysis (story beats, soundbites, social clips) and long-form +# synthesis (story briefs, My Style profiles). +_OPUS_TASK_TYPES = frozenset({"profile_creation", "analysis", "story_brief"}) + +# Anthropic enforces a per-block minimum on cached prefixes: Sonnet and +# Opus need at least 1024 tokens in a block before the API will cache +# it. We approximate with len(text) // 4, which is a small overestimate +# of tokens for English prose and a safe bound for the minimum check. +_MIN_CACHE_TOKENS = 1024 +_CHARS_PER_TOKEN = 4 + +# Extended 1h TTL on ephemeral cache requires this beta header. Sending +# it only on requests that actually use cache markers keeps non-cached +# calls byte-identical to the pre-caching behavior. +_EXTENDED_TTL_HEADER = "extended-cache-ttl-2025-04-11" + +logger = logging.getLogger(__name__) + +# Per-process record of prefixes we have already paid to cache. A second +# call that lands on the same prefix is expected to return a non-zero +# cache_read_input_tokens; if it does not, log a WARNING because the +# breakpoint moved silently. +_seen_cache_prefixes: set = set() + + +def _approx_tokens(text: str) -> int: + if not text: + return 0 + return max(1, len(text) // _CHARS_PER_TOKEN) def _model_for_task(task_type: str) -> str: - return MODEL_PROFILE if task_type == "profile_creation" else MODEL_DEFAULT + return MODEL_OPUS if task_type in _OPUS_TASK_TYPES else MODEL_DEFAULT def _max_tokens_for_task(task_type: str) -> int: - if task_type == "profile_creation": + if task_type in ("profile_creation", "analysis", "story_brief"): return 8192 if task_type == "chat": return 2048 @@ -37,6 +97,103 @@ def _normalize_messages(user_or_messages: Union[str, list]) -> list: return [{"role": "user", "content": str(user_or_messages)}] +def _prefix_signature(system_param: Any, messages: list) -> str: + """Cheap content hash for the cacheable prefix. + + Used only to decide whether to WARN about a missed cache hit on the + second call to the same prefix in one process. Hashes the stable + system text plus the first user message text (which carries the + transcript on the chat path). + """ + import hashlib + h = hashlib.sha1() + if isinstance(system_param, list): + for block in system_param: + if isinstance(block, dict) and block.get("cache_control"): + h.update((block.get("text") or "").encode("utf-8", "replace")) + elif isinstance(system_param, str): + h.update(system_param.encode("utf-8", "replace")) + for m in messages: + if not isinstance(m, dict): + continue + content = m.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("cache_control"): + h.update((block.get("text") or "").encode("utf-8", "replace")) + elif isinstance(content, str) and m.get("role") == "user": + h.update(content.encode("utf-8", "replace")) + break + return h.hexdigest() + + +def _has_cache_markers(system_param: Any, messages: list) -> bool: + if isinstance(system_param, list): + for block in system_param: + if isinstance(block, dict) and block.get("cache_control"): + return True + for m in messages: + if not isinstance(m, dict): + continue + content = m.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("cache_control"): + return True + return False + + +def _log_usage(usage, model: str, call_site: str, signature: Optional[str]): + """Log Anthropic usage stats and warn on suspected cache misses. + + Hit rate is computed as cache_read / (cache_read + input_tokens), + matching the way Anthropic bills: fresh input tokens are cheaper + than cache_creation but ten times more expensive than cache_read. + + The Flask app uses ``print()`` for diagnostics (no ``logging`` + configuration), so the human-readable line also goes through + ``print()`` to land in the server log alongside per-request lines. + The module-level ``logger`` is still wired up so pytest's caplog + fixture can assert on warnings in unit tests. + """ + if usage is None: + return + cache_create = int(getattr(usage, "cache_creation_input_tokens", 0) or 0) + cache_read = int(getattr(usage, "cache_read_input_tokens", 0) or 0) + in_tokens = int(getattr(usage, "input_tokens", 0) or 0) + out_tokens = int(getattr(usage, "output_tokens", 0) or 0) + total_in = cache_read + in_tokens + hit_rate = (cache_read / total_in) if total_in else 0.0 + line = ( + f"[anthropic.usage] site={call_site} model={model} " + f"in={in_tokens} out={out_tokens} " + f"cache_create={cache_create} cache_read={cache_read} " + f"hit_rate={hit_rate:.2f}" + ) + print(line, flush=True) + logger.info( + "anthropic.usage site=%s model=%s in=%d out=%d cache_create=%d " + "cache_read=%d hit_rate=%.2f", + call_site, model, in_tokens, out_tokens, + cache_create, cache_read, hit_rate, + ) + if signature and signature in _seen_cache_prefixes: + if cache_create > 0 and cache_read == 0: + warn_line = ( + f"[anthropic.cache-miss-on-repeat] site={call_site} model={model} " + f"signature={signature[:12]} cache_create={cache_create} " + "expected non-zero cache_read" + ) + print(warn_line, flush=True) + logger.warning( + "anthropic.cache-miss-on-repeat site=%s model=%s signature=%s " + "cache_create=%d expected non-zero cache_read", + call_site, model, signature[:12], cache_create, + ) + if signature and (cache_create > 0 or cache_read > 0): + _seen_cache_prefixes.add(signature) + + class AnthropicProvider(BaseProvider): name = "anthropic" @@ -52,6 +209,9 @@ def __init__(self, api_key: str): self._client = Anthropic(api_key=api_key) def _build_body(self, system_prompt, user_or_messages, task_type, kwargs): + # ``system_prompt`` may be a string (legacy single-text path) or + # a list of structured content blocks (cached-prefix path). The + # SDK accepts either, so we pass through without coercion. body = { "model": _model_for_task(task_type), "max_tokens": kwargs.get("max_tokens", _max_tokens_for_task(task_type)), @@ -64,19 +224,32 @@ def _build_body(self, system_prompt, user_or_messages, task_type, kwargs): return body def generate(self, system_prompt, user_or_messages, task_type="general", **kwargs): - from anthropic import APIError body = self._build_body(system_prompt, user_or_messages, task_type, kwargs) - resp = self._call_with_retry(body) + call_site = kwargs.get("call_site") or "generate" + resp = self._call_with_retry(body, call_site=call_site) return "".join(block.text for block in resp.content if hasattr(block, "text")) - def _call_with_retry(self, body): + def _request_headers(self, body) -> Dict[str, str]: + """Return per-request headers. The extended-TTL beta header is + sent only when the body actually uses cache markers, so + non-cached calls do not change behavior. + """ + if _has_cache_markers(body.get("system"), body.get("messages") or []): + return {"anthropic-beta": _EXTENDED_TTL_HEADER} + return {} + + def _call_with_retry(self, body, call_site: str = "anthropic"): from anthropic import RateLimitError, AuthenticationError, APIError + extra_headers = self._request_headers(body) + signature = None + if extra_headers: + signature = _prefix_signature(body.get("system"), body.get("messages") or []) try: - return self._client.messages.create(**body) + resp = self._client.messages.create(extra_headers=extra_headers, **body) except RateLimitError: time.sleep(2) try: - return self._client.messages.create(**body) + resp = self._client.messages.create(extra_headers=extra_headers, **body) except RateLimitError: raise ProviderError( "API rate limited, try again in a moment.", @@ -89,15 +262,32 @@ def _call_with_retry(self, body): ) except APIError as e: raise ProviderError(f"Anthropic API error: {e}") + _log_usage(getattr(resp, "usage", None), body.get("model"), call_site, signature) + return resp def generate_stream(self, system_prompt, user_or_messages, task_type="general", **kwargs): from anthropic import RateLimitError, AuthenticationError, APIError body = self._build_body(system_prompt, user_or_messages, task_type, kwargs) + call_site = kwargs.get("call_site") or "generate_stream" + extra_headers = self._request_headers(body) + signature = None + if extra_headers: + signature = _prefix_signature(body.get("system"), body.get("messages") or []) try: - with self._client.messages.stream(**body) as stream: + with self._client.messages.stream(extra_headers=extra_headers, **body) as stream: for piece in stream.text_stream: if piece: yield piece + try: + final = stream.get_final_message() + _log_usage( + getattr(final, "usage", None), + body.get("model"), call_site, signature, + ) + except Exception: + # Usage is advisory; never let a logging failure + # break the streaming call itself. + pass except AuthenticationError: raise ProviderError( "Anthropic API key is invalid or expired. Update it in Settings.", @@ -124,3 +314,105 @@ def test_connection(self) -> dict: return {"success": False, "error": str(e), "code": e.code} except Exception as e: return {"success": False, "error": f"Unexpected error: {e}"} + + +# Public helpers used by the wrapper layer to assemble cached payloads. +# Kept here so the call-site logic in ai_analysis.py stays inline and +# does not grow a separate caching abstraction class. + +def build_cached_system_blocks( + system_prompt: str, + dna_block: Optional[str] = None, + transcript_block: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Return a structured ``system`` list with cache_control markers. + + Block order depends on whether a transcript is present, because + Anthropic computes the cache key byte-by-byte from the FRONT of the + prefix. Anything that varies between calls must sit AFTER everything + cached, otherwise it shifts the prefix and every call misses. + + With a transcript (analysis path): + 1. transcript text, ``cache_control`` 1h TTL. Sits first because + it is the only piece guaranteed identical across the four + story-analysis passes (soundbites / beats / overview / social), + each of which ships a DIFFERENT ``system_prompt``. + 2. dna_block, ``cache_control`` 5m TTL, if supplied. + 3. system_prompt, NO ``cache_control``. Varies per pass on the + four-pass path, so it must not be inside the cached prefix. + + Without a transcript (chat path): + 1. system_prompt + dna_block joined, ``cache_control`` 5m TTL. + The system text is the stable prefix here; the transcript + lives in a user message instead (see + :func:`build_cached_user_messages`). + + Each block omits ``cache_control`` when it would not meet the 1024 + token minimum. The block is still sent, it just is not a cache + boundary. + """ + blocks: List[Dict[str, Any]] = [] + has_transcript = bool((transcript_block or "").strip()) + + if has_transcript: + t = transcript_block.strip() + tblock: Dict[str, Any] = {"type": "text", "text": t} + if _approx_tokens(t) >= _MIN_CACHE_TOKENS: + tblock["cache_control"] = {"type": "ephemeral", "ttl": "1h"} + blocks.append(tblock) + + if dna_block and dna_block.strip(): + d = dna_block.strip() + dblock: Dict[str, Any] = {"type": "text", "text": d} + if _approx_tokens(d) >= _MIN_CACHE_TOKENS: + dblock["cache_control"] = {"type": "ephemeral"} + blocks.append(dblock) + + sys_text = (system_prompt or "").strip() + if sys_text: + blocks.append({"type": "text", "text": sys_text}) + return blocks + + stable_text = (system_prompt or "").strip() + if dna_block: + dna_text = dna_block.strip() + if dna_text: + stable_text = ( + stable_text + "\n\n" + dna_text + ).strip() if stable_text else dna_text + if stable_text: + block: Dict[str, Any] = {"type": "text", "text": stable_text} + if _approx_tokens(stable_text) >= _MIN_CACHE_TOKENS: + block["cache_control"] = {"type": "ephemeral"} + blocks.append(block) + return blocks + + +def build_cached_user_messages( + transcript_block: str, + other_user_messages: List[Dict[str, Any]], + extra_user_blocks: Optional[List[Dict[str, Any]]] = None, +) -> List[Dict[str, Any]]: + """Wrap the chat-path messages so the transcript turn is cached. + + The chat layer sends the transcript as the FIRST user message. To + cache it, that message's content becomes a list of text blocks with + the transcript block carrying cache_control. Subsequent messages + (the assistant ack, history, the new user turn) are left as plain + strings because they change every turn. + """ + cached_blocks: List[Dict[str, Any]] = [] + if extra_user_blocks: + cached_blocks.extend(extra_user_blocks) + transcript_text = (transcript_block or "").strip() + if transcript_text: + block: Dict[str, Any] = {"type": "text", "text": transcript_text} + if _approx_tokens(transcript_text) >= _MIN_CACHE_TOKENS: + block["cache_control"] = {"type": "ephemeral", "ttl": "1h"} + cached_blocks.append(block) + + messages: List[Dict[str, Any]] = [ + {"role": "user", "content": cached_blocks}, + ] + messages.extend(other_user_messages or []) + return messages diff --git a/tests/test_analysis_chunking.py b/tests/test_analysis_chunking.py index 970eff2..1ddba30 100644 --- a/tests/test_analysis_chunking.py +++ b/tests/test_analysis_chunking.py @@ -256,7 +256,7 @@ def test_multiple_chunks_synthesizes_via_ai(self, monkeypatch): # a single 3-4 sentence overview covering every section. captured = {} - def _fake_call_ai(prompt, system_prompt=''): + def _fake_call_ai(prompt, system_prompt='', **_kwargs): captured['prompt'] = prompt return ( '{"summary": "Covers interviews A, B, and C across the whole ' diff --git a/tests/test_anthropic_prompt_cache.py b/tests/test_anthropic_prompt_cache.py new file mode 100644 index 0000000..b2de22c --- /dev/null +++ b/tests/test_anthropic_prompt_cache.py @@ -0,0 +1,382 @@ +"""Prompt-cache wiring tests for the Anthropic BYO API path. + +Two layers: + + 1. Unit tests (no network). Verify the sentinel-extraction helpers, + structured-system block construction, and chat-message preparation + produce the exact shapes Anthropic's prompt cache expects. + + 2. Integration test (network, skipped without an API key). Fires two + identical ``_call_ai`` requests with a >5K-token transcript and + asserts the second response has ``cache_read_input_tokens > 0``, + proving the cache actually hit end-to-end. + +Run the live test with the app's bundled Python so the ``anthropic`` +SDK is on the import path, plus either an env-var key or whatever the +running app has stored in the macOS Keychain:: + + APP_PY="$HOME/Library/Application Support/DozaAssist/venv/bin/python3" + "$APP_PY" -m pytest core/tests/test_anthropic_prompt_cache.py \ + -k integration -s + +The fixture below resolves the key in this order: + 1. ``ANTHROPIC_API_KEY`` env var + 2. ``ai_providers.config.load_provider_config()`` (same Keychain + lookup the app uses; will prompt the first time) + 3. skip the test + +``ANTHROPIC_TEST_TRANSCRIPT`` can override the synthetic transcript with +a real one for ad-hoc runs. +""" +import logging +import os +import sys +import textwrap +from pathlib import Path +from unittest import mock + +import pytest + + +HERE = Path(__file__).resolve() +CORE_DIR = HERE.parent.parent +if str(CORE_DIR) not in sys.path: + sys.path.insert(0, str(CORE_DIR)) + + +# --------------------------------------------------------------------------- +# Unit layer: sentinels + structured-system construction +# --------------------------------------------------------------------------- + +def test_sentinel_extraction_round_trips_text(): + from ai_analysis import ( + CACHE_TX_START, CACHE_TX_END, CACHE_DNA_START, CACHE_DNA_END, + _split_cacheable_prompt, _strip_cache_sentinels, + ) + + transcript = "[00:00:00] Speaker: this is a long transcript " * 200 + dna = "MY STYLE CONTEXT — punchy intercuts, never trail off" + raw_prompt = ( + f"PROJECT: demo\n\nTRANSCRIPT:\n" + f"{CACHE_TX_START}{transcript}{CACHE_TX_END}\n\n" + f"STYLE:\n{CACHE_DNA_START}{dna}{CACHE_DNA_END}\n\n" + "Return ONLY valid JSON." + ) + + clean, found_tx, found_dna = _split_cacheable_prompt(raw_prompt) + assert found_tx == transcript + assert found_dna == dna + # The cleaned prompt drops the sentinels and the wrapped text, replacing + # it with the hoist notice so the model still has a reference to it. + assert CACHE_TX_START not in clean + assert CACHE_TX_END not in clean + assert "TRANSCRIPT provided" in clean + assert "EDITORIAL STYLE provided" in clean + # _strip_cache_sentinels keeps the wrapped content but drops markers. + stripped = _strip_cache_sentinels(raw_prompt) + assert CACHE_TX_START not in stripped and CACHE_DNA_END not in stripped + assert transcript in stripped and dna in stripped + + +def test_build_cached_system_blocks_marks_long_segments(): + """With a transcript present, layout is [transcript, dna?, system]. + The transcript carries the 1h-TTL cache marker; the system text comes + AFTER it without a cache marker so it can vary between sibling calls + without busting the transcript prefix.""" + from ai_providers.anthropic_provider import build_cached_system_blocks + + system = "S" * (1024 * 4 + 10) # comfortably over the 1024-token min + dna = "D" * (1024 * 4 + 10) # long enough to qualify for its own cache marker + transcript = "T" * (1024 * 4 + 10) + + blocks = build_cached_system_blocks(system, dna_block=dna, transcript_block=transcript) + assert len(blocks) == 3 + tx, dna_block, sys_block = blocks + + assert tx["type"] == "text" and tx["text"].startswith("T") + assert tx.get("cache_control") == {"type": "ephemeral", "ttl": "1h"} + + assert dna_block["type"] == "text" and dna_block["text"].startswith("D") + assert dna_block.get("cache_control") == {"type": "ephemeral"} + + assert sys_block["type"] == "text" and sys_block["text"].startswith("S") + # System text is intentionally uncached when a transcript precedes it. + assert "cache_control" not in sys_block + + +def test_build_cached_system_blocks_no_transcript_caches_combined_stable_text(): + """Without a transcript (chat-style call), the system + DNA get joined + into one cached block with the default 5m TTL, since the system text + itself is the stable prefix and the transcript lives in a user + message instead.""" + from ai_providers.anthropic_provider import build_cached_system_blocks + + system = "S" * (1024 * 4 + 10) + dna = "D" * 200 # short DNA, merges into the stable block + + blocks = build_cached_system_blocks(system, dna_block=dna) + assert len(blocks) == 1 + stable = blocks[0] + assert stable["text"].startswith("S") and dna in stable["text"] + assert stable.get("cache_control") == {"type": "ephemeral"} + + +def test_build_cached_system_blocks_transcript_first_independent_of_system(): + """Regression for the 0.8.5 bug: two calls with the same transcript + but a DIFFERENT system_prompt must produce the same leading bytes, + so Anthropic's prefix-based cache lookup hits on the second call. + """ + from ai_providers.anthropic_provider import build_cached_system_blocks + + transcript = "T" * (1024 * 4 + 10) + sys_a = "soundbites system prompt body" * 50 # different text + sys_b = "overview system prompt body" * 50 # different text + + blocks_a = build_cached_system_blocks(sys_a, transcript_block=transcript) + blocks_b = build_cached_system_blocks(sys_b, transcript_block=transcript) + # The block carrying cache_control must be byte-identical across + # both calls, otherwise the prefix differs and the cache misses. + assert blocks_a[0]["text"] == blocks_b[0]["text"] + assert blocks_a[0]["cache_control"] == blocks_b[0]["cache_control"] + + +def test_build_cached_system_blocks_skips_cache_on_small_blocks(): + """Below 1024 tokens the API refuses to cache, so we omit cache_control + rather than send a malformed request.""" + from ai_providers.anthropic_provider import build_cached_system_blocks + + blocks = build_cached_system_blocks("short system", transcript_block="short tx") + assert all("cache_control" not in b for b in blocks) + + +def test_prepare_chat_messages_hoists_transcript_to_user_block_list(): + """Anthropic chat path: the transcript message becomes a content-list + with the transcript text carrying cache_control, while the system + block list carries the chat system prompt (and any DNA examples).""" + from ai_analysis import ( + _prepare_chat_messages_for_provider, + CACHE_TX_START, CACHE_TX_END, CACHE_DNA_START, CACHE_DNA_END, + ) + + transcript = "T" * (1024 * 4 + 100) + dna = "D" * (1024 * 4 + 100) + style_msg = { + "role": "user", + "content": ( + "STYLE CONTEXT (active My Style profile):\n\n" + f"{CACHE_DNA_START}{dna}{CACHE_DNA_END}" + ), + } + transcript_msg = { + "role": "user", + "content": ( + "Here is the loaded project.\n\n" + "PROJECT: demo\n\nTRANSCRIPT:\n" + f"{CACHE_TX_START}{transcript}{CACHE_TX_END}\n" + ), + } + ack = {"role": "assistant", "content": "Transcript loaded."} + current = {"role": "user", "content": "find me three powerful soundbites"} + + messages = [style_msg, transcript_msg, ack, current] + sys_param, msg_param = _prepare_chat_messages_for_provider( + "anthropic", "chat system prompt", messages, + ) + + # System is now a structured list with chat-system + DNA cached together. + assert isinstance(sys_param, list) + assert sys_param[0]["text"].startswith("chat system prompt") + assert dna in sys_param[0]["text"] + assert sys_param[0]["cache_control"] == {"type": "ephemeral"} + + # The orphan style header message was dropped; the transcript message + # carries the cache_control on its transcript block. + assert all(m.get("role") != "user" or + m.get("content") != "STYLE CONTEXT (active My Style profile):" + for m in msg_param) + transcript_message = msg_param[0] + assert isinstance(transcript_message["content"], list) + cached_block = [b for b in transcript_message["content"] if "cache_control" in b] + assert len(cached_block) == 1 + assert cached_block[0]["text"] == transcript + assert cached_block[0]["cache_control"] == {"type": "ephemeral", "ttl": "1h"} + + +def test_prepare_chat_messages_passes_through_ollama(): + """Ollama path: sentinels stripped, no cache_control on anything.""" + from ai_analysis import ( + _prepare_chat_messages_for_provider, + CACHE_TX_START, CACHE_TX_END, + ) + msg = { + "role": "user", + "content": f"TRANSCRIPT:\n{CACHE_TX_START}body{CACHE_TX_END}", + } + sys_param, msg_param = _prepare_chat_messages_for_provider( + "ollama", "sys", [msg, {"role": "user", "content": "find clips"}], + ) + assert sys_param == "sys" + assert msg_param[0]["content"] == "TRANSCRIPT:\nbody" + assert msg_param[1]["content"] == "find clips" + + +def test_log_usage_warns_on_cache_miss_for_seen_prefix(caplog): + """If the same prefix re-creates a cache instead of reading one, the + provider logs a WARNING so we notice broken cache_control placement.""" + from ai_providers.anthropic_provider import ( + _log_usage, _seen_cache_prefixes, + ) + + class FakeUsage: + cache_creation_input_tokens = 5000 + cache_read_input_tokens = 0 + input_tokens = 200 + output_tokens = 50 + + _seen_cache_prefixes.add("test-signature") + try: + with caplog.at_level(logging.WARNING, logger="ai_providers.anthropic_provider"): + _log_usage(FakeUsage(), "claude-opus-4", "unit-test", "test-signature") + assert any("cache-miss-on-repeat" in r.message for r in caplog.records) + finally: + _seen_cache_prefixes.discard("test-signature") + + +# --------------------------------------------------------------------------- +# Integration layer: real Anthropic call (skipped without a key) +# --------------------------------------------------------------------------- + +def _make_long_transcript(min_tokens: int = 5000) -> str: + """Generate a synthetic but well-formed transcript that comfortably + exceeds the 1024-token cache minimum and looks realistic enough for + the model to engage with.""" + overridden = os.environ.get("ANTHROPIC_TEST_TRANSCRIPT") + if overridden and Path(overridden).exists(): + return Path(overridden).read_text(encoding="utf-8") + + paragraph = textwrap.dedent("""\ + [00:{m:02d}:{s:02d}] Speaker: I think the moment that changed things for me + was when I realized the story we were telling wasn't the story I'd lived. + We had this draft on the wall that read like a press release, and the + truth was messier. There was a long pause that morning, and somebody + asked the only question that mattered, which was: do we want this to be + accurate or do we want this to be safe. We chose accurate. + """).strip() + "\n" + chunks = [] + minute = 0 + while sum(len(c) for c in chunks) // 4 < min_tokens: + for s in range(0, 60, 15): + chunks.append(paragraph.format(m=minute, s=s)) + minute += 1 + return "".join(chunks) + + +def _resolve_anthropic_key() -> str: + """Pull the Anthropic key from env first, then the app's own config + loader (which reads the macOS Keychain via ``keyring``). Returns '' + if neither source has a key. We do not shell out to ``security`` so + no separate credential prompt happens beyond keyring's own UI.""" + env_key = (os.environ.get("ANTHROPIC_API_KEY") or "").strip() + if env_key and env_key.lower() != "sk-ant-..." and not env_key.endswith("..."): + return env_key + try: + from ai_providers.config import load_provider_config + cfg = load_provider_config() or {} + return ((cfg.get("anthropic") or {}).get("api_key") or "").strip() + except Exception: + return "" + + +@pytest.mark.integration +def test_repeat_call_reads_cache(): + """End-to-end: fire two identical analysis prompts and assert the + second response shows a non-zero cache_read_input_tokens. Skipped + when no Anthropic key is reachable (env var or Keychain) so CI + without credentials stays green.""" + api_key = _resolve_anthropic_key() + if not api_key: + pytest.skip( + "No Anthropic key found in ANTHROPIC_API_KEY or " + "ai_providers.config.load_provider_config(); skipping live test" + ) + + from ai_providers.anthropic_provider import ( + AnthropicProvider, + build_cached_system_blocks, + _seen_cache_prefixes, + ) + + # Clear any cross-test state so we're measuring this run only. + _seen_cache_prefixes.clear() + + transcript = _make_long_transcript(min_tokens=5500) + system_prompt = ( + "You are an expert documentary film editor. Output JSON only. " + "No markdown, no fences, no commentary. Copy HH:MM:SS timecodes " + "exactly from the transcript without rounding." + ) + system_param = build_cached_system_blocks( + system_prompt, transcript_block=transcript, + ) + user_prompt = ( + "Return a JSON object with this shape only: " + '{"strongest_soundbites": [{"text": "...", "start": "00:00:00", ' + '"end": "00:00:00", "why": "..."}]}\n' + "Find ONE soundbite from the transcript above. JSON only, no prose." + ) + + provider = AnthropicProvider(api_key=api_key) + + # Capture usage for both calls by patching _log_usage to also save + # the numbers in a list we can assert on. + captured = [] + real_log = provider.__class__.__module__ + + from ai_providers import anthropic_provider as ap_mod + original_log = ap_mod._log_usage + + def capturing_log(usage, model, call_site, signature): + captured.append({ + "cache_create": int(getattr(usage, "cache_creation_input_tokens", 0) or 0), + "cache_read": int(getattr(usage, "cache_read_input_tokens", 0) or 0), + "input": int(getattr(usage, "input_tokens", 0) or 0), + "output": int(getattr(usage, "output_tokens", 0) or 0), + "site": call_site, + }) + original_log(usage, model, call_site, signature) + + with mock.patch.object(ap_mod, "_log_usage", capturing_log): + first = provider.generate( + system_param, user_prompt, task_type="analysis", + max_tokens=400, call_site="integration_first", + ) + second = provider.generate( + system_param, user_prompt, task_type="analysis", + max_tokens=400, call_site="integration_second", + ) + + assert first and isinstance(first, str) + assert second and isinstance(second, str) + assert len(captured) == 2 + first_usage, second_usage = captured + # First call should populate the cache (or hit a prior cache from a + # recent run; either way cache_create + cache_read > 0). + assert first_usage["cache_create"] + first_usage["cache_read"] > 0, ( + "First call did not exercise the cache at all: %s" % (first_usage,) + ) + # Second call must read from cache. + assert second_usage["cache_read"] > 0, ( + "Second call returned cache_read=0; cache_control did not apply. " + "first=%s second=%s" % (first_usage, second_usage) + ) + hit_rate = ( + second_usage["cache_read"] / + max(1, second_usage["cache_read"] + second_usage["input"]) + ) + print( + "\n[anthropic-cache] first: %s\n[anthropic-cache] second: %s\n" + "[anthropic-cache] second-call hit rate: %.2f" + % (first_usage, second_usage, hit_rate) + ) + # >70% is the product goal. + assert hit_rate > 0.7, f"Cache hit rate {hit_rate:.2f} below 0.70 target" diff --git a/tests/test_editorial_dna_v21.py b/tests/test_editorial_dna_v21.py index 0c4244d..aad758a 100644 --- a/tests/test_editorial_dna_v21.py +++ b/tests/test_editorial_dna_v21.py @@ -58,7 +58,7 @@ def fake_llm(monkeypatch): 'uses_narration': False, } - def fake(prompt): + def fake(prompt, *_args, **_kwargs): return json.dumps(canned) import ai_analysis @@ -67,7 +67,7 @@ def fake(prompt): import editorial_dna.analysis as ea monkeypatch.setattr(ea, '_call_ai', fake) import editorial_dna.summarizer as es - monkeypatch.setattr(es, '_call_ai', lambda p: 'A calm, observational editor who lets subjects breathe.') + monkeypatch.setattr(es, '_call_ai', lambda p, *_a, **_k: 'A calm, observational editor who lets subjects breathe.') return canned @@ -321,9 +321,9 @@ def test_analysis_survives_bad_llm_json(monkeypatch): import ai_analysis import editorial_dna.analysis as ea import editorial_dna.summarizer as es - monkeypatch.setattr(ai_analysis, '_call_ai', lambda p: 'not json at all!!') - monkeypatch.setattr(ea, '_call_ai', lambda p: 'not json at all!!') - monkeypatch.setattr(es, '_call_ai', lambda p: 'A calm editor.') + monkeypatch.setattr(ai_analysis, '_call_ai', lambda p, *_a, **_k: 'not json at all!!') + monkeypatch.setattr(ea, '_call_ai', lambda p, *_a, **_k: 'not json at all!!') + monkeypatch.setattr(es, '_call_ai', lambda p, *_a, **_k: 'A calm editor.') summary = ea.generate_structured_summary( 'x', 'Test', {'speech_pacing': {'rhythm_descriptor': 'calm'}}, diff --git a/tests/test_story_builder.py b/tests/test_story_builder.py index 42df195..183852e 100644 --- a/tests/test_story_builder.py +++ b/tests/test_story_builder.py @@ -165,7 +165,7 @@ def _vectors(self): def test_menu_drops_low_score_segments(self, monkeypatch): captured = {} - def _stub_call_ai(prompt, system_prompt=""): + def _stub_call_ai(prompt, system_prompt="", **_kwargs): captured['prompt'] = prompt return '{"clips": []}' @@ -191,7 +191,7 @@ def test_fallback_when_all_segments_are_low(self, monkeypatch): dict(s, narrative_score='low') for s in self._vectors() ] captured = {} - monkeypatch.setattr(ai_analysis, '_call_ai', lambda p, s="": (captured.update(prompt=p), '{"clips": []}')[1]) + monkeypatch.setattr(ai_analysis, '_call_ai', lambda p, s="", **_k: (captured.update(prompt=p), '{"clips": []}')[1]) monkeypatch.setattr(ai_analysis, 'inject_my_style', lambda p, profile_id=None: p) ai_analysis._build_story_from_vectors(all_low, message="build", project_name="T") @@ -232,7 +232,7 @@ def test_menu_groups_by_score_not_timecode(self, monkeypatch): captured = {} monkeypatch.setattr( ai_analysis, '_call_ai', - lambda p, s="": (captured.update(prompt=p, system=s), '{"clips": []}')[1], + lambda p, s="", **_k: (captured.update(prompt=p, system=s), '{"clips": []}')[1], ) monkeypatch.setattr(ai_analysis, 'inject_my_style', lambda p, profile_id=None: p) @@ -251,7 +251,7 @@ def test_menu_groups_by_score_not_timecode(self, monkeypatch): def test_prompt_contains_mandatory_reorder_directive(self, monkeypatch): captured = {} - def _stub(prompt, system_prompt=""): + def _stub(prompt, system_prompt="", **_kwargs): captured['prompt'] = prompt captured['system'] = system_prompt return '{"clips": []}' @@ -297,7 +297,7 @@ def test_non_chronological_order_preserved_through_hydration(self, monkeypatch): {'order': 3, 'seg_id': 'SEG_C', 'editorial_note': 'ROLE: turn'}, ], }) - monkeypatch.setattr(ai_analysis, '_call_ai', lambda p, s="": fake_response) + monkeypatch.setattr(ai_analysis, '_call_ai', lambda p, s="", **_k: fake_response) monkeypatch.setattr(ai_analysis, 'inject_my_style', lambda p, profile_id=None: p) out = ai_analysis._build_story_from_vectors(