Official Go SDK for Spooled Cloud β a modern, scalable job queue and task scheduler.
Live Demo (SpriteForge) β’ Documentation
- Idiomatic Go API with functional options pattern
- Full API Coverage β Access all Spooled API endpoints
- Type Safety β Strongly typed requests and responses
- Context Support β Full
context.Contextpropagation - Automatic Retries β Exponential backoff with jitter
- Circuit Breaker β Fault tolerance for unreliable networks
- Worker Runtime β Process jobs with concurrent workers
- Real-time Events β WebSocket and SSE support
- gRPC Support β High-performance streaming client
- Workflow DAGs β Complex job dependencies
- Automatic JWT Refresh β Single-flight token refresh
go get github.com/spooled-cloud/spooled-sdk-goRequirements: Go 1.25 or later
package main
import (
"context"
"fmt"
"log"
"github.com/spooled-cloud/spooled-sdk-go/spooled"
"github.com/spooled-cloud/spooled-sdk-go/spooled/resources"
)
func main() {
// Create client
client, err := spooled.NewClient(
spooled.WithAPIKey("sp_live_your_api_key"),
// For self-hosted: spooled.WithBaseURL("https://your-server.com"),
)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// Create a job
priority := 5
maxRetries := 3
result, err := client.Jobs().Create(ctx, &resources.CreateJobRequest{
QueueName: "email-notifications",
Payload: map[string]any{
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
},
Priority: &priority,
MaxRetries: &maxRetries,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Created job: %s\n", result.ID)
// Get job status
job, err := client.Jobs().Get(ctx, result.ID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Status: %s\n", job.Status)
}package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/spooled-cloud/spooled-sdk-go/spooled"
"github.com/spooled-cloud/spooled-sdk-go/spooled/worker"
)
func main() {
client, err := spooled.NewClient(
spooled.WithAPIKey("sp_live_your_api_key"),
)
if err != nil {
log.Fatal(err)
}
// Create worker
w := worker.NewWorker(client.Jobs(), client.Workers(), worker.Options{
QueueName: "email-notifications",
Concurrency: 10,
})
// Register job handler
w.Process(func(ctx *worker.JobContext) (map[string]any, error) {
fmt.Printf("Processing job %s\n", ctx.JobID)
// Access payload
to := ctx.Payload["to"].(string)
subject := ctx.Payload["subject"].(string)
// Do your work here
fmt.Printf("Sending email to %s: %s\n", to, subject)
// Report progress
ctx.Progress(50, "Email sent")
return map[string]any{"sent": true}, nil
})
// Handle events
w.OnEvent(func(event worker.Event) {
fmt.Printf("Event: %s\n", event.Type)
})
// Start worker
ctx, cancel := context.WithCancel(context.Background())
if err := w.Start(ctx); err != nil {
log.Fatal(err)
}
// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
cancel()
w.Stop()
}If you want 5 copy/paste βreal lifeβ setups (Stripe β jobs, GitHub Actions β jobs, cron schedules, CSV import, website signup), see:
https://github.com/spooled-cloud/spooled-backend/blob/main/docs/guides/real-world-examples.md
| Guide | Description |
|---|---|
| Getting Started | Installation, setup, and first job |
| Configuration | All configuration options |
| Workers | Job processing with Worker |
| Workflows | Building DAGs with job dependencies |
| gRPC | High-performance streaming |
| Real-time Events | WebSocket and SSE |
| Error Handling | Typed errors and retries |
See the examples/ directory for runnable code:
| Example | Description |
|---|---|
quick-start/ |
Basic SDK usage |
worker/ |
Processing jobs with Worker |
workflow/ |
Complex workflows with dependencies |
grpc/ |
High-performance gRPC streaming |
realtime/ |
Real-time event streaming |
schedules/ |
Cron schedules |
webhooks/ |
Webhook configuration |
api-keys/ |
API key management |
bulk-operations/ |
Bulk job operations |
error-handling/ |
Error handling patterns |
Run any example with:
API_KEY=sp_live_... go run examples/quick-start/main.goclient, err := spooled.NewClient(
// Authentication (required - one of these)
spooled.WithAPIKey("sp_live_..."),
spooled.WithAccessToken("eyJ..."),
// Endpoints (optional)
spooled.WithBaseURL("https://api.spooled.cloud"),
spooled.WithWSURL("wss://api.spooled.cloud/api/v1/ws"),
spooled.WithGRPCAddress("grpc.spooled.cloud:443"),
// Timeouts (optional)
spooled.WithTimeout(30 * time.Second),
// Retries (optional)
spooled.WithRetryConfig(spooled.RetryConfig{
MaxRetries: 3,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 5 * time.Second,
}),
// Circuit breaker (optional)
spooled.WithCircuitBreaker(spooled.CircuitBreakerConfig{
Enabled: true,
FailureThreshold: 5,
Timeout: 30 * time.Second,
}),
// Debug logging (optional)
spooled.WithDebug(true),
)SPOOLED_API_KEY=sp_live_...
SPOOLED_BASE_URL=https://api.spooled.cloud
SPOOLED_GRPC_ADDRESS=grpc.spooled.cloud:443Jobs are units of work with payloads, priorities, and retry policies:
priority := 5
maxRetries := 3
timeout := 300
scheduledAt := time.Now().Add(time.Hour)
result, err := client.Jobs().Create(ctx, &resources.CreateJobRequest{
QueueName: "my-queue",
Payload: map[string]any{"data": "value"},
Priority: &priority, // -100 to 100
MaxRetries: &maxRetries,
TimeoutSeconds: &timeout,
ScheduledAt: &scheduledAt,
IdempotencyKey: ptr("unique-key"),
})
// List jobs
jobs, err := client.Jobs().List(ctx, &resources.ListJobsParams{
QueueName: ptr("my-queue"),
Status: ptr(resources.JobStatusPending),
Tag: ptr("billing"), // Optional: filter by a single tag
Limit: ptr(10),
})
// Cancel a job
err = client.Jobs().Cancel(ctx, jobID)Process jobs with the built-in worker runtime:
w := worker.NewWorker(client.Jobs(), client.Workers(), worker.Options{
QueueName: "my-queue",
Concurrency: 10, // Max concurrent jobs
PollInterval: time.Second, // Polling frequency
LeaseDuration: 30, // Lease duration in seconds
HeartbeatFraction: 0.5, // Heartbeat at 50% of lease
ShutdownTimeout: 30 * time.Second,
})
w.Process(func(ctx *worker.JobContext) (map[string]any, error) {
// ctx.Context - Go context with cancellation
// ctx.JobID - Unique job ID
// ctx.QueueName - Queue name
// ctx.Payload - Job payload
// ctx.RetryCount - Current retry attempt
// ctx.MaxRetries - Maximum retries
// ctx.Progress(percent, message) - Report progress
// ctx.Log(level, message, meta) - Log messages
return map[string]any{"result": "success"}, nil
})
// Event handlers
w.OnEvent(func(event worker.Event) {
switch event.Type {
case worker.EventJobCompleted:
data := event.Data.(worker.JobCompletedData)
fmt.Printf("Job %s completed in %v\n", data.JobID, data.Duration)
case worker.EventJobFailed:
data := event.Data.(worker.JobFailedData)
fmt.Printf("Job %s failed: %v\n", data.JobID, data.Error)
}
})
ctx := context.Background()
w.Start(ctx)
// Graceful shutdown
w.Stop()Orchestrate multiple jobs with dependencies:
workflow, err := client.Workflows().Create(ctx, &resources.CreateWorkflowRequest{
Name: "ETL Pipeline",
Jobs: []resources.WorkflowJob{
{
Key: "extract",
QueueName: "etl",
Payload: map[string]any{"step": "extract"},
},
{
Key: "transform",
QueueName: "etl",
Payload: map[string]any{"step": "transform"},
DependsOn: []string{"extract"},
},
{
Key: "load",
QueueName: "etl",
Payload: map[string]any{"step": "load"},
DependsOn: []string{"transform"},
},
},
})
// Get workflow status
wf, err := client.Workflows().Get(ctx, workflow.WorkflowID)
fmt.Printf("Status: %s\n", wf.Status)Run jobs on a cron schedule:
schedule, err := client.Schedules().Create(ctx, &resources.CreateScheduleRequest{
Name: "Daily Report",
CronExpression: "0 9 * * *", // 9 AM daily (5-field cron)
Timezone: ptr("America/New_York"),
QueueName: "reports",
PayloadTemplate: map[string]any{"type": "daily"},
})
fmt.Printf("Next run: %v\n", schedule.NextRunAt)
// Pause/resume
client.Schedules().Pause(ctx, schedule.ID)
client.Schedules().Resume(ctx, schedule.ID)
// Trigger immediately
client.Schedules().Trigger(ctx, schedule.ID)Manage queues and view statistics:
// List queues
queues, err := client.Queues().List(ctx)
// Get queue stats
stats, err := client.Queues().GetStats(ctx, "my-queue")
fmt.Printf("Pending: %d, Processing: %d\n", stats.Pending, stats.Processing)
// Pause/resume queue
client.Queues().Pause(ctx, "my-queue", nil)
client.Queues().Resume(ctx, "my-queue")
// Purge all pending jobs
client.Queues().Purge(ctx, "my-queue")Subscribe to real-time job events via WebSocket or SSE:
// WebSocket client
ws := realtime.NewWebSocketClient(realtime.ConnectionOptions{
WSURL: "wss://api.spooled.cloud/api/v1/ws",
Token: "your-jwt-token",
APIKey: "sp_live_...",
})
ws.OnJobEvent(realtime.EventJobCompleted, func(event *realtime.JobEvent) {
fmt.Printf("Job completed: %s\n", event.JobID)
})
ws.OnStateChange(func(state realtime.ConnectionState) {
fmt.Printf("Connection state: %s\n", state)
})
ws.Connect()
ws.Subscribe(realtime.SubscriptionFilter{QueueName: "my-queue"})
// SSE client (unidirectional)
sse := realtime.NewSSEClient(realtime.ConnectionOptions{
BaseURL: "https://api.spooled.cloud",
APIKey: "sp_live_...",
})
sse.OnEvent(func(event *realtime.Event) {
fmt.Printf("Event: %s\n", event.Type)
})
sse.ConnectWithFilter(&realtime.SubscriptionFilter{
QueueName: "my-queue",
})Use gRPC for high-throughput scenarios:
import "github.com/spooled-cloud/spooled-sdk-go/spooled/grpc"
client, err := grpc.NewClient(grpc.ClientOptions{
Address: "grpc.spooled.cloud:443",
APIKey: "sp_live_...",
})
defer client.Close()
// Enqueue a job
resp, err := client.Enqueue(ctx, &grpc.EnqueueRequest{
QueueName: "high-throughput",
Payload: map[string]any{"data": "value"},
Priority: 5,
})
fmt.Printf("Job ID: %s\n", resp.JobID)
// Dequeue jobs
jobs, err := client.Dequeue(ctx, &grpc.DequeueRequest{
QueueName: "high-throughput",
WorkerID: "worker-1",
BatchSize: 10,
})
// Stream jobs (server-side streaming)
stream, err := client.StreamJobs(ctx, "high-throughput", "worker-1")
for {
job, err := stream.Recv()
if err == io.EOF {
break
}
// Process job...
}Configure outgoing webhooks for job events:
// Create webhook
webhook, err := client.Webhooks().Create(ctx, &resources.CreateWebhookRequest{
URL: "https://your-app.com/webhooks/spooled",
Events: []string{"job.completed", "job.failed"},
QueueName: ptr("my-queue"),
Secret: ptr("whsec_..."),
})
// Test webhook
client.Webhooks().Test(ctx, webhook.ID)
// List deliveries
deliveries, err := client.Webhooks().ListDeliveries(ctx, webhook.ID, nil)Manage your organization and track usage:
// List organizations
orgs, err := client.Organizations().List(ctx)
// Get organization details
org, err := client.Organizations().Get(ctx, orgID)
// Get usage information
usage, err := client.Organizations().Usage(ctx, orgID)
fmt.Printf("Jobs today: %d/%d\n", usage.Usage.JobsToday.Current, *usage.Limits.MaxJobsPerDay)Manage API keys:
// List API keys
keys, err := client.APIKeys().List(ctx)
// Create new API key
key, err := client.APIKeys().Create(ctx, &resources.CreateAPIKeyRequest{
Name: "Production Key",
})
fmt.Printf("Key: %s (save this, it won't be shown again)\n", key.Key)
// Revoke API key
client.APIKeys().Revoke(ctx, keyID)The SDK provides typed errors for different failure scenarios:
import "github.com/spooled-cloud/spooled-sdk-go/spooled"
job, err := client.Jobs().Get(ctx, "invalid-id")
if err != nil {
switch e := err.(type) {
case *spooled.NotFoundError:
fmt.Printf("Job not found: %s\n", e.Message)
case *spooled.AuthenticationError:
fmt.Printf("Invalid API key: %s\n", e.Message)
case *spooled.RateLimitError:
fmt.Printf("Rate limited. Retry after: %v\n", e.Reset)
case *spooled.ValidationError:
fmt.Printf("Invalid request: %v\n", e.Errors)
default:
fmt.Printf("Error: %v\n", err)
}
}
// Check error types
if spooled.IsRetryableError(err) {
// Safe to retry
}
if spooled.IsNetworkError(err) {
// Network issue
}
if spooled.IsTimeoutError(err) {
// Request timed out
}# Build, lint, and test
make
# Run specific targets
make build # Build the SDK
make test # Run unit tests
make test-coverage # Tests with coverage report
make lint # Run linter
make fmt # Format code
make examples # Build all examples
# Integration tests (requires API_KEY)
API_KEY=sp_live_... make integration-test
# See all commands
make help# Unit tests
go test ./...
# With race detection
go test -race ./...
# With coverage
go test -cover ./...
# Integration tests (requires API key)
API_KEY=sp_live_... go run scripts/test-local/main.go
# Full production test suite (all features)
API_KEY=sp_live_... \
ADMIN_KEY=your_admin_key \
BASE_URL=https://api.spooled.cloud \
GRPC_ADDRESS=grpc.spooled.cloud:443 \
SKIP_GRPC=0 \
SKIP_STRESS=0 \
go run scripts/test-local/main.goContributions are welcome! Please see CONTRIBUTING.md.
Apache 2.0 - see LICENSE for details.
- π Documentation
- π Issue Tracker
- π§ Email Support