Skip to content

Spooled-Cloud/spooled-sdk-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

19 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Spooled Go SDK

Official Go SDK for Spooled Cloud β€” a modern, scalable job queue and task scheduler.

Live Demo (SpriteForge) β€’ Documentation

Go Reference Go Report Card License

Features

  • Idiomatic Go API with functional options pattern
  • Full API Coverage β€” Access all Spooled API endpoints
  • Type Safety β€” Strongly typed requests and responses
  • Context Support β€” Full context.Context propagation
  • Automatic Retries β€” Exponential backoff with jitter
  • Circuit Breaker β€” Fault tolerance for unreliable networks
  • Worker Runtime β€” Process jobs with concurrent workers
  • Real-time Events β€” WebSocket and SSE support
  • gRPC Support β€” High-performance streaming client
  • Workflow DAGs β€” Complex job dependencies
  • Automatic JWT Refresh β€” Single-flight token refresh

Installation

go get github.com/spooled-cloud/spooled-sdk-go

Requirements: Go 1.25 or later

Quick Start

Create a Job

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/spooled-cloud/spooled-sdk-go/spooled"
    "github.com/spooled-cloud/spooled-sdk-go/spooled/resources"
)

func main() {
    // Create client
    client, err := spooled.NewClient(
        spooled.WithAPIKey("sp_live_your_api_key"),
        // For self-hosted: spooled.WithBaseURL("https://your-server.com"),
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    // Create a job
    priority := 5
    maxRetries := 3
    result, err := client.Jobs().Create(ctx, &resources.CreateJobRequest{
        QueueName: "email-notifications",
        Payload: map[string]any{
            "to":      "user@example.com",
            "subject": "Welcome!",
            "body":    "Thanks for signing up.",
        },
        Priority:   &priority,
        MaxRetries: &maxRetries,
    })
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Created job: %s\n", result.ID)

    // Get job status
    job, err := client.Jobs().Get(ctx, result.ID)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Status: %s\n", job.Status)
}

Process Jobs with a Worker

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/spooled-cloud/spooled-sdk-go/spooled"
    "github.com/spooled-cloud/spooled-sdk-go/spooled/worker"
)

func main() {
    client, err := spooled.NewClient(
        spooled.WithAPIKey("sp_live_your_api_key"),
    )
    if err != nil {
        log.Fatal(err)
    }

    // Create worker
    w := worker.NewWorker(client.Jobs(), client.Workers(), worker.Options{
        QueueName:   "email-notifications",
        Concurrency: 10,
    })

    // Register job handler
    w.Process(func(ctx *worker.JobContext) (map[string]any, error) {
        fmt.Printf("Processing job %s\n", ctx.JobID)
        
        // Access payload
        to := ctx.Payload["to"].(string)
        subject := ctx.Payload["subject"].(string)
        
        // Do your work here
        fmt.Printf("Sending email to %s: %s\n", to, subject)
        
        // Report progress
        ctx.Progress(50, "Email sent")
        
        return map[string]any{"sent": true}, nil
    })

    // Handle events
    w.OnEvent(func(event worker.Event) {
        fmt.Printf("Event: %s\n", event.Type)
    })

    // Start worker
    ctx, cancel := context.WithCancel(context.Background())
    if err := w.Start(ctx); err != nil {
        log.Fatal(err)
    }

    // Graceful shutdown
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    cancel()
    w.Stop()
}

Real-world examples (beginner friendly)

If you want 5 copy/paste β€œreal life” setups (Stripe β†’ jobs, GitHub Actions β†’ jobs, cron schedules, CSV import, website signup), see:

  • https://github.com/spooled-cloud/spooled-backend/blob/main/docs/guides/real-world-examples.md

Documentation

Guide Description
Getting Started Installation, setup, and first job
Configuration All configuration options
Workers Job processing with Worker
Workflows Building DAGs with job dependencies
gRPC High-performance streaming
Real-time Events WebSocket and SSE
Error Handling Typed errors and retries

Examples

See the examples/ directory for runnable code:

