A high-performance, in-memory publish/subscribe message broker over plain TCP, with an extremely simple JSON-based protocol.
Inspired by Redis Pub/Sub and NATS core, but focused on:
- In-memory only (no persistence)
- Single node, single binary
- Thousands of concurrent TCP connections
- Minimal NDJSON protocol
- Optional authentication and authorization
- NDJSON protocol: one JSON object per line, UTF-8.
- Operations:
auth,sub,unsub,pub,ping. - In-memory topics: no persistence, low-latency broadcast to subscribers.
- Optional security:
- Username + password authentication.
- Strong salted, iterative password hashing (HMAC-SHA256).
- Constant-time password comparison.
- Per-user permissions:
publish:<topic>,subscribe:<topic>,admin.
- Config-driven:
- Bind address and port.
- Security on/off.
- Initial admin and user accounts.
- Cross-platform:
- Windows (x64)
- Linux (x64)
- macOS (x64 / ARM64)
- Server: a TCP listener (
net.Listen) that accepts clients and spawns a goroutine per connection. - Protocol: clients send newline-delimited JSON (
NDJSON); server responds similarly. - Topic registry: an in-memory map
topic -> set of subscribers. - Security manager: user registry, password hashing, permission checks.
cmd/broker/main.go- Entry point, loads configuration, initializes security, starts server.
internal/config- Loads and validates JSON config.
internal/security- Password hashing and checking.
- User registry with in-memory permissions.
internal/protocol- Types and helpers for client requests and server responses.
internal/server- Core TCP server and connection handling.
- Topic subscriptions and message routing.
- Go 1.20+ (standard library only)
- Operating systems:
- Windows (x64)
- Linux (x64)
- macOS (x64 / ARM64)
No OS-specific syscalls are used. The server relies solely on net and
other portable APIs.
Security is configurable and off by default.
- All clients are treated as anonymous.
- No authentication is required.
- All operations are allowed:
- Any client can publish to any topic.
- Any client can subscribe to any topic.
- Permission checks effectively become no-ops.
- Each client must authenticate before performing any operation:
authmust be called first withuserandpass.
- Passwords:
- Never stored in plaintext in memory.
- Hashed using random salts and iterative HMAC-SHA256.
- Verified using constant-time comparison.
- Users have explicit permissions such as:
publish:chat.room1subscribe:chat.room1publish:chat.*(wildcard suffix)subscribe:chat.*(wildcard suffix)admin
- If a client tries to perform an operation without permission:
- The operation is rejected.
- The server returns
{"ok":false,"error":"permission_denied"}.
- Unlimited number of users supported.
- One built-in administrator user defined in configuration.
- Only the administrator (or any user with
adminpermission) can:- Create users.
- Delete users.
- Change user passwords.
- Grant or revoke permissions.
Note: For simplicity, administrative operations (create/delete users etc.) are currently implemented via the in-memory security manager API. Exposing these safely over the public network protocol can be added on top (e.g.
{"op":"admin.create_user", ...}), but must be done carefully to preserve strong security guarantees.
Configuration is provided as a JSON file.
{
"network": {
"bind": "0.0.0.0",
"port": 8080
},
"security": {
"enabled": false,
"admin_user": {
"username": "",
"password": "",
"permissions": []
},
"users": []
}
}{
"network": {
"bind": "0.0.0.0",
"port": 8080
},
"security": {
"enabled": true,
"admin_user": {
"username": "admin",
"password": "strong-admin-password",
"permissions": ["admin"]
},
"users": [
{
"username": "publisher1",
"password": "secret1",
"permissions": ["publish:chat.room1"]
},
{
"username": "subscriber1",
"password": "secret2",
"permissions": ["subscribe:chat.room1"]
},
{
"username": "chatuser",
"password": "secret3",
"permissions": [
"publish:chat.*",
"subscribe:chat.*"
]
}
]
}
}network.bind– IP address to bind to, e.g."0.0.0.0"or"127.0.0.1".network.port– TCP port number, e.g.8080.security.enabled– boolean:false– no authentication or authorization.true– fully enforced authentication and permissions.
security.admin_user:username– admin username (must be non-empty if security enabled).password– admin password in plaintext (hashed in memory).permissions– typically includes"admin".
security.users– array of standard users:username– user name.password– user password in plaintext (hashed in memory).permissions– array of permission strings.
Important: The config file may contain plaintext passwords. Protect this file using OS-level permissions and environment constraints.
All messages are UTF-8 JSON objects, one per line, delimited by \n.
No pretty-printing; no extra whitespace required.
{"op":"auth","user":"username","pass":"password"}- Must be sent before any
sub,unsub, orpuboperations. - When security is disabled:
- This operation always succeeds and has no effect.
{"op":"sub","topic":"chat.room1"}- Subscribes the current connection to a topic.
- The server will deliver messages published to this topic to this connection.
{"op":"unsub","topic":"chat.room1"}- Cancels a subscription for this topic for the current connection.
{"op":"pub","topic":"chat.room1","data":"hello"}- Publishes a message to a topic.
- The server broadcasts
{ "topic": "...", "data": "..." }to all authorized subscribers of that topic.
{"op":"ping"}- Health check / keep-alive.
- The server responds with
{"ok":true}.
Generic success:
{"ok":true}Error with reason:
{"ok":false,"error":"reason"}Common error reasons:
invalid_jsonunknown_opmissing_credentialsauth_failedmissing_topicpermission_denied
When a message is published to topic "chat.room1":
{"topic":"chat.room1","data":"hello"}This object is sent to every connection that:
- Is subscribed to
"chat.room1", and - Has permission to subscribe to that topic (if security is enabled).
- Client connects via TCP.
- Client can immediately send
sub,pub,pingmessages. authis optional and does nothing but return{"ok":true}.
-
Client connects via TCP.
-
Client sends:
{"op":"auth","user":"username","pass":"password"} -
Server:
- Looks up the user.
- Verifies password using:
- Per-user random salt.
- Iterative HMAC-SHA256.
- Constant-time comparison.
-
On success:
- Server responds
{"ok":true}. - Server associates the connection with that user.
- Server responds
-
On failure:
- Server responds
{"ok":false,"error":"auth_failed"}. - Connection remains unauthenticated and may not perform other operations.
- Server responds
Permissions are strings:
- Operation-specific:
publish:<topic>,subscribe:<topic>. - Admin:
admin. - Topics can use wildcard suffix:
chat.*.
-
Allow a user to publish to exactly
chat.room1:publish:chat.room1
-
Allow a user to subscribe to any chat room under
chat.*:subscribe:chat.*
-
Administrator user:
admin
- For an operation
opon topictopic, the server checks:op + ":" + topic(exact match), orop + ":" + prefix + ".*"wheretopicequalsprefixor begins withprefix + ".", oradminpermission.
-
If a client attempts
suborpubwithout permission, the server responds:{"ok":false,"error":"permission_denied"}
- Go 1.20+ installed.
- A configuration file (e.g.
config.json).
go build ./cmd/brokerThis produces a binary (e.g. broker on Unix-like systems or broker.exe on Windows).
Using a config with "security": { "enabled": false, ... }:
./broker -config=config.jsonExample config.json:
{
"network": {
"bind": "0.0.0.0",
"port": 8080
},
"security": {
"enabled": false,
"admin_user": {
"username": "",
"password": "",
"permissions": []
},
"users": []
}
}./broker -config=secure-config.jsonExample secure-config.json is shown above in the Security enabled section.
Below examples assume the server is running on localhost:8080.
You can use nc (netcat), telnet, or a small custom client.
-
Start server with security off.
-
Connect with netcat:
nc 127.0.0.1 8080
-
In terminal A, subscribe:
{"op":"sub","topic":"chat.room1"}Server responds:
{"ok":true} -
In terminal B, publish:
nc 127.0.0.1 8080
Then send:
{"op":"pub","topic":"chat.room1","data":"hello world"}Server responds to publisher:
{"ok":true}Subscriber (terminal A) receives:
{"topic":"chat.room1","data":"hello world"}
Given secure-config.json with:
- User
chatuser, passwordsecret3, permissions:publish:chat.*subscribe:chat.*
-
Start server:
./broker -config=secure-config.json
-
Subscriber connection:
nc 127.0.0.1 8080
Authenticate:
{"op":"auth","user":"chatuser","pass":"secret3"}Response:
{"ok":true}Subscribe:
{"op":"sub","topic":"chat.room1"}Response:
{"ok":true} -
Publisher connection:
nc 127.0.0.1 8080
Authenticate:
{"op":"auth","user":"chatuser","pass":"secret3"}Response:
{"ok":true}Publish:
{"op":"pub","topic":"chat.room1","data":"hello secure world"}Publisher gets:
{"ok":true}Subscriber gets:
{"topic":"chat.room1","data":"hello secure world"}
Administrative operations are currently designed to be invoked from within
the server process using the security.Manager API.
A typical future extension would be to add admin protocol operations such as:
{"op":"admin.create_user", ...}{"op":"admin.delete_user", ...}{"op":"admin.set_password", ...}{"op":"admin.set_permissions", ...}
These would require that the calling connection be associated with a user
having admin permission. The internal security.Manager already supports:
CreateUser(username, password, perms)DeleteUser(username)SetUserPassword(username, newPassword)SetUserPermissions(username, perms)
If you would like a fully wired admin protocol interface, it can be added on top of the existing security manager with minimal changes.
conn, err := net.Dial("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
w := bufio.NewWriter(conn)
r := bufio.NewReader(conn)
// Optional: auth
fmt.Fprintln(w, `{"op":"auth","user":"chatuser","pass":"secret3"}`)
w.Flush()
line, _ := r.ReadString('\n')
fmt.Println("auth response:", line)
// Subscribe
fmt.Fprintln(w, `{"op":"sub","topic":"chat.room1"}`)
w.Flush()
line, _ = r.ReadString('\n')
fmt.Println("sub response:", line)
// Read messages
for {
msg, err := r.ReadString('\n')
if err != nil {
break
}
fmt.Println("msg:", msg)
}- Memory-only:
- No disk I/O or persistence paths.
- Topics and user registry live fully in memory.
- Concurrency model:
- One goroutine per client connection.
- A single
sync.RWMutexprotects the topic registry:- Readers for publishing (copy subscriber set).
- Writers for subscribe/unsubscribe operations.
- Security manager uses its own
sync.RWMutexfor user data.
- Minimized allocations:
- Permission checks operate on simple strings.
- Topic subscriber sets are
map[*Client]struct{}for efficient presence checks.
- Security checks:
- Per-message permission check is just a few string comparisons.
- Heavy cryptography only happens during authentication (password hashing).
- Password verification uses constant-time comparison for hashes.
- Malformed JSON:
- Server replies with
{"ok":false,"error":"invalid_json"}and may close the connection.
- Server replies with
- Unknown operations:
- Server replies with
{"ok":false,"error":"unknown_op"}.
- Server replies with
- Unauthorized or forbidden actions:
- Server replies with
{"ok":false,"error":"permission_denied"}.
- Server replies with
- Missing required fields (e.g.
topic):- Server replies with
{"ok":false,"error":"missing_topic"}or similar.
- Server replies with
The server is designed so that malformed or malicious client input does not cause a crash. Each client is handled in isolation, and errors are logged and returned as structured responses.
Without violating the current goals and non-goals, you can extend:
- Admin protocol:
- Safe, authenticated operations to manage users and permissions.
- Channel-based per-client writer:
- To guarantee ordered, non-interleaved message delivery even under extreme concurrency.
- Metrics / observability:
- Simple counters for topics, connections, and error rates.
No changes to the core protocol are required for basic usage as described.