Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ Notes:
- `pipeline.greeting` plays when a session starts. `pipeline.greeting_outgoing` is used for outbound SIP calls when present.
- `pipeline.debug = true` emits timing events over the DataChannel.
- `stt.provider = "openai"` uses Whisper-style final transcription instead of streaming partials.
- `test.turn_endpoint = true` enables a dev-only `POST /test-turn` text regression harness. Keep this disabled on public deployments.
- `llm.provider = "ollama"` uses a local Ollama instance instead of OpenAI. Make sure Ollama is running and the specified model is pulled (e.g., `ollama pull llama3.2`).
- `stt.provider = "vibevoice"` and `tts.provider = "vibevoice"` use local VibeVoice models. Start the Python servers first (see [Local VibeVoice Setup](#local-vibevoice-setup)).
- `rag.provider` enables built-in RAG. When set, the server embeds each user utterance and retrieves the top-k most relevant chunks from your vector store before calling the LLM — all in a single LLM pass with no tool-call overhead.
Expand Down
5 changes: 4 additions & 1 deletion config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ greeting = "" # Spoken when a session connects
greeting_outgoing = "" # Spoken for outbound SIP calls; falls back to greeting
debug = false # Emit timing events over the DataChannel

[test]
turn_endpoint = false # Dev-only: enable POST /test-turn text regression harness. Do not expose publicly.

# Provider selection

[stt]
Expand Down Expand Up @@ -73,4 +76,4 @@ embedding_model = "text-embedding-3-small" # optional, this is the default
[supabase]
url = "https://xxx.supabase.co"
api_key = "your-service-role-key"
function = "match_documents" # optional, this is the default
function = "match_documents" # optional, this is the default
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
Server ServerConfig `toml:"server"`
Plugins PluginsConfig `toml:"plugins"`
Pipeline PipelineConfig `toml:"pipeline"`
Test TestConfig `toml:"test"`
STT STTConfig `toml:"stt"`
LLM LLMConfig `toml:"llm"`
TTS TTSConfig `toml:"tts"`
Expand All @@ -39,6 +40,10 @@ type PipelineConfig struct {
Debug bool `toml:"debug"` // Emit per-turn timing events over the DataChannel
}

type TestConfig struct {
TurnEndpoint bool `toml:"turn_endpoint"` // Enable dev-only POST /test-turn text harness. Disabled by default.
}

type ServerConfig struct {
Port string `toml:"port"`
PublicIP string `toml:"public_ip"` // Public IP for ICE candidates when behind NAT (e.g., EC2). Leave empty for local/direct connections.
Expand Down
244 changes: 244 additions & 0 deletions internal/testturn/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package testturn

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"

"github.com/streamcoreai/server/internal/config"
"github.com/streamcoreai/server/internal/llm"
"github.com/streamcoreai/server/internal/plugin"
"github.com/streamcoreai/server/internal/rag"
)

const maxRequestBytes = 1 << 20
const unsupportedVisionTool = "vision.analyze"

type Message struct {
Role string `json:"role"`
Text string `json:"text"`
At string `json:"at,omitempty"`
}

type TurnRequest struct {
Text string `json:"text,omitempty"`
CustomerText string `json:"customerText,omitempty"`
Messages []Message `json:"messages,omitempty"`
}

type Event struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Stage string `json:"stage,omitempty"`
Ms int64 `json:"ms,omitempty"`
}

type TurnResponse struct {
Spoken string `json:"spoken"`
Events []Event `json:"events,omitempty"`
LatencyMs int64 `json:"latencyMs"`
}

type turnRunner func(context.Context, TurnRequest) (TurnResponse, error)

// NewHandler returns the disabled-by-default text turn harness used by local
// regression tools. It bypasses WebRTC, STT, and TTS, but reuses the configured
// LLM, plugins, skills, and optional RAG context.
func NewHandler(cfg *config.Config, pluginMgr *plugin.Manager, ragClient rag.Client) http.HandlerFunc {
return newHTTPHandler(newAgent(cfg, pluginMgr, ragClient).run)
}

func newHTTPHandler(run turnRunner) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method not allowed"})
return
}

var req TurnRequest
decoder := json.NewDecoder(http.MaxBytesReader(w, r.Body, maxRequestBytes))
if err := decoder.Decode(&req); err != nil {
var maxBytesErr *http.MaxBytesError
if errors.As(err, &maxBytesErr) {
writeJSON(w, http.StatusRequestEntityTooLarge, map[string]string{"error": "request body too large"})
return
}
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON request"})
return
Comment on lines +63 to +72
}

if req.latestCustomerText() == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "text or customerText is required"})
return
}

resp, err := run(r.Context(), req)
if err != nil {
log.Printf("[test-turn] error: %v", err)
writeJSON(w, http.StatusBadGateway, map[string]string{"error": "test turn failed"})
return
}