Example Description
quick-start/ Basic SDK usage
worker/ Processing jobs with Worker
workflow/ Complex workflows with dependencies
grpc/ High-performance gRPC streaming
realtime/ Real-time event streaming
schedules/ Cron schedules
webhooks/ Webhook configuration
api-keys/ API key management
bulk-operations/ Bulk job operations
error-handling/ Error handling patterns

Run any example with:

API_KEY=sp_live_... go run examples/quick-start/main.go

Configuration

Client Options

client, err := spooled.NewClient(
    // Authentication (required - one of these)
    spooled.WithAPIKey("sp_live_..."),
    spooled.WithAccessToken("eyJ..."),
    
    // Endpoints (optional)
    spooled.WithBaseURL("https://api.spooled.cloud"),
    spooled.WithWSURL("wss://api.spooled.cloud/api/v1/ws"),
    spooled.WithGRPCAddress("grpc.spooled.cloud:443"),
    
    // Timeouts (optional)
    spooled.WithTimeout(30 * time.Second),
    
    // Retries (optional)
    spooled.WithRetryConfig(spooled.RetryConfig{
        MaxRetries: 3,
        BaseDelay:  100 * time.Millisecond,
        MaxDelay:   5 * time.Second,
    }),
    
    // Circuit breaker (optional)
    spooled.WithCircuitBreaker(spooled.CircuitBreakerConfig{
        Enabled:          true,
        FailureThreshold: 5,
        Timeout:          30 * time.Second,
    }),
    
    // Debug logging (optional)
    spooled.WithDebug(true),
)

Environment Variables

SPOOLED_API_KEY=sp_live_...
SPOOLED_BASE_URL=https://api.spooled.cloud
SPOOLED_GRPC_ADDRESS=grpc.spooled.cloud:443

Core Concepts

Jobs

Jobs are units of work with payloads, priorities, and retry policies:

priority := 5
maxRetries := 3
timeout := 300
scheduledAt := time.Now().Add(time.Hour)

result, err := client.Jobs().Create(ctx, &resources.CreateJobRequest{
    QueueName:      "my-queue",
    Payload:        map[string]any{"data": "value"},
    Priority:       &priority,        // -100 to 100
    MaxRetries:     &maxRetries,
    TimeoutSeconds: &timeout,
    ScheduledAt:    &scheduledAt,
    IdempotencyKey: ptr("unique-key"),
})

// List jobs
jobs, err := client.Jobs().List(ctx, &resources.ListJobsParams{
    QueueName: ptr("my-queue"),
    Status:    ptr(resources.JobStatusPending),
    Tag:       ptr("billing"), // Optional: filter by a single tag
    Limit:     ptr(10),
})

// Cancel a job
err = client.Jobs().Cancel(ctx, jobID)

Workers

Process jobs with the built-in worker runtime:

w := worker.NewWorker(client.Jobs(), client.Workers(), worker.Options{
    QueueName:         "my-queue",
    Concurrency:       10,              // Max concurrent jobs
    PollInterval:      time.Second,     // Polling frequency
    LeaseDuration:     30,              // Lease duration in seconds
    HeartbeatFraction: 0.5,             // Heartbeat at 50% of lease
    ShutdownTimeout:   30 * time.Second,
})

w.Process(func(ctx *worker.JobContext) (map[string]any, error) {
    // ctx.Context - Go context with cancellation
    // ctx.JobID - Unique job ID
    // ctx.QueueName - Queue name
    // ctx.Payload - Job payload
    // ctx.RetryCount - Current retry attempt
    // ctx.MaxRetries - Maximum retries
    // ctx.Progress(percent, message) - Report progress
    // ctx.Log(level, message, meta) - Log messages
    
    return map[string]any{"result": "success"}, nil
})

// Event handlers
w.OnEvent(func(event worker.Event) {
    switch event.Type {
    case worker.EventJobCompleted:
        data := event.Data.(worker.JobCompletedData)
        fmt.Printf("Job %s completed in %v\n", data.JobID, data.Duration)
    case worker.EventJobFailed:
        data := event.Data.(worker.JobFailedData)
        fmt.Printf("Job %s failed: %v\n", data.JobID, data.Error)
    }
})

