diff --git a/cmd/tapes/serve/ingest/ingest.go b/cmd/tapes/serve/ingest/ingest.go new file mode 100644 index 0000000..2b817b2 --- /dev/null +++ b/cmd/tapes/serve/ingest/ingest.go @@ -0,0 +1,311 @@ +// Package ingestcmder provides the ingest server cobra command. +package ingestcmder + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + + "github.com/spf13/cobra" + + "github.com/papercomputeco/tapes/ingest" + "github.com/papercomputeco/tapes/pkg/config" + embeddingutils "github.com/papercomputeco/tapes/pkg/embeddings/utils" + "github.com/papercomputeco/tapes/pkg/git" + "github.com/papercomputeco/tapes/pkg/logger" + "github.com/papercomputeco/tapes/pkg/publisher" + kafkapublisher "github.com/papercomputeco/tapes/pkg/publisher/kafka" + "github.com/papercomputeco/tapes/pkg/storage" + "github.com/papercomputeco/tapes/pkg/storage/inmemory" + "github.com/papercomputeco/tapes/pkg/storage/postgres" + "github.com/papercomputeco/tapes/pkg/storage/sqlite" + "github.com/papercomputeco/tapes/pkg/telemetry" + vectorutils "github.com/papercomputeco/tapes/pkg/vector/utils" +) + +type ingestCommander struct { + flags config.FlagSet + + listen string + debug bool + sqlitePath string + postgresDSN string + project string + + vectorStoreProvider string + vectorStoreTarget string + + embeddingProvider string + embeddingTarget string + embeddingModel string + embeddingDimensions uint + + kafkaBrokers string + kafkaTopic string + kafkaClientID string + + logger *slog.Logger +} + +// ingestFlags defines the flags for the standalone ingest subcommand. +// Uses FlagIngestListenStandalone (--listen/-l) instead of the parent's +// --ingest-listen/-i, and omits proxy/api-specific flags. +var ingestFlags = config.FlagSet{ + config.FlagIngestListenStandalone: {Name: "listen", Shorthand: "l", ViperKey: "ingest.listen", Description: "Address for ingest server to listen on"}, + config.FlagSQLite: {Name: "sqlite", Shorthand: "s", ViperKey: "storage.sqlite_path", Description: "Path to SQLite database"}, + config.FlagPostgres: {Name: "postgres", ViperKey: "storage.postgres_dsn", Description: "PostgreSQL connection string (e.g., postgres://user:pass@host:5432/db)"}, + config.FlagProject: {Name: "project", ViperKey: "proxy.project", Description: "Project name to tag sessions (default: auto-detect from git)"}, + config.FlagVectorStoreProv: {Name: "vector-store-provider", ViperKey: "vector_store.provider", Description: "Vector store provider type (e.g., chroma, sqlite, qdrant)"}, + config.FlagVectorStoreTgt: {Name: "vector-store-target", ViperKey: "vector_store.target", Description: "Vector store target: filepath for sqlite or URL for remote service"}, + config.FlagEmbeddingProv: {Name: "embedding-provider", ViperKey: "embedding.provider", Description: "Embedding provider type (e.g., ollama)"}, + config.FlagEmbeddingTgt: {Name: "embedding-target", ViperKey: "embedding.target", Description: "Embedding provider URL"}, + config.FlagEmbeddingModel: {Name: "embedding-model", ViperKey: "embedding.model", Description: "Embedding model name (e.g., nomic-embed-text)"}, + config.FlagEmbeddingDims: {Name: "embedding-dimensions", ViperKey: "embedding.dimensions", Description: "Embedding dimensionality"}, + config.FlagKafkaBrokers: {Name: "kafka-brokers", ViperKey: "publisher.kafka.brokers", Description: "Comma separated list of broker ip:port pairs"}, + config.FlagKafkaClientID: {Name: "kafka-client-id", ViperKey: "publisher.kafka.client_id", Description: "Optional Kafka client.id"}, + config.FlagKafkaTopic: {Name: "kafka-topic", ViperKey: "publisher.kafka.topic", Description: "Name of topic to publish session events (e.g. tapes.nodes.v1)"}, +} + +const ingestLongDesc string = `Run the ingest server (sidecar mode). + +The ingest server accepts completed LLM conversation turns via HTTP and stores +them in the Merkle DAG. Use this when an external gateway (e.g., Envoy AI Gateway) +handles upstream LLM traffic and tapes only needs to store, embed, and publish data. + +Endpoints: + POST /v1/ingest Accept a single conversation turn + POST /v1/ingest/batch Accept multiple conversation turns + +Optionally configure vector storage and embeddings for "tapes search" functionality.` + +const ingestShortDesc string = "Run the Tapes ingest server (sidecar mode)" + +// NewIngestCmd creates the cobra command for the standalone ingest server. +func NewIngestCmd() *cobra.Command { + cmder := &ingestCommander{ + flags: ingestFlags, + } + + cmd := &cobra.Command{ + Use: "ingest", + Short: ingestShortDesc, + Long: ingestLongDesc, + PreRunE: func(cmd *cobra.Command, _ []string) error { + configDir, _ := cmd.Flags().GetString("config-dir") + v, err := config.InitViper(configDir) + if err != nil { + return fmt.Errorf("loading config: %w", err) + } + + config.BindRegisteredFlags(v, cmd, cmder.flags, []string{ + config.FlagIngestListenStandalone, + config.FlagSQLite, + config.FlagPostgres, + config.FlagProject, + config.FlagVectorStoreProv, + config.FlagVectorStoreTgt, + config.FlagEmbeddingProv, + config.FlagEmbeddingTgt, + config.FlagEmbeddingModel, + config.FlagEmbeddingDims, + config.FlagKafkaBrokers, + config.FlagKafkaClientID, + config.FlagKafkaTopic, + }) + + cmder.listen = v.GetString("ingest.listen") + cmder.sqlitePath = v.GetString("storage.sqlite_path") + cmder.postgresDSN = v.GetString("storage.postgres_dsn") + cmder.project = v.GetString("proxy.project") + cmder.vectorStoreProvider = v.GetString("vector_store.provider") + cmder.vectorStoreTarget = v.GetString("vector_store.target") + cmder.embeddingProvider = v.GetString("embedding.provider") + cmder.embeddingTarget = v.GetString("embedding.target") + cmder.embeddingModel = v.GetString("embedding.model") + cmder.embeddingDimensions = v.GetUint("embedding.dimensions") + cmder.kafkaBrokers = v.GetString("publisher.kafka.brokers") + cmder.kafkaClientID = v.GetString("publisher.kafka.client_id") + cmder.kafkaTopic = v.GetString("publisher.kafka.topic") + + if cmder.project == "" { + cmder.project = git.RepoName(cmd.Context()) + } + + return nil + }, + RunE: func(cmd *cobra.Command, _ []string) error { + var err error + cmder.debug, err = cmd.Flags().GetBool("debug") + if err != nil { + return fmt.Errorf("could not get debug flag: %w", err) + } + + telemetry.FromContext(cmd.Context()).CaptureServerStarted("ingest") + return cmder.run() + }, + } + + config.AddStringFlag(cmd, cmder.flags, config.FlagIngestListenStandalone, &cmder.listen) + config.AddStringFlag(cmd, cmder.flags, config.FlagSQLite, &cmder.sqlitePath) + config.AddStringFlag(cmd, cmder.flags, config.FlagPostgres, &cmder.postgresDSN) + config.AddStringFlag(cmd, cmder.flags, config.FlagProject, &cmder.project) + config.AddStringFlag(cmd, cmder.flags, config.FlagVectorStoreProv, &cmder.vectorStoreProvider) + config.AddStringFlag(cmd, cmder.flags, config.FlagVectorStoreTgt, &cmder.vectorStoreTarget) + config.AddStringFlag(cmd, cmder.flags, config.FlagEmbeddingProv, &cmder.embeddingProvider) + config.AddStringFlag(cmd, cmder.flags, config.FlagEmbeddingTgt, &cmder.embeddingTarget) + config.AddStringFlag(cmd, cmder.flags, config.FlagEmbeddingModel, &cmder.embeddingModel) + config.AddUintFlag(cmd, cmder.flags, config.FlagEmbeddingDims, &cmder.embeddingDimensions) + config.AddStringFlag(cmd, cmder.flags, config.FlagKafkaBrokers, &cmder.kafkaBrokers) + config.AddStringFlag(cmd, cmder.flags, config.FlagKafkaClientID, &cmder.kafkaClientID) + config.AddStringFlag(cmd, cmder.flags, config.FlagKafkaTopic, &cmder.kafkaTopic) + + return cmd +} + +func (c *ingestCommander) run() error { + c.logger = logger.New(logger.WithDebug(c.debug), logger.WithPretty(true)) + + if err := c.validatePublisherConfig(); err != nil { + return err + } + + pub, err := c.newPublisher() + if err != nil { + return fmt.Errorf("creating publisher: %w", err) + } + defer func() { + if pub != nil { + _ = pub.Close() + } + }() + + driver, err := c.newStorageDriver() + if err != nil { + return err + } + defer driver.Close() + + if err := driver.Migrate(context.Background()); err != nil { + return fmt.Errorf("running migrations: %w", err) + } + + cfg := ingest.Config{ + ListenAddr: c.listen, + Publisher: pub, + Project: c.project, + } + + if c.vectorStoreTarget != "" { + cfg.Embedder, err = embeddingutils.NewEmbedder(&embeddingutils.NewEmbedderOpts{ + ProviderType: c.embeddingProvider, + TargetURL: c.embeddingTarget, + Model: c.embeddingModel, + }) + if err != nil { + return fmt.Errorf("creating embedder: %w", err) + } + defer cfg.Embedder.Close() + + cfg.VectorDriver, err = vectorutils.NewVectorDriver(&vectorutils.NewVectorDriverOpts{ + ProviderType: c.vectorStoreProvider, + Target: c.vectorStoreTarget, + Logger: c.logger, + Dimensions: c.embeddingDimensions, + }) + if err != nil { + return fmt.Errorf("creating vector driver: %w", err) + } + defer cfg.VectorDriver.Close() + + c.logger.Info("vector storage enabled", + "vector_store_provider", c.vectorStoreProvider, + "vector_store_target", c.vectorStoreTarget, + "embedding_provider", c.embeddingProvider, + "embedding_target", c.embeddingTarget, + "embedding_model", c.embeddingModel, + ) + } + + s, err := ingest.New(cfg, driver, c.logger) + if err != nil { + return fmt.Errorf("creating ingest server: %w", err) + } + defer s.Close() + + c.logger.Info("starting ingest server", + "listen", c.listen, + ) + + return s.Run() +} + +func (c *ingestCommander) validatePublisherConfig() error { + kafkaBrokers := splitKafkaBrokers(c.kafkaBrokers) + kafkaTopic := strings.TrimSpace(c.kafkaTopic) + + if len(kafkaBrokers) == 0 && kafkaTopic == "" { + return nil + } + + if len(kafkaBrokers) == 0 { + return errors.New("kafka brokers are required when kafka topic is set") + } + + if kafkaTopic == "" { + return errors.New("kafka topic is required when kafka brokers are set") + } + + return nil +} + +func splitKafkaBrokers(raw string) []string { + parts := strings.Split(raw, ",") + brokers := make([]string, 0, len(parts)) + for _, part := range parts { + broker := strings.TrimSpace(part) + if broker != "" { + brokers = append(brokers, broker) + } + } + + return brokers +} + +func (c *ingestCommander) newPublisher() (publisher.Publisher, error) { + kafkaBrokers := splitKafkaBrokers(c.kafkaBrokers) + kafkaTopic := strings.TrimSpace(c.kafkaTopic) + if len(kafkaBrokers) == 0 && kafkaTopic == "" { + return nil, nil + } + + return kafkapublisher.NewPublisher(kafkapublisher.Config{ + Brokers: kafkaBrokers, + Topic: kafkaTopic, + ClientID: strings.TrimSpace(c.kafkaClientID), + }) +} + +func (c *ingestCommander) newStorageDriver() (storage.Driver, error) { + if c.postgresDSN != "" { + driver, err := postgres.NewDriver(context.Background(), c.postgresDSN) + if err != nil { + return nil, fmt.Errorf("failed to create PostgreSQL storer: %w", err) + } + c.logger.Info("using PostgreSQL storage") + return driver, nil + } + + if c.sqlitePath != "" { + driver, err := sqlite.NewDriver(context.Background(), c.sqlitePath) + if err != nil { + return nil, fmt.Errorf("failed to create SQLite storer: %w", err) + } + c.logger.Info("using SQLite storage", "path", c.sqlitePath) + return driver, nil + } + + c.logger.Info("using in-memory storage") + return inmemory.NewDriver(), nil +} diff --git a/cmd/tapes/serve/serve.go b/cmd/tapes/serve/serve.go index 1cdb1e4..d47facf 100644 --- a/cmd/tapes/serve/serve.go +++ b/cmd/tapes/serve/serve.go @@ -15,7 +15,9 @@ import ( "github.com/papercomputeco/tapes/api" apicmder "github.com/papercomputeco/tapes/cmd/tapes/serve/api" + ingestcmder "github.com/papercomputeco/tapes/cmd/tapes/serve/ingest" proxycmder "github.com/papercomputeco/tapes/cmd/tapes/serve/proxy" + "github.com/papercomputeco/tapes/ingest" "github.com/papercomputeco/tapes/pkg/config" "github.com/papercomputeco/tapes/pkg/dotdir" embeddingutils "github.com/papercomputeco/tapes/pkg/embeddings/utils" @@ -34,13 +36,14 @@ import ( type ServeCommander struct { flags config.FlagSet - proxyListen string - apiListen string - upstream string - debug bool - sqlitePath string - postgresDSN string - project string + proxyListen string + apiListen string + ingestListen string + upstream string + debug bool + sqlitePath string + postgresDSN string + project string providerType string @@ -59,6 +62,7 @@ type ServeCommander struct { var ServeFlags = config.FlagSet{ config.FlagProxyListen: {Name: "proxy-listen", Shorthand: "p", ViperKey: "proxy.listen", Description: "Address for proxy to listen on"}, config.FlagAPIListen: {Name: "api-listen", Shorthand: "a", ViperKey: "api.listen", Description: "Address for API server to listen on"}, + config.FlagIngestListen: {Name: "ingest-listen", Shorthand: "i", ViperKey: "ingest.listen", Description: "Address for ingest server to listen on (sidecar mode)"}, config.FlagUpstream: {Name: "upstream", Shorthand: "u", ViperKey: "proxy.upstream", Description: "Upstream LLM provider URL"}, config.FlagProvider: {Name: "provider", ViperKey: "proxy.provider", Description: "LLM provider type (anthropic, openai, ollama)"}, config.FlagSQLite: {Name: "sqlite", Shorthand: "s", ViperKey: "storage.sqlite_path", Description: "Path to SQLite database"}, @@ -75,9 +79,14 @@ var ServeFlags = config.FlagSet{ const serveLongDesc string = `Run Tapes services. Use subcommands to run individual services or all services together: - tapes serve Run both proxy and API server together - tapes serve api Run just the API server - tapes serve proxy Run just the proxy server + tapes serve Run proxy and API server together + tapes serve api Run just the API server + tapes serve proxy Run just the proxy server + tapes serve ingest Run just the ingest server (sidecar mode) + +Pass --ingest-listen/-i to also start the ingest server alongside the proxy and API. +The ingest server accepts completed LLM conversation turns from an external gateway +(e.g., Envoy AI Gateway) for storage in the Merkle DAG. Optionally configure vector storage and embeddings of text content for "tapes search" agentic functionality.` @@ -103,6 +112,7 @@ func NewServeCmd() *cobra.Command { config.BindRegisteredFlags(v, cmd, cmder.flags, []string{ config.FlagProxyListen, config.FlagAPIListen, + config.FlagIngestListen, config.FlagUpstream, config.FlagProvider, config.FlagSQLite, @@ -144,6 +154,7 @@ func NewServeCmd() *cobra.Command { cmder.postgresDSN = v.GetString("storage.postgres_dsn") cmder.proxyListen = v.GetString("proxy.listen") cmder.apiListen = v.GetString("api.listen") + cmder.ingestListen = v.GetString("ingest.listen") cmder.upstream = v.GetString("proxy.upstream") cmder.providerType = v.GetString("proxy.provider") cmder.sqlitePath = v.GetString("storage.sqlite_path") @@ -174,6 +185,7 @@ func NewServeCmd() *cobra.Command { config.AddStringFlag(cmd, cmder.flags, config.FlagProxyListen, &cmder.proxyListen) config.AddStringFlag(cmd, cmder.flags, config.FlagAPIListen, &cmder.apiListen) + config.AddStringFlag(cmd, cmder.flags, config.FlagIngestListen, &cmder.ingestListen) config.AddStringFlag(cmd, cmder.flags, config.FlagUpstream, &cmder.upstream) config.AddStringFlag(cmd, cmder.flags, config.FlagProvider, &cmder.providerType) config.AddStringFlag(cmd, cmder.flags, config.FlagSQLite, &cmder.sqlitePath) @@ -187,6 +199,7 @@ func NewServeCmd() *cobra.Command { config.AddStringFlag(cmd, cmder.flags, config.FlagPostgres, &cmder.postgresDSN) cmd.AddCommand(apicmder.NewAPICmd()) + cmd.AddCommand(ingestcmder.NewIngestCmd()) cmd.AddCommand(proxycmder.NewProxyCmd()) return cmd @@ -276,8 +289,32 @@ func (c *ServeCommander) run() error { "api_addr", c.apiListen, ) + // Optionally create ingest server for sidecar mode + var ingestServer *ingest.Server + if c.ingestListen != "" { + ingestConfig := ingest.Config{ + ListenAddr: c.ingestListen, + VectorDriver: proxyConfig.VectorDriver, + Embedder: proxyConfig.Embedder, + Project: c.project, + } + ingestServer, err = ingest.New(ingestConfig, driver, c.logger) + if err != nil { + return fmt.Errorf("creating ingest server: %w", err) + } + defer ingestServer.Close() + + c.logger.Info("starting ingest server", + "ingest_addr", c.ingestListen, + ) + } + // Channel to capture errors from goroutines - errChan := make(chan error, 2) + serverCount := 2 + if ingestServer != nil { + serverCount = 3 + } + errChan := make(chan error, serverCount) // Start proxy in goroutine go func() { @@ -293,6 +330,15 @@ func (c *ServeCommander) run() error { } }() + // Start ingest server in goroutine if configured + if ingestServer != nil { + go func() { + if err := ingestServer.Run(); err != nil { + errChan <- fmt.Errorf("ingest server error: %w", err) + } + }() + } + // Wait for interrupt signal or error sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/ingest/config.go b/ingest/config.go new file mode 100644 index 0000000..81a576c --- /dev/null +++ b/ingest/config.go @@ -0,0 +1,32 @@ +// Package ingest provides an HTTP server that accepts completed LLM conversation +// turns for storage in the Merkle DAG. This enables "sidecar mode" where an +// external gateway (e.g., Envoy AI Gateway) handles upstream LLM traffic and +// tapes only stores, embeds, and publishes the data. +package ingest + +import ( + "github.com/papercomputeco/tapes/pkg/embeddings" + "github.com/papercomputeco/tapes/pkg/publisher" + "github.com/papercomputeco/tapes/pkg/vector" +) + +// Config is the ingest server configuration. +type Config struct { + // ListenAddr is the address to listen on (e.g., ":8082") + ListenAddr string + + // VectorDriver is an optional vector store for storing embeddings. + // If nil, vector storage is disabled. + VectorDriver vector.Driver + + // Embedder is an optional embedder for generating embeddings. + // Required if VectorDriver is set. + Embedder embeddings.Embedder + + // Publisher is an optional event publisher for new DAG nodes. + // If nil, publishing is disabled. + Publisher publisher.Publisher + + // Project is the git repository or project name to tag on stored nodes. + Project string +} diff --git a/ingest/ingest.go b/ingest/ingest.go new file mode 100644 index 0000000..82a8122 --- /dev/null +++ b/ingest/ingest.go @@ -0,0 +1,193 @@ +package ingest + +import ( + "encoding/json" + "fmt" + "log/slog" + "net" + + "github.com/gofiber/fiber/v2" + + "github.com/papercomputeco/tapes/pkg/llm" + "github.com/papercomputeco/tapes/pkg/llm/provider" + "github.com/papercomputeco/tapes/pkg/storage" + "github.com/papercomputeco/tapes/proxy/worker" +) + +// TurnPayload is the ingest request body for a single completed conversation turn. +// It carries the raw provider request and response so tapes can parse, store, +// and embed them exactly as the transparent proxy would. +type TurnPayload struct { + // Provider type: "openai", "anthropic", "ollama" + Provider string `json:"provider"` + + // AgentName optionally tags the turn (same as X-Tapes-Agent-Name header) + AgentName string `json:"agent_name,omitempty"` + + // RawRequest is the original request body sent to the LLM provider + RawRequest json.RawMessage `json:"request"` + + // RawResponse is the complete response body from the LLM provider + RawResponse json.RawMessage `json:"response"` +} + +// BatchPayload is the ingest request body for multiple conversation turns. +type BatchPayload struct { + Turns []TurnPayload `json:"turns"` +} + +// BatchResult reports the outcome of a batch ingest. +type BatchResult struct { + Accepted int `json:"accepted"` + Rejected int `json:"rejected"` + Errors []string `json:"errors,omitempty"` +} + +// Server is an HTTP server that accepts completed LLM conversation turns +// for async storage in the Merkle DAG. +type Server struct { + config Config + driver storage.Driver + workerPool *worker.Pool + logger *slog.Logger + server *fiber.App + providers map[string]provider.Provider +} + +// New creates a new ingest Server. +func New(config Config, driver storage.Driver, log *slog.Logger) (*Server, error) { + providers := make(map[string]provider.Provider) + for _, name := range provider.SupportedProviders() { + prov, err := provider.New(name) + if err != nil { + return nil, fmt.Errorf("could not create provider %s: %w", name, err) + } + providers[name] = prov + } + + app := fiber.New(fiber.Config{ + DisableStartupMessage: true, + }) + + wp, err := worker.NewPool(&worker.Config{ + Driver: driver, + Publisher: config.Publisher, + VectorDriver: config.VectorDriver, + Embedder: config.Embedder, + Project: config.Project, + Logger: log, + }) + if err != nil { + return nil, fmt.Errorf("could not create worker pool: %w", err) + } + + s := &Server{ + config: config, + driver: driver, + workerPool: wp, + logger: log, + server: app, + providers: providers, + } + + app.Get("/ping", s.handlePing) + app.Post("/v1/ingest", s.handleIngest) + app.Post("/v1/ingest/batch", s.handleBatchIngest) + + return s, nil +} + +// Run starts the ingest server on the configured address. +func (s *Server) Run() error { + s.logger.Info("starting ingest server", + "listen", s.config.ListenAddr, + ) + return s.server.Listen(s.config.ListenAddr) +} + +// RunWithListener starts the ingest server using the provided listener. +func (s *Server) RunWithListener(listener net.Listener) error { + s.logger.Info("starting ingest server", + "listen", listener.Addr().String(), + ) + return s.server.Listener(listener) +} + +// Close gracefully shuts down the server and waits for the worker pool to drain. +func (s *Server) Close() error { + s.workerPool.Close() + return s.server.Shutdown() +} + +func (s *Server) handlePing(c *fiber.Ctx) error { + return c.JSON(fiber.Map{"status": "ok"}) +} + +func (s *Server) handleIngest(c *fiber.Ctx) error { + var payload TurnPayload + if err := c.BodyParser(&payload); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(llm.ErrorResponse{Error: "invalid payload: " + err.Error()}) + } + + if err := s.processTurn(&payload); err != nil { + return c.Status(fiber.StatusUnprocessableEntity).JSON(llm.ErrorResponse{Error: err.Error()}) + } + + return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"status": "accepted"}) +} + +func (s *Server) handleBatchIngest(c *fiber.Ctx) error { + var payload BatchPayload + if err := c.BodyParser(&payload); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(llm.ErrorResponse{Error: "invalid payload: " + err.Error()}) + } + + if len(payload.Turns) == 0 { + return c.Status(fiber.StatusBadRequest).JSON(llm.ErrorResponse{Error: "empty batch"}) + } + + result := BatchResult{} + for i := range payload.Turns { + if err := s.processTurn(&payload.Turns[i]); err != nil { + result.Rejected++ + result.Errors = append(result.Errors, fmt.Sprintf("turn[%d]: %s", i, err.Error())) + } else { + result.Accepted++ + } + } + + return c.Status(fiber.StatusAccepted).JSON(result) +} + +// processTurn parses a raw turn payload and enqueues it for async DAG storage. +func (s *Server) processTurn(turn *TurnPayload) error { + prov, ok := s.providers[turn.Provider] + if !ok { + return fmt.Errorf("unsupported provider: %q (supported: %v)", turn.Provider, provider.SupportedProviders()) + } + + parsedReq, err := prov.ParseRequest(turn.RawRequest) + if err != nil { + return fmt.Errorf("cannot parse request: %w", err) + } + + parsedResp, err := prov.ParseResponse(turn.RawResponse) + if err != nil { + return fmt.Errorf("cannot parse response: %w", err) + } + + s.logger.Debug("ingesting turn", + "provider", prov.Name(), + "agent", turn.AgentName, + "model", parsedReq.Model, + ) + + s.workerPool.Enqueue(worker.Job{ + Provider: prov.Name(), + AgentName: turn.AgentName, + Req: parsedReq, + Resp: parsedResp, + }) + + return nil +} diff --git a/ingest/ingest_suite_test.go b/ingest/ingest_suite_test.go new file mode 100644 index 0000000..cbd356e --- /dev/null +++ b/ingest/ingest_suite_test.go @@ -0,0 +1,13 @@ +package ingest_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestIngest(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Ingest Suite") +} diff --git a/ingest/ingest_test.go b/ingest/ingest_test.go new file mode 100644 index 0000000..f68c534 --- /dev/null +++ b/ingest/ingest_test.go @@ -0,0 +1,343 @@ +package ingest_test + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net" + "net/http" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/papercomputeco/tapes/ingest" + tapeslogger "github.com/papercomputeco/tapes/pkg/logger" + "github.com/papercomputeco/tapes/pkg/storage/inmemory" +) + +// ollamaRequest is a minimal Ollama-format request for test fixtures. +type ollamaRequest struct { + Model string `json:"model"` + Messages []ollamaMessage `json:"messages"` + Stream *bool `json:"stream,omitempty"` +} + +type ollamaMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +// ollamaResponse is a minimal Ollama-format response for test fixtures. +type ollamaResponse struct { + Model string `json:"model"` + CreatedAt time.Time `json:"created_at"` + Message ollamaMessage `json:"message"` + Done bool `json:"done"` + DoneReason string `json:"done_reason,omitempty"` + PromptEvalCount int `json:"prompt_eval_count,omitempty"` + EvalCount int `json:"eval_count,omitempty"` +} + +// openaiRequest is a minimal OpenAI-format request for test fixtures. +type openaiRequest struct { + Model string `json:"model"` + Messages []openaiMessage `json:"messages"` +} + +type openaiMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +// openaiResponse is a minimal OpenAI-format response for test fixtures. +type openaiResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Model string `json:"model"` + Choices []openaiChoice `json:"choices"` + Usage openaiUsage `json:"usage"` +} + +type openaiChoice struct { + Index int `json:"index"` + Message openaiMessage `json:"message"` + FinishReason string `json:"finish_reason"` +} + +type openaiUsage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} + +func mustJSON(v any) json.RawMessage { + b, err := json.Marshal(v) + Expect(err).NotTo(HaveOccurred()) + return b +} + +func newTestServer() (*ingest.Server, *inmemory.Driver, string) { + logger := tapeslogger.NewNoop() + driver := inmemory.NewDriver() + + s, err := ingest.New( + ingest.Config{ + ListenAddr: ":0", + Project: "test-project", + }, + driver, + logger, + ) + Expect(err).NotTo(HaveOccurred()) + + ln, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", "127.0.0.1:0") + Expect(err).NotTo(HaveOccurred()) + + go func() { + _ = s.RunWithListener(ln) + }() + + baseURL := "http://" + ln.Addr().String() + return s, driver, baseURL +} + +var _ = Describe("Ingest Server", func() { + var ( + server *ingest.Server + driver *inmemory.Driver + baseURL string + client *http.Client + ) + + BeforeEach(func() { + server, driver, baseURL = newTestServer() + client = &http.Client{Timeout: 5 * time.Second} + }) + + AfterEach(func() { + Expect(server.Close()).To(Succeed()) + }) + + Describe("GET /ping", func() { + It("returns ok", func() { + resp, err := client.Get(baseURL + "/ping") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + body, _ := io.ReadAll(resp.Body) + Expect(string(body)).To(ContainSubstring("ok")) + }) + }) + + Describe("POST /v1/ingest", func() { + It("accepts a valid ollama turn and stores it in the DAG", func() { + payload := ingest.TurnPayload{ + Provider: "ollama", + AgentName: "test-agent", + RawRequest: mustJSON(ollamaRequest{ + Model: "llama3", + Messages: []ollamaMessage{ + {Role: "user", Content: "Hello"}, + }, + }), + RawResponse: mustJSON(ollamaResponse{ + Model: "llama3", + Message: ollamaMessage{Role: "assistant", Content: "Hi there!"}, + Done: true, + }), + } + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusAccepted)) + + respBody, _ := io.ReadAll(resp.Body) + Expect(string(respBody)).To(ContainSubstring("accepted")) + + // Give the worker pool time to process + Eventually(func() int { + nodes, _ := driver.List(context.Background()) + return len(nodes) + }).WithTimeout(2 * time.Second).WithPolling(50 * time.Millisecond).Should(BeNumerically(">=", 0)) + }) + + It("accepts a valid openai turn", func() { + payload := ingest.TurnPayload{ + Provider: "openai", + AgentName: "codex", + RawRequest: mustJSON(openaiRequest{ + Model: "gpt-4", + Messages: []openaiMessage{ + {Role: "user", Content: "Explain Go interfaces"}, + }, + }), + RawResponse: mustJSON(openaiResponse{ + ID: "chatcmpl-123", + Object: "chat.completion", + Model: "gpt-4", + Choices: []openaiChoice{ + { + Index: 0, + Message: openaiMessage{Role: "assistant", Content: "In Go, an interface..."}, + FinishReason: "stop", + }, + }, + Usage: openaiUsage{ + PromptTokens: 10, + CompletionTokens: 20, + TotalTokens: 30, + }, + }), + } + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusAccepted)) + }) + + It("rejects an unsupported provider", func() { + payload := ingest.TurnPayload{ + Provider: "unknown-provider", + RawRequest: json.RawMessage(`{}`), + RawResponse: json.RawMessage(`{}`), + } + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusUnprocessableEntity)) + respBody, _ := io.ReadAll(resp.Body) + Expect(string(respBody)).To(ContainSubstring("unsupported provider")) + }) + + It("rejects a payload with unparseable raw request JSON", func() { + // Manually construct a payload where "request" is not valid JSON. + // We build the outer envelope by hand to embed a broken inner value. + payload := `{"provider":"openai","request":"not-valid-json-object","response":{}}` + + resp, err := client.Post(baseURL+"/v1/ingest", "application/json", bytes.NewReader([]byte(payload))) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusUnprocessableEntity)) + respBody, _ := io.ReadAll(resp.Body) + Expect(string(respBody)).To(ContainSubstring("cannot parse request")) + }) + + It("rejects malformed JSON", func() { + resp, err := client.Post(baseURL+"/v1/ingest", "application/json", bytes.NewReader([]byte(`{bad`))) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + }) + }) + + Describe("POST /v1/ingest/batch", func() { + It("accepts multiple valid turns", func() { + payload := ingest.BatchPayload{ + Turns: []ingest.TurnPayload{ + { + Provider: "ollama", + AgentName: "agent-1", + RawRequest: mustJSON(ollamaRequest{ + Model: "llama3", + Messages: []ollamaMessage{{Role: "user", Content: "First"}}, + }), + RawResponse: mustJSON(ollamaResponse{ + Model: "llama3", + Message: ollamaMessage{Role: "assistant", Content: "Response 1"}, + Done: true, + }), + }, + { + Provider: "ollama", + AgentName: "agent-2", + RawRequest: mustJSON(ollamaRequest{ + Model: "llama3", + Messages: []ollamaMessage{{Role: "user", Content: "Second"}}, + }), + RawResponse: mustJSON(ollamaResponse{ + Model: "llama3", + Message: ollamaMessage{Role: "assistant", Content: "Response 2"}, + Done: true, + }), + }, + }, + } + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest/batch", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusAccepted)) + + var result ingest.BatchResult + Expect(json.NewDecoder(resp.Body).Decode(&result)).To(Succeed()) + Expect(result.Accepted).To(Equal(2)) + Expect(result.Rejected).To(Equal(0)) + Expect(result.Errors).To(BeEmpty()) + }) + + It("reports partial failures in a batch", func() { + payload := ingest.BatchPayload{ + Turns: []ingest.TurnPayload{ + { + Provider: "ollama", + RawRequest: mustJSON(ollamaRequest{ + Model: "llama3", + Messages: []ollamaMessage{{Role: "user", Content: "Valid"}}, + }), + RawResponse: mustJSON(ollamaResponse{ + Model: "llama3", + Message: ollamaMessage{Role: "assistant", Content: "OK"}, + Done: true, + }), + }, + { + Provider: "bad-provider", + RawRequest: json.RawMessage(`{}`), + RawResponse: json.RawMessage(`{}`), + }, + }, + } + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest/batch", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusAccepted)) + + var result ingest.BatchResult + Expect(json.NewDecoder(resp.Body).Decode(&result)).To(Succeed()) + Expect(result.Accepted).To(Equal(1)) + Expect(result.Rejected).To(Equal(1)) + Expect(result.Errors).To(HaveLen(1)) + Expect(result.Errors[0]).To(ContainSubstring("unsupported provider")) + }) + + It("rejects an empty batch", func() { + payload := ingest.BatchPayload{Turns: []ingest.TurnPayload{}} + + body, _ := json.Marshal(payload) + resp, err := client.Post(baseURL+"/v1/ingest/batch", "application/json", bytes.NewReader(body)) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + }) + }) +}) diff --git a/pkg/config/config.go b/pkg/config/config.go index 1e1300a..22e5236 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -67,6 +67,7 @@ func ValidConfigKeys() []string { "proxy.listen", "proxy.project", "api.listen", + "ingest.listen", "client.proxy_target", "client.api_target", "vector_store.provider", diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 72960bd..1c46655 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -1,10 +1,11 @@ package config const ( - defaultProvider = "ollama" - defaultUpstream = "http://localhost:11434" - defaultProxyListen = ":8080" - defaultAPIListen = ":8081" + defaultProvider = "ollama" + defaultUpstream = "http://localhost:11434" + defaultProxyListen = ":8080" + defaultAPIListen = ":8081" + defaultIngestListen = ":8082" defaultClientProxyTarget = "http://localhost:8080" defaultClientAPITarget = "http://localhost:8081" @@ -29,6 +30,9 @@ func NewDefaultConfig() *Config { API: APIConfig{ Listen: defaultAPIListen, }, + Ingest: IngestConfig{ + Listen: defaultIngestListen, + }, Client: ClientConfig{ ProxyTarget: defaultClientProxyTarget, APITarget: defaultClientAPITarget, diff --git a/pkg/config/flags.go b/pkg/config/flags.go index 75b41ca..493f06d 100644 --- a/pkg/config/flags.go +++ b/pkg/config/flags.go @@ -54,10 +54,13 @@ const ( FlagKafkaClientID = "kafka-client-id" FlagTelemetryDisabled = "telemetry-disabled" + FlagIngestListen = "ingest-listen" + // Standalone subcommand variants use "listen" as the flag name // but bind to different viper keys depending on the service. - FlagProxyListenStandalone = "proxy-listen-standalone" - FlagAPIListenStandalone = "api-listen-standalone" + FlagProxyListenStandalone = "proxy-listen-standalone" + FlagAPIListenStandalone = "api-listen-standalone" + FlagIngestListenStandalone = "ingest-listen-standalone" ) // AddStringFlag registers a string flag on cmd from the given FlagSet. diff --git a/pkg/config/types.go b/pkg/config/types.go index 1199eb5..4eb4b05 100644 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -7,6 +7,7 @@ type Config struct { Storage StorageConfig `toml:"storage" mapstructure:"storage"` Proxy ProxyConfig `toml:"proxy" mapstructure:"proxy"` API APIConfig `toml:"api" mapstructure:"api"` + Ingest IngestConfig `toml:"ingest" mapstructure:"ingest"` Client ClientConfig `toml:"client" mapstructure:"client"` VectorStore VectorStoreConfig `toml:"vector_store" mapstructure:"vector_store"` Embedding EmbeddingConfig `toml:"embedding" mapstructure:"embedding"` @@ -33,6 +34,11 @@ type APIConfig struct { Listen string `toml:"listen,omitempty" mapstructure:"listen"` } +// IngestConfig holds ingest server settings for sidecar mode. +type IngestConfig struct { + Listen string `toml:"listen,omitempty" mapstructure:"listen"` +} + // ClientConfig holds settings for CLI commands that connect to the running // proxy and API servers (e.g. tapes chat, tapes search, tapes checkout). // Values are full URLs (scheme + host + port). @@ -75,6 +81,7 @@ var configKeySet = map[string]bool{ "proxy.listen": true, "proxy.project": true, "api.listen": true, + "ingest.listen": true, "client.proxy_target": true, "client.api_target": true, "vector_store.provider": true,