writeJSON(w, http.StatusOK, resp)
}
}

type agent struct {
cfg *config.Config
pluginMgr *plugin.Manager
ragClient rag.Client
}

func newAgent(cfg *config.Config, pluginMgr *plugin.Manager, ragClient rag.Client) *agent {
return &agent{cfg: cfg, pluginMgr: pluginMgr, ragClient: ragClient}
}

func (a *agent) run(ctx context.Context, req TurnRequest) (TurnResponse, error) {
started := time.Now()

client, err := llm.NewClient(a.cfg)
if err != nil {
return TurnResponse{}, err
}
a.configureClient(client)

input := req.prompt()
if a.ragClient != nil {
chunks, err := a.ragClient.Search(ctx, req.latestCustomerText(), 0)
if err != nil {
log.Printf("[test-turn] RAG search error: %v", err)
} else if len(chunks) > 0 {
input = fmt.Sprintf("[Context:\n%s]\n\n%s", strings.Join(chunks, "\n---\n"), input)
}
}

events := make([]Event, 0, 8)
spoken, err := client.Chat(ctx, input, func(chunk string) {
events = append(events, Event{Type: "response", Text: chunk})
}, nil)
if err != nil {
return TurnResponse{}, err
}
spoken = strings.TrimSpace(spoken)
if spoken == "" {
return TurnResponse{}, fmt.Errorf("LLM returned an empty response")
}

latency := time.Since(started).Milliseconds()
events = append(events, Event{Type: "timing", Stage: "llm_complete", Ms: latency})
return TurnResponse{
Spoken: spoken,
Events: events,
LatencyMs: latency,
}, nil
}

func (a *agent) configureClient(client llm.Client) {
if a.pluginMgr == nil {
return
}

tools := a.pluginMgr.Tools()
if len(tools) > 0 {
defs := make([]llm.ToolDefinition, 0, len(tools))
for _, tool := range tools {
if tool.Name() == unsupportedVisionTool {
log.Printf("[test-turn] skipping unsupported pipeline-dependent tool: %s", tool.Name())
continue
}
defs = append(defs, llm.ToolDefinition{
Name: tool.Name(),
Description: tool.Description(),
Parameters: tool.Parameters(),
})
}
if len(defs) > 0 {
client.SetTools(defs)
client.SetToolHandler(func(callCtx context.Context, call llm.ToolCall) (string, error) {
tool, ok := a.pluginMgr.GetTool(call.Name)
if !ok {
return "", fmt.Errorf("unknown tool: %s", call.Name)
}
return tool.Execute(call.Arguments)
})
}
}

if skillsPrompt := a.pluginMgr.SkillsPrompt(); skillsPrompt != "" {
client.AppendSystemPrompt(skillsPrompt)
}
}

func (req TurnRequest) latestCustomerText() string {
if text := strings.TrimSpace(req.CustomerText); text != "" {
return text
}
if text := strings.TrimSpace(req.Text); text != "" {
return text
}
for i := len(req.Messages) - 1; i >= 0; i-- {
if strings.EqualFold(req.Messages[i].Role, "customer") || strings.EqualFold(req.Messages[i].Role, "user") {
return strings.TrimSpace(req.Messages[i].Text)
}
}
return ""
}

func (req TurnRequest) prompt() string {
messages := req.normalizedMessages()
if len(messages) == 0 {
return req.latestCustomerText()
}

var b strings.Builder
b.WriteString("Conversation transcript:\n")
for _, msg := range messages {
switch strings.ToLower(strings.TrimSpace(msg.Role)) {
case "assistant":
b.WriteString("Assistant: ")
default:
b.WriteString("User: ")
}
b.WriteString(strings.TrimSpace(msg.Text))
b.WriteString("\n")
}
b.WriteString("\nRespond to the latest user turn. Keep the reply concise and natural for voice.")
return b.String()
}

func (req TurnRequest) normalizedMessages() []Message {
messages := make([]Message, 0, len(req.Messages)+1)
for _, msg := range req.Messages {
text := strings.TrimSpace(msg.Text)
if text == "" {
continue
}
role := strings.TrimSpace(msg.Role)
if role == "" {
role = "user"
}
messages = append(messages, Message{Role: role, Text: text, At: msg.At})
}

latest := req.latestCustomerText()
if latest == "" {
return messages
}
if len(messages) == 0 || strings.TrimSpace(messages[len(messages)-1].Text) != latest {
messages = append(messages, Message{Role: "user", Text: latest})
}
return messages
}

func writeJSON(w http.ResponseWriter, status int, body interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(body); err != nil {
log.Printf("[test-turn] write response error: %v", err)
}
}
Loading