ctx := context.Background()
w.Start(ctx)

// Graceful shutdown
w.Stop()

Workflows (DAGs)

Orchestrate multiple jobs with dependencies:

workflow, err := client.Workflows().Create(ctx, &resources.CreateWorkflowRequest{
    Name: "ETL Pipeline",
    Jobs: []resources.WorkflowJob{
        {
            Key:       "extract",
            QueueName: "etl",
            Payload:   map[string]any{"step": "extract"},
        },
        {
            Key:       "transform",
            QueueName: "etl",
            Payload:   map[string]any{"step": "transform"},
            DependsOn: []string{"extract"},
        },
        {
            Key:       "load",
            QueueName: "etl",
            Payload:   map[string]any{"step": "load"},
            DependsOn: []string{"transform"},
        },
    },
})

// Get workflow status
wf, err := client.Workflows().Get(ctx, workflow.WorkflowID)
fmt.Printf("Status: %s\n", wf.Status)

Schedules

Run jobs on a cron schedule:

schedule, err := client.Schedules().Create(ctx, &resources.CreateScheduleRequest{
    Name:           "Daily Report",
    CronExpression: "0 9 * * *",       // 9 AM daily (5-field cron)
    Timezone:       ptr("America/New_York"),
    QueueName:      "reports",
    PayloadTemplate: map[string]any{"type": "daily"},
})

fmt.Printf("Next run: %v\n", schedule.NextRunAt)

// Pause/resume
client.Schedules().Pause(ctx, schedule.ID)
client.Schedules().Resume(ctx, schedule.ID)

// Trigger immediately
client.Schedules().Trigger(ctx, schedule.ID)

Queues

Manage queues and view statistics:

// List queues
queues, err := client.Queues().List(ctx)

// Get queue stats
stats, err := client.Queues().GetStats(ctx, "my-queue")
fmt.Printf("Pending: %d, Processing: %d\n", stats.Pending, stats.Processing)

// Pause/resume queue
client.Queues().Pause(ctx, "my-queue", nil)
client.Queues().Resume(ctx, "my-queue")

// Purge all pending jobs
client.Queues().Purge(ctx, "my-queue")

Real-time Events

Subscribe to real-time job events via WebSocket or SSE:

// WebSocket client
ws := realtime.NewWebSocketClient(realtime.ConnectionOptions{
    WSURL:  "wss://api.spooled.cloud/api/v1/ws",
    Token:  "your-jwt-token",
    APIKey: "sp_live_...",
})

ws.OnJobEvent(realtime.EventJobCompleted, func(event *realtime.JobEvent) {
    fmt.Printf("Job completed: %s\n", event.JobID)
})

ws.OnStateChange(func(state realtime.ConnectionState) {
    fmt.Printf("Connection state: %s\n", state)
})

ws.Connect()
ws.Subscribe(realtime.SubscriptionFilter{QueueName: "my-queue"})

// SSE client (unidirectional)
sse := realtime.NewSSEClient(realtime.ConnectionOptions{
    BaseURL: "https://api.spooled.cloud",
    APIKey:  "sp_live_...",
})

sse.OnEvent(func(event *realtime.Event) {
    fmt.Printf("Event: %s\n", event.Type)
})

sse.ConnectWithFilter(&realtime.SubscriptionFilter{
    QueueName: "my-queue",
})

gRPC (High Performance)

Use gRPC for high-throughput scenarios:

import "github.com/spooled-cloud/spooled-sdk-go/spooled/grpc"

client, err := grpc.NewClient(grpc.ClientOptions{
    Address: "grpc.spooled.cloud:443",
    APIKey:  "sp_live_...",
})
defer client.Close()

// Enqueue a job
resp, err := client.Enqueue(ctx, &grpc.EnqueueRequest{
    QueueName: "high-throughput",
    Payload:   map[string]any{"data": "value"},
    Priority:  5,
})
fmt.Printf("Job ID: %s\n", resp.JobID)

