Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,15 @@ Routes use path segments as NATS subject tokens:
GET /sub/sensors/temp -> SSE subscription to sensors.temp
GET /sub/sensors/%3E -> SSE subscription to sensors.>
GET /sub/$LVC/sensors/temp -> SSE subscription to $LVC.sensors.temp
GET /latest/sensors/temp -> one-shot LVC lookup for sensors.temp
POST /pub/sensors/temp -> client publish to sensors.temp
```

`GET /latest/<subject>` returns the raw cached payload bytes for that exact,
unprefixed subject. It returns `404` when LVC has no value for the subject and
`409` when LVC is disabled. Use `/sub/$LVC/...` for wildcard replay or for a
stream that stays open for future updates.

`POST` requires `Content-Length` plus `Content-Type: text/plain` or
`Content-Type: application/json`, and rejects NUL bytes. It is intentionally a
text adapter, not a binary payload API. Successful POSTs fan out exactly like a
Expand Down Expand Up @@ -347,17 +353,27 @@ server {
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Connection "";
}

location /latest/ {
proxy_pass http://monoblok_http;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Connection "";
}
}
```

Serve browser clients from the same nginx origin when possible. monoblok does
not add CORS headers itself, and same-origin `/sub/...` and `/pub/...` requests
avoid needing browser-specific CORS handling. If monoblok auth is enabled, make
sure nginx forwards the `Authorization` header; the default proxy behavior does
unless you override it.
not add CORS headers itself, and same-origin `/sub/...`, `/pub/...`, and
`/latest/...` requests avoid needing browser-specific CORS handling. If monoblok
auth is enabled, make sure nginx forwards the `Authorization` header; the
default proxy behavior does unless you override it.

Use the Node demo server to serve the page and proxy `/sub` and `/pub` from the
same origin, which avoids browser CORS. It can also start a local monoblok
Use the Node demo server to serve the page and proxy `/sub`, `/pub`, and
`/latest` from the same origin, which avoids browser CORS. It can also start a local monoblok
process for the demo:

