-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
6881 lines (6230 loc) · 276 KB
/
main.py
File metadata and controls
6881 lines (6230 loc) · 276 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import logging
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime as _dt, timezone as _timezone
from typing import Optional
import httpx
import uvicorn
import bcrypt
from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from auth.jwt_utils import create_token, verify_token
from db.database import init_tables, get_conn
from memory.mem0_client import warmup as mem0_warmup, add_memories, add_raw_memory
from memory.context import get_active_rules, get_active_skills, get_context
from router.confidence import get_confidence, update_confidence, detect_topic
from router.compass import record_topic
from training.collector import save_pair, count_untrained_pairs, mark_chat_feedback_pair, mark_used
from training.adapter import adapter_exists, adapter_status, ensure_adapter_loaded, lora_name_for_user
from training.runtime import start_gemma_vllm_after_training, stop_gemma_vllm_for_training, tcp_ok, vllm_models_health
from loop_events import loop_snapshot, record_event, record_life_event, record_outcome
from loop_priority import get_today_priority
from proactive_engine import (
acknowledge_intervention,
get_daily_mission,
get_growth_timeline,
get_intervention_settings,
get_latest_clone_mission,
get_or_create_next_intervention,
get_reality_check,
get_revelation_status,
update_intervention_settings,
)
from thesis import get_current_thesis, refresh_current_thesis
from training.summary import get_training_summary
from training.state import create_training_run, finish_training_run, latest_training_run, mark_interrupted_training_runs
from models.schemas import ContextRequest, ContextResponse, SaveRequest
from teacher_policy import infer_importance, record_teacher_usage, should_use_teacher
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("echo")
ECHO_PRODUCT_SYSTEM_PROMPT = (
"You are Echo, a local-first opportunity engine built on one belief: talent is common, "
"but discovery is uneven. Many people only find what they are good at by accident; Echo "
"turns scattered moments into direction, practice, proof, outcomes, and eval-gated adapter updates.\n"
"Echo's product loop is: Signal -> Pattern Map -> Next Proof Step -> "
"Outcome -> Proof Card -> Direction.\n"
"Behave less like a generic chatbot and more like a precise private operating layer. Be "
"direct, calm, evidence-based, and specific. Help the user leave with one concrete next "
"step, one better decision, or one piece of proof they can build.\n"
"Use memory and loop context as evidence. Never invent personal facts. If context is missing, "
"say so simply and ask at most one useful question.\n"
"When advice matters, end with a small action and what outcome the user should log later.\n"
"Avoid public-facing clone, shadow, battle, tournament, lab, or anime-style language. "
"Use product language: Pattern Map, Today, Proof Card, Decision Room, Practice, Outcome, "
"Direction, Runtime, Home Brain, Home Brain Adapter, This Device.\n"
"Stage awareness: if the user has fewer than 8 stored moments (early stage), listen more and "
"advise less — build the picture before claiming to know them, ask at most one clarifying "
"question per turn. If the user has an active thesis and practice rep, reference both when "
"the moment fits. If training is ready, briefly mention it before closing the conversation."
)
# Shared voice anchor used by all feature prompts — keeps identity coherent across endpoints.
ECHO_FEATURE_HEADER = (
"You are Echo - a local-first AI system building a longitudinal picture of one person "
"so hidden talent becomes direction, proof, and opportunity.\n"
)
def _meaningful_chat_message(value: str | None) -> bool:
text = " ".join((value or "").strip().lower().split())
if len(text) < 28:
return False
blocked = (
"generate a concise title",
"generate a title",
"conversation content:",
"please return the result",
"continue",
)
return not any(marker in text for marker in blocked)
def _loop_context_injection(
thesis: dict,
priority: dict,
practice: dict | None = None,
training: dict | None = None,
) -> str:
thesis_title = thesis.get("title") or "Still Forming"
thesis_statement = thesis.get("statement") or "Echo is still forming a read."
confidence = thesis.get("confidence_label") or "early"
priority_title = priority.get("title") or "No priority yet"
priority_body = priority.get("body") or ""
action = priority.get("action") if isinstance(priority.get("action"), dict) else {}
action_label = action.get("label") or "Keep talking"
practice = practice or {}
training = training or {}
practice_title = practice.get("rep_title") or "No practice rep set yet"
practice_instruction = practice.get("rep_instruction") or ""
ready_for_training = bool(training.get("ready_for_training"))
untrained = int(training.get("untrained_pairs") or 0)
required = int(training.get("required_pairs") or settings.min_pairs_for_training)
dpo_ready = int(training.get("dpo_ready_pairs") or 0)
dpo_required = int(training.get("dpo_required_pairs") or 4)
return (
ECHO_PRODUCT_SYSTEM_PROMPT
+ "\n\n## Current Echo Loop\n"
f"- Pattern map / direction: {thesis_title} ({confidence}). {thesis_statement}\n"
f"- Next proof step: {priority_title}. {priority_body}\n"
f"- Best next action: {action_label}.\n"
f"- Today's practice / proof rep: {practice_title}. {practice_instruction}\n"
f"- Home Brain Adapter: {'ready' if ready_for_training else 'not ready'} "
f"({untrained}/{required} new moments, {dpo_ready}/{dpo_required} preference pairs).\n"
"\n## How to use this context\n"
f"- If the user's question touches anything related to '{thesis_title}': connect it to their current direction.\n"
f"- If the practice rep is set and the user describes a relevant action: mention logging the outcome.\n"
f"- If training is ready and the conversation is winding down: briefly note that a Home Brain Adapter update is available.\n"
f"- If an action label is set ('{action_label}'): surface it once as a concrete next step if the conversation warrants it.\n"
"- Do not force the loop — let the user's question lead. One nudge per turn is enough."
)
def _has_tool_prompt(system_prompt: str | None) -> bool:
return bool(
system_prompt
and (
"<tool_definitions>" in system_prompt
or "<tool_usage_instructions>" in system_prompt
or "<function name=" in system_prompt
)
)
def _looks_like_tool_request(text: str | None) -> bool:
lower = " ".join((text or "").lower().split())
if not lower:
return False
service_markers = (
"gmail",
"email",
"mail",
"slack",
"calendar",
"google drive",
"drive",
"notion",
"github",
"jira",
"discord",
"telegram",
"whatsapp",
"sheets",
"docs",
)
action_markers = (
"connect",
"authenticate",
"auth",
"authorize",
"read",
"check",
"send",
"reply",
"create",
"schedule",
"search",
"find",
"list",
"open",
"update",
"delete",
"upload",
"download",
)
return any(service in lower for service in service_markers) and any(action in lower for action in action_markers)
def _tool_safe_context(system_injection: str) -> str:
return system_injection.replace(
"You have NO tools and NO external functions. Do NOT output <function>, <tool_call>, or any XML/function syntax — ever.",
"You may use the tools explicitly provided by the client system prompt. Do not invent tools.",
)
def _requested_model_lane(request: Request, body: dict) -> str:
lane = (
request.headers.get("x-echo-model-lane")
or body.get("echo_model_lane")
or body.get("model_lane")
or ""
)
lane = str(lane).strip().lower()
if lane in {"gemma", "gemma4", "gemma4_e2b", "gemma-4-e2b"}:
return "gemma4_e2b"
return "auto"
def _training_lane(value: str | None) -> str:
return "gemma4_e2b"
def _training_status_key(user_id: str, lane: str = "gemma4_e2b") -> str:
return f"{user_id}:{lane}"
def _local_target_for_lane(lane: str, user_id: str, use_adapter: bool) -> tuple[str, str, str]:
"""Return (base_url, model, model_used) for the selected local lane."""
target = lora_name_for_user(user_id, lane="gemma4_e2b") if use_adapter else settings.gemma4_base_model
return settings.gemma4_vllm_base_url, target, "gemma4_e2b"
async def _call_gemma_feature(
user_id: str,
prompt: str,
*,
system: str | None = None,
temperature: float = 0.65,
max_tokens: int = 400,
) -> str:
use_adapter = adapter_exists(user_id, lane="gemma4_e2b")
if use_adapter:
use_adapter = await ensure_adapter_loaded(user_id, lane="gemma4_e2b")
base_url, model, _ = _local_target_for_lane("gemma4_e2b", user_id, use_adapter=use_adapter)
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
async with httpx.AsyncClient(timeout=90) as client:
resp = await client.post(
f"{base_url.rstrip('/')}/chat/completions",
headers={"Content-Type": "application/json"},
json={
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": ["<function", "<tool_call>", "<|tool_call|>", "✿FUNCTION✿"],
},
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"].get("content", "").strip()
async def _call_feature_model(
user_id: str,
purpose: str,
prompt: str,
*,
system: str | None = None,
temperature: float = 0.65,
max_tokens: int = 400,
importance: str = "normal",
) -> tuple[str, str]:
try:
text = await _call_gemma_feature(
user_id,
prompt,
system=system,
temperature=temperature,
max_tokens=max_tokens,
)
if text:
return text, "gemma4_e2b"
except Exception as e:
log.warning("Gemma feature call failed user=%s purpose=%s: %s", user_id, purpose, e)
decision = await should_use_teacher(
user_id,
purpose,
importance=importance,
prompt=prompt,
recent_failure=True,
)
if not decision.allowed:
return "", f"teacher_skipped:{decision.reason}"
from providers.teacher import chat_with_teacher
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
text, _, model = await chat_with_teacher(
messages,
user_id=user_id,
purpose=purpose,
importance=importance,
recent_failure=True,
explicit_user_request=False,
require_policy=True,
)
return text, model
async def _context_with_loop(user_id: str, message: str) -> dict:
ctx = await get_context(user_id, message)
try:
thesis, priority, training, practice = await asyncio.gather(
get_current_thesis(user_id),
get_today_priority(user_id),
get_training_summary(user_id, lane="gemma4_e2b"),
_cached_practice_today(user_id),
)
ctx["system_injection"] = (
ctx["system_injection"]
+ "\n\n"
+ _loop_context_injection(thesis, priority, practice, training)
)
ctx["loop_state"] = {
"thesis": thesis,
"today_priority": priority,
"training_summary": training,
"practice": practice,
}
except Exception as e:
log.warning("loop context failed for user=%s: %s", user_id, e)
ctx["loop_state"] = {}
return ctx
async def _cached_practice_today(user_id: str) -> dict | None:
today = _dt.utcnow().strftime("%Y-%m-%d")
async with get_conn() as db:
async with db.execute(
"SELECT * FROM practice_reps WHERE user_id=? AND date=?",
(user_id, today),
) as cur:
row = await cur.fetchone()
if not row:
return None
rep = dict(row)
async with db.execute(
"SELECT done FROM practice_log WHERE user_id=? AND rep_id=?",
(user_id, rep["id"]),
) as cur:
log_row = await cur.fetchone()
return {
"rep_id": rep["id"],
"observation": rep["observation"],
"rep_title": rep["rep_title"],
"rep_instruction": rep["rep_instruction"],
"arc_label": rep.get("arc_label"),
"logged": log_row is not None,
"done": bool(log_row["done"]) if log_row else None,
}
async def _loop_delta_for_turn(user_id: str, event_id: str, topic: str, model_used: str) -> dict:
thesis, priority, snapshot = await asyncio.gather(
get_current_thesis(user_id),
get_today_priority(user_id),
loop_snapshot(user_id),
)
return {
"event_id": event_id,
"topic": topic,
"model_used": model_used,
"thesis": thesis,
"today_priority": priority,
"snapshot": snapshot,
"suggested_actions": [
{
"type": "run_tournament",
"label": "Choose best path",
"payload": (priority.get("action") or {}).get("payload", {}),
},
{"type": "outcome", "label": "Log outcome", "outcome": "helped", "score": 1.0},
{"type": "outcome", "label": "Not true", "outcome": "not_true", "score": -0.7},
{"type": "outcome", "label": "Add to proof", "outcome": "saved_signal", "score": 1.2},
],
}
async def _loop_delta_for_save(
user_id: str,
event_id: str,
*,
topic: str,
model_used: str,
thesis_updated: bool = False,
proof_created: bool = False,
opportunity_unlocked: bool = False,
training_signal_saved: bool = False,
next_action: str = "Add evidence when you have it.",
receipt_title: str = "Echo updated",
receipt_detail: str = "This moment was saved to your loop.",
) -> dict:
base = await _loop_delta_for_turn(user_id, event_id, topic, model_used)
base.update(
{
"thesis_updated": thesis_updated,
"proof_created": proof_created,
"opportunity_unlocked": opportunity_unlocked,
"training_signal_saved": training_signal_saved,
"next_action": next_action,
"receipt_title": receipt_title,
"receipt_detail": receipt_detail,
}
)
return base
async def _auto_load_adapters():
"""On startup, reload all trained adapters into vLLM (lost on every vLLM restart)."""
await asyncio.sleep(15) # wait for vLLM to finish booting
from pathlib import Path
from training.adapter import adapter_user_from_dirname, hot_swap_adapter
adapters_dir = Path(settings.adapters_dir)
candidates: list[tuple[str, str]] = []
try:
async with get_conn() as db:
async with db.execute(
"""
SELECT c.user_id, c.path
FROM checkpoints c
JOIN (
SELECT user_id, lane, MAX(created_at) AS created_at
FROM checkpoints
WHERE lane='gemma4_e2b'
GROUP BY user_id, lane
) latest
ON c.user_id=latest.user_id
AND c.lane=latest.lane
AND c.created_at=latest.created_at
"""
) as cur:
rows = await cur.fetchall()
candidates.extend((r["user_id"], r["path"]) for r in rows)
except Exception as e:
log.warning("Could not read adapter checkpoints on startup: %s", e)
if adapters_dir.exists():
for p in sorted(adapters_dir.iterdir()):
if not p.is_dir():
continue
user_id = adapter_user_from_dirname(p.name)
if user_id:
candidates.append((user_id, str(p)))
if not candidates:
return
seen: set[tuple[str, str]] = set()
loaded = 0
for user_id, path in candidates:
key = (user_id, path)
if key in seen:
continue
seen.add(key)
if not Path(path).exists():
continue
ok = await hot_swap_adapter(user_id, path, record_checkpoint=False, lane="gemma4_e2b")
if ok:
loaded += 1
log.info("Auto-loaded adapter for user=%s path=%s", user_id, path)
log.info("Auto-loaded %d adapter(s) on startup", loaded)
@asynccontextmanager
async def lifespan(app: FastAPI):
if settings.jwt_secret == "echo-jwt-secret-change-in-production":
log.warning("JWT secret is the default dev value — change it in .env before exposing to a network")
await init_tables()
await mark_interrupted_training_runs()
try:
await asyncio.wait_for(mem0_warmup(), timeout=8)
except asyncio.TimeoutError:
log.warning("mem0 warmup timed out; continuing startup")
from scheduler import start_scheduler, _extract_skills, _seed_proof_from_history
start_scheduler()
asyncio.create_task(_auto_load_adapters())
asyncio.create_task(_startup_skill_and_proof_seed(_extract_skills, _seed_proof_from_history))
log.info("Echo sidecar ready on port %d", settings.port)
yield
async def _startup_skill_and_proof_seed(extract_skills_fn, seed_proof_fn) -> None:
"""On startup: if any user has training pairs but no skills or proof, seed them."""
await asyncio.sleep(10) # let mem0 and vLLM settle first
try:
async with get_conn() as db:
async with db.execute(
"SELECT DISTINCT user_id FROM training_pairs"
) as cur:
rows = await cur.fetchall()
for row in rows:
uid = row["user_id"]
try:
async with get_conn() as db:
async with db.execute(
"SELECT COUNT(*) as cnt FROM user_skills WHERE user_id=? AND active=1", (uid,)
) as cur:
skill_cnt = (await cur.fetchone())["cnt"]
if skill_cnt == 0:
log.info("Startup: no skills for user=%s — running extraction", uid)
await extract_skills_fn(uid)
except Exception as e:
log.warning("Startup skill extraction failed user=%s: %s", uid, e)
try:
await seed_proof_fn(uid)
except Exception as e:
log.warning("Startup proof seed failed user=%s: %s", uid, e)
except Exception as e:
log.warning("Startup skill/proof seed error: %s", e)
app = FastAPI(title="Echo Sidecar", version="2.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Public paths — no token required
_PUBLIC = {
"/health",
"/auth/register",
"/auth/login",
"/v1/models",
"/v1/demo/seed",
"/docs",
"/openapi.json",
"/redoc",
}
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
# Let CORS preflight through — browser sends OPTIONS with no auth header
if request.method == "OPTIONS":
return await call_next(request)
if request.url.path in _PUBLIC:
return await call_next(request)
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
# Valid JWT — extract user_id from it
user_id = verify_token(token)
if user_id:
request.state.user_id = user_id
return await call_next(request)
# Static app API key — identity comes from x-echo-user-id header or "default"
if token == settings.echo_secret:
uid = request.headers.get("x-echo-user-id") or "default"
request.state.user_id = uid
return await call_next(request)
# Unauthenticated local dev via x-echo-user-id header only
legacy_uid = request.headers.get("x-echo-user-id")
if legacy_uid and settings.echo_secret == "echo-local-secret":
request.state.user_id = legacy_uid
return await call_next(request)
return JSONResponse({"error": "Unauthorized"}, status_code=401)
# Register auth routes
from auth.router import router as auth_router
app.include_router(auth_router)
@app.get("/health")
async def health():
return {"status": "ok", "version": "2.0"}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [{"id": "shadow", "object": "model", "owned_by": "echo"}],
}
def _create_livekit_token(room_name: str, identity: str) -> str:
"""Generate a LiveKit JWT token using the configured API key/secret."""
from jose import jwt as jose_jwt
now = int(time.time())
claims = {
"iss": settings.livekit_api_key,
"nbf": now,
"exp": now + 3600,
"sub": identity,
"video": {"roomJoin": True, "room": room_name, "canPublish": True, "canSubscribe": True},
}
return jose_jwt.encode(claims, settings.livekit_api_secret, algorithm="HS256")
@app.post("/v1/voice/token")
async def voice_token(request: Request):
"""Return a LiveKit JWT token + room name for the authenticated user."""
user_id = getattr(request.state, "user_id", None) or request.headers.get("x-echo-user-id") or "default"
room_name = f"voice-{user_id}"
token = _create_livekit_token(room_name, user_id)
return {"token": token, "room": room_name, "url": settings.livekit_url}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, background_tasks: BackgroundTasks):
"""
OpenAI-compatible endpoint. ChatMCP points here as a custom provider.
1. Enriches messages with memory context
2. Routes to vLLM (personal model) or OpenAI (teacher)
3. Saves the pair for training
User-id comes from X-Echo-User-Id header, defaults to 'default'.
"""
body = await request.json()
user_id = getattr(request.state, "user_id", None) or request.headers.get("x-echo-user-id") or body.get("user") or "default"
messages: list[dict] = body.get("messages", [])
stream: bool = body.get("stream", False)
model_lane = _requested_model_lane(request, body)
# Extract last user message — content can be a string or an OpenAI array
raw = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "")
if isinstance(raw, list):
user_msg = " ".join(p.get("text", "") for p in raw if isinstance(p, dict) and p.get("type") == "text")
else:
user_msg = raw or ""
existing_system = next((m["content"] for m in messages if m.get("role") == "system"), None)
has_tool_prompt = _has_tool_prompt(existing_system)
use_tool_route = has_tool_prompt and _looks_like_tool_request(user_msg)
confidence = await get_confidence(user_id, user_msg)
# Gemma is the default local lane. Use the personal LoRA when it is valid and loaded;
# otherwise use base Gemma so the user does not have to select it manually.
explicit_gemma = model_lane == "gemma4_e2b"
auto_gemma = model_lane == "auto"
wants_gemma = settings.gemma4_enabled and not use_tool_route and (explicit_gemma or auto_gemma)
teacher_decision = None
use_gemma_adapter = False
if wants_gemma:
vllm_health = await vllm_models_health(timeout=2.0)
if not vllm_health.get("ok"):
log.warning(
"Gemma lane requested but vLLM is unavailable; falling back to teacher user=%s lane=%s reason=%s",
user_id,
model_lane,
vllm_health.get("error") or vllm_health.get("status_code") or "unknown",
)
wants_gemma = False
if wants_gemma and adapter_exists(user_id, lane="gemma4_e2b"):
use_gemma_adapter = await ensure_adapter_loaded(user_id, lane="gemma4_e2b")
if not use_gemma_adapter:
log.warning("Gemma adapter exists but is not loadable; user=%s using base Gemma", user_id)
use_local = wants_gemma
if not use_local and use_tool_route:
teacher_decision = await should_use_teacher(
user_id,
"tool_chat",
confidence=confidence,
importance=infer_importance(user_msg),
prompt=user_msg,
explicit_user_request=True,
)
if not teacher_decision.allowed:
log.info("Teacher tool route skipped user=%s reason=%s", user_id, teacher_decision.reason)
return JSONResponse(
{
"error": {
"message": "Echo is Gemma-first and skipped the teacher tool route for this turn.",
"type": "teacher_policy_skipped",
"teacher_policy": teacher_decision.to_dict(),
}
},
status_code=429,
)
model_used = "gemma4_e2b" if use_local else "openai"
ctx = await _context_with_loop(user_id, user_msg)
context_injection = _tool_safe_context(ctx["system_injection"]) if use_tool_route else ctx["system_injection"]
if existing_system and (not has_tool_prompt or use_tool_route):
# Put caller's persona first, Echo's memory injection last — model follows the final instruction
if use_tool_route:
combined = context_injection + "\n\n" + existing_system
else:
combined = existing_system + "\n\n" + context_injection
enriched = _inject_system(messages, combined)
else:
enriched = _inject_system(messages, context_injection)
log.info(
"/v1/chat/completions user=%s confidence=%.2f route=%s lane=%s",
user_id, confidence, model_used, model_lane,
)
if use_local:
base_url, target_model, local_model_used = _local_target_for_lane(
"gemma4_e2b",
user_id,
use_adapter=use_gemma_adapter,
)
model_used = local_model_used
target_url = f"{base_url}/chat/completions"
headers = {"Content-Type": "application/json"}
else:
target_url = f"{settings.teacher_base_url}/chat/completions"
target_model = settings.teacher_model
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {settings.llm_api_key}",
}
if teacher_decision:
await record_teacher_usage(
user_id,
"tool_chat",
teacher_decision.reason,
{"model": target_model, "confidence": confidence, "decision": teacher_decision.to_dict()},
)
payload = {**body, "messages": enriched, "model": target_model}
# Strip MCP tool definitions — Echo handles memory/context itself; tool calls break streaming
if not use_tool_route:
payload.pop("tools", None)
payload.pop("tool_choice", None)
# Stop tokens only for local models (OpenAI limits to 4, and doesn't need these Qwen tokens)
if use_local and "stop" not in payload and not use_tool_route:
payload["stop"] = ["<function", "<tool_call>", "<|tool_call|>", "✿FUNCTION✿"]
# Strip unknown fields so OpenAI doesn't 400 on custom Echo fields
_OPENAI_FIELDS = {
"model", "messages", "stream", "temperature", "top_p", "max_tokens",
"frequency_penalty", "presence_penalty", "stop", "n", "user",
"tools", "tool_choice", "seed", "logprobs", "top_logprobs", "stream_options",
}
if not use_local:
payload = {k: v for k, v in payload.items() if k in _OPENAI_FIELDS}
if stream:
return StreamingResponse(
_stream_and_save(
target_url,
headers,
payload,
user_id,
user_msg,
model_used,
allow_tools=use_tool_route,
fallback_messages=enriched if use_local else None,
),
media_type="text/event-stream",
)
try:
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(target_url, headers=headers, json=payload)
if resp.status_code >= 400:
log.error(
"chat_completions upstream error status=%s url=%s model=%s body=%s",
resp.status_code, target_url, target_model, resp.text[:1000],
)
return JSONResponse(
{
"error": {
"message": f"upstream error {resp.status_code}: {resp.text[:500]}",
"type": "upstream_error",
}
},
status_code=502,
)
data = resp.json()
except (httpx.ConnectError, httpx.ReadError, httpx.RemoteProtocolError) as e:
log.error("chat_completions connection error url=%s: %s", target_url, e)
if use_local and settings.llm_api_key:
try:
from providers.teacher import chat_with_teacher
fallback_text, _, fallback_model = await chat_with_teacher(
enriched,
model=settings.teacher_model,
user_id=None,
purpose="chat_fallback",
recent_failure=True,
explicit_user_request=True,
)
if fallback_text:
background_tasks.add_task(_do_save_raw, user_id, user_msg, fallback_text, f"{fallback_model}:fallback")
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": fallback_model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": fallback_text},
"finish_reason": "stop",
}
],
}
except Exception as fallback_e:
log.error("teacher fallback failed after local connection error: %s", fallback_e)
return JSONResponse(
{"error": {"message": "Echo model unreachable — is vLLM running?", "type": "connection_error"}},
status_code=503,
)
except (httpx.TimeoutException, httpx.ReadTimeout) as e:
log.error("chat_completions timeout url=%s: %s", target_url, e)
if use_local and settings.llm_api_key:
try:
from providers.teacher import chat_with_teacher
fallback_text, _, fallback_model = await chat_with_teacher(
enriched,
model=settings.teacher_model,
user_id=None,
purpose="chat_fallback",
recent_failure=True,
explicit_user_request=True,
)
if fallback_text:
background_tasks.add_task(_do_save_raw, user_id, user_msg, fallback_text, f"{fallback_model}:fallback")
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": fallback_model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": fallback_text},
"finish_reason": "stop",
}
],
}
except Exception as fallback_e:
log.error("teacher fallback failed after local timeout: %s", fallback_e)
return JSONResponse(
{"error": {"message": "Echo model timed out — vLLM may be overloaded.", "type": "timeout_error"}},
status_code=504,
)
assistant_msg = data["choices"][0]["message"]["content"] or ""
background_tasks.add_task(_do_save_raw, user_id, user_msg, assistant_msg, model_used)
return data
def _strip_tool_tags(text: str) -> str:
"""Remove any Qwen/Claude-style tool call blocks from a streamed chunk."""
import re
# Qwen format: <function name="...">...</function> and <function_response>...</function_response>
text = re.sub(r"<function[^>]*>.*?</function[^>]*>", "", text, flags=re.DOTALL)
# Generic tool_call blocks
text = re.sub(r"<tool_call>.*?</tool_call>", "", text, flags=re.DOTALL)
text = re.sub(r"<functioncall>.*?</functioncall>", "", text, flags=re.DOTALL)
# Truncate at any opening function/tool tag that wasn't closed yet
for tag in ("<function", "<tool_call>", "<|tool_call|>", "✿FUNCTION✿"):
idx = text.find(tag)
if idx != -1:
text = text[:idx]
return text
async def _stream_teacher_fallback(
messages: list[dict],
user_id: str,
user_msg: str,
reason: str,
):
if not settings.llm_api_key:
msg = f"\n\n[Echo: local model failed and no cloud fallback key is configured - {reason}]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
from providers.teacher import stream_teacher_response
full_text = ""
try:
async for sse_line, accumulated in stream_teacher_response(
messages,
model=settings.teacher_model,
user_id=None,
purpose="chat_fallback",
recent_failure=True,
explicit_user_request=True,
):
if accumulated:
full_text = accumulated
yield sse_line
except Exception as e:
log.error("teacher stream fallback failed: %s", e)
msg = f"\n\n[Echo: local model failed and cloud fallback also failed - {type(e).__name__}: {e or 'no details'}]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
if full_text:
asyncio.create_task(_do_save_raw(user_id, user_msg, full_text, f"{settings.teacher_model}:fallback"))
async def _stream_and_save(
url: str,
headers: dict,
payload: dict,
user_id: str,
user_msg: str,
model_used: str,
allow_tools: bool = False,
fallback_messages: list[dict] | None = None,
):
collected = []
try:
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream("POST", url, headers=headers, json=payload) as resp:
if resp.status_code != 200:
body = await resp.aread()
try:
err = json.loads(body).get("error", {}).get("message", body.decode())
except Exception:
err = body.decode()
log.error("_stream_and_save upstream error %s url=%s: %s", resp.status_code, url, err)
err_chunk = {"choices": [{"delta": {"content": f"\n\n[Echo: upstream error {resp.status_code} — {err}]"}, "finish_reason": None}]}
yield f"data: {json.dumps(err_chunk)}\n\n"
yield "data: [DONE]\n\n"
return
async for line in resp.aiter_lines():
if not line:
continue
if line.startswith("data: ") and line != "data: [DONE]":
try:
chunk = json.loads(line[6:])
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content") or ""
if content:
clean = content if allow_tools else _strip_tool_tags(content)
delta["content"] = clean
chunk["choices"][0]["delta"] = delta
collected.append(clean)
yield f"data: {json.dumps(chunk)}\n\n"
else:
yield f"{line}\n\n"
except Exception:
yield f"{line}\n\n"
else:
yield f"{line}\n\n"
except httpx.ConnectError:
log.error("_stream_and_save connect error url=%s", url)
if fallback_messages is not None:
async for item in _stream_teacher_fallback(fallback_messages, user_id, user_msg, "local model connection failed"):
yield item
return
msg = f"\n\n[Echo: cannot reach model at {url} — is vLLM running?]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
except (httpx.TimeoutException, httpx.ReadTimeout, httpx.ConnectTimeout, httpx.WriteTimeout, httpx.PoolTimeout) as e:
log.error("_stream_and_save timeout url=%s: %s", url, type(e).__name__)
if fallback_messages is not None:
async for item in _stream_teacher_fallback(fallback_messages, user_id, user_msg, "local model timed out"):
yield item
return
msg = f"\n\n[Echo: model timed out at {url} — vLLM may be overloaded or not running]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
except httpx.RemoteProtocolError as e:
log.error("_stream_and_save protocol error url=%s: %s", url, e)
if fallback_messages is not None:
async for item in _stream_teacher_fallback(fallback_messages, user_id, user_msg, "local model disconnected"):
yield item
return
msg = f"\n\n[Echo: model disconnected unexpectedly — {e or 'server closed the connection'}]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
except httpx.ReadError as e:
log.error("_stream_and_save read error url=%s: %s", url, e)
if fallback_messages is not None:
async for item in _stream_teacher_fallback(fallback_messages, user_id, user_msg, "local model read failed"):
yield item
return
msg = "\n\n[Echo: connection dropped while reading the response — vLLM may have restarted or run out of memory. Try again in a moment.]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
except Exception as e:
log.error("_stream_and_save error url=%s: %s", url, e)
if fallback_messages is not None:
async for item in _stream_teacher_fallback(fallback_messages, user_id, user_msg, f"local model failed with {type(e).__name__}"):
yield item
return
msg = f"\n\n[Echo: unexpected error — {type(e).__name__}: {e or 'no details'}]"
yield f"data: {json.dumps({'choices': [{'delta': {'content': msg}, 'finish_reason': None}]})}\n\n"
yield "data: [DONE]\n\n"
return
assistant_msg = "".join(collected)
if assistant_msg:
asyncio.create_task(_do_save_raw(user_id, user_msg, assistant_msg, model_used))
def _inject_system(messages: list[dict], system_injection: str) -> list[dict]:
enriched = [m for m in messages if m.get("role") != "system"]
return [{"role": "system", "content": system_injection}] + enriched