diff --git a/.env.example b/.env.example index 3a8df4d..b6bd4ea 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,9 @@ OPENROUTER_API_KEY=your_openrouter_api_key_here + +# Langfuse tracing (optional, for LLM observability) +LANGFUSE_HOST=http://localhost:3000 +LANGFUSE_PUBLIC_KEY=pk-lf-lapp-dev +LANGFUSE_SECRET_KEY=sk-lf-lapp-dev + +# OpenTelemetry tracing (optional, for distributed tracing with Jaeger) +OTEL_TRACING_ENABLED=true diff --git a/CLAUDE.md b/CLAUDE.md index 5d4fc74..54fc64c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -23,10 +23,9 @@ go test -v -run TestFunctionName ./pkg/pattern/ ## CLI Usage ```bash -go run ./cmd/lapp/ ingest [--db ] [--model ] -go run ./cmd/lapp/ templates [--db ] -go run ./cmd/lapp/ analyze [question] +go run ./cmd/lapp/ analyze [question] [--model ] [--db ] go run ./cmd/lapp/ debug workspace [-o ] +go run ./cmd/lapp/ debug ingest [--model ] [--db ] go run ./cmd/lapp/ debug run [question] [--model ] ``` @@ -79,7 +78,7 @@ Builds a workspace directory with pre-processed files, then runs an eino ADK age ## Environment Variables -- `OPENROUTER_API_KEY`: Required for `ingest`, `analyze`, and `debug run` +- `OPENROUTER_API_KEY`: Required for `analyze`, `debug ingest`, and `debug run` - `MODEL_NAME`: Override default LLM model (default: `google/gemini-3-flash-preview`) - `.env` file is auto-loaded via godotenv @@ -91,3 +90,4 @@ Builds a workspace directory with pre-processed files, then runs an eino ADK age - `nolint` directives go on the line above the target, not as end-of-line comments - Compile-time interface guards: `var _ MyInterface = (*MyImpl)(nil)` +- Always use `make build` to verify compilation, never bare `go build` (it drops a binary in the project root) diff --git a/cmd/lapp/analyze.go b/cmd/lapp/analyze.go index 5d6ef8e..7650520 100644 --- a/cmd/lapp/analyze.go +++ b/cmd/lapp/analyze.go @@ -10,6 +10,12 @@ import ( "github.com/spf13/cobra" "github.com/strrl/lapp/pkg/analyzer" "github.com/strrl/lapp/pkg/multiline" + "github.com/strrl/lapp/pkg/pattern" + "github.com/strrl/lapp/pkg/semantic" + "github.com/strrl/lapp/pkg/store" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) var analyzeModel string @@ -18,8 +24,9 @@ func analyzeCmd() *cobra.Command { cmd := &cobra.Command{ Use: "analyze [question]", Short: "Analyze a log file using an AI agent to find root causes", - Long: `Read a log file, parse it through the template pipeline, then use an AI agent -to autonomously explore the processed logs and provide analysis. + Long: `Read a log file, run the full ingest pipeline (Drain clustering, semantic labeling, +DuckDB storage), then use an AI agent to autonomously explore the processed logs +and provide analysis. Requires OPENROUTER_API_KEY environment variable to be set. @@ -46,6 +53,11 @@ func runAnalyze(cmd *cobra.Command, args []string) error { question = args[1] } + ctx, span := otel.Tracer("lapp/cmd").Start(cmd.Context(), "cmd.Analyze") + defer span.End() + + span.SetAttributes(attribute.String("log.file", logFile)) + // Read all lines slog.Info("Reading logs...") lines, err := readLines(logFile) @@ -56,20 +68,67 @@ func runAnalyze(cmd *cobra.Command, args []string) error { if err != nil { return errors.Errorf("multiline detector: %w", err) } - merged := multiline.MergeSlice(lines, detector) + merged := multiline.MergeSlice(ctx, lines, detector) mergedLines := make([]string, len(merged)) for i, m := range merged { mergedLines[i] = m.Content } slog.Info("Read lines", "lines", len(lines), "merged_entries", len(mergedLines)) + // Refuse to reuse an existing database to avoid silently mixing datasets + if _, err := os.Stat(dbPath); err == nil { + return errors.Errorf("database %q already exists; remove it first or choose a different --db path", dbPath) + } + + // Ingest pipeline: Drain clustering + semantic labeling + DuckDB storage + drainParser, err := pattern.NewDrainParser() + if err != nil { + return errors.Errorf("drain parser: %w", err) + } + + s, err := store.NewDuckDBStore(dbPath) + if err != nil { + return errors.Errorf("store: %w", err) + } + defer func() { _ = s.Close() }() + + if err := s.Init(ctx); err != nil { + return errors.Errorf("store init: %w", err) + } + + semanticIDMap, patternCount, templateCount, err := discoverAndSavePatterns(ctx, s, drainParser, mergedLines, semantic.Config{ + APIKey: apiKey, + Model: analyzeModel, + }) + if err != nil { + return err + } + + templates, err := drainParser.Templates(ctx) + if err != nil { + return errors.Errorf("drain templates: %w", err) + } + if err := storeLogsWithLabels(ctx, s, merged, templates, semanticIDMap); err != nil { + return err + } + + slog.Info("Ingestion complete", + "lines", len(mergedLines), + "templates", templateCount, + "patterns_with_2+_matches", patternCount, + ) + slog.Info("Database stored", "path", dbPath) + + // Run AI agent analysis config := analyzer.Config{ APIKey: apiKey, Model: analyzeModel, } - result, err := analyzer.Analyze(cmd.Context(), config, mergedLines, question) + result, err := analyzer.AnalyzeWithTemplates(ctx, config, mergedLines, templates, question) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } diff --git a/cmd/lapp/debug.go b/cmd/lapp/debug.go index 2ba74be..30abf5c 100644 --- a/cmd/lapp/debug.go +++ b/cmd/lapp/debug.go @@ -10,6 +10,9 @@ import ( "github.com/strrl/lapp/pkg/analyzer/workspace" "github.com/strrl/lapp/pkg/multiline" "github.com/strrl/lapp/pkg/pattern" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) func debugCmd() *cobra.Command { @@ -20,6 +23,7 @@ func debugCmd() *cobra.Command { cmd.AddCommand(debugWorkspaceCmd()) cmd.AddCommand(debugRunCmd()) + cmd.AddCommand(debugIngestCmd()) return cmd } @@ -41,6 +45,11 @@ files (raw.log, summary.txt, errors.txt) in a local directory for inspection.`, func runDebugWorkspace(cmd *cobra.Command, args []string) error { logFile := args[0] + ctx, span := otel.Tracer("lapp/cmd").Start(cmd.Context(), "cmd.DebugWorkspace") + defer span.End() + + span.SetAttributes(attribute.String("log.file", logFile)) + slog.Info("Reading logs...") lines, err := readLines(logFile) if err != nil { @@ -50,7 +59,7 @@ func runDebugWorkspace(cmd *cobra.Command, args []string) error { if err != nil { return errors.Errorf("multiline detector: %w", err) } - merged := multiline.MergeSlice(lines, detector) + merged := multiline.MergeSlice(ctx, lines, detector) mergedLines := make([]string, len(merged)) for i, m := range merged { mergedLines[i] = m.Content @@ -75,10 +84,10 @@ func runDebugWorkspace(cmd *cobra.Command, args []string) error { } slog.Info("Parsing entries", "count", len(mergedLines)) - if err := drainParser.Feed(mergedLines); err != nil { + if err := drainParser.Feed(ctx, mergedLines); err != nil { return errors.Errorf("drain feed: %w", err) } - templates, err := drainParser.Templates() + templates, err := drainParser.Templates(ctx) if err != nil { return errors.Errorf("drain templates: %w", err) } @@ -88,6 +97,8 @@ func runDebugWorkspace(cmd *cobra.Command, args []string) error { } slog.Info("Workspace created", "dir", outDir) + + span.SetStatus(codes.Ok, "") return nil } @@ -108,6 +119,9 @@ Requires OPENROUTER_API_KEY environment variable to be set.`, } func runDebugRun(cmd *cobra.Command, args []string) error { + ctx, span := otel.Tracer("lapp/cmd").Start(cmd.Context(), "cmd.DebugRun") + defer span.End() + apiKey := os.Getenv("OPENROUTER_API_KEY") if apiKey == "" { return errors.Errorf("OPENROUTER_API_KEY environment variable is required") @@ -129,12 +143,16 @@ func runDebugRun(cmd *cobra.Command, args []string) error { Model: debugRunModel, } + span.SetAttributes(attribute.String("workspace.dir", workDir)) + slog.Info("Running agent on workspace", "dir", workDir) - result, err := analyzer.RunAgent(cmd.Context(), config, workDir, question) + result, err := analyzer.RunAgent(ctx, config, workDir, question) if err != nil { return err } slog.Info(result) + + span.SetStatus(codes.Ok, "") return nil } diff --git a/cmd/lapp/ingest.go b/cmd/lapp/ingest.go index 38a91da..0558de5 100644 --- a/cmd/lapp/ingest.go +++ b/cmd/lapp/ingest.go @@ -4,7 +4,6 @@ import ( "context" "log/slog" "os" - "time" "github.com/go-errors/errors" "github.com/spf13/cobra" @@ -13,29 +12,37 @@ import ( "github.com/strrl/lapp/pkg/pattern" "github.com/strrl/lapp/pkg/semantic" "github.com/strrl/lapp/pkg/store" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) -func ingestCmd() *cobra.Command { +func debugIngestCmd() *cobra.Command { var model string cmd := &cobra.Command{ Use: "ingest ", - Short: "Ingest a log file through the parser pipeline into the store", + Short: "Run the ingest pipeline only (Drain + semantic labeling + DuckDB)", Long: "Read a log file, parse each line through Drain, store results in DuckDB, and label patterns with semantic IDs via LLM.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return runIngest(cmd, args, model) + return runDebugIngest(cmd, args, model) }, } cmd.Flags().StringVar(&model, "model", "", "LLM model to use for labeling (default: $MODEL_NAME or google/gemini-3-flash-preview)") return cmd } -func runIngest(cmd *cobra.Command, args []string, model string) error { +func runDebugIngest(cmd *cobra.Command, args []string, model string) error { logFile := args[0] - ctx, cancel := context.WithCancel(cmd.Context()) + parentCtx, cancel := context.WithCancel(cmd.Context()) defer cancel() + ctx, span := otel.Tracer("lapp/cmd").Start(parentCtx, "cmd.DebugIngest") + defer span.End() + + span.SetAttributes(attribute.String("log.file", logFile)) + apiKey := os.Getenv("OPENROUTER_API_KEY") if apiKey == "" { return errors.Errorf("OPENROUTER_API_KEY environment variable is required") @@ -50,7 +57,12 @@ func runIngest(cmd *cobra.Command, args []string, model string) error { if err != nil { return errors.Errorf("multiline detector: %w", err) } - merged := multiline.Merge(ch, detector) + merged := multiline.Merge(ctx, ch, detector) + + // Refuse to reuse an existing database to avoid silently mixing datasets + if _, err := os.Stat(dbPath); err == nil { + return errors.Errorf("database %q already exists; remove it first or choose a different --db path", dbPath) + } drainParser, err := pattern.NewDrainParser() if err != nil { @@ -68,7 +80,7 @@ func runIngest(cmd *cobra.Command, args []string, model string) error { } // Round 1: Collect all lines in memory (no DB writes yet) - mergedLines, err := collectLines(merged) + mergedLines, err := collectLines(ctx, merged) if err != nil { return err } @@ -88,7 +100,7 @@ func runIngest(cmd *cobra.Command, args []string, model string) error { } // Round 2: Match each line to a pattern and store with labels - templates, err := drainParser.Templates() + templates, err := drainParser.Templates(ctx) if err != nil { return errors.Errorf("drain templates: %w", err) } @@ -103,150 +115,7 @@ func runIngest(cmd *cobra.Command, args []string, model string) error { "patterns_with_2+_matches", patternCount, ) slog.Info("Database stored", "path", dbPath) - return nil -} - -func collectLines(merged <-chan multiline.MergeResult) ([]multiline.MergedLine, error) { - var lines []multiline.MergedLine - for rr := range merged { - if rr.Err != nil { - return nil, errors.Errorf("read log: %w", rr.Err) - } - lines = append(lines, *rr.Value) - } - return lines, nil -} - -func discoverAndSavePatterns( - ctx context.Context, - s *store.DuckDBStore, - dp *pattern.DrainParser, - lines []string, - labelCfg semantic.Config, -) (semanticIDMap map[string]string, patternCount, templateCount int, err error) { - if err := dp.Feed(lines); err != nil { - return nil, 0, 0, errors.Errorf("drain feed: %w", err) - } - - templates, err := dp.Templates() - if err != nil { - return nil, 0, 0, errors.Errorf("drain templates: %w", err) - } - - // Filter out single-match patterns (not generalized) - var filtered []pattern.DrainCluster - for _, t := range templates { - if t.Count <= 1 { - continue - } - filtered = append(filtered, t) - } - - semanticIDMap = make(map[string]string) - - if len(filtered) == 0 { - return semanticIDMap, 0, len(templates), nil - } - - // Build labeler inputs with sample lines from in-memory data - inputs := buildLabelInputs(filtered, lines) - slog.Info("Labeling patterns", "count", len(inputs)) - - labels, err := semantic.Label(ctx, labelCfg, inputs) - if err != nil { - return nil, 0, 0, errors.Errorf("label: %w", err) - } - - // Index labels by pattern UUID for lookup - labelMap := make(map[string]semantic.SemanticLabel, len(labels)) - for _, l := range labels { - labelMap[l.PatternUUIDString] = l - } - - // Build store patterns with semantic labels and populate semanticIDMap - var patterns []store.Pattern - for _, t := range filtered { - p := store.Pattern{ - PatternUUIDString: t.ID.String(), - PatternType: "drain", - RawPattern: t.Pattern, - } - if l, ok := labelMap[t.ID.String()]; ok { - p.SemanticID = l.SemanticID - p.Description = l.Description - semanticIDMap[t.ID.String()] = l.SemanticID - } - patterns = append(patterns, p) - } - - if err := s.InsertPatterns(ctx, patterns); err != nil { - return nil, 0, 0, errors.Errorf("insert patterns: %w", err) - } - - return semanticIDMap, len(patterns), len(templates), nil -} - -func storeLogsWithLabels( - ctx context.Context, - s *store.DuckDBStore, - mergedLines []multiline.MergedLine, - templates []pattern.DrainCluster, - semanticIDMap map[string]string, -) error { - var batch []store.LogEntry - for _, ml := range mergedLines { - entry := store.LogEntry{ - LineNumber: ml.StartLine, - EndLineNumber: ml.EndLine, - Timestamp: time.Now(), - Raw: ml.Content, - } - - if tpl, ok := pattern.MatchTemplate(ml.Content, templates); ok { - if sid, found := semanticIDMap[tpl.ID.String()]; found { - entry.Labels = map[string]string{ - "pattern": sid, - "pattern_id": tpl.ID.String(), - } - } - } - - batch = append(batch, entry) - - if len(batch) >= 500 { - if err := s.InsertLogBatch(ctx, batch); err != nil { - return errors.Errorf("insert batch: %w", err) - } - batch = batch[:0] - } - } - - if len(batch) > 0 { - if err := s.InsertLogBatch(ctx, batch); err != nil { - return errors.Errorf("insert batch: %w", err) - } - } + span.SetStatus(codes.Ok, "") return nil } - -func buildLabelInputs(templates []pattern.DrainCluster, lines []string) []semantic.PatternInput { - var inputs []semantic.PatternInput - for _, t := range templates { - var samples []string - for _, line := range lines { - if _, ok := pattern.MatchTemplate(line, []pattern.DrainCluster{t}); ok { - samples = append(samples, line) - if len(samples) >= 3 { - break - } - } - } - inputs = append(inputs, semantic.PatternInput{ - PatternUUIDString: t.ID.String(), - Pattern: t.Pattern, - Samples: samples, - }) - } - return inputs -} diff --git a/cmd/lapp/main.go b/cmd/lapp/main.go index 298f947..ab5063f 100644 --- a/cmd/lapp/main.go +++ b/cmd/lapp/main.go @@ -1,10 +1,12 @@ package main import ( + "context" "os" "github.com/joho/godotenv" "github.com/spf13/cobra" + "github.com/strrl/lapp/pkg/tracing" ) var dbPath string @@ -13,6 +15,9 @@ func main() { // Load .env file if present (does not override existing env vars) _ = godotenv.Load() + flush := tracing.InitLangfuse() + otelShutdown := tracing.InitOTel(context.Background()) + root := &cobra.Command{ Use: "lapp", Short: "Log Auto Pattern Pipeline", @@ -21,12 +26,14 @@ func main() { root.PersistentFlags().StringVar(&dbPath, "db", "lapp.duckdb", "path to DuckDB database") - root.AddCommand(ingestCmd()) - root.AddCommand(templatesCmd()) root.AddCommand(analyzeCmd()) root.AddCommand(debugCmd()) - if err := root.Execute(); err != nil { + err := root.Execute() + otelShutdown() + flush() + + if err != nil { os.Exit(1) } } diff --git a/cmd/lapp/pipeline.go b/cmd/lapp/pipeline.go new file mode 100644 index 0000000..a16a52b --- /dev/null +++ b/cmd/lapp/pipeline.go @@ -0,0 +1,198 @@ +package main + +import ( + "context" + "log/slog" + "time" + + "github.com/go-errors/errors" + "github.com/strrl/lapp/pkg/multiline" + "github.com/strrl/lapp/pkg/pattern" + "github.com/strrl/lapp/pkg/semantic" + "github.com/strrl/lapp/pkg/store" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +func collectLines(ctx context.Context, merged <-chan multiline.MergeResult) ([]multiline.MergedLine, error) { + _, span := otel.Tracer("lapp/pipeline").Start(ctx, "pipeline.CollectLines") + defer span.End() + + var lines []multiline.MergedLine + for rr := range merged { + if rr.Err != nil { + span.RecordError(rr.Err) + span.SetStatus(codes.Error, rr.Err.Error()) + return nil, errors.Errorf("read log: %w", rr.Err) + } + lines = append(lines, *rr.Value) + } + + span.SetAttributes(attribute.Int("line.count", len(lines))) + return lines, nil +} + +func discoverAndSavePatterns( + ctx context.Context, + s *store.DuckDBStore, + dp *pattern.DrainParser, + lines []string, + labelCfg semantic.Config, +) (semanticIDMap map[string]string, patternCount, templateCount int, err error) { + ctx, span := otel.Tracer("lapp/pipeline").Start(ctx, "pipeline.DiscoverAndSavePatterns") + defer span.End() + + if err := dp.Feed(ctx, lines); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, 0, 0, errors.Errorf("drain feed: %w", err) + } + + templates, err := dp.Templates(ctx) + if err != nil { + return nil, 0, 0, errors.Errorf("drain templates: %w", err) + } + + // Filter out single-match patterns (not generalized) + var filtered []pattern.DrainCluster + for _, t := range templates { + if t.Count <= 1 { + continue + } + filtered = append(filtered, t) + } + + semanticIDMap = make(map[string]string) + + if len(filtered) == 0 { + return semanticIDMap, 0, len(templates), nil + } + + // Build labeler inputs with sample lines from in-memory data + inputs := buildLabelInputs(ctx, filtered, lines) + + slog.Info("Labeling patterns", "count", len(inputs)) + + labels, err := semantic.Label(ctx, labelCfg, inputs) + if err != nil { + return nil, 0, 0, errors.Errorf("label: %w", err) + } + + // Index labels by pattern UUID for lookup + labelMap := make(map[string]semantic.SemanticLabel, len(labels)) + for _, l := range labels { + labelMap[l.PatternUUIDString] = l + } + + // Build store patterns with semantic labels and populate semanticIDMap + var patterns []store.Pattern + for _, t := range filtered { + p := store.Pattern{ + PatternUUIDString: t.ID.String(), + PatternType: "drain", + RawPattern: t.Pattern, + } + if l, ok := labelMap[t.ID.String()]; ok { + p.SemanticID = l.SemanticID + p.Description = l.Description + semanticIDMap[t.ID.String()] = l.SemanticID + } + patterns = append(patterns, p) + } + + if err := s.InsertPatterns(ctx, patterns); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, 0, 0, errors.Errorf("insert patterns: %w", err) + } + + span.SetAttributes( + attribute.Int("pattern.count", len(patterns)), + attribute.Int("template.count", len(templates)), + ) + return semanticIDMap, len(patterns), len(templates), nil +} + +func storeLogsWithLabels( + ctx context.Context, + s *store.DuckDBStore, + mergedLines []multiline.MergedLine, + templates []pattern.DrainCluster, + semanticIDMap map[string]string, +) error { + ctx, span := otel.Tracer("lapp/pipeline").Start(ctx, "pipeline.StoreLogsWithLabels") + defer span.End() + + span.SetAttributes(attribute.Int("line.count", len(mergedLines))) + + var batchCount int + var batch []store.LogEntry + for _, ml := range mergedLines { + entry := store.LogEntry{ + LineNumber: ml.StartLine, + EndLineNumber: ml.EndLine, + Timestamp: time.Now(), + Raw: ml.Content, + } + + if tpl, ok := pattern.MatchTemplate(ml.Content, templates); ok { + if sid, found := semanticIDMap[tpl.ID.String()]; found { + entry.Labels = map[string]string{ + "pattern": sid, + "pattern_id": tpl.ID.String(), + } + } + } + + batch = append(batch, entry) + + if len(batch) >= 500 { + if err := s.InsertLogBatch(ctx, batch); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return errors.Errorf("insert batch: %w", err) + } + batch = batch[:0] + batchCount++ + } + } + + if len(batch) > 0 { + if err := s.InsertLogBatch(ctx, batch); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return errors.Errorf("insert batch: %w", err) + } + batchCount++ + } + + span.SetAttributes(attribute.Int("batch.count", batchCount)) + return nil +} + +func buildLabelInputs(ctx context.Context, templates []pattern.DrainCluster, lines []string) []semantic.PatternInput { + _, span := otel.Tracer("lapp/pipeline").Start(ctx, "pipeline.BuildLabelInputs") + defer span.End() + + span.SetAttributes(attribute.Int("template.count", len(templates))) + + var inputs []semantic.PatternInput + for _, t := range templates { + var samples []string + for _, line := range lines { + if _, ok := pattern.MatchTemplate(line, []pattern.DrainCluster{t}); ok { + samples = append(samples, line) + if len(samples) >= 3 { + break + } + } + } + inputs = append(inputs, semantic.PatternInput{ + PatternUUIDString: t.ID.String(), + Pattern: t.Pattern, + Samples: samples, + }) + } + return inputs +} diff --git a/cmd/lapp/templates.go b/cmd/lapp/templates.go deleted file mode 100644 index 99a5158..0000000 --- a/cmd/lapp/templates.go +++ /dev/null @@ -1,84 +0,0 @@ -package main - -import ( - "log/slog" - - "github.com/go-errors/errors" - "github.com/spf13/cobra" - "github.com/strrl/lapp/pkg/store" -) - -func templatesCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "templates", - Short: "List discovered templates", - RunE: runTemplates, - } - return cmd -} - -func runTemplates(cmd *cobra.Command, _ []string) error { - ctx := cmd.Context() - - s, err := store.NewDuckDBStore(dbPath) - if err != nil { - return errors.Errorf("store: %w", err) - } - defer func() { _ = s.Close() }() - - if err := s.Init(ctx); err != nil { - return errors.Errorf("init store: %w", err) - } - - summaries, err := s.PatternSummaries(ctx) - if err != nil { - return errors.Errorf("query: %w", err) - } - - // Check if any summaries have semantic info - hasLabels := false - for _, ts := range summaries { - if ts.SemanticID != "" { - hasLabels = true - break - } - } - - if hasLabels { - for _, ts := range summaries { - semanticID := ts.SemanticID - if semanticID == "" { - semanticID = "-" - } - desc := ts.Description - if desc == "" { - desc = "(not labeled)" - } - pType := ts.PatternType - if pType == "" { - pType = "-" - } - slog.Info("template", - "id", ts.PatternUUIDString, - "type", pType, - "semantic_id", semanticID, - "count", ts.Count, - "description", desc, - ) - } - } else { - for _, ts := range summaries { - pType := ts.PatternType - if pType == "" { - pType = "-" - } - slog.Info("template", - "id", ts.PatternUUIDString, - "type", pType, - "count", ts.Count, - "pattern", ts.Pattern, - ) - } - } - return nil -} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..43e21a0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,185 @@ +# Copied from https://github.com/langfuse/langfuse/blob/main/docker-compose.yml +# Only LANGFUSE_INIT_* values and TELEMETRY_ENABLED are changed for local dev. +# +# Usage: docker compose up -d +# UI: http://localhost:3000 (dev@lapp.local / password) +services: + langfuse-worker: + image: docker.io/langfuse/langfuse-worker:3 + restart: always + depends_on: &langfuse-depends-on + postgres: + condition: service_healthy + minio: + condition: service_healthy + redis: + condition: service_healthy + clickhouse: + condition: service_healthy + ports: + - 127.0.0.1:3030:3030 + environment: &langfuse-worker-env + NEXTAUTH_URL: ${NEXTAUTH_URL:-http://localhost:3000} + DATABASE_URL: ${DATABASE_URL:-postgresql://postgres:postgres@postgres:5432/postgres} + SALT: ${SALT:-mysalt} + ENCRYPTION_KEY: ${ENCRYPTION_KEY:-0000000000000000000000000000000000000000000000000000000000000000} + TELEMETRY_ENABLED: ${TELEMETRY_ENABLED:-false} + LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: ${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-false} + CLICKHOUSE_MIGRATION_URL: ${CLICKHOUSE_MIGRATION_URL:-clickhouse://clickhouse:9000} + CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://clickhouse:8123} + CLICKHOUSE_USER: ${CLICKHOUSE_USER:-clickhouse} + CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD:-clickhouse} + CLICKHOUSE_CLUSTER_ENABLED: ${CLICKHOUSE_CLUSTER_ENABLED:-false} + LANGFUSE_USE_AZURE_BLOB: ${LANGFUSE_USE_AZURE_BLOB:-false} + LANGFUSE_S3_EVENT_UPLOAD_BUCKET: ${LANGFUSE_S3_EVENT_UPLOAD_BUCKET:-langfuse} + LANGFUSE_S3_EVENT_UPLOAD_REGION: ${LANGFUSE_S3_EVENT_UPLOAD_REGION:-auto} + LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID:-minio} + LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} + LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: ${LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT:-http://minio:9000} + LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE:-true} + LANGFUSE_S3_EVENT_UPLOAD_PREFIX: ${LANGFUSE_S3_EVENT_UPLOAD_PREFIX:-events/} + LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: ${LANGFUSE_S3_MEDIA_UPLOAD_BUCKET:-langfuse} + LANGFUSE_S3_MEDIA_UPLOAD_REGION: ${LANGFUSE_S3_MEDIA_UPLOAD_REGION:-auto} + LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID:-minio} + LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} + LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://localhost:9090} + LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE:-true} + LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: ${LANGFUSE_S3_MEDIA_UPLOAD_PREFIX:-media/} + LANGFUSE_S3_BATCH_EXPORT_ENABLED: ${LANGFUSE_S3_BATCH_EXPORT_ENABLED:-false} + LANGFUSE_S3_BATCH_EXPORT_BUCKET: ${LANGFUSE_S3_BATCH_EXPORT_BUCKET:-langfuse} + LANGFUSE_S3_BATCH_EXPORT_PREFIX: ${LANGFUSE_S3_BATCH_EXPORT_PREFIX:-exports/} + LANGFUSE_S3_BATCH_EXPORT_REGION: ${LANGFUSE_S3_BATCH_EXPORT_REGION:-auto} + LANGFUSE_S3_BATCH_EXPORT_ENDPOINT: ${LANGFUSE_S3_BATCH_EXPORT_ENDPOINT:-http://minio:9000} + LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT: ${LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT:-http://localhost:9090} + LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID: ${LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID:-minio} + LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY: ${LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY:-miniosecret} + LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE: ${LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE:-true} + LANGFUSE_INGESTION_QUEUE_DELAY_MS: ${LANGFUSE_INGESTION_QUEUE_DELAY_MS:-} + LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS: ${LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS:-} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_AUTH: ${REDIS_AUTH:-myredissecret} + REDIS_TLS_ENABLED: ${REDIS_TLS_ENABLED:-false} + REDIS_TLS_CA: ${REDIS_TLS_CA:-/certs/ca.crt} + REDIS_TLS_CERT: ${REDIS_TLS_CERT:-/certs/redis.crt} + REDIS_TLS_KEY: ${REDIS_TLS_KEY:-/certs/redis.key} + EMAIL_FROM_ADDRESS: ${EMAIL_FROM_ADDRESS:-} + SMTP_CONNECTION_URL: ${SMTP_CONNECTION_URL:-} + + langfuse-web: + image: docker.io/langfuse/langfuse:3 + restart: always + depends_on: *langfuse-depends-on + ports: + - 3000:3000 + environment: + <<: *langfuse-worker-env + NEXTAUTH_SECRET: ${NEXTAUTH_SECRET:-mysecret} + LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-lapp-org} + LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-lapp} + LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-lapp-project} + LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-lapp} + LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-pk-lf-lapp-dev} + LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-sk-lf-lapp-dev} + LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-dev@lapp.local} + LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-dev} + LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-password} + + clickhouse: + image: docker.io/clickhouse/clickhouse-server + restart: always + user: "101:101" + environment: + CLICKHOUSE_DB: default + CLICKHOUSE_USER: ${CLICKHOUSE_USER:-clickhouse} + CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD:-clickhouse} + volumes: + - langfuse_clickhouse_data:/var/lib/clickhouse + - langfuse_clickhouse_logs:/var/log/clickhouse-server + ports: + - 127.0.0.1:8123:8123 + - 127.0.0.1:9000:9000 + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1 + interval: 5s + timeout: 5s + retries: 10 + start_period: 1s + + minio: + image: cgr.dev/chainguard/minio + restart: always + entrypoint: sh + # create the 'langfuse' bucket before starting the service + command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data' + environment: + MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minio} + MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-miniosecret} + ports: + - 9090:9000 + - 127.0.0.1:9091:9001 + volumes: + - langfuse_minio_data:/data + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 1s + timeout: 5s + retries: 5 + start_period: 1s + + redis: + image: docker.io/redis:7 + restart: always + command: > + --requirepass ${REDIS_AUTH:-myredissecret} + --maxmemory-policy noeviction + ports: + - 127.0.0.1:6379:6379 + volumes: + - langfuse_redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 3s + timeout: 10s + retries: 10 + + postgres: + image: docker.io/postgres:${POSTGRES_VERSION:-17} + restart: always + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 3s + timeout: 3s + retries: 10 + environment: + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} + POSTGRES_DB: ${POSTGRES_DB:-postgres} + TZ: UTC + PGTZ: UTC + ports: + - 127.0.0.1:5432:5432 + volumes: + - langfuse_postgres_data:/var/lib/postgresql/data + + jaeger: + image: jaegertracing/jaeger:latest + restart: always + ports: + - "4317:4317" + - "4318:4318" + - "16686:16686" + environment: + COLLECTOR_OTLP_ENABLED: "true" + +volumes: + langfuse_postgres_data: + driver: local + langfuse_clickhouse_data: + driver: local + langfuse_clickhouse_logs: + driver: local + langfuse_minio_data: + driver: local + langfuse_redis_data: + driver: local diff --git a/go.mod b/go.mod index c47dcf1..2d09de8 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.7 require ( github.com/cloudwego/eino v0.7.35 github.com/cloudwego/eino-ext/adk/backend/local v0.1.1 + github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 github.com/duckdb/duckdb-go/v2 v2.5.5 github.com/go-errors/errors v1.5.1 @@ -12,6 +13,10 @@ require ( github.com/jaeyo/go-drain3 v0.1.2 github.com/joho/godotenv v1.5.1 github.com/spf13/cobra v1.10.2 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 + go.opentelemetry.io/otel v1.40.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 + go.opentelemetry.io/otel/sdk v1.40.0 ) require ( @@ -21,7 +26,11 @@ require ( github.com/bytedance/gopkg v0.1.3 // indirect github.com/bytedance/sonic v1.14.1 // indirect github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect + github.com/cloudwego/eino-ext/libs/acl/langfuse v0.0.0-20251124083837-ce2e7e196f9f // indirect github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13 // indirect github.com/duckdb/duckdb-go-bindings v0.3.3 // indirect github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.3.3 // indirect @@ -32,10 +41,14 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/eino-contrib/jsonschema v1.0.3 // indirect github.com/evanphx/json-patch v0.5.2 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/google/flatbuffers v25.12.19+incompatible // indirect github.com/goph/emperror v0.17.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -60,13 +73,24 @@ require ( github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yargevad/filepathx v1.0.0 // indirect github.com/zeebo/xxh3 v1.1.0 // indirect - golang.org/x/arch v0.11.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect + golang.org/x/arch v0.12.0 // indirect golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect golang.org/x/mod v0.32.0 // indirect + golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/telemetry v0.0.0-20260116145544-c6413dc483f5 // indirect + golang.org/x/text v0.33.0 // indirect golang.org/x/tools v0.41.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/grpc v1.78.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 77e1926..49b6f0f 100644 --- a/go.sum +++ b/go.sum @@ -21,15 +21,25 @@ github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7 github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cloudwego/eino v0.7.35 h1:UpZwHQNh8qgGRxKk2Zzdef68Holk8wPVpwjRDbiLOY8= github.com/cloudwego/eino v0.7.35/go.mod h1:nA8Vacmuqv3pqKBQbTWENBLQ8MmGmPt/WqiyLeB8ohQ= github.com/cloudwego/eino-ext/adk/backend/local v0.1.1 h1:cfagscQRuNH52Rptc2JPIb+4n2Agb6bxUt4RY2xJMrY= github.com/cloudwego/eino-ext/adk/backend/local v0.1.1/go.mod h1:LfFk+VqZk0JOxIyl5RaerYqlFVLyXOCoSaqqak8hNls= +github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 h1:DKTXDDw8ErC4RorZLfB2ZdHChjDKWIqOEO7VRSjjfbg= +github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563/go.mod h1:lrNKITZR4QUaYl9Rdz9W6qGOolHRy6mPamEZYA8uz7s= github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 h1:zDFteouktUsGk4I/7m1b7yT4e9qawy45gWtLoyeHwxI= github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2/go.mod h1:v+Xo3hEEBZ9BVGpyjmLrqiP1BCshF3wQrEsxw1SLtF4= +github.com/cloudwego/eino-ext/libs/acl/langfuse v0.0.0-20251124083837-ce2e7e196f9f h1:i8DgDklrNznB1/e/HUpM8i/mfOL/E+Hsug5166f/mec= +github.com/cloudwego/eino-ext/libs/acl/langfuse v0.0.0-20251124083837-ce2e7e196f9f/go.mod h1:P3zzJTRexY0QKaE9Vn2CmOnCorIMgNzNtler8mw9IQM= github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13 h1:z0bI5TH3nE+uDQiRhxBQMvk2HswlDUM3xP38+VSgpSQ= github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13/go.mod h1:1xMQZ8eE11pkEoTAEy8UlaAY817qGVMvjpDPGSIO3Ns= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= @@ -57,24 +67,35 @@ github.com/eino-contrib/jsonschema v1.0.3 h1:2Kfsm1xlMV0ssY2nuxshS4AwbLFuqmPmzIj github.com/eino-contrib/jsonschema v1.0.3/go.mod h1:cpnX4SyKjWjGC7iN2EbhxaTdLqGjCi0e9DxpLYxddD4= github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs= github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -82,6 +103,8 @@ github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18= github.com/goph/emperror v0.17.2/go.mod h1:+ZbQ+fUNO/6FNiUo0ujtMjhgad9Xa6fQL9KhH4LNHic= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -106,8 +129,8 @@ github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzh github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -146,6 +169,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rollbar/rollbar-go v1.0.2/go.mod h1:AcFs5f0I+c71bpHlXNNDbOWJiKwjFDtISeXco0L5PKQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -194,19 +219,43 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 h1:wVZXIWjQSeSmMoxF74LzAnpVQOAFDo3pPji9Y4SOFKc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0/go.mod h1:khvBS2IggMFNwZK/6lEeHg/W57h/IX6J4URh57fuI40= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4= -golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg= +golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= @@ -220,15 +269,25 @@ golang.org/x/telemetry v0.0.0-20260116145544-c6413dc483f5/go.mod h1:b7fPSJ0pKZ3c golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/integration_test/integration_test.go b/integration_test/integration_test.go index b1012d0..17baea0 100644 --- a/integration_test/integration_test.go +++ b/integration_test/integration_test.go @@ -68,10 +68,10 @@ func TestAllDatasets_CSVPath(t *testing.T) { } // Feed all lines and get templates - if err := dp.Feed(lines); err != nil { + if err := dp.Feed(ctx, lines); err != nil { t.Fatalf("feed: %v", err) } - templates, err := dp.Templates() + templates, err := dp.Templates(ctx) if err != nil { t.Fatalf("templates: %v", err) } @@ -185,10 +185,10 @@ func TestAllDatasets_IngestorPath(t *testing.T) { for i, ll := range collected { lines[i] = ll.content } - if err := dp.Feed(lines); err != nil { + if err := dp.Feed(ctx, lines); err != nil { t.Fatalf("feed: %v", err) } - templates, err := dp.Templates() + templates, err := dp.Templates(ctx) if err != nil { t.Fatalf("templates: %v", err) } diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index eb034c8..2fe97b9 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -20,11 +20,18 @@ import ( "github.com/strrl/lapp/pkg/analyzer/workspace" llmconfig "github.com/strrl/lapp/pkg/config" "github.com/strrl/lapp/pkg/pattern" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) func buildSystemPrompt(workDir string) string { return fmt.Sprintf(`You are a log analysis expert helping developers troubleshoot issues. +IMPORTANT: All file operations (read_file, grep, ls, glob, execute) MUST use paths under %s. +Do NOT access files outside this workspace directory. + Your workspace contains pre-processed log data at %s: - %s/raw.log — the original log file - %s/summary.txt — log templates discovered by automated parsing, with occurrence counts and samples @@ -41,7 +48,7 @@ Provide: 4. Suggested next steps for debugging Be concise and actionable. Focus on what matters.`, - workDir, workDir, workDir, workDir, workDir, workDir, workDir) + workDir, workDir, workDir, workDir, workDir, workDir, workDir, workDir) } // Config holds configuration for the analyzer. @@ -52,21 +59,16 @@ type Config struct { // Analyze runs the full agentic log analysis pipeline: // build a workspace, then run the AI agent on it. +// It creates its own DrainParser internally, so template IDs will not match +// any external parser. Use AnalyzeWithTemplates when templates are already available. func Analyze(ctx context.Context, config Config, lines []string, question string) (string, error) { - config.Model = llmconfig.ResolveModel(config.Model) - - // Create temp workspace - tmpDir, err := os.MkdirTemp("", "lapp-analyze-*") - if err != nil { - return "", errors.Errorf("create temp dir: %w", err) - } - defer func() { _ = os.RemoveAll(tmpDir) }() + ctx, span := otel.Tracer("lapp/analyzer").Start(ctx, "analyzer.Analyze") + defer span.End() - // Resolve to absolute path for the local backend - absDir, err := filepath.Abs(tmpDir) - if err != nil { - return "", errors.Errorf("resolve temp dir: %w", err) - } + span.SetAttributes( + attribute.Int("line.count", len(lines)), + attribute.String("question", question), + ) // Parse lines with Drain drainParser, err := pattern.NewDrainParser() @@ -75,14 +77,48 @@ func Analyze(ctx context.Context, config Config, lines []string, question string } slog.Info("Parsing lines", "count", len(lines)) - if err := drainParser.Feed(lines); err != nil { + if err := drainParser.Feed(ctx, lines); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return "", errors.Errorf("drain feed: %w", err) } - templates, err := drainParser.Templates() + templates, err := drainParser.Templates(ctx) if err != nil { return "", errors.Errorf("drain templates: %w", err) } + return AnalyzeWithTemplates(ctx, config, lines, templates, question) +} + +// AnalyzeWithTemplates runs the agentic log analysis pipeline using +// pre-computed Drain templates. This ensures template IDs in the workspace +// match those stored in the database by the ingest pipeline. +func AnalyzeWithTemplates(ctx context.Context, config Config, lines []string, templates []pattern.DrainCluster, question string) (string, error) { + ctx, span := otel.Tracer("lapp/analyzer").Start(ctx, "analyzer.AnalyzeWithTemplates") + defer span.End() + + span.SetAttributes( + attribute.Int("line.count", len(lines)), + attribute.Int("template.count", len(templates)), + attribute.String("question", question), + ) + + config.Model = llmconfig.ResolveModel(config.Model) + span.SetAttributes(attribute.String("model", config.Model)) + + // Create temp workspace + tmpDir, err := os.MkdirTemp("", "lapp-analyze-*") + if err != nil { + return "", errors.Errorf("create temp dir: %w", err) + } + defer func() { _ = os.RemoveAll(tmpDir) }() + + // Resolve to absolute path for the local backend + absDir, err := filepath.Abs(tmpDir) + if err != nil { + return "", errors.Errorf("resolve temp dir: %w", err) + } + if err := workspace.NewBuilder(absDir, lines, templates).BuildAll(); err != nil { return "", errors.Errorf("build workspace: %w", err) } @@ -92,7 +128,15 @@ func Analyze(ctx context.Context, config Config, lines []string, question string // RunAgent runs the AI agent on an existing workspace directory. func RunAgent(ctx context.Context, config Config, workDir, question string) (string, error) { + ctx, span := otel.Tracer("lapp/analyzer").Start(ctx, "analyzer.RunAgent") + defer span.End() + + span.SetAttributes( + attribute.String("workspace.dir", workDir), + ) + config.Model = llmconfig.ResolveModel(config.Model) + span.SetAttributes(attribute.String("model", config.Model)) absDir, err := filepath.Abs(workDir) if err != nil { @@ -107,10 +151,13 @@ func RunAgent(ctx context.Context, config Config, workDir, question string) (str } // Create OpenRouter chat model with fixup transport to patch eino tool message bug + // Stack: otelhttp (tracing) → fixupRoundTripper (eino bug workaround) → http.DefaultTransport chatModel, err := openrouter.NewChatModel(ctx, &openrouter.Config{ - APIKey: config.APIKey, - Model: config.Model, - HTTPClient: &http.Client{Transport: &fixupRoundTripper{base: http.DefaultTransport}}, + APIKey: config.APIKey, + Model: config.Model, + HTTPClient: &http.Client{ + Transport: otelhttp.NewTransport(&fixupRoundTripper{base: http.DefaultTransport}), + }, }) if err != nil { return "", errors.Errorf("create chat model: %w", err) @@ -239,6 +286,9 @@ func fixToolMessages(body []byte) []byte { // preflightCheck does a quick API call to verify the key works. func preflightCheck(ctx context.Context, config Config) error { + _, span := otel.Tracer("lapp/analyzer").Start(ctx, "analyzer.PreflightCheck") + defer span.End() + apiURL := "https://openrouter.ai/api/v1/models" req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, http.NoBody) if err != nil { diff --git a/pkg/analyzer/workspace/builder_test.go b/pkg/analyzer/workspace/builder_test.go index 250e346..050af7c 100644 --- a/pkg/analyzer/workspace/builder_test.go +++ b/pkg/analyzer/workspace/builder_test.go @@ -1,6 +1,7 @@ package workspace_test import ( + "context" "os" "path/filepath" "strings" @@ -24,10 +25,10 @@ func TestBuildAll(t *testing.T) { if err != nil { t.Fatalf("NewDrainParser: %v", err) } - if err := drainParser.Feed(lines); err != nil { + if err := drainParser.Feed(context.Background(), lines); err != nil { t.Fatalf("Feed: %v", err) } - templates, err := drainParser.Templates() + templates, err := drainParser.Templates(context.Background()) if err != nil { t.Fatalf("Templates: %v", err) } @@ -84,10 +85,10 @@ func TestBuildAll_NoErrors(t *testing.T) { if err != nil { t.Fatalf("NewDrainParser: %v", err) } - if err := drainParser.Feed(lines); err != nil { + if err := drainParser.Feed(context.Background(), lines); err != nil { t.Fatalf("Feed: %v", err) } - templates, err := drainParser.Templates() + templates, err := drainParser.Templates(context.Background()) if err != nil { t.Fatalf("Templates: %v", err) } diff --git a/pkg/logsource/ingestor.go b/pkg/logsource/ingestor.go index f933b7a..f513959 100644 --- a/pkg/logsource/ingestor.go +++ b/pkg/logsource/ingestor.go @@ -6,6 +6,8 @@ import ( "os" "github.com/go-errors/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" ) // LogLine represents a single raw log line read from input. @@ -36,14 +38,20 @@ type fileIngestor struct { // Ingest reads log lines from the file. // Cancel the context to stop reading early; the goroutine will exit promptly. func (f *fileIngestor) Ingest(ctx context.Context) (<-chan Result[*LogLine], error) { + _, span := otel.Tracer("lapp/logsource").Start(ctx, "logsource.Ingest") + file, err := os.Open(f.path) if err != nil { + span.End() return nil, errors.Errorf("open log file: %w", err) } + span.SetAttributes(attribute.String("file.path", f.path)) + ch := make(chan Result[*LogLine], 100) go func() { defer close(ch) + defer span.End() var fileErr error defer func() { diff --git a/pkg/multiline/integration_test.go b/pkg/multiline/integration_test.go index 2201cef..ce9f2ff 100644 --- a/pkg/multiline/integration_test.go +++ b/pkg/multiline/integration_test.go @@ -1,6 +1,7 @@ package multiline import ( + "context" "os" "path/filepath" "strings" @@ -24,7 +25,7 @@ func TestIntegrationJavaStackTrace(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) // Expected: 4 entries // 1. "2024-03-28 13:45:30 INFO Application started successfully" @@ -59,7 +60,7 @@ func TestIntegrationPythonTraceback(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) // Expected: 4 entries // 1. "2024-03-28 14:00:01 INFO Starting batch processing" @@ -89,7 +90,7 @@ func TestIntegrationGoPanic(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) // Expected: 3 entries // 1. "2024-03-28 15:00:01 INFO Server listening..." @@ -119,7 +120,7 @@ func TestIntegrationSingleLine(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) if len(merged) != len(lines) { t.Fatalf("expected %d entries for single-line logs, got %d", len(lines), len(merged)) @@ -142,7 +143,7 @@ func TestIntegrationMixedFormats(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) // All lines in mixed_formats.log start with timestamps, // so each should be its own entry diff --git a/pkg/multiline/merger.go b/pkg/multiline/merger.go index ab51465..ebf9d0a 100644 --- a/pkg/multiline/merger.go +++ b/pkg/multiline/merger.go @@ -7,9 +7,12 @@ package multiline import ( + "context" "strings" "github.com/strrl/lapp/pkg/logsource" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" ) // MergedLine represents one logical log entry that may span multiple @@ -31,10 +34,13 @@ type MergeResult struct { // It propagates read errors from the ingestor Result channel. // If no line is ever detected as a new entry (i.e. no recognizable timestamp), // each physical line is emitted as its own entry to avoid behavioral regression. -func Merge(in <-chan logsource.Result[*logsource.LogLine], detector *Detector) <-chan MergeResult { +func Merge(ctx context.Context, in <-chan logsource.Result[*logsource.LogLine], detector *Detector) <-chan MergeResult { + _, span := otel.Tracer("lapp/multiline").Start(ctx, "multiline.Merge") + out := make(chan MergeResult, 100) go func() { defer close(out) + defer span.End() var buf []string startLine := 0 @@ -105,7 +111,12 @@ func Merge(in <-chan logsource.Result[*logsource.LogLine], detector *Detector) < // MergeSlice merges a slice of log lines into logical entries. // This is useful for non-streaming paths (analyze, debug commands). // If no line is ever detected as a new entry, each line passes through individually. -func MergeSlice(lines []string, detector *Detector) []MergedLine { +func MergeSlice(ctx context.Context, lines []string, detector *Detector) []MergedLine { + _, span := otel.Tracer("lapp/multiline").Start(ctx, "multiline.MergeSlice") + defer span.End() + + span.SetAttributes(attribute.Int("input.lines", len(lines))) + if len(lines) == 0 { return nil } @@ -165,5 +176,7 @@ func MergeSlice(lines []string, detector *Detector) []MergedLine { } flush() + + span.SetAttributes(attribute.Int("merged.entries", len(result))) return result } diff --git a/pkg/multiline/merger_test.go b/pkg/multiline/merger_test.go index 2e6794e..21b7722 100644 --- a/pkg/multiline/merger_test.go +++ b/pkg/multiline/merger_test.go @@ -1,6 +1,7 @@ package multiline import ( + "context" "testing" "github.com/strrl/lapp/pkg/logsource" @@ -24,7 +25,7 @@ func TestMergeSliceJavaStackTrace(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) if len(merged) != 3 { t.Fatalf("expected 3 merged entries, got %d", len(merged)) } @@ -54,7 +55,7 @@ func TestMergeSliceSingleLine(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) if len(merged) != 3 { t.Fatalf("expected 3 entries for single-line logs, got %d", len(merged)) } @@ -72,7 +73,7 @@ func TestMergeSliceEmpty(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(nil, d) + merged := MergeSlice(context.Background(), nil, d) if merged != nil { t.Errorf("expected nil for empty input, got %v", merged) } @@ -97,7 +98,7 @@ func TestMergeChannel(t *testing.T) { } close(ch) - merged := Merge(ch, d) + merged := Merge(context.Background(), ch, d) var results []MergedLine for m := range merged { if m.Err != nil { @@ -129,7 +130,7 @@ func TestMergeSliceMaxEntryBytes(t *testing.T) { "another continuation line", } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) if len(merged) < 2 { t.Fatalf("expected at least 2 entries due to max entry bytes, got %d", len(merged)) } @@ -148,7 +149,7 @@ func TestMergeSliceNoTimestamp(t *testing.T) { t.Fatal(err) } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) if len(merged) != 3 { t.Fatalf("expected 3 entries for non-timestamp logs, got %d", len(merged)) } @@ -172,7 +173,7 @@ func TestMergeChannelNoTimestamp(t *testing.T) { close(ch) var results []MergedLine - for m := range Merge(ch, d) { + for m := range Merge(context.Background(), ch, d) { if m.Err != nil { t.Fatalf("unexpected error: %v", m.Err) } @@ -198,7 +199,7 @@ func TestMergeSliceOverflowLineRange(t *testing.T) { "2024-03-28 13:45:31 INFO next entry", } - merged := MergeSlice(lines, d) + merged := MergeSlice(context.Background(), lines, d) // Verify no overlapping line ranges for i := 1; i < len(merged); i++ { diff --git a/pkg/pattern/drain.go b/pkg/pattern/drain.go index fb100eb..9539fc8 100644 --- a/pkg/pattern/drain.go +++ b/pkg/pattern/drain.go @@ -1,11 +1,15 @@ package pattern import ( + "context" "sync" "github.com/go-errors/errors" "github.com/google/uuid" "github.com/jaeyo/go-drain3/pkg/drain3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) // DrainParser uses the Drain algorithm to discover log templates online. @@ -33,13 +37,20 @@ func NewDrainParser() (*DrainParser, error) { } // Feed processes a batch of log lines through the Drain algorithm. -func (p *DrainParser) Feed(contents []string) error { +func (p *DrainParser) Feed(ctx context.Context, contents []string) error { + _, span := otel.Tracer("lapp/pattern").Start(ctx, "pattern.Feed") + defer span.End() + + span.SetAttributes(attribute.Int("input.lines", len(contents))) + p.mu.Lock() defer p.mu.Unlock() for _, content := range contents { cluster, _, err := p.drain.AddLogMessage(content) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return errors.Errorf("drain add: %w", err) } if cluster == nil { @@ -53,7 +64,10 @@ func (p *DrainParser) Feed(contents []string) error { } // Templates returns all Drain clusters discovered so far with their counts. -func (p *DrainParser) Templates() ([]DrainCluster, error) { +func (p *DrainParser) Templates(ctx context.Context) ([]DrainCluster, error) { + _, span := otel.Tracer("lapp/pattern").Start(ctx, "pattern.Templates") + defer span.End() + p.mu.Lock() defer p.mu.Unlock() @@ -70,5 +84,7 @@ func (p *DrainParser) Templates() ([]DrainCluster, error) { Count: int(c.Size), }) } + + span.SetAttributes(attribute.Int("cluster.count", len(templates))) return templates, nil } diff --git a/pkg/pattern/drain_test.go b/pkg/pattern/drain_test.go index 19d6f9f..efd5f7c 100644 --- a/pkg/pattern/drain_test.go +++ b/pkg/pattern/drain_test.go @@ -1,6 +1,7 @@ package pattern import ( + "context" "testing" "github.com/google/uuid" @@ -20,11 +21,11 @@ func TestDrainParser_FeedAndTemplates(t *testing.T) { "081109 204005 36 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0002/job.jar. blk_5260569883199042858", } - if err := p.Feed(lines); err != nil { + if err := p.Feed(context.Background(), lines); err != nil { t.Fatalf("Feed: %v", err) } - templates, err := p.Templates() + templates, err := p.Templates(context.Background()) if err != nil { t.Fatalf("Templates: %v", err) } @@ -63,7 +64,7 @@ func TestDrainParser_EmptyInput(t *testing.T) { t.Fatalf("NewDrainParser: %v", err) } - templates, err := p.Templates() + templates, err := p.Templates(context.Background()) if err != nil { t.Fatalf("Templates: %v", err) } diff --git a/pkg/semantic/labeler.go b/pkg/semantic/labeler.go index de9f7e0..ed61de0 100644 --- a/pkg/semantic/labeler.go +++ b/pkg/semantic/labeler.go @@ -11,6 +11,10 @@ import ( "github.com/cloudwego/eino/schema" "github.com/go-errors/errors" llmconfig "github.com/strrl/lapp/pkg/config" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) // Config holds configuration for the labeler. @@ -42,20 +46,30 @@ type SemanticLabel struct { // Label sends all patterns to the LLM in a single call and returns semantic labels. func Label(ctx context.Context, config Config, patterns []PatternInput) ([]SemanticLabel, error) { + ctx, span := otel.Tracer("lapp/semantic").Start(ctx, "semantic.Label") + defer span.End() + + span.SetAttributes(attribute.Int("pattern.count", len(patterns))) + if len(patterns) == 0 { return nil, nil } config.Model = llmconfig.ResolveModel(config.Model) + span.SetAttributes(attribute.String("model", config.Model)) prompt := buildPrompt(patterns) resp, err := callLLM(ctx, config, prompt) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, errors.Errorf("call LLM: %w", err) } labels, err := parseResponse(resp) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, errors.Errorf("parse LLM response: %w", err) } @@ -86,10 +100,25 @@ Patterns: } func callLLM(ctx context.Context, config Config, prompt string) (string, error) { + _, span := otel.Tracer("lapp/semantic").Start(ctx, "semantic.CallLLM") + defer span.End() + + span.SetAttributes( + attribute.String("model", config.Model), + attribute.Int("prompt.length", len(prompt)), + ) + + httpClient := config.HTTPClient + if httpClient == nil { + httpClient = &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + } + } + chatModel, err := openrouter.NewChatModel(ctx, &openrouter.Config{ APIKey: config.APIKey, Model: config.Model, - HTTPClient: config.HTTPClient, + HTTPClient: httpClient, ResponseFormat: &openrouter.ChatCompletionResponseFormat{ Type: openrouter.ChatCompletionResponseFormatTypeJSONObject, }, @@ -102,8 +131,12 @@ func callLLM(ctx context.Context, config Config, prompt string) (string, error) {Role: schema.User, Content: prompt}, }) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return "", errors.Errorf("generate: %w", err) } + + span.SetAttributes(attribute.Int("response.length", len(resp.Content))) return resp.Content, nil } diff --git a/pkg/store/duckdb.go b/pkg/store/duckdb.go index 1180112..3d680e4 100644 --- a/pkg/store/duckdb.go +++ b/pkg/store/duckdb.go @@ -11,6 +11,8 @@ import ( // DuckDB driver for database/sql. _ "github.com/duckdb/duckdb-go/v2" "github.com/go-errors/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" ) var _ Store = (*DuckDBStore)(nil) @@ -32,6 +34,9 @@ func NewDuckDBStore(dsn string) (*DuckDBStore, error) { // Init creates the log_entries and patterns tables if they do not exist. func (s *DuckDBStore) Init(ctx context.Context) error { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.Init") + defer span.End() + if _, err := s.db.ExecContext(ctx, `CREATE SEQUENCE IF NOT EXISTS log_entries_id_seq START 1`); err != nil { return errors.Errorf("create sequence: %w", err) } @@ -78,6 +83,9 @@ func marshalLabels(labels map[string]string) (string, error) { // InsertLog stores a single log entry. func (s *DuckDBStore) InsertLog(ctx context.Context, entry LogEntry) error { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.InsertLog") + defer span.End() + labelsJSON, err := marshalLabels(entry.Labels) if err != nil { return err @@ -99,6 +107,11 @@ func (s *DuckDBStore) InsertLog(ctx context.Context, entry LogEntry) error { // InsertLogBatch stores multiple log entries in a single transaction. func (s *DuckDBStore) InsertLogBatch(ctx context.Context, entries []LogEntry) error { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.InsertLogBatch") + defer span.End() + + span.SetAttributes(attribute.Int("batch.size", len(entries))) + tx, err := s.db.BeginTx(ctx, nil) if err != nil { return errors.Errorf("begin tx: %w", err) @@ -133,6 +146,11 @@ func (s *DuckDBStore) InsertLogBatch(ctx context.Context, entries []LogEntry) er // QueryByPattern returns log entries matching the given pattern semantic ID via labels. func (s *DuckDBStore) QueryByPattern(ctx context.Context, pattern string) ([]LogEntry, error) { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.QueryByPattern") + defer span.End() + + span.SetAttributes(attribute.String("pattern", pattern)) + rows, err := s.db.QueryContext(ctx, `SELECT id, line_number, end_line_number, timestamp, raw, CAST(labels AS VARCHAR) FROM log_entries WHERE json_extract_string(labels, '$.pattern') = ?`, @@ -147,6 +165,9 @@ func (s *DuckDBStore) QueryByPattern(ctx context.Context, pattern string) ([]Log // QueryLogs returns log entries matching the given options. func (s *DuckDBStore) QueryLogs(ctx context.Context, opts QueryOpts) ([]LogEntry, error) { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.QueryLogs") + defer span.End() + var conditions []string var args []any @@ -185,6 +206,9 @@ func (s *DuckDBStore) QueryLogs(ctx context.Context, opts QueryOpts) ([]LogEntry // PatternSummaries returns all patterns with their occurrence counts, // joined with pattern metadata from the patterns table. func (s *DuckDBStore) PatternSummaries(ctx context.Context) ([]PatternSummary, error) { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.PatternSummaries") + defer span.End() + rows, err := s.db.QueryContext(ctx, `SELECT p.pattern_id, COALESCE(p.raw_pattern, ''), COUNT(*) as cnt, COALESCE(p.pattern_type, ''), COALESCE(p.semantic_id, ''), COALESCE(p.description, '') @@ -214,6 +238,11 @@ func (s *DuckDBStore) PatternSummaries(ctx context.Context) ([]PatternSummary, e // InsertPatterns upserts patterns into the patterns table. func (s *DuckDBStore) InsertPatterns(ctx context.Context, patterns []Pattern) error { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.InsertPatterns") + defer span.End() + + span.SetAttributes(attribute.Int("pattern.count", len(patterns))) + tx, err := s.db.BeginTx(ctx, nil) if err != nil { return errors.Errorf("begin tx: %w", err) @@ -249,6 +278,9 @@ func (s *DuckDBStore) InsertPatterns(ctx context.Context, patterns []Pattern) er // Patterns returns all patterns from the patterns table. func (s *DuckDBStore) Patterns(ctx context.Context) ([]Pattern, error) { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.Patterns") + defer span.End() + rows, err := s.db.QueryContext(ctx, `SELECT pattern_id, pattern_type, raw_pattern, COALESCE(semantic_id, ''), COALESCE(description, '') @@ -276,6 +308,9 @@ func (s *DuckDBStore) Patterns(ctx context.Context) ([]Pattern, error) { // PatternCounts returns the number of log entries per pattern semantic ID. func (s *DuckDBStore) PatternCounts(ctx context.Context) (map[string]int, error) { + _, span := otel.Tracer("lapp/store").Start(ctx, "store.PatternCounts") + defer span.End() + rows, err := s.db.QueryContext(ctx, `SELECT json_extract_string(labels, '$.pattern'), COUNT(*) FROM log_entries diff --git a/pkg/tracing/langfuse.go b/pkg/tracing/langfuse.go new file mode 100644 index 0000000..cdb242a --- /dev/null +++ b/pkg/tracing/langfuse.go @@ -0,0 +1,33 @@ +package tracing + +import ( + "log/slog" + "os" + + "github.com/cloudwego/eino-ext/callbacks/langfuse" + "github.com/cloudwego/eino/callbacks" +) + +// InitLangfuse registers a global Langfuse callback handler if +// LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY, and LANGFUSE_SECRET_KEY are set. +// Returns a flush function that must be called before process exit. +func InitLangfuse() (flush func()) { + host := os.Getenv("LANGFUSE_HOST") + publicKey := os.Getenv("LANGFUSE_PUBLIC_KEY") + secretKey := os.Getenv("LANGFUSE_SECRET_KEY") + + if host == "" || publicKey == "" || secretKey == "" { + return func() {} + } + + handler, flusher := langfuse.NewLangfuseHandler(&langfuse.Config{ + Host: host, + PublicKey: publicKey, + SecretKey: secretKey, + }) + + callbacks.AppendGlobalHandlers(handler) + slog.Info("langfuse tracing enabled", "host", host) + + return flusher +} diff --git a/pkg/tracing/otel.go b/pkg/tracing/otel.go new file mode 100644 index 0000000..337b506 --- /dev/null +++ b/pkg/tracing/otel.go @@ -0,0 +1,69 @@ +package tracing + +import ( + "context" + "log/slog" + "os" + "strings" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" +) + +// InitOTel sets up the OpenTelemetry TracerProvider with an OTLP HTTP exporter +// when OTEL_TRACING_ENABLED is set to "true" or "1". +// Returns a shutdown function that flushes pending spans. +func InitOTel(ctx context.Context) (shutdown func()) { + enabled := os.Getenv("OTEL_TRACING_ENABLED") + if !strings.EqualFold(enabled, "true") && enabled != "1" { + return func() {} + } + + endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if endpoint == "" { + endpoint = "http://localhost:4318" + } + + serviceName := os.Getenv("OTEL_SERVICE_NAME") + if serviceName == "" { + serviceName = "lapp" + } + + exporter, err := otlptracehttp.New(ctx, + otlptracehttp.WithEndpointURL(endpoint), + ) + if err != nil { + slog.Warn("failed to create OTLP exporter, tracing disabled", "error", err) + return func() {} + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(serviceName), + ), + ) + if err != nil { + slog.Warn("failed to create OTel resource", "error", err) + res = resource.Default() + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + ) + + otel.SetTracerProvider(tp) + slog.Info("OpenTelemetry tracing enabled", "endpoint", endpoint, "service", serviceName) + + return func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := tp.Shutdown(shutdownCtx); err != nil { + slog.Warn("failed to shutdown TracerProvider", "error", err) + } + } +}