```sh
Expand Down
11 changes: 11 additions & 0 deletions examples/http-sse-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ class MonoblokHttpClient {
}
}

async latest(subject) {
const res = await this.fetch(`${this.baseUrl}/latest/${subjectPath(subject)}`, {
method: "GET",
headers: this.headers(),
});
if (!res.ok) {
throw await responseError("monoblok latest failed", res);
}
return res.text();
}

async *messages(subject, { signal } = {}) {
const res = await this.fetch(`${this.baseUrl}/sub/${subjectPath(subject)}`, {
method: "GET",
Expand Down
12 changes: 10 additions & 2 deletions examples/http-sse-demo.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function usage() {
return [
"usage: node examples/http-sse-demo.js [--host HOST] [--port PORT] [--monoblok URL] [--open] [--start-monoblok]",
"",
"Serves examples/http-sse-client.html and proxies /sub and /pub to monoblok.",
"Serves examples/http-sse-client.html and proxies /sub, /pub, and /latest to monoblok.",
"Without --start-monoblok, monoblok must already be running with --http-port.",
"",
"Defaults:",
Expand Down Expand Up @@ -298,6 +298,14 @@ function route(req, res, upstream) {
proxyToMonoblok(req, res, upstream, reqUrl);
return;
}
if (pathname.startsWith("/latest/")) {
if (req.method !== "GET") {
sendText(res, 405, "use GET for /latest\n");
return;
}
proxyToMonoblok(req, res, upstream, reqUrl);
return;
}

sendText(res, 404, "not found\n");
}
Expand Down Expand Up @@ -352,7 +360,7 @@ function main() {
server.listen(opts.port, opts.host, async () => {
const url = browserUrl(opts.host, opts.port);
console.log(`monoblok HTTP/SSE demo: ${url}`);
console.log(`proxying /sub and /pub to ${opts.upstream.href.replace(/\/$/, "")}`);
console.log(`proxying /sub, /pub, and /latest to ${opts.upstream.href.replace(/\/$/, "")}`);
if (opts.startMonoblok) {
try {
await waitForUpstream(opts.upstream, 4000);
Expand Down
23 changes: 23 additions & 0 deletions src/router.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,29 @@ bool mb_router_lvc_entry(const mb_router *router, size_t index, mb_slice *subjec
return true;
}

bool mb_router_lvc_latest(const mb_router *router, mb_slice subject, mb_slice *payload) {
if (payload == NULL || !mb_proto_subject_valid(subject, false) || !lvc_subject_enabled(router, subject)) {
return false;
}
const uint64_t hash = mb_slice_hash(subject);
const size_t found = lvc_index_find(router, subject, hash);
if (found != SIZE_MAX) {
const mb_lvc_entry *entry = &router->lvc[found];
*payload = (mb_slice){.ptr = entry->payload.ptr, .len = entry->payload.len};
return true;
}
if (router->lvc_index_cap == 0) {
for (size_t i = 0; i < router->lvc_len; i += 1) {
const mb_lvc_entry *entry = &router->lvc[i];
if (mb_slice_eq((mb_slice){.ptr = entry->subject, .len = entry->subject_len}, subject)) {
*payload = (mb_slice){.ptr = entry->payload.ptr, .len = entry->payload.len};
return true;
}
}
}
return false;
}

static bool emit_cached(mb_router *router, mb_router_conn *conn, mb_slice filter, mb_slice sid) {
if (!router->lvc_enabled) {
return false;
Expand Down
1 change: 1 addition & 0 deletions src/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,6 @@ bool mb_router_subject_matches(mb_slice filter, mb_slice subject);
bool mb_router_store_lvc(mb_router *router, mb_slice subject, mb_slice payload);
size_t mb_router_lvc_count(const mb_router *router);
bool mb_router_lvc_entry(const mb_router *router, size_t index, mb_slice *subject, mb_slice *payload);
bool mb_router_lvc_latest(const mb_router *router, mb_slice subject, mb_slice *payload);

#endif
67 changes: 60 additions & 7 deletions src/server/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ static bool slice_eq_lit(mb_slice s, const char *lit) {
return s.len == n && memcmp(s.ptr, lit, n) == 0;
}

static bool slice_starts_lit(mb_slice s, const char *lit) {
const size_t n = strlen(lit);
return s.len >= n && memcmp(s.ptr, lit, n) == 0;
}

static bool slice_eq_ci_lit(mb_slice s, const char *lit) {
const size_t n = strlen(lit);
if (s.len != n) {
Expand Down Expand Up @@ -270,24 +275,32 @@ static void write_cb(uv_write_t *req, int status) {
kick_write(conn);
}

static bool write_response(mb_http_conn *conn, int code, const char *reason,
const char *extra_header, const char *body) {
const char *payload = body == NULL ? "" : body;
const size_t payload_len = strlen(payload);
static bool write_response_bytes(mb_http_conn *conn, int code, const char *reason,
const char *extra_header, const char *content_type, mb_slice body) {
const char *type = content_type == NULL ? "text/plain; charset=utf-8" : content_type;
if (!append_lit(&conn->router_conn.out, "HTTP/1.1 ") ||
!append_usize(&conn->router_conn.out, (size_t)code) ||
!mb_buf_append_byte(&conn->router_conn.out, ' ') ||
!append_lit(&conn->router_conn.out, reason) ||
!append_lit(&conn->router_conn.out, "\r\nServer: monoblok\r\nConnection: close\r\nContent-Length: ") ||
!append_usize(&conn->router_conn.out, payload_len) ||
!append_lit(&conn->router_conn.out, "\r\nContent-Type: text/plain; charset=utf-8\r\n")) {
!append_usize(&conn->router_conn.out, body.len) ||
!append_lit(&conn->router_conn.out, "\r\nContent-Type: ") ||
!append_lit(&conn->router_conn.out, type) ||
!append_lit(&conn->router_conn.out, "\r\n")) {
return false;
}
if (extra_header != NULL && !append_lit(&conn->router_conn.out, extra_header)) {
return false;
}
return append_lit(&conn->router_conn.out, "\r\n") &&
mb_buf_append(&conn->router_conn.out, payload, payload_len);
mb_buf_append(&conn->router_conn.out, body.ptr, body.len);
}

static bool write_response(mb_http_conn *conn, int code, const char *reason,
const char *extra_header, const char *body) {
const char *payload = body == NULL ? "" : body;
return write_response_bytes(conn, code, reason, extra_header, NULL,
(mb_slice){.ptr = (const uint8_t *)payload, .len = strlen(payload)});
}

static void respond_and_close(mb_http_conn *conn, int code, const char *reason,
Expand All @@ -300,6 +313,16 @@ static void respond_and_close(mb_http_conn *conn, int code, const char *reason,
kick_write(conn);
}

static void respond_bytes_and_close(mb_http_conn *conn, int code, const char *reason,
const char *extra_header, const char *content_type, mb_slice body) {
if (!write_response_bytes(conn, code, reason, extra_header, content_type, body)) {
http_conn_begin_close(conn);
return;
}
conn->close_after_write = true;
kick_write(conn);
}

static bool find_header_end(const uint8_t *buf, size_t len, size_t *out) {
for (size_t i = 0; i + 3 < len; i += 1) {
if (buf[i] == '\r' && buf[i + 1] == '\n' && buf[i + 2] == '\r' && buf[i + 3] == '\n') {
Expand Down Expand Up @@ -662,6 +685,28 @@ static const char *auth_header(const mb_server *server) {
return NULL;
}

static void handle_latest(mb_http_conn *conn, const mb_http_request *req) {
if (!decode_subject_path(req->target, "/latest/", false, &conn->subject)) {
respond_and_close(conn, 400, "Bad Request", NULL, "invalid latest path\n");
return;
}
const mb_slice subject = {.ptr = conn->subject.ptr, .len = conn->subject.len};
if (mb_router_subject_has_lvc_prefix(subject)) {
respond_and_close(conn, 400, "Bad Request", NULL, "use the unprefixed subject\n");
return;
}
if (!conn->server->lvc_enabled) {
respond_and_close(conn, 409, "Conflict", NULL, "$LVC is disabled\n");
return;
}
mb_slice payload = {0};
if (!mb_router_lvc_latest(&conn->server->router, subject, &payload)) {
respond_and_close(conn, 404, "Not Found", NULL, "not cached\n");
return;
}
respond_bytes_and_close(conn, 200, "OK", "Cache-Control: no-cache\r\n", "application/octet-stream", payload);
}

static void handle_get(mb_http_conn *conn, const mb_http_request *req) {
if (req->has_content_length && req->content_length != 0) {
respond_and_close(conn, 400, "Bad Request", NULL, "GET body is not supported\n");
Expand All @@ -671,6 +716,14 @@ static void handle_get(mb_http_conn *conn, const mb_http_request *req) {
respond_and_close(conn, 401, "Unauthorized", auth_header(conn->server), "unauthorized\n");
return;
}
if (slice_starts_lit(req->target, "/latest/")) {
handle_latest(conn, req);
return;
}
if (!slice_starts_lit(req->target, "/sub/")) {
respond_and_close(conn, 400, "Bad Request", NULL, "invalid GET path\n");
return;
}
if (!decode_subject_path(req->target, "/sub/", true, &conn->subject)) {
respond_and_close(conn, 400, "Bad Request", NULL, "invalid subscription path\n");
return;
Expand Down
21 changes: 21 additions & 0 deletions test/smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ grep '\$STATS is read-only' "$pub_out" >/dev/null
grep 'info: loaded 1 patchbay form(s)' "$srv_out" >/dev/null

python3 - "$http_port" <<'PY'
import http.client
import socket
import sys

Expand All @@ -85,6 +86,18 @@ def recv_until(sock, needle):
data += chunk
return data

def latest_request(path):
conn = http.client.HTTPConnection("127.0.0.1", port, timeout=5)
conn.request("GET", path)
res = conn.getresponse()
out = {
"status": res.status,
"content_type": res.getheader("Content-Type"),
"body": res.read(),
}
conn.close()
return out

sse = socket.create_connection(("127.0.0.1", port), timeout=5)
sse.sendall(b"GET /sub/sensors/temp/seen HTTP/1.1\r\nHost: localhost\r\n\r\n")
headers = recv_until(sse, b"\r\n\r\n")
Expand All @@ -110,6 +123,14 @@ if b'event: msg\n' not in event or b'"subject":"sensors.temp.seen"' not in event
raise RuntimeError(f"bad SSE event: {event!r}")
sse.close()

latest = latest_request("/latest/sensors/temp")
if latest["status"] != 200 or latest["content_type"] != "application/octet-stream" or latest["body"] != b"44":
raise RuntimeError(f"bad latest response: {latest!r}")

missing = latest_request("/latest/missing/value")
if missing["status"] != 404 or missing["body"] != b"not cached\n":
raise RuntimeError(f"bad missing latest response: {missing!r}")

bad = socket.create_connection(("127.0.0.1", port), timeout=5)
bad.sendall(
b"POST /pub/sensors/temp HTTP/1.1\r\n"
Expand Down
18 changes: 18 additions & 0 deletions test/unit.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,23 @@ static void test_router_lvc_replay(void) {
mb_router_free(&router);
}

static void test_router_lvc_latest(void) {
mb_router router;
mb_router_init(&router);
mb_slice filters[] = {lit("sensors.*")};
CHECK(mb_router_configure_lvc(&router, filters, 1));
CHECK(mb_router_publish(&router, lit("sensors.temp"), lit("31")));
CHECK(mb_router_publish(&router, lit("other.temp"), lit("cold")));

mb_slice payload = {0};
CHECK(mb_router_lvc_latest(&router, lit("sensors.temp"), &payload));
CHECK(payload.len == 2);
CHECK(memcmp(payload.ptr, "31", 2) == 0);
CHECK(!mb_router_lvc_latest(&router, lit("other.temp"), &payload));
CHECK(!mb_router_lvc_latest(&router, lit("sensors.*"), &payload));
mb_router_free(&router);
}

static void test_router_lvc_live_wildcard(void) {
mb_router router;
mb_router_init(&router);
Expand Down Expand Up @@ -638,6 +655,7 @@ TEST_MAIN(unit,
test_router_non_match,
test_router_wildcards,
test_router_lvc_replay,
test_router_lvc_latest,
test_router_lvc_live_wildcard,
test_router_lvc_live_reply_to,
test_router_lvc_rejects_writes,
Expand Down
Loading