// Dequeue jobs
jobs, err := client.Dequeue(ctx, &grpc.DequeueRequest{
    QueueName: "high-throughput",
    WorkerID:  "worker-1",
    BatchSize: 10,
})

// Stream jobs (server-side streaming)
stream, err := client.StreamJobs(ctx, "high-throughput", "worker-1")
for {
    job, err := stream.Recv()
    if err == io.EOF {
        break
    }
    // Process job...
}

Webhooks

Configure outgoing webhooks for job events:

// Create webhook
webhook, err := client.Webhooks().Create(ctx, &resources.CreateWebhookRequest{
    URL:       "https://your-app.com/webhooks/spooled",
    Events:    []string{"job.completed", "job.failed"},
    QueueName: ptr("my-queue"),
    Secret:    ptr("whsec_..."),
})

// Test webhook
client.Webhooks().Test(ctx, webhook.ID)

// List deliveries
deliveries, err := client.Webhooks().ListDeliveries(ctx, webhook.ID, nil)

Organizations

Manage your organization and track usage:

// List organizations
orgs, err := client.Organizations().List(ctx)

// Get organization details
org, err := client.Organizations().Get(ctx, orgID)

// Get usage information
usage, err := client.Organizations().Usage(ctx, orgID)
fmt.Printf("Jobs today: %d/%d\n", usage.Usage.JobsToday.Current, *usage.Limits.MaxJobsPerDay)

API Keys

Manage API keys:

// List API keys
keys, err := client.APIKeys().List(ctx)

// Create new API key
key, err := client.APIKeys().Create(ctx, &resources.CreateAPIKeyRequest{
    Name: "Production Key",
})
fmt.Printf("Key: %s (save this, it won't be shown again)\n", key.Key)

// Revoke API key
client.APIKeys().Revoke(ctx, keyID)

Error Handling

The SDK provides typed errors for different failure scenarios:

import "github.com/spooled-cloud/spooled-sdk-go/spooled"

job, err := client.Jobs().Get(ctx, "invalid-id")
if err != nil {
    switch e := err.(type) {
    case *spooled.NotFoundError:
        fmt.Printf("Job not found: %s\n", e.Message)
    case *spooled.AuthenticationError:
        fmt.Printf("Invalid API key: %s\n", e.Message)
    case *spooled.RateLimitError:
        fmt.Printf("Rate limited. Retry after: %v\n", e.Reset)
    case *spooled.ValidationError:
        fmt.Printf("Invalid request: %v\n", e.Errors)
    default:
        fmt.Printf("Error: %v\n", err)
    }
}

// Check error types
if spooled.IsRetryableError(err) {
    // Safe to retry
}
if spooled.IsNetworkError(err) {
    // Network issue
}
if spooled.IsTimeoutError(err) {
    // Request timed out
}

Development

Using Make

# Build, lint, and test
make

# Run specific targets
make build          # Build the SDK
make test           # Run unit tests
make test-coverage  # Tests with coverage report
make lint           # Run linter
make fmt            # Format code
make examples       # Build all examples

# Integration tests (requires API_KEY)
API_KEY=sp_live_... make integration-test

# See all commands
make help

Testing

# Unit tests
go test ./...

# With race detection
go test -race ./...

# With coverage
go test -cover ./...

# Integration tests (requires API key)
API_KEY=sp_live_... go run scripts/test-local/main.go

# Full production test suite (all features)
API_KEY=sp_live_... \
ADMIN_KEY=your_admin_key \
BASE_URL=https://api.spooled.cloud \
GRPC_ADDRESS=grpc.spooled.cloud:443 \
SKIP_GRPC=0 \
SKIP_STRESS=0 \
go run scripts/test-local/main.go

Contributing

Contributions are welcome! Please see CONTRIBUTING.md.

License

Apache 2.0 - see LICENSE for details.

Support

About

Official Go SDK for Spooled. Idiomatic Go with interfaces, generics, context propagation, connection pooling, benchmarks included.

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages