Skip to content

jerry7991/nebula_queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NebulQueue 🌌

A Redis-backed background job processor for Ruby — built from scratch to understand what Sidekiq does under the hood.

Status: 🚧 Active development — core Worker/Dispatcher/Registry architecture is working. Expanding features iteratively.


Why?

Sidekiq is great. But using a library without understanding its internals is a ceiling.

NebulQueue is a ground-up implementation of Redis-backed job processing to deeply understand:

  • How jobs are serialised, enqueued, and popped atomically
  • How worker registration and dispatch work without const_get hacks
  • How connection pooling prevents Redis connection exhaustion under concurrency
  • How retry logic and dead-letter queues should actually be designed

Architecture

┌─────────────────────────────────────────────────────────┐
│  Client App                                             │
│                                                         │
│  NebulaQueue::Dispatcher.enqueue(MyWorker, args)        │
│          │                                              │
│          ▼                                              │
│  ┌───────────────┐    LPUSH    ┌──────────────────┐    │
│  │  Dispatcher   │ ──────────► │  Redis List      │    │
│  └───────────────┘             │  nebula_queue:   │    │
│                                │  queue:<name>    │    │
│  Worker Process                └──────┬───────────┘    │
│                                       │ BRPOP           │
│  ┌───────────────┐    lookup   ┌──────▼───────────┐    │
│  │HandlerRegistry│ ◄────────── │  Worker Process  │    │
│  └───────┬───────┘             └──────────────────┘    │
│          │                                              │
│          ▼                                              │
│  ┌───────────────┐                                      │
│  │  Worker#perform│                                     │
│  └───────────────┘                                      │
└─────────────────────────────────────────────────────────┘

Key design decisions:

  • Jobs pushed via LPUSH, popped via BRPOP — FIFO, blocking pop avoids polling
  • Workers self-register with HandlerRegistry on include — no magic string dispatch
  • Redis connection pool via connection_pool gem — safe under concurrent workers
  • Config via YAML + ERB — 12-factor friendly, reads from ENV variables

Quick Start

1. Configure

# config/nebula_queue.rb
NebulaQueue.configure do |config|
  config.redis_url    = ENV.fetch("REDIS_URL", "redis://localhost:6379/0")
  config.pool_size    = 5
  config.max_retries  = 3
  config.default_queue = "default"
  config.worker_paths = ["app/workers"]
end

Or drop a nebula_queue.yml in your app root:

redis_url: <%= ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") %>
pool_size: 5
pool_timeout: 5
default_queue: default
max_retries: 3
worker_paths:
  - app/workers

2. Define a Worker

class SendEmailWorker
  include NebulaQueue::Worker

  queue_as :mailer          # optional — defaults to config.default_queue
  job_name "send_email"     # optional — defaults to class name

  def perform(user_id:, template:)
    user = User.find(user_id)
    Mailer.send(user, template)
  end
end

3. Enqueue a Job

# Enqueue with keyword args — serialised to JSON in Redis
NebulaQueue::Dispatcher.enqueue(SendEmailWorker, user_id: 42, template: "welcome")

# Enqueue onto a specific queue (overrides worker's declared queue)
NebulaQueue::Dispatcher.enqueue(SendEmailWorker, { user_id: 42 }, queue: "critical")

4. Run the Worker Process

bundle exec nebula_queue_worker

How the Registry Works

Workers register themselves automatically when the module is included:

# This happens at class load time — no manual registration needed
class MyWorker
  include NebulaQueue::Worker   # ← triggers HandlerRegistry.register("MyWorker", self)
  # ...
end

# Dispatcher uses the registry at execution time — no unsafe const_get
NebulaQueue::HandlerRegistry.lookup!("MyWorker")  # => MyWorker

This means:

  • No dynamic const_get with un-validated strings
  • Duplicate registration raises ArgumentError immediately
  • Registry is clearable in test suites via HandlerRegistry.clear!

Current Status

Feature Status
Worker DSL (queue_as, job_name) ✅ Done
Dispatcher (enqueue, queue_key) ✅ Done
HandlerRegistry (self-registration) ✅ Done
Redis connection pool ✅ Done
YAML + ERB config loader ✅ Done
Worker process (BRPOP loop) 🚧 In progress
Retry logic with backoff 🚧 In progress
Dead-letter queue 📋 Planned
Scheduled jobs (perform_in) 📋 Planned
Web UI (queue dashboard) 📋 Planned
RSpec helpers for testing workers 📋 Planned

Running Tests

bundle install
bundle exec rspec

Design Notes

Why LPUSH + BRPOP?

LPUSH + BRPOP gives FIFO ordering with a blocking pop — the worker process sleeps at the OS level when the queue is empty instead of spinning. No polling overhead, no wasted Redis round-trips.

Why a HandlerRegistry instead of const_get?

const_get(job_class_name) from a string in a Redis payload is an RCE vector if the payload is ever tampered with. The HandlerRegistry is an explicit allowlist — only classes that explicitly include NebulaQueue::Worker are ever dispatched to.

Why connection_pool?

Each thread in the worker process needs its own Redis connection. The connection_pool gem manages a fixed-size pool and blocks (with timeout) if all connections are in use — prevents connection storms under load.


Contributing

This is a learning project — issues and PRs welcome, especially around the worker process loop and retry design.


License

MIT

About

A Redis-backed background job processor for Ruby — built from scratch to understand what Sidekiq does under the hood.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages