WARNING: This project is under active development.
The high-performance, zero-copy distributed runtime for the AI era.
Pepper is a polyglot orchestrator that bridges the reliability and concurrency of Go with the machine learning ecosystem of Python. It lets you treat Python scripts, local CLI binaries, external LLM APIs (OpenAI, Anthropic), and MCP servers as unified, strongly-typed Capabilities on a fast, zero-copy message bus.
Stop writing FastAPI wrappers around your PyTorch models. Stop base64-encoding 50MB images over HTTP. Use Pepper.
- Zero-Copy Binary Passing. Pass tensors, video frames, and audio buffers between Go and Python using
/dev/shmshared memory, with graceful fallback to HTTP/S3 for multi-node clusters. - First-Class Python DX. Write ML workers with
@capabilityand@resourcedecorators. No boilerplate server code. - Everything is a Capability. A Python script, a Go function, a shell command, and an OpenAI API call all share the same MsgPack routing envelope.
- Declarative DAG Pipelines. Compose multi-step, conditional, and scatter-gather pipelines in Go. The router evaluates logic dynamically to save network hops.
- Production Resilience. Poison Pill detection, Dead Letter Queues, automated worker respawns, backpressure, and end-to-end deadline propagation.
- Native LLM Tooling. Export your entire backend as OpenAI or Anthropic function-calling tools with one line.
# caps/vision.py
from runtime import Input, Output, capability, read_blob
@capability(name="face.embed", groups=["gpu"], max_concurrent=2)
class FaceEmbedder:
def setup(self):
import torch
self.model = torch.load("model.pt")
def run(self, image: Input[dict]) -> Output[dict]:
img_matrix = read_blob(image).as_numpy()
embedding = self.model(img_matrix).tolist()
return {"embedding": embedding}package main
import (
"context"
"log"
"github.com/agberohq/pepper"
)
func main() {
pp, err := pepper.New(
pepper.WithWorkers(pepper.NewWorker("gpu-node").Groups("gpu")),
)
if err != nil {
log.Fatal(err)
}
defer pp.Stop()
if err := pp.Register(pepper.Script("face.embed", "./caps/vision.py")); err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := pp.Start(ctx); err != nil {
log.Fatal(err)
}
blob, err := pp.NewBlobFromFile("/path/to/image.jpg")
if err != nil {
log.Fatal(err)
}
defer blob.Close()
type EmbedOut struct {
Embedding []float32 `json:"embedding"`
}
result, err := pepper.Call[EmbedOut]{
Cap: "face.embed",
Input: pepper.In{"image": blob.Ref()},
}.Bind(pp).Do(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("embedding: %v", result.Embedding)
}Pepper unifies different runtimes behind a single Register call.
Python script — a single .py file with a @capability class:
pp.Register(pepper.Script("speech.transcribe", "./caps/transcribe.py"))Python directory — every .py file under a directory becomes a capability:
pp.Register(pepper.Dir("./caps").Groups("gpu"))Go function — inline typed handler, no subprocess:
type SumIn struct{ Values []float64 `json:"values"` }
type SumOut struct{ Total float64 `json:"total"` }
pp.Register(pepper.Func("math.sum", func(ctx context.Context, in SumIn) (SumOut, error) {
var total float64
for _, v := range in.Values {
total += v
}
return SumOut{Total: total}, nil
}))CLI tool — wrap any binary as a capability:
pp.Register(pepper.CLI("video.probe", "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format"))HTTP adapter — wrap any REST API:
pp.Register(pepper.HTTP("sentiment.analyze", "https://api.example.com/sentiment"))Built-in LLM adapters:
pp.Register(pepper.HTTP("llm.chat", "").With(pepper.OpenAI))
pp.Register(pepper.HTTP("llm.chat", "").With(pepper.AnthropicAdapter))
pp.Register(pepper.HTTP("llm.chat", "http://localhost:11434").With(pepper.Ollama))MCP server — expose all tools from an MCP server as capabilities:
pp.Register(pepper.MCP("tools.browser", "https://mcp.example.com/sse"))
// or auto-name from URL:
pp.AdaptMCP("https://mcp.example.com/sse")Typed single call:
type AnalysisOut struct {
Sentiment string `json:"sentiment"`
Score float64 `json:"score"`
}
result, err := pepper.Call[AnalysisOut]{
Cap: "sentiment.analyze",
Input: pepper.In{"text": "Pepper is fast"},
Timeout: 10 * time.Second,
}.Bind(pp).Do(ctx)Fire-and-forget:
err := pepper.Exec{
Cap: "audit.log",
Input: pepper.In{"event": "login", "user": "alice"},
}.Bind(pp).Do(ctx)Parallel fan-out:
results, err := pepper.All[AnalysisOut](ctx, pp,
pepper.MakeCall("sentiment.analyze", pepper.In{"text": "first"}),
pepper.MakeCall("sentiment.analyze", pepper.In{"text": "second"}),
)Streaming output:
ch, err := pepper.Call[TranscriptChunk]{
Cap: "speech.transcribe",
Input: pepper.In{"audio_path": "/tmp/audio.wav"},
}.Bind(pp).Stream(ctx)
if err != nil {
log.Fatal(err)
}
for chunk := range ch {
fmt.Print(chunk.Text)
}Bidirectional streaming:
type AudioChunk struct{ Data []byte `json:"data"` }
type TranscriptChunk struct{ Text string `json:"text"` }
stream, err := pepper.OpenStream[AudioChunk, TranscriptChunk](
ctx, pp, "speech.transcribe", pepper.In{"language": "en"},
)
if err != nil {
log.Fatal(err)
}
go func() {
for _, chunk := range audioChunks {
stream.Write(AudioChunk{Data: chunk})
}
stream.CloseInput()
}()
for chunk := range stream.Chunks(ctx) {
fmt.Print(chunk.Text)
}Compose multi-step pipelines in Go. The router evaluates routing logic without extra network hops.
pp.Compose("audio.pipeline",
pepper.Pipe("audio.convert").WithGroup("cpu"),
pepper.Pipe("speech.transcribe").WithGroup("asr"),
pepper.PipeTransform(func(in map[string]any) (map[string]any, error) {
transcript, _ := in["text"].(string)
return pepper.In{"prompt": "Summarise: " + transcript}, nil
}),
pepper.Pipe("llm.chat"),
)
result, err := pp.Do(ctx, "audio.pipeline", pepper.In{"audio_path": "/tmp/speech.wav"})Async pipeline with stage events:
proc, err := pepper.Call[SummaryOut]{
Cap: "audio.pipeline",
Input: pepper.In{"audio_path": "/tmp/speech.wav"},
}.Bind(pp).Execute(ctx)
if err != nil {
log.Fatal(err)
}
go func() {
for event := range proc.Events() {
log.Printf("stage %s: %s (%dms)", event.Stage, event.Status, event.DurationMs)
}
}()
result, err := proc.Wait(ctx)Because Pepper knows the schema of every registered capability, you can expose your backend to an LLM agent in one call.
schemas := pp.Capabilities(ctx, pepper.FilterByGroup("tools"))
// OpenAI
openAITools := pepper.Tools(schemas, pepper.FormatOpenAI)
// Anthropic
anthropicTools := pepper.Tools(schemas, pepper.FormatAnthropic)Passing large binary data between Go and Python without serialisation:
// From raw bytes
blob, err := pp.NewBlob(imageBytes)
defer blob.Close()
// From a file
blob, err := pp.NewBlobFromFile("/tmp/frame.jpg")
defer blob.Close()
// Pass the ref in any capability input
result, err := pp.Do(ctx, "face.embed", pepper.In{"image": blob.Ref()})In Python, read_blob(image).as_numpy() memory-maps the exact same shared memory page. On multi-node deployments, Pepper falls back to treating the path as a URI and caches locally on first access.
- Poison Pill detection. If a payload crashes a Python worker more than
WithPoisonPillThreshold(n)times, Pepper blacklists theorigin_id, stops retrying, and writes the payload to the DLQ. - Dead Letter Queue. Persist poison pills to disk for debugging and replay:
pepper.WithDLQ(pepper.FileDLQ("./dlq")). - Deadline propagation. A
context.WithTimeoutin Go propagates across the entire message bus. When the deadline passes, the router drops the pending request and broadcasts a cancel signal to interrupt running workers. - Load-aware routing. Requests route to the worker with the lowest active load by default. Override with
WithStrategy(pepper.StrategyRoundRobin)orWithStrategy(pepper.StrategyLeastLoaded). - Worker recycling. Automatically restart workers after N requests or a maximum uptime:
pepper.NewWorker("w-1").MaxRequests(10000).MaxUptime(24 * time.Hour).
Maintain state across multiple capability calls:
sess := pp.NewSession()
result, err := pepper.Call[ReplyOut]{
Cap: "chat.reply",
Input: pepper.In{"message": "Hello"},
}.Session(sess).Do(ctx)
// Later, with the same session:
sess2 := pp.Session(sess.ID()) // resume from cookie, JWT, etc.| Transport | Default | Notes |
|---|---|---|
mula:// |
Yes | Zero-dependency custom TCP framing |
nats:// |
No | Coming soon — NATS JetStream |
redis:// |
No | Coming soon — Redis Pub/Sub |
Override with pepper.WithTransport(pepper.TransportTCPLoopback) or pepper.WithTransportURL("tcp://0.0.0.0:7731").
MIT License. See LICENSE for more information.