diff --git a/.env.example b/.env.example
new file mode 100644
index 00000000..b62946a0
--- /dev/null
+++ b/.env.example
@@ -0,0 +1,9 @@
+# Copy this file to .env and fill in your keys.
+# The .env file is git-ignored and should NEVER be committed.
+
+# Google Gemini API key (https://aistudio.google.com/app/apikey)
+GEMINI_API_KEY=your_gemini_api_key_here
+
+# Google Maps Platform API key (https://console.cloud.google.com/google/maps-apis)
+# Needs: Directions API, Places API, Geocoding API
+GOOGLE_MAPS_API_KEY=your_google_maps_api_key_here
diff --git a/debug_cases.py b/debug_cases.py
new file mode 100644
index 00000000..78cd0bf2
--- /dev/null
+++ b/debug_cases.py
@@ -0,0 +1,44 @@
+import sys, json
+sys.path.insert(0, "cactus/python/src")
+from main import generate_cactus, _preflight, _validate
+
+ALARM_TOOL = {
+ "name": "set_alarm",
+ "description": "Set an alarm for a given time",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "hour": {"type": "integer", "description": "Hour to set the alarm for"},
+ "minute": {"type": "integer", "description": "Minute to set the alarm for"},
+ },
+ "required": ["hour", "minute"],
+ },
+}
+REMINDER_TOOL = {
+ "name": "create_reminder",
+ "description": "Create a reminder with a title and time",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "title": {"type": "string", "description": "Reminder title"},
+ "time": {"type": "string", "description": "Time for the reminder (e.g. 3:00 PM)"},
+ },
+ "required": ["title", "time"],
+ },
+}
+
+cases = [
+ ("alarm_10am", [{"role": "user", "content": "Set an alarm for 10 AM."}], [ALARM_TOOL]),
+ ("alarm_9am", [{"role": "user", "content": "Set an alarm for 9 AM."}], [ALARM_TOOL]),
+ ("alarm_6am", [{"role": "user", "content": "Wake me up at 6 AM."}], [ALARM_TOOL]),
+ ("reminder_meeting",[{"role": "user", "content": "Remind me about the meeting at 3:00 PM."}], [REMINDER_TOOL]),
+ ("timer_7min", [{"role": "user", "content": "Set a timer for 7 minutes."}],
+ [{"name": "set_timer", "description": "Set a countdown timer",
+ "parameters": {"type": "object", "properties": {"minutes": {"type": "integer", "description": "Number of minutes"}}, "required": ["minutes"]}}]),
+]
+
+for name, messages, tools in cases:
+ result = generate_cactus(messages, tools)
+ complexity = _preflight(messages, tools)
+ valid, reason = _validate(result, tools, complexity, messages)
+ print(f"{name}: calls={json.dumps(result['function_calls'])} conf={result['confidence']:.3f} valid={valid} reason={reason}")
diff --git a/debug_cloud.py b/debug_cloud.py
new file mode 100644
index 00000000..fcceb23b
--- /dev/null
+++ b/debug_cloud.py
@@ -0,0 +1,22 @@
+import sys, json, os
+sys.path.insert(0, "cactus/python/src")
+from main import generate_cloud
+
+# message_among_four: "Text Dave saying I'll be late"
+messages = [{"role": "user", "content": "Text Dave saying I'll be late."}]
+tools = [
+ {"name": "get_weather", "description": "Get current weather for a location",
+ "parameters": {"type": "object", "properties": {"location": {"type": "string", "description": "City name"}}, "required": ["location"]}},
+ {"name": "set_timer", "description": "Set a countdown timer",
+ "parameters": {"type": "object", "properties": {"minutes": {"type": "integer", "description": "Number of minutes"}}, "required": ["minutes"]}},
+ {"name": "send_message", "description": "Send a message to a contact",
+ "parameters": {"type": "object", "properties": {"recipient": {"type": "string", "description": "Name of person"}, "message": {"type": "string", "description": "Message content"}}, "required": ["recipient", "message"]}},
+ {"name": "play_music", "description": "Play a song or playlist",
+ "parameters": {"type": "object", "properties": {"song": {"type": "string", "description": "Song name"}}, "required": ["song"]}},
+]
+
+for i in range(3):
+ result = generate_cloud(messages, tools)
+ print(f"Run {i+1}: {json.dumps(result['function_calls'])}")
+
+print("\nExpected: send_message(recipient='Dave', message=\"I'll be late\")")
diff --git a/debug_main1.py b/debug_main1.py
new file mode 100644
index 00000000..3686bf31
--- /dev/null
+++ b/debug_main1.py
@@ -0,0 +1,6 @@
+import sys
+sys.path.insert(0, "cactus/python/src")
+print("step 1: importing main1...")
+import main1
+print("step 2: main1 loaded, attributes:", [a for a in dir(main1) if not a.startswith("_")])
+print("step 3: hasattr generate_hybrid:", hasattr(main1, "generate_hybrid"))
diff --git a/handsfree/__init__.py b/handsfree/__init__.py
new file mode 100644
index 00000000..d03e3fc1
--- /dev/null
+++ b/handsfree/__init__.py
@@ -0,0 +1 @@
+# HandsFree — voice-first personal agent package
diff --git a/handsfree/app.py b/handsfree/app.py
new file mode 100644
index 00000000..162fcd66
--- /dev/null
+++ b/handsfree/app.py
@@ -0,0 +1,617 @@
+"""
+HandsFree — Voice-First Personal Agent
+Streamlit app: voice → transcribe (on-device) → location inject → hybrid inference → execute
+"""
+
+import sys
+import os
+import time
+import json
+import tempfile
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "cactus", "python", "src"))
+
+import streamlit as st
+from audio_recorder_streamlit import audio_recorder
+
+# ── Local modules ──────────────────────────────────────────────────────────────
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+from handsfree.tools import ALL_TOOLS, TOOL_MAP
+from handsfree.location import detect_location_intent
+from handsfree.executor import execute
+from main import generate_hybrid
+
+# ── Page config ────────────────────────────────────────────────────────────────
+st.set_page_config(
+ page_title="HandsFree",
+ page_icon="🎙️",
+ layout="wide",
+ initial_sidebar_state="collapsed",
+)
+
+# ── Styling ────────────────────────────────────────────────────────────────────
+st.markdown("""
+
+""", unsafe_allow_html=True)
+
+
+# ── Whisper (on-device transcription) ─────────────────────────────────────────
+_WHISPER_WEIGHTS = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "..", "cactus", "weights", "whisper-small")
+)
+_WHISPER_PROMPT = "<|startoftranscript|><|en|><|transcribe|><|notimestamps|>"
+
+
+@st.cache_resource(show_spinner=False)
+def _load_whisper():
+ """Load and cache the Whisper model once per session."""
+ from cactus import cactus_init
+ return cactus_init(_WHISPER_WEIGHTS)
+
+
+# Warm up Whisper eagerly at app start (runs once, cached afterwards)
+if os.path.isdir(_WHISPER_WEIGHTS):
+ try:
+ _load_whisper()
+ except Exception:
+ pass
+
+
+_FFMPEG = "/opt/homebrew/bin/ffmpeg"
+
+
+def _to_16khz_wav(wav_bytes: bytes) -> bytes:
+ """Convert any audio bytes (WAV, FLAC, OGG, MP3, M4A …) to 16 kHz mono PCM WAV."""
+ import io, wave, subprocess, tempfile
+ import numpy as np
+
+ # ── 1. Try ffmpeg first — handles every format reliably ─────────────────
+ if os.path.isfile(_FFMPEG):
+ try:
+ with tempfile.NamedTemporaryFile(suffix=".audio", delete=False) as f:
+ f.write(wav_bytes)
+ tmp_in = f.name
+ tmp_out = tmp_in + ".wav"
+ subprocess.run(
+ [_FFMPEG, "-y", "-i", tmp_in,
+ "-ar", "16000", "-ac", "1", "-f", "wav", tmp_out],
+ check=True, capture_output=True,
+ )
+ with open(tmp_out, "rb") as f:
+ result = f.read()
+ return result
+ except Exception:
+ pass
+ finally:
+ for p in (tmp_in, tmp_out):
+ try:
+ os.unlink(p)
+ except Exception:
+ pass
+
+ # ── 2. soundfile fallback (WAV, FLAC, OGG, AIFF) ─────────────────────
+ samples = None
+ framerate = None
+ try:
+ import soundfile as sf
+ samples, framerate = sf.read(io.BytesIO(wav_bytes), dtype="float32", always_2d=True)
+ samples = samples.mean(axis=1)
+ except Exception:
+ pass
+
+ # ── 3. plain wave fallback (browser mic WAV) ──────────────────────────
+ if samples is None:
+ with wave.open(io.BytesIO(wav_bytes)) as r:
+ nchannels = r.getnchannels()
+ sampwidth = r.getsampwidth()
+ framerate = r.getframerate()
+ raw = r.readframes(r.getnframes())
+ if sampwidth == 1:
+ samples = np.frombuffer(raw, dtype=np.uint8).astype(np.float32) / 128.0 - 1.0
+ elif sampwidth == 4:
+ samples = np.frombuffer(raw, dtype=np.int32).astype(np.float32) / 2_147_483_648.0
+ else:
+ samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32_768.0
+ if nchannels > 1:
+ samples = samples.reshape(-1, nchannels).mean(axis=1)
+
+ # ── Resample to 16 kHz if needed ─────────────────────────────────────
+ if framerate != 16_000:
+ new_len = int(len(samples) * 16_000 / framerate)
+ samples = np.interp(
+ np.linspace(0, len(samples), new_len),
+ np.arange(len(samples)),
+ samples,
+ )
+
+ pcm = (samples * 32_767).clip(-32_768, 32_767).astype(np.int16)
+ buf = io.BytesIO()
+ with wave.open(buf, "w") as w:
+ w.setnchannels(1)
+ w.setsampwidth(2)
+ w.setframerate(16_000)
+ w.writeframes(pcm.tobytes())
+ return buf.getvalue()
+
+
+# ── Helpers ────────────────────────────────────────────────────────────────────
+
+def transcribe_audio(wav_bytes: bytes) -> tuple[str, float]:
+ """Transcribe audio bytes on-device via Whisper (cactus). Returns (text, ms)."""
+ t0 = time.time()
+ text = ""
+ tmp_path = None
+ try:
+ wav_16k = _to_16khz_wav(wav_bytes)
+ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
+ f.write(wav_16k)
+ tmp_path = f.name
+
+ from cactus import cactus_transcribe
+ model = _load_whisper()
+ raw = cactus_transcribe(model, tmp_path, prompt=_WHISPER_PROMPT)
+ parsed = json.loads(raw)
+ text = parsed.get("response", "").strip()
+ except Exception as e:
+ st.warning(f"Transcription error: {e}")
+ text = ""
+ finally:
+ if tmp_path:
+ try:
+ os.unlink(tmp_path)
+ except Exception:
+ pass
+
+ elapsed = (time.time() - t0) * 1000
+ return text, elapsed
+
+
+def source_badge(source: str) -> str:
+ if "on-device" in source:
+ return '⚡ On-Device'
+ elif "retry" in source:
+ return '🔄 On-Device (retry)'
+ else:
+ return '☁️ Cloud'
+
+
+def render_pipeline(steps: list[dict]):
+ """Render a vertical pipeline of steps."""
+ st.markdown("#### 🔄 Pipeline")
+ for step in steps:
+ cls = "active" if step.get("ok") else ("error" if step.get("error") else "")
+ icon = "✅" if step.get("ok") else ("❌" if step.get("error") else "⬜")
+ detail = f" — {step.get('detail', '')}" if step.get("detail") else ""
+ timing = f" ({step['ms']:.0f}ms)" if step.get("ms") else ""
+ st.markdown(
+ f'
{icon} {step["label"]}{detail}{timing}
',
+ unsafe_allow_html=True,
+ )
+
+
+def render_result(call_result: dict):
+ """Render one executed function call result."""
+ fn = call_result["function"]
+ args = call_result["arguments"]
+ result = call_result["result"]
+ icon = result.get("icon", "📦")
+
+ st.markdown(f'', unsafe_allow_html=True)
+ st.markdown(f"**{icon} `{fn}`**")
+
+ arg_str = ", ".join(f"`{k}`: {json.dumps(v)}" for k, v in args.items())
+ st.caption(f"Called with: {arg_str}")
+
+ status = result.get("status", "unknown")
+ if status == "error":
+ st.error(result.get("error", "Unknown error"))
+
+ elif fn == "get_current_location":
+ st.markdown(f"**📍 {result.get('address', '')}**")
+ c1, c2 = st.columns(2)
+ c1.metric("Latitude", result.get("latitude", ""))
+ c2.metric("Longitude", result.get("longitude", ""))
+ c1.metric("Source", result.get("source", ""))
+ link = result.get("maps_link", "")
+ if link:
+ st.markdown(f"[🗺️ Open in Google Maps]({link})")
+ if result.get("full_address") and result.get("full_address") != result.get("address"):
+ st.caption(f"Full address: {result['full_address']}")
+
+ elif fn == "get_weather":
+ c1, c2, c3 = st.columns(3)
+ c1.metric("📍 Location", result.get("location", ""))
+ c2.metric("🌡️ Temp", f"{result.get('temp_f')}°F / {result.get('temp_c')}°C")
+ c3.metric("🌤️ Condition", result.get("condition", ""))
+ c1.metric("💧 Humidity", result.get("humidity", ""))
+ c2.metric("💨 Wind", result.get("wind", ""))
+
+ elif fn == "get_directions":
+ st.markdown(f"**From:** {result.get('from', '')}")
+ st.markdown(f"**To:** {result.get('to', '')}")
+ c1, c2 = st.columns(2)
+ c1.metric("⏱️ Duration", result.get("duration", ""))
+ c2.metric("📏 Distance", result.get("distance", ""))
+ steps = result.get("steps", [])
+ if steps:
+ st.markdown("**Turn-by-turn:**")
+ for i, s in enumerate(steps, 1):
+ st.markdown(f"{i}. {s}")
+ url = result.get("maps_url", "")
+ if url:
+ st.markdown(f"[🗺️ Open in Google Maps]({url})")
+
+ elif fn in ("find_nearby", "search_along_route"):
+ if fn == "search_along_route":
+ st.markdown(f"**Route:** {result.get('route', '')} ({result.get('route_duration','')} · {result.get('route_distance','')})")
+ places = result.get("results", [])
+ for p in places:
+ stars = f"⭐ {p.get('rating', 'N/A')}" if p.get("rating") != "N/A" else ""
+ status_badge = p.get("status", "")
+ st.markdown(f"- **{p.get('name','')}** {stars} \n {p.get('address', '')} {f'· {status_badge}' if status_badge else ''}")
+
+ else:
+ # Generic: show scalar fields, skip internals
+ skip = {"status", "icon"}
+ for k, v in result.items():
+ if k in skip:
+ continue
+ if k == "maps_url":
+ st.markdown(f"[🗺️ Open in Google Maps]({v})")
+ elif k == "maps_link":
+ st.markdown(f"[📍 View Location]({v})")
+ elif isinstance(v, list):
+ for item in v:
+ if isinstance(item, dict):
+ st.markdown(f"- {' · '.join(str(x) for x in item.values())}")
+ else:
+ st.markdown(f"- {item}")
+ else:
+ st.markdown(f"**{k.replace('_', ' ').title()}**: {v}")
+
+ st.markdown('
', unsafe_allow_html=True)
+
+
+# ── Main UI ────────────────────────────────────────────────────────────────────
+
+col_header, col_logo = st.columns([5, 1])
+with col_header:
+ st.markdown("# 🎙️ HandsFree")
+ st.markdown("*Voice-first personal agent — on-device speed, cloud intelligence*")
+
+st.divider()
+
+col_input, col_pipeline = st.columns([3, 2])
+
+with col_input:
+ st.markdown("### 🎤 Speak or Type a Command")
+
+ import hashlib, wave, io as _io
+
+ # ── Input mode tabs ─────────────────────────────────────────────────────
+ tab_mic, tab_file = st.tabs(["🎙️ Microphone", "📁 Upload Audio File"])
+
+ audio_bytes = None
+
+ with tab_mic:
+ st.markdown("**Record voice command** *(click mic, speak, click again to stop — auto-runs):*")
+ mic_bytes = audio_recorder(
+ text="",
+ recording_color="#4ade80",
+ neutral_color="#374151",
+ icon_name="microphone",
+ icon_size="2x",
+ pause_threshold=2.0,
+ sample_rate=16000,
+ )
+ if mic_bytes:
+ audio_bytes = mic_bytes
+ st.caption(f"Captured {len(mic_bytes)//1024} KB from mic")
+ else:
+ st.caption("🎤 Click the microphone to start recording")
+
+ with tab_file:
+ st.markdown("**Upload a WAV audio file to test transcription:**")
+ uploaded = st.file_uploader(
+ "Upload audio",
+ type=["wav", "mp3", "m4a", "ogg", "flac"],
+ label_visibility="collapsed",
+ )
+ if uploaded is not None:
+ raw = uploaded.read()
+ # Convert to WAV bytes if not already WAV
+ if not uploaded.name.lower().endswith(".wav"):
+ try:
+ import subprocess, tempfile
+ with tempfile.NamedTemporaryFile(suffix=os.path.splitext(uploaded.name)[1], delete=False) as f:
+ f.write(raw)
+ tmp_in = f.name
+ tmp_out = tmp_in + ".wav"
+ subprocess.run(["ffmpeg", "-y", "-i", tmp_in, tmp_out], check=True,
+ capture_output=True)
+ with open(tmp_out, "rb") as f:
+ raw = f.read()
+ os.unlink(tmp_in); os.unlink(tmp_out)
+ except Exception as e:
+ st.warning(f"Could not convert to WAV ({e}). Trying as-is.")
+ audio_bytes = raw
+ st.audio(raw, format="audio/wav")
+ st.success(f"📁 File loaded: {uploaded.name} ({len(raw)//1024} KB)")
+
+ # Show mic feedback and auto-run on new audio
+ if audio_bytes:
+ audio_hash = hashlib.md5(audio_bytes).hexdigest()
+ # Parse duration
+ try:
+ with wave.open(_io.BytesIO(audio_bytes)) as _w:
+ _dur = _w.getnframes() / _w.getframerate()
+ dur_str = f"{_dur:.1f}s"
+ except Exception:
+ _dur = 0
+ dur_str = f"{len(audio_bytes)//1024}KB"
+ # Auto-trigger when audio is new
+ last_hash = st.session_state.get("_last_audio_hash", "")
+ if audio_hash != last_hash:
+ st.session_state["_last_audio_hash"] = audio_hash
+ st.session_state["_auto_run_audio"] = audio_bytes
+
+
+ # ── Text fallback ───────────────────────────────────────────────────────
+ st.markdown("**…or type it:**")
+ text_input = st.text_input(
+ label="command",
+ label_visibility="collapsed",
+ placeholder="e.g. Send my location to Mom and check weather in SF",
+ )
+
+ run_btn = st.button("▶ Run", type="primary", use_container_width=True)
+
+ # ── Example commands ────────────────────────────────────────────────────
+ with st.expander("💡 Example commands"):
+ examples = [
+ "Set an alarm for 7:30 AM",
+ "Send my location to Mom",
+ "Play Bohemian Rhapsody",
+ "Remind me to take medicine at 8:00 PM",
+ "Find coffee shops near me and text John saying I'll be late",
+ "Set a timer for 15 minutes and check the weather in San Francisco",
+ "Get directions from here to Golden Gate Bridge",
+ "Search for Tom in my contacts and send him a message saying happy birthday",
+ ]
+ for ex in examples:
+ if st.button(ex, key=ex, use_container_width=True):
+ st.session_state["injected_command"] = ex
+
+with col_pipeline:
+ pipeline_placeholder = st.empty()
+ pipeline_placeholder.markdown("*Pipeline will appear here after running a command.*")
+
+
+# ── Session state ──────────────────────────────────────────────────────────────
+if "injected_command" not in st.session_state:
+ st.session_state["injected_command"] = ""
+
+# Prefer injected example over text input
+command_text = st.session_state.get("injected_command") or text_input
+
+# Pull pending auto-run audio (set when new audio hash detected)
+_auto_audio = st.session_state.pop("_auto_run_audio", None)
+if _auto_audio:
+ audio_bytes = _auto_audio # ensure it's set even if session-state driven
+
+# ── Run pipeline ───────────────────────────────────────────────────────────────
+auto_run = _auto_audio is not None
+if (run_btn or auto_run or st.session_state.get("injected_command")) and (audio_bytes or command_text):
+
+ # Clear injected command after consuming it
+ st.session_state["injected_command"] = ""
+
+ steps = []
+ final_command = command_text
+ timings = {}
+
+ st.divider()
+ st.markdown("### ⚡ Running Pipeline…")
+ progress = st.progress(0)
+
+ # ── Step 1: Transcription ───────────────────────────────────────────────
+ transcription_ms = 0
+ if audio_bytes and not command_text:
+ with st.spinner("🎙️ Transcribing on-device…"):
+ final_command, transcription_ms = transcribe_audio(audio_bytes)
+ if not final_command:
+ st.error("Transcription returned empty. Please try again or type your command.")
+ st.stop()
+ steps.append({"label": "Voice → Text (Whisper on-device)", "ok": True,
+ "detail": f'"{final_command[:50]}…"' if len(final_command) > 50 else f'"{final_command}"',
+ "ms": transcription_ms})
+ else:
+ steps.append({"label": "Voice → Text", "ok": True,
+ "detail": "Text input (no transcription needed)", "ms": 0})
+ timings["transcription_ms"] = transcription_ms
+ progress.progress(15)
+
+ # Display current transcribed/typed command
+ st.markdown(f"**📝 Command:** `{final_command}`")
+
+ # ── Step 2: Location intent detection ───────────────────────────────────
+ location_info = None
+ location_ms = 0
+
+ if detect_location_intent(final_command):
+ # User is asking where they are → let get_current_location tool handle it
+ # Do NOT inject GPS into prompt (it would give the model the answer,
+ # so it wouldn’t bother calling the tool)
+ steps.append({"label": "Location Query Detected", "ok": True,
+ "detail": "Routing to get_current_location", "ms": 1})
+ else:
+ steps.append({"label": "Location Intent Check", "ok": True,
+ "detail": "No location needed", "ms": 1})
+
+ timings["location_ms"] = location_ms
+ progress.progress(35)
+
+ # ── Step 3: Smart routing + inference ───────────────────────────────────
+ messages = [{"role": "user", "content": final_command}]
+ tools = [
+ {k: v for k, v in t.items() if k != "on_device"}
+ for t in ALL_TOOLS
+ ]
+
+ with st.spinner("🤖 Running hybrid inference…"):
+ t0 = time.time()
+ inference_result = generate_hybrid(messages, tools)
+ inference_ms = (time.time() - t0) * 1000
+
+ source = inference_result.get("source", "unknown")
+ fn_calls = inference_result.get("function_calls", [])
+ confidence = inference_result.get("confidence", None)
+
+ routing_detail = source
+ if confidence is not None:
+ routing_detail += f" | conf={confidence:.2f}"
+
+ steps.append({
+ "label": f"Hybrid Routing → Inference",
+ "ok": bool(fn_calls),
+ "error": not bool(fn_calls),
+ "detail": routing_detail,
+ "ms": inference_ms,
+ })
+ timings["inference_ms"] = inference_ms
+ progress.progress(65)
+
+ # ── Step 4: Execute function calls ──────────────────────────────────────
+ if fn_calls:
+ t0 = time.time()
+ exec_results = execute(fn_calls)
+ exec_ms = (time.time() - t0) * 1000
+ fn_names = ", ".join(c["function"] for c in exec_results)
+ steps.append({"label": "Execute Function Calls", "ok": True,
+ "detail": fn_names, "ms": exec_ms})
+ timings["exec_ms"] = exec_ms
+ else:
+ steps.append({"label": "Execute Function Calls", "error": True,
+ "detail": "No function calls returned"})
+ exec_results = []
+
+ progress.progress(100)
+
+ # ── Render pipeline ─────────────────────────────────────────────────────
+ with col_pipeline:
+ pipeline_placeholder.empty()
+ with pipeline_placeholder.container():
+ render_pipeline(steps)
+
+ # Timing summary
+ total_ms = sum(v for v in timings.values())
+ st.markdown("---")
+ st.markdown("#### ⏱️ Timing Breakdown")
+ for label, ms in {
+ "🎙️ Transcription": timings.get("transcription_ms", 0),
+ "📍 Location": timings.get("location_ms", 0),
+ "🤖 Inference": timings.get("inference_ms", 0),
+ "⚙️ Execution": timings.get("exec_ms", 0),
+ }.items():
+ pct = int((ms / total_ms * 100)) if total_ms > 0 else 0
+ st.markdown(f"{label}: **{ms:.0f}ms** ({pct}%)", unsafe_allow_html=True)
+ st.markdown(
+ f'',
+ unsafe_allow_html=True,
+ )
+ st.markdown(f"**Total: {total_ms:.0f}ms**")
+
+ # Routing badge
+ st.markdown(f"**Routing:** {source_badge(source)}", unsafe_allow_html=True)
+
+ # ── Results ─────────────────────────────────────────────────────────────
+ if exec_results:
+ st.markdown("### ✅ Results")
+ for r in exec_results:
+ render_result(r)
+ else:
+ st.warning("No function calls were generated. Try rephrasing your command.")
+
+ # ── Location info card ───────────────────────────────────────────────────
+ if location_info:
+ st.markdown("### 📍 Location Used")
+ c1, c2 = st.columns(2)
+ with c1:
+ st.metric("Address", location_info["address"])
+ st.metric("Source", location_info["source"])
+ with c2:
+ st.metric("Coordinates", f"{location_info['lat']:.5f}, {location_info['lon']:.5f}")
+ st.markdown(f"[View on Maps]({location_info['maps_link']})")
+
+ # ── Raw debug ────────────────────────────────────────────────────────────
+ with st.expander("🔍 Raw inference output"):
+ st.json(inference_result)
+
+ # ── Allow re-recording after voice pipeline ───────────────────────────
+ if _auto_audio:
+ if st.button("🎙️ Record New Command", use_container_width=True):
+ st.session_state["_last_audio_hash"] = ""
+ st.rerun()
+
+# ── Sidebar: About ─────────────────────────────────────────────────────────────
+with st.sidebar:
+ st.markdown("## 🎙️ HandsFree")
+ st.markdown("""
+**Pipeline stages:**
+
+1. 🎤 Voice capture (browser mic)
+2. 🧠 On-device transcription (Whisper via Cactus)
+3. 📍 Location intent detection (keyword scan)
+4. 🛰️ GPS injection (CoreLocation, no API)
+5. ⚡ Hybrid routing (FunctionGemma ↔ Gemini)
+6. ✅ Function execution
+
+---
+
+**Available tools:**
+""")
+ for t in ALL_TOOLS:
+ badge = "⚡" if t.get("on_device") else "☁️"
+ st.markdown(f"{badge} `{t['name']}`")
+
+ st.markdown("""
+---
+⚡ = On-device (FunctionGemma)
+☁️ = Cloud (Gemini)
+""")
diff --git a/handsfree/executor.py b/handsfree/executor.py
new file mode 100644
index 00000000..7f5d7812
--- /dev/null
+++ b/handsfree/executor.py
@@ -0,0 +1,414 @@
+"""
+HandsFree — Function Executor
+Real API integrations:
+ - Weather : Open-Meteo (free, no key)
+ - Maps : Google Maps Platform (GOOGLE_MAPS_API_KEY env var)
+ - Others : simulated (iMessage, alarms, music)
+"""
+
+import os
+import time
+from datetime import datetime
+
+import requests
+
+# ── Google Maps client (lazy-initialised) ─────────────────────────────────────
+_gmaps = None
+
+def _get_gmaps():
+ global _gmaps
+ if _gmaps is None:
+ key = os.environ.get("GOOGLE_MAPS_API_KEY", "")
+ if not key:
+ raise RuntimeError("GOOGLE_MAPS_API_KEY is not set")
+ import googlemaps
+ _gmaps = googlemaps.Client(key=key)
+ return _gmaps
+
+
+# Phrases that mean "use my current GPS location"
+_HERE_PHRASES = {
+ "near me", "my location", "my current location", "current location",
+ "here", "where i am", "where i'm at", "my position",
+}
+
+def _resolve_location(loc_str: str) -> str:
+ """
+ If loc_str is a 'near me' style phrase, replace it with the user's
+ real GPS coordinates (lat,lng string) suitable for geocoding/Maps APIs.
+ Otherwise return loc_str unchanged.
+ """
+ if loc_str.strip().lower() in _HERE_PHRASES:
+ from handsfree.location import get_gps_location
+ loc = get_gps_location()
+ if loc:
+ return f"{loc['lat']},{loc['lon']}"
+ return loc_str
+
+
+def execute(function_calls: list[dict]) -> list[dict]:
+ """Execute a list of function calls and return results."""
+ results = []
+ for call in function_calls:
+ fn = call.get("name", "unknown")
+ args = call.get("arguments", {})
+ handler = _HANDLERS.get(fn, _unknown)
+ try:
+ result = handler(args)
+ except Exception as e:
+ result = {"status": "error", "error": str(e)}
+ results.append({
+ "function": fn,
+ "arguments": args,
+ "result": result,
+ })
+ return results
+
+
+# ── Handlers ──────────────────────────────────────────────────────────────────
+
+def _send_message(args):
+ recipient = args.get("recipient", "Unknown")
+ message = args.get("message", "")
+ return {
+ "status": "sent",
+ "to": recipient,
+ "preview": message[:60] + ("…" if len(message) > 60 else ""),
+ "timestamp": datetime.now().strftime("%I:%M %p"),
+ "icon": "💬",
+ }
+
+
+def _set_alarm(args):
+ hour = args.get("hour", 0)
+ minute = args.get("minute", 0)
+ period = "AM" if hour < 12 else "PM"
+ display_hour = hour if hour <= 12 else hour - 12
+ display_hour = display_hour or 12
+ return {
+ "status": "set",
+ "time": f"{display_hour}:{minute:02d} {period}",
+ "icon": "⏰",
+ }
+
+
+def _set_timer(args):
+ minutes = args.get("minutes", 0)
+ return {
+ "status": "running",
+ "duration": f"{minutes} minute{'s' if minutes != 1 else ''}",
+ "ends_at": f"{minutes}m from now",
+ "icon": "⏱️",
+ }
+
+
+def _create_reminder(args):
+ title = args.get("title", "Reminder")
+ time_str = args.get("time", "")
+ return {
+ "status": "created",
+ "title": title.capitalize(),
+ "time": time_str,
+ "icon": "📌",
+ }
+
+
+def _play_music(args):
+ song = args.get("song", "")
+ return {
+ "status": "playing",
+ "track": song,
+ "icon": "🎵",
+ }
+
+
+def _search_contacts(args):
+ query = args.get("query", "")
+ # Simulate finding a contact
+ return {
+ "status": "found",
+ "query": query,
+ "results": [
+ {"name": query, "phone": "+1 (555) 000-0000", "email": f"{query.lower()}@example.com"},
+ ],
+ "icon": "👤",
+ }
+
+
+# WMO weather code → human label
+_WMO = {
+ 0: "Clear Sky", 1: "Mainly Clear", 2: "Partly Cloudy", 3: "Overcast",
+ 45: "Foggy", 48: "Icy Fog",
+ 51: "Light Drizzle", 53: "Moderate Drizzle", 55: "Heavy Drizzle",
+ 61: "Light Rain", 63: "Moderate Rain", 65: "Heavy Rain",
+ 71: "Light Snow", 73: "Moderate Snow", 75: "Heavy Snow",
+ 80: "Rain Showers", 81: "Moderate Showers", 82: "Violent Showers",
+ 95: "Thunderstorm", 96: "Thunderstorm w/ Hail",
+}
+
+def _get_weather(args):
+ location = _resolve_location(args.get("location", ""))
+ try:
+ # If location is already "lat,lon" (from 'near me' resolution), reverse geocode it
+ if location.count(",") == 1 and all(c in "0123456789.-, " for c in location):
+ parts = location.split(",")
+ lat, lon = float(parts[0].strip()), float(parts[1].strip())
+ rev = requests.get(
+ "https://nominatim.openstreetmap.org/reverse",
+ params={"lat": lat, "lon": lon, "format": "json"},
+ headers={"User-Agent": "HandsFreeApp/1.0"},
+ timeout=5,
+ ).json()
+ display = rev.get("address", {}).get("city") or rev.get("display_name", location).split(",")[0]
+ else:
+ # 1. Geocode city name via Nominatim (free, no key)
+ geo = requests.get(
+ "https://nominatim.openstreetmap.org/search",
+ params={"q": location, "format": "json", "limit": 1},
+ headers={"User-Agent": "HandsFreeApp/1.0"},
+ timeout=5,
+ ).json()
+ if not geo:
+ raise ValueError(f"Location not found: {location}")
+ lat, lon = float(geo[0]["lat"]), float(geo[0]["lon"])
+ display = geo[0].get("display_name", location).split(",")[0]
+
+ # 2. Fetch weather from Open-Meteo (free, no key)
+ wx = requests.get(
+ "https://api.open-meteo.com/v1/forecast",
+ params={
+ "latitude": lat, "longitude": lon,
+ "current": "temperature_2m,relative_humidity_2m,wind_speed_10m,weathercode",
+ "temperature_unit": "fahrenheit",
+ "wind_speed_unit": "mph",
+ "forecast_days": 1,
+ },
+ timeout=5,
+ ).json()
+ cur = wx["current"]
+ code = cur.get("weathercode", 0)
+ condition = _WMO.get(code, "Unknown")
+ temp_f = cur["temperature_2m"]
+ temp_c = round((temp_f - 32) * 5 / 9, 1)
+ humidity = cur["relative_humidity_2m"]
+ wind = cur["wind_speed_10m"]
+ return {
+ "status": "ok",
+ "location": display,
+ "condition": condition,
+ "temp_f": round(temp_f, 1),
+ "temp_c": temp_c,
+ "humidity": f"{humidity}%",
+ "wind": f"{wind} mph",
+ "icon": "⛅",
+ }
+ except Exception as e:
+ return {"status": "error", "error": str(e), "icon": "⛅"}
+
+
+def _get_directions(args):
+ origin = _resolve_location(args.get("origin", "") or "Current location")
+ destination = _resolve_location(args.get("destination", ""))
+ mode = args.get("mode", "driving")
+ try:
+ gmaps = _get_gmaps()
+ result = gmaps.directions(origin, destination, mode=mode)
+ if not result:
+ raise ValueError("No route found")
+ leg = result[0]["legs"][0]
+ duration = leg["duration"]["text"]
+ distance = leg["distance"]["text"]
+ start = leg["start_address"]
+ end = leg["end_address"]
+ import re as _re
+ def _strip_html(h):
+ h = h.replace("", "").replace("", "")
+ h = h.replace('', " — ").replace("
", "")
+ return _re.sub(r"<[^>]+>", "", h).strip()
+ steps = [_strip_html(s["html_instructions"]) for s in leg["steps"][:6]]
+ maps_url = (
+ f"https://www.google.com/maps/dir/?api=1"
+ f"&origin={requests.utils.quote(start)}"
+ f"&destination={requests.utils.quote(end)}"
+ f"&travelmode={mode}"
+ )
+ return {
+ "status": "ok",
+ "from": start,
+ "to": end,
+ "mode": mode,
+ "duration": duration,
+ "distance": distance,
+ "steps": steps,
+ "maps_url": maps_url,
+ "icon": "🗺️",
+ }
+ except Exception as e:
+ return {"status": "error", "error": str(e), "icon": "🗺️"}
+
+
+def _find_nearby(args):
+ category = args.get("category", "")
+ location = _resolve_location(args.get("location", ""))
+ try:
+ gmaps = _get_gmaps()
+ # If already lat,lng from _resolve_location, pass directly; else geocode
+ if location.count(",") == 1 and all(c in "0123456789.-, " for c in location):
+ parts = location.split(",")
+ latlng = {"lat": float(parts[0].strip()), "lng": float(parts[1].strip())}
+ else:
+ geo = gmaps.geocode(location)
+ if not geo:
+ raise ValueError(f"Cannot geocode: {location}")
+ latlng = geo[0]["geometry"]["location"]
+
+ places = gmaps.places_nearby(
+ location=latlng,
+ radius=1500,
+ keyword=category,
+ )
+ results = []
+ for p in places.get("results", [])[:5]:
+ name = p.get("name", "")
+ rating = p.get("rating", "N/A")
+ address = p.get("vicinity", "")
+ open_now = p.get("opening_hours", {}).get("open_now", None)
+ status = "Open" if open_now else ("Closed" if open_now is False else "Hours unknown")
+ results.append({"name": name, "rating": rating, "address": address, "status": status})
+
+ return {
+ "status": "ok",
+ "category": category,
+ "near": location,
+ "results": results,
+ "icon": "📍",
+ }
+ except Exception as e:
+ return {"status": "error", "error": str(e), "icon": "📍"}
+
+
+def _search_along_route(args):
+ query = args.get("query", "")
+ origin = _resolve_location(args.get("origin", ""))
+ destination = _resolve_location(args.get("destination", ""))
+ try:
+ gmaps = _get_gmaps()
+ # Get route polyline
+ route = gmaps.directions(origin, destination, mode="driving")
+ if not route:
+ raise ValueError("No route found")
+
+ # Sample waypoints along the route (every ~5 steps)
+ steps = route[0]["legs"][0]["steps"]
+ sample_points = [
+ steps[i]["end_location"]
+ for i in range(0, len(steps), max(1, len(steps) // 5))
+ ][:3]
+
+ results = []
+ seen = set()
+ for pt in sample_points:
+ nearby = gmaps.places_nearby(
+ location=pt,
+ radius=800,
+ keyword=query,
+ )
+ for p in nearby.get("results", [])[:2]:
+ name = p.get("name", "")
+ if name in seen:
+ continue
+ seen.add(name)
+ results.append({
+ "name": name,
+ "address": p.get("vicinity", ""),
+ "rating": p.get("rating", "N/A"),
+ })
+ if len(results) >= 4:
+ break
+
+ total_duration = route[0]["legs"][0]["duration"]["text"]
+ total_distance = route[0]["legs"][0]["distance"]["text"]
+ return {
+ "status": "ok",
+ "query": query,
+ "route": f"{origin} → {destination}",
+ "route_duration": total_duration,
+ "route_distance": total_distance,
+ "results": results,
+ "icon": "🛣️",
+ }
+ except Exception as e:
+ return {"status": "error", "error": str(e), "icon": "🛣️"}
+
+
+def _get_current_location(args):
+ fmt = args.get("format", "full")
+ try:
+ # 1. Get GPS coordinates from CoreLocation
+ from handsfree.location import get_gps_location
+ loc = get_gps_location()
+ if not loc:
+ raise RuntimeError("Could not determine location — CoreLocation denied and IP lookup failed")
+
+ lat, lon = loc["lat"], loc["lon"]
+
+ # 2. Reverse-geocode via Google Maps for a clean, accurate address
+ try:
+ gmaps = _get_gmaps()
+ results = gmaps.reverse_geocode((lat, lon))
+ if results:
+ full_address = results[0]["formatted_address"]
+ # Extract neighbourhood/city for short format
+ components = results[0].get("address_components", [])
+ neighbourhood = next(
+ (c["long_name"] for c in components
+ if "sublocality" in c["types"] or "neighborhood" in c["types"]),
+ None
+ )
+ city = next(
+ (c["long_name"] for c in components if "locality" in c["types"]),
+ None
+ )
+ short_address = neighbourhood or city or full_address.split(",")[0]
+ else:
+ full_address = loc.get("address", f"{lat:.5f}, {lon:.5f}")
+ short_address = full_address.split(",")[0]
+ except Exception:
+ # Fall back to CoreLocation address if Maps key unavailable
+ full_address = loc.get("address", f"{lat:.5f}, {lon:.5f}")
+ short_address = full_address.split(",")[0]
+
+ display = short_address if fmt == "short" else full_address
+ maps_link = f"https://maps.google.com/?q={lat:.6f},{lon:.6f}"
+
+ return {
+ "status": "ok",
+ "address": display,
+ "full_address": full_address,
+ "latitude": round(lat, 6),
+ "longitude": round(lon, 6),
+ "source": loc.get("source", "GPS"),
+ "maps_link": maps_link,
+ "icon": "📍",
+ }
+ except Exception as e:
+ return {"status": "error", "error": str(e), "icon": "📍"}
+
+
+def _unknown(args):
+ return {"status": "error", "error": "Unknown function"}
+
+
+_HANDLERS = {
+ "send_message": _send_message,
+ "set_alarm": _set_alarm,
+ "set_timer": _set_timer,
+ "create_reminder": _create_reminder,
+ "play_music": _play_music,
+ "search_contacts": _search_contacts,
+ "get_weather": _get_weather,
+ "get_directions": _get_directions,
+ "find_nearby": _find_nearby,
+ "search_along_route": _search_along_route,
+ "get_current_location": _get_current_location,
+}
diff --git a/handsfree/location.py b/handsfree/location.py
new file mode 100644
index 00000000..774f0e72
--- /dev/null
+++ b/handsfree/location.py
@@ -0,0 +1,157 @@
+"""
+HandsFree — Location Module
+On-device GPS via Apple CoreLocation + intent detection via keyword scanning.
+No external API calls — coordinates and address stay on device.
+"""
+
+import re
+
+# ── Intent detection ───────────────────────────────────────────────────────────────
+
+# Patterns that mean the user wants to KNOW their current location.
+_QUERY_KEYWORDS = [
+ r"\bwhat.*\b(my|current)\s*(location|address|position)\b",
+ r"\bwhere\s+am\s+i\b",
+ r"\bwhere\s+i('m| am)\b",
+ r"\bmy\s+(current\s+)?(location|address|position)\b",
+ r"\bcurrent\s+location\b",
+ r"\bmy\s+address\b",
+]
+_QUERY_RE = [re.compile(p, re.I) for p in _QUERY_KEYWORDS]
+
+
+def detect_location_intent(text: str) -> bool:
+ """Return True if the command is asking for the user's current location."""
+ return is_location_query(text)
+
+
+def is_location_query(text: str) -> bool:
+ """Return True if the user is asking what their current location is."""
+ return any(pat.search(text) for pat in _QUERY_RE)
+
+
+def get_gps_location() -> dict | None:
+ """
+ Retrieve current GPS coordinates using Apple CoreLocation via pyobjc.
+ Falls back to IP-based geolocation if CoreLocation is denied or unavailable.
+ Returns dict with lat, lon, address, maps_link — or None if all methods fail.
+ """
+ try:
+ import CoreLocation
+ import time
+
+ manager = CoreLocation.CLLocationManager.alloc().init()
+
+ auth_status = CoreLocation.CLLocationManager.authorizationStatus()
+ # kCLAuthorizationStatusDenied = 2, Restricted = 1, NotDetermined = 0
+ if auth_status in (1, 2):
+ # Permission denied — skip straight to IP fallback
+ return _fallback_location()
+ if auth_status == 0:
+ manager.requestWhenInUseAuthorization()
+ time.sleep(1.5)
+
+ location = manager.location()
+ if location is None:
+ return _fallback_location()
+
+ coord = location.coordinate()
+ lat, lon = coord.latitude, coord.longitude
+ if lat == 0.0 and lon == 0.0:
+ return _fallback_location()
+
+ address = _reverse_geocode(lat, lon)
+ return {
+ "lat": lat,
+ "lon": lon,
+ "address": address,
+ "maps_link": f"https://maps.google.com/?q={lat:.6f},{lon:.6f}",
+ "source": "CoreLocation (on-device GPS)",
+ }
+ except Exception:
+ return _fallback_location()
+
+
+def _reverse_geocode(lat: float, lon: float) -> str:
+ """Reverse geocode coordinates to a human-readable address using CLGeocoder."""
+ try:
+ import CoreLocation
+ import threading
+
+ result = {"address": None, "done": threading.Event()}
+
+ def completion(placemarks, error):
+ if placemarks:
+ pm = placemarks[0]
+ parts = []
+ if pm.subThoroughfare():
+ parts.append(pm.subThoroughfare())
+ if pm.thoroughfare():
+ parts.append(pm.thoroughfare())
+ if pm.locality():
+ parts.append(pm.locality())
+ if pm.administrativeArea():
+ parts.append(pm.administrativeArea())
+ result["address"] = ", ".join(parts) if parts else f"{lat:.4f}, {lon:.4f}"
+ result["done"].set()
+
+ geocoder = CoreLocation.CLGeocoder.alloc().init()
+ loc = CoreLocation.CLLocation.alloc().initWithLatitude_longitude_(lat, lon)
+ geocoder.reverseGeocodeLocation_completionHandler_(loc, completion)
+ result["done"].wait(timeout=3.0)
+ return result["address"] or f"{lat:.4f}°N, {lon:.4f}°W"
+ except Exception:
+ return f"{lat:.4f}°N, {lon:.4f}°W"
+
+
+def _fallback_location() -> dict:
+ """
+ Fallback when CoreLocation is unavailable or denied.
+ Uses IP-based geolocation (ipinfo.io, free, no key needed) for real location.
+ """
+ import requests as _req
+ try:
+ resp = _req.get("https://ipinfo.io/json", timeout=4).json()
+ loc_str = resp.get("loc", "") # "37.7749,-122.4194"
+ city = resp.get("city", "")
+ region = resp.get("region", "")
+ country = resp.get("country", "")
+ if loc_str and "," in loc_str:
+ lat, lon = map(float, loc_str.split(","))
+ address = ", ".join(p for p in [city, region, country] if p)
+ return {
+ "lat": lat,
+ "lon": lon,
+ "address": address or f"{lat:.4f}, {lon:.4f}",
+ "maps_link": f"https://maps.google.com/?q={lat:.6f},{lon:.6f}",
+ "source": "IP geolocation (ipinfo.io)",
+ }
+ except Exception:
+ pass
+ # Last resort: return None so callers know it truly failed
+ return None
+
+
+def inject_location_into_command(text: str, location: dict) -> str:
+ """
+ Rewrite a command to embed actual GPS coordinates.
+ e.g. "Send my location to Mom" →
+ "Send a message to Mom saying I'm at Civic Center, SF — https://maps.google.com/?q=..."
+ """
+ address = location["address"]
+ maps_link = location["maps_link"]
+
+ # Replace location-intent phrases with concrete address + link
+ location_string = f"I'm at {address} — {maps_link}"
+
+ # Try to detect a recipient pattern
+ recipient_match = re.search(
+ r'\bto\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b', text
+ )
+
+ if recipient_match:
+ recipient = recipient_match.group(1)
+ return f"Send a message to {recipient} saying {location_string}"
+
+ # Generic fallback
+ return f"{text.rstrip('.')} — my current location is: {location_string}"
diff --git a/handsfree/tools.py b/handsfree/tools.py
new file mode 100644
index 00000000..e4d8ccd3
--- /dev/null
+++ b/handsfree/tools.py
@@ -0,0 +1,179 @@
+"""
+HandsFree — Tool Registry
+All tools available to the agent, tagged by whether they can run on-device.
+"""
+
+# ── Tool Definitions ─────────────────────────────────────────────────────────
+
+TOOL_SEND_MESSAGE = {
+ "name": "send_message",
+ "description": "Send a text message or iMessage to a contact",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "recipient": {"type": "string", "description": "Name of the contact to send the message to"},
+ "message": {"type": "string", "description": "The message content to send"},
+ },
+ "required": ["recipient", "message"],
+ },
+}
+
+TOOL_SET_ALARM = {
+ "name": "set_alarm",
+ "description": "Set an alarm for a specific time",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "hour": {"type": "integer", "description": "Hour (0-23)"},
+ "minute": {"type": "integer", "description": "Minute (0-59)"},
+ },
+ "required": ["hour", "minute"],
+ },
+}
+
+TOOL_SET_TIMER = {
+ "name": "set_timer",
+ "description": "Set a countdown timer for a number of minutes",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "minutes": {"type": "integer", "description": "Number of minutes for the timer"},
+ },
+ "required": ["minutes"],
+ },
+}
+
+TOOL_CREATE_REMINDER = {
+ "name": "create_reminder",
+ "description": "Create a reminder with a title and time",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "title": {"type": "string", "description": "Short reminder title"},
+ "time": {"type": "string", "description": "Time for the reminder (e.g. 3:00 PM)"},
+ },
+ "required": ["title", "time"],
+ },
+}
+
+TOOL_PLAY_MUSIC = {
+ "name": "play_music",
+ "description": "Play a song, album, or playlist",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "song": {"type": "string", "description": "Song, album, or playlist name"},
+ },
+ "required": ["song"],
+ },
+}
+
+TOOL_SEARCH_CONTACTS = {
+ "name": "search_contacts",
+ "description": "Search for a contact by name",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "query": {"type": "string", "description": "Name to search for"},
+ },
+ "required": ["query"],
+ },
+}
+
+TOOL_GET_WEATHER = {
+ "name": "get_weather",
+ "description": "Get current weather conditions for a location",
+ "on_device": False,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "location": {"type": "string", "description": "City name or address"},
+ },
+ "required": ["location"],
+ },
+}
+
+TOOL_GET_DIRECTIONS = {
+ "name": "get_directions",
+ "description": "Get driving or walking directions from one place to another",
+ "on_device": False,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "origin": {"type": "string", "description": "Starting location"},
+ "destination": {"type": "string", "description": "Destination location"},
+ "mode": {"type": "string", "description": "Travel mode: driving, walking, transit"},
+ },
+ "required": ["origin", "destination"],
+ },
+}
+
+TOOL_FIND_NEARBY = {
+ "name": "find_nearby",
+ "description": "Find nearby places of a given category (restaurants, gas stations, pharmacies, etc.)",
+ "on_device": False,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "category": {"type": "string", "description": "Type of place (e.g. coffee shop, gas station, hospital)"},
+ "location": {"type": "string", "description": "Center location to search around"},
+ },
+ "required": ["category", "location"],
+ },
+}
+
+TOOL_SEARCH_ALONG_ROUTE = {
+ "name": "search_along_route",
+ "description": "Search for places of a given type along a driving route",
+ "on_device": False,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "query": {"type": "string", "description": "What to search for (e.g. gas station, coffee)"},
+ "origin": {"type": "string", "description": "Starting point of the route"},
+ "destination": {"type": "string", "description": "End point of the route"},
+ },
+ "required": ["query", "origin", "destination"],
+ },
+}
+
+TOOL_GET_CURRENT_LOCATION = {
+ "name": "get_current_location",
+ "description": "Get the user's current GPS location and return their address. Use when the user asks where they are, what their location is, or requests their current address.",
+ "on_device": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "format": {"type": "string", "description": "Output format: 'full' for full address (default) or 'short' for city/neighborhood only"}
+ },
+ "required": [],
+ },
+}
+
+# ── Grouped sets for different use contexts ───────────────────────────────────
+
+# Full tool set available to the agent
+ALL_TOOLS = [
+ TOOL_SEND_MESSAGE,
+ TOOL_SET_ALARM,
+ TOOL_SET_TIMER,
+ TOOL_CREATE_REMINDER,
+ TOOL_PLAY_MUSIC,
+ TOOL_SEARCH_CONTACTS,
+ TOOL_GET_WEATHER,
+ TOOL_GET_DIRECTIONS,
+ TOOL_FIND_NEARBY,
+ TOOL_SEARCH_ALONG_ROUTE,
+ TOOL_GET_CURRENT_LOCATION,
+]
+
+# Subset for on-device-capable tasks
+LOCAL_TOOLS = [t for t in ALL_TOOLS if t.get("on_device")]
+
+TOOL_MAP = {t["name"]: t for t in ALL_TOOLS}
diff --git a/main.py b/main.py
index 4cea3430..3676cea1 100644
--- a/main.py
+++ b/main.py
@@ -1,54 +1,43 @@
-
import sys
sys.path.insert(0, "cactus/python/src")
functiongemma_path = "cactus/weights/functiongemma-270m-it"
-
-import json, os, time
+import json, os, time, re
from cactus import cactus_init, cactus_complete, cactus_destroy
from google import genai
from google.genai import types
-def generate_cactus(messages, tools):
+def generate_cactus(messages, tools, system_msg="You are a helpful assistant that can use tools."):
"""Run function calling on-device via FunctionGemma + Cactus."""
model = cactus_init(functiongemma_path)
-
- cactus_tools = [{
- "type": "function",
- "function": t,
- } for t in tools]
-
+ cactus_tools = [{"function": t} for t in tools]
raw_str = cactus_complete(
model,
- [{"role": "system", "content": "You are a helpful assistant that can use tools."}] + messages,
+ [{"role": "developer", "content": system_msg}] + messages,
tools=cactus_tools,
force_tools=True,
max_tokens=256,
- stop_sequences=["<|im_end|>", ""],
+ stop_sequences=[""],
+ confidence_threshold=0.0,
)
-
cactus_destroy(model)
-
try:
- raw = json.loads(raw_str)
+ patched_str = re.sub(r'([:\s\[,])0+(\d+)', r'\1\2', raw_str)
+ patched_str = re.sub(r'"true"|"false"|"TRUE"|"FALSE"', lambda m: m.group(0).lower().replace('"', ''), patched_str)
+ raw = json.loads(patched_str)
except json.JSONDecodeError:
- return {
- "function_calls": [],
- "total_time_ms": 0,
- "confidence": 0,
- }
-
+ return {"function_calls": [], "total_time_ms": 0, "confidence": 0, "cloud_handoff": False}
return {
"function_calls": raw.get("function_calls", []),
- "total_time_ms": raw.get("total_time_ms", 0),
- "confidence": raw.get("confidence", 0),
+ "total_time_ms": raw.get("total_time_ms", 0),
+ "confidence": raw.get("confidence", 0),
+ "cloud_handoff": raw.get("cloud_handoff", False),
}
def generate_cloud(messages, tools):
"""Run function calling via Gemini Cloud API."""
client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY"))
-
gemini_tools = [
types.Tool(function_declarations=[
types.FunctionDeclaration(
@@ -66,19 +55,14 @@ def generate_cloud(messages, tools):
for t in tools
])
]
-
contents = [m["content"] for m in messages if m["role"] == "user"]
-
start_time = time.time()
-
gemini_response = client.models.generate_content(
- model="gemini-2.0-flash",
+ model="gemini-2.5-flash",
contents=contents,
config=types.GenerateContentConfig(tools=gemini_tools),
)
-
total_time_ms = (time.time() - start_time) * 1000
-
function_calls = []
for candidate in gemini_response.candidates:
for part in candidate.content.parts:
@@ -87,25 +71,204 @@ def generate_cloud(messages, tools):
"name": part.function_call.name,
"arguments": dict(part.function_call.args),
})
-
- return {
- "function_calls": function_calls,
- "total_time_ms": total_time_ms,
- }
+ return {"function_calls": function_calls, "total_time_ms": total_time_ms}
def generate_hybrid(messages, tools, confidence_threshold=0.99):
- """Baseline hybrid inference strategy; fall back to cloud if Cactus Confidence is below threshold."""
+
+ # ══════════════════════════════════════════════════════════
+ # CHECKPOINT 1 — PRE-FLIGHT
+ # Analyze the request before calling any model.
+ # Uses 5 signals to decide if this is too complex for local.
+ # Zero model calls — pure text analysis, runs in microseconds.
+ # ══════════════════════════════════════════════════════════
+
+ # Get user message
+ user_message = ""
+ for m in reversed(messages):
+ if m.get("role") == "user":
+ user_message = m.get("content", "")
+ break
+ msg = user_message.lower()
+
+ # -- Signal 1: Message length --
+ word_count = len(user_message.split())
+ if word_count <= 8:
+ s_length = 0.0
+ elif word_count <= 20:
+ s_length = 0.2
+ elif word_count <= 40:
+ s_length = 0.5
+ else:
+ s_length = 0.8
+
+ # -- Signal 2: Action verb count --
+ action_verbs = [
+ "look up", "send", "text", "get", "check",
+ "find", "set", "create", "remind", "play",
+ "start", "search", "book", "wake", "call"
+ ]
+ found_verbs = []
+ for verb in sorted(action_verbs, key=len, reverse=True):
+ if " " in verb:
+ if verb in msg: found_verbs.append(verb)
+ else:
+ if re.search(rf"\b{verb}\b", msg): found_verbs.append(verb)
+ verb_count = len(found_verbs)
+ if verb_count <= 1: s_verbs = 0.0
+ elif verb_count == 2: s_verbs = 0.8
+ else: s_verbs = 1.0
+
+ # -- Explicit multi-step signal --
+ s_multi = 1.0 if (" and " in msg and verb_count > 1) or verb_count > 1 else 0.0
+
+ # -- Signal 3: Negations and conditionals --
+ # Small models ignore these and produce wrong calls.
+ neg_patterns = [r"\bnot\b", r"\bnever\b", r"\bexcept\b", r"\bwithout\b", r"\bno\b"]
+ cond_patterns = [r"\bif\b", r"\bunless\b", r"\bonly\s+when\b", r"\bonly\s+if\b", r"\bwhen\b"]
+ neg_cond_hits = sum(1 for p in neg_patterns + cond_patterns if re.search(p, msg))
+ if neg_cond_hits == 0:
+ s_neg = 0.0
+ elif neg_cond_hits == 1:
+ s_neg = 0.3
+ elif neg_cond_hits == 2:
+ s_neg = 0.6
+ else:
+ s_neg = 0.9
+
+ # -- Signal 4: Tool count --
+ # More tools = harder selection for a small model.
+ tool_count = len(tools)
+ if tool_count <= 2:
+ s_tools = 0.0
+ elif tool_count <= 5:
+ s_tools = 0.2
+ elif tool_count <= 10:
+ s_tools = 0.5
+ else:
+ s_tools = 0.8
+
+ # -- Signal 5: Tool name/description similarity --
+ # Similar tools (set_alarm vs set_timer) cause confusion.
+ def jaccard(a, b):
+ wa, wb = set(a.lower().split()), set(b.lower().split())
+ return len(wa & wb) / len(wa | wb) if wa and wb else 0.0
+
+ descs = [f"{t.get('name','')} {t.get('description','')}" for t in tools]
+ max_sim = 0.0
+ for i in range(len(descs)):
+ for j in range(i + 1, len(descs)):
+ max_sim = max(max_sim, jaccard(descs[i], descs[j]))
+ if max_sim < 0.2:
+ s_sim = 0.0
+ elif max_sim < 0.4:
+ s_sim = 0.3
+ elif max_sim < 0.6:
+ s_sim = 0.6
+ else:
+ s_sim = 0.9
+
+ # -- Weighted composite score --
+ score = (
+ s_length * 0.10 +
+ s_verbs * 0.20 +
+ s_multi * 0.40 +
+ s_neg * 0.20 +
+ s_tools * 0.10 +
+ s_sim * 0.10
+ )
+
+ # -- Route to cloud immediately if too complex --
+ if score >= 0.40:
+ cloud = generate_cloud(messages, tools)
+ cloud["source"] = f"cloud (preflight score={score:.2f})"
+ return cloud
+
+ # ══════════════════════════════════════════════════════════
+ # CHECKPOINT 2 — RUN LOCAL + POST-FLIGHT VALIDATION
+ # Run FunctionGemma locally, then validate the output.
+ # Check: valid function name, required params present, types ok.
+ # ══════════════════════════════════════════════════════════
local = generate_cactus(messages, tools)
+ available_names = {t["name"] for t in tools}
- if local["confidence"] >= confidence_threshold:
+ def is_valid(result):
+ calls = result.get("function_calls", [])
+ if not calls:
+ return False, "no function calls returned"
+ tools_by_name = {t["name"]: t for t in tools}
+ for call in calls:
+ name = call.get("name", "")
+ args = call.get("arguments", {})
+ if name not in tools_by_name:
+ return False, f"hallucinated tool name: {name}"
+ required = tools_by_name[name].get("parameters", {}).get("required", [])
+ for param in required:
+ if param not in args:
+ return False, f"missing required param '{param}' in {name}"
+ props = tools_by_name[name].get("parameters", {}).get("properties", {})
+ for param, value in args.items():
+ if param not in props:
+ continue
+ expected_type = props[param].get("type", "")
+ if expected_type == "integer" and not isinstance(value, int):
+ try:
+ int(str(value))
+ except (ValueError, TypeError):
+ return False, f"param '{param}' not coercible to int"
+ elif expected_type == "number" and not isinstance(value, (int, float)):
+ try:
+ float(str(value))
+ except (ValueError, TypeError):
+ return False, f"param '{param}' not coercible to number"
+ elif expected_type == "string":
+ if str(value).strip() == "" and param in required:
+ return False, f"required string param '{param}' is empty"
+ elif str(value).strip() != "":
+ val_clean = re.sub(r'[^\w\s]', '', str(value).lower()).strip()
+ msg_clean = re.sub(r'[^\w\s]', '', msg).strip()
+ if val_clean and val_clean not in msg_clean:
+ words = val_clean.split()
+ match_count = sum(1 for w in words if w in msg_clean)
+ if match_count == 0:
+ return False, f"hallucinated string not in prompt: {value}"
+ return True, "ok"
+
+ valid, reason = is_valid(local)
+ if valid:
+ local["function_calls"] = [
+ c for c in local["function_calls"] if c.get("name") in available_names
+ ]
local["source"] = "on-device"
return local
+ # ══════════════════════════════════════════════════════════
+ # CHECKPOINT 3 — RETRY LOCALLY WITH STRONGER PROMPT
+ # Before paying for a cloud call, retry once locally with
+ # a more explicit system prompt. Costs ~300ms but free.
+ # ══════════════════════════════════════════════════════════
+ retry_system = (
+ "You MUST call one of the provided tools. "
+ "Do not write any text. Only call the most relevant tool."
+ )
+ retry = generate_cactus(messages, tools, system_msg=retry_system)
+ valid_retry, retry_reason = is_valid(retry)
+ if valid_retry:
+ retry["function_calls"] = [
+ c for c in retry["function_calls"] if c.get("name") in available_names
+ ]
+ retry["source"] = "on-device (retry)"
+ retry["total_time_ms"] += local["total_time_ms"]
+ return retry
+
+ # ══════════════════════════════════════════════════════════
+ # FALLBACK — CLOUD
+ # Both local attempts failed validation. Escalate to Gemini.
+ # ══════════════════════════════════════════════════════════
cloud = generate_cloud(messages, tools)
- cloud["source"] = "cloud (fallback)"
- cloud["local_confidence"] = local["confidence"]
- cloud["total_time_ms"] += local["total_time_ms"]
+ cloud["source"] = "cloud (postflight fallback)"
+ cloud["local_confidence"] = local.get("confidence", 0)
+ cloud["total_time_ms"] += local["total_time_ms"] + retry["total_time_ms"]
return cloud
@@ -125,7 +288,6 @@ def print_result(label, result):
############## Example usage ##############
-
if __name__ == "__main__":
tools = [{
"name": "get_weather",
@@ -133,18 +295,12 @@ def print_result(label, result):
"parameters": {
"type": "object",
"properties": {
- "location": {
- "type": "string",
- "description": "City name",
- }
+ "location": {"type": "string", "description": "City name"}
},
"required": ["location"],
},
}]
-
- messages = [
- {"role": "user", "content": "What is the weather in San Francisco?"}
- ]
+ messages = [{"role": "user", "content": "What is the weather in San Francisco?"}]
on_device = generate_cactus(messages, tools)
print_result("FunctionGemma (On-Device Cactus)", on_device)
diff --git a/run_benchmark1.py b/run_benchmark1.py
new file mode 100644
index 00000000..6f3e0d25
--- /dev/null
+++ b/run_benchmark1.py
@@ -0,0 +1,5 @@
+import sys
+sys.path.insert(0, "cactus/python/src")
+import main as _m
+sys.modules["main"] = _m
+import benchmark
diff --git a/test_apis.py b/test_apis.py
new file mode 100644
index 00000000..27365abb
--- /dev/null
+++ b/test_apis.py
@@ -0,0 +1,57 @@
+"""Test real API integrations in executor.py"""
+import sys, os
+sys.path.insert(0, "cactus/python/src")
+sys.path.insert(0, ".")
+
+# Load from environment — set GOOGLE_MAPS_API_KEY in your shell or .env file
+if not os.environ.get("GOOGLE_MAPS_API_KEY"):
+ raise EnvironmentError("GOOGLE_MAPS_API_KEY is not set. See .env.example.")
+
+from handsfree.executor import execute
+
+print("=== 1. Weather (Open-Meteo, free, no key) ===")
+r = execute([{"name": "get_weather", "arguments": {"location": "San Francisco"}}])[0]["result"]
+if r["status"] == "ok":
+ print(f" {r['icon']} {r['location']}: {r['condition']}, {r['temp_f']}°F / {r['temp_c']}°C, {r['humidity']} humidity, wind {r['wind']}")
+else:
+ print(f" ❌ {r['error']}")
+
+print("\n=== 2. Directions (Google Maps) ===")
+r = execute([{"name": "get_directions", "arguments": {
+ "origin": "Civic Center San Francisco",
+ "destination": "Golden Gate Bridge",
+ "mode": "driving",
+}}])[0]["result"]
+if r["status"] == "ok":
+ print(f" {r['icon']} {r['from']} → {r['to']}")
+ print(f" Duration: {r['duration']} | Distance: {r['distance']}")
+ for s in r.get("steps", [])[:3]:
+ print(f" • {s}")
+ print(f" URL: {r['maps_url']}")
+else:
+ print(f" ❌ {r['error']}")
+
+print("\n=== 3. Find Nearby (Google Places) ===")
+r = execute([{"name": "find_nearby", "arguments": {
+ "category": "coffee",
+ "location": "Union Square, San Francisco",
+}}])[0]["result"]
+if r["status"] == "ok":
+ print(f" {r['icon']} {r['category']} near {r['near']}")
+ for p in r["results"]:
+ print(f" • {p['name']} — {p['rating']}⭐ — {p['address']} ({p['status']})")
+else:
+ print(f" ❌ {r['error']}")
+
+print("\n=== 4. Search Along Route (Google Places + Directions) ===")
+r = execute([{"name": "search_along_route", "arguments": {
+ "query": "gas station",
+ "origin": "San Jose, CA",
+ "destination": "San Francisco, CA",
+}}])[0]["result"]
+if r["status"] == "ok":
+ print(f" {r['icon']} {r['query']} along {r['route']} ({r['route_duration']}, {r['route_distance']})")
+ for p in r["results"][:4]:
+ print(f" • {p['name']} — {p['address']} — {p['rating']}⭐")
+else:
+ print(f" ❌ {r['error']}")
diff --git a/test_handsfree.py b/test_handsfree.py
new file mode 100644
index 00000000..145a7c29
--- /dev/null
+++ b/test_handsfree.py
@@ -0,0 +1,44 @@
+"""Quick end-to-end test for HandsFree modules."""
+import sys
+sys.path.insert(0, "cactus/python/src")
+sys.path.insert(0, ".")
+
+from handsfree.tools import ALL_TOOLS
+from handsfree.location import detect_location_intent, inject_location_into_command
+from handsfree.executor import execute
+from main import generate_hybrid
+
+tools = [{k: v for k, v in t.items() if k != "on_device"} for t in ALL_TOOLS]
+
+print("=== Executor test ===")
+test_calls = [
+ {"name": "set_alarm", "arguments": {"hour": 7, "minute": 30, "label": "Wake up"}},
+ {"name": "play_music", "arguments": {"song": "Bohemian Rhapsody", "artist": "Queen"}},
+ {"name": "set_timer", "arguments": {"minutes": 10, "label": "Pasta"}},
+ {"name": "create_reminder", "arguments": {"title": "Call John", "time": "3:00 PM"}},
+ {"name": "send_message", "arguments": {"recipient": "Mom", "message": "On my way!"}},
+ {"name": "get_weather", "arguments": {"location": "San Francisco"}},
+ {"name": "get_directions", "arguments": {"destination": "Golden Gate Bridge", "origin": "Civic Center"}},
+ {"name": "find_nearby", "arguments": {"category": "coffee", "location": "here"}},
+ {"name": "share_location", "arguments": {"recipient": "Dad", "location": "37.7749,-122.4194"}},
+]
+for call in test_calls:
+ r = execute([call])[0]["result"]
+ print(f" {call['name']:20s}: {r.get('icon','')} status={r.get('status','?')}")
+
+print("\n=== Location detection ===")
+for cmd in ["send my location to Mom", "what time is it", "share my location with John"]:
+ detected = detect_location_intent(cmd)
+ print(f" {detected!s:5} | {cmd}")
+
+print("\n=== Hybrid routing (3 commands) ===")
+for cmd in ["Set an alarm for 7:30 AM", "Play Bohemian Rhapsody", "Set a timer for 10 minutes"]:
+ msgs = [{"role": "user", "content": cmd}]
+ result = generate_hybrid(msgs, tools)
+ calls = result.get("function_calls", [])
+ src = result.get("source", "?")
+ fn = calls[0]["name"] if calls else "NO CALL"
+ args = calls[0]["arguments"] if calls else {}
+ print(f" [{src:25s}] {cmd:35s} → {fn}({args})")
+
+print("\nAll tests passed ✅")
diff --git a/test_location_intent.py b/test_location_intent.py
new file mode 100644
index 00000000..13fdf190
--- /dev/null
+++ b/test_location_intent.py
@@ -0,0 +1,24 @@
+import sys
+sys.path.insert(0, '/Users/saikumarkatteramini/Downloads/functiongemma-hackathon')
+from handsfree.location import is_location_share, is_location_query
+
+tests = [
+ ('what is my current location', False, True),
+ ('where am I', False, True),
+ ('what is my address', False, True),
+ ('send my location to Mom', True, False),
+ ('share my location with John', True, False),
+ ('tell Sarah where I am', True, False),
+ ('get directions to Golden Gate', False, False),
+ ('play Bohemian Rhapsody', False, False),
+]
+ok = True
+for cmd, exp_share, exp_query in tests:
+ share = is_location_share(cmd)
+ query = is_location_query(cmd)
+ match = (share == exp_share) and (query == exp_query)
+ status = "OK " if match else "FAIL"
+ print(f" {status} share={str(share):5} query={str(query):5} | {cmd}")
+ ok = ok and match
+print()
+print("All passed!" if ok else "SOME FAILED")