diff --git a/docs/overview.md b/docs/overview.md index ff6dc14..d3d31b4 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -270,9 +270,15 @@ Routes use path segments as NATS subject tokens: GET /sub/sensors/temp -> SSE subscription to sensors.temp GET /sub/sensors/%3E -> SSE subscription to sensors.> GET /sub/$LVC/sensors/temp -> SSE subscription to $LVC.sensors.temp +GET /latest/sensors/temp -> one-shot LVC lookup for sensors.temp POST /pub/sensors/temp -> client publish to sensors.temp ``` +`GET /latest/` returns the raw cached payload bytes for that exact, +unprefixed subject. It returns `404` when LVC has no value for the subject and +`409` when LVC is disabled. Use `/sub/$LVC/...` for wildcard replay or for a +stream that stays open for future updates. + `POST` requires `Content-Length` plus `Content-Type: text/plain` or `Content-Type: application/json`, and rejects NUL bytes. It is intentionally a text adapter, not a binary payload API. Successful POSTs fan out exactly like a @@ -347,17 +353,27 @@ server { proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header Connection ""; } + + location /latest/ { + proxy_pass http://monoblok_http; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Connection ""; + } } ``` Serve browser clients from the same nginx origin when possible. monoblok does -not add CORS headers itself, and same-origin `/sub/...` and `/pub/...` requests -avoid needing browser-specific CORS handling. If monoblok auth is enabled, make -sure nginx forwards the `Authorization` header; the default proxy behavior does -unless you override it. +not add CORS headers itself, and same-origin `/sub/...`, `/pub/...`, and +`/latest/...` requests avoid needing browser-specific CORS handling. If monoblok +auth is enabled, make sure nginx forwards the `Authorization` header; the +default proxy behavior does unless you override it. -Use the Node demo server to serve the page and proxy `/sub` and `/pub` from the -same origin, which avoids browser CORS. It can also start a local monoblok +Use the Node demo server to serve the page and proxy `/sub`, `/pub`, and +`/latest` from the same origin, which avoids browser CORS. It can also start a local monoblok process for the demo: ```sh diff --git a/examples/http-sse-client.js b/examples/http-sse-client.js index 310ac92..5553c93 100644 --- a/examples/http-sse-client.js +++ b/examples/http-sse-client.js @@ -105,6 +105,17 @@ class MonoblokHttpClient { } } + async latest(subject) { + const res = await this.fetch(`${this.baseUrl}/latest/${subjectPath(subject)}`, { + method: "GET", + headers: this.headers(), + }); + if (!res.ok) { + throw await responseError("monoblok latest failed", res); + } + return res.text(); + } + async *messages(subject, { signal } = {}) { const res = await this.fetch(`${this.baseUrl}/sub/${subjectPath(subject)}`, { method: "GET", diff --git a/examples/http-sse-demo.js b/examples/http-sse-demo.js index 8551ca7..2544b1b 100755 --- a/examples/http-sse-demo.js +++ b/examples/http-sse-demo.js @@ -25,7 +25,7 @@ function usage() { return [ "usage: node examples/http-sse-demo.js [--host HOST] [--port PORT] [--monoblok URL] [--open] [--start-monoblok]", "", - "Serves examples/http-sse-client.html and proxies /sub and /pub to monoblok.", + "Serves examples/http-sse-client.html and proxies /sub, /pub, and /latest to monoblok.", "Without --start-monoblok, monoblok must already be running with --http-port.", "", "Defaults:", @@ -298,6 +298,14 @@ function route(req, res, upstream) { proxyToMonoblok(req, res, upstream, reqUrl); return; } + if (pathname.startsWith("/latest/")) { + if (req.method !== "GET") { + sendText(res, 405, "use GET for /latest\n"); + return; + } + proxyToMonoblok(req, res, upstream, reqUrl); + return; + } sendText(res, 404, "not found\n"); } @@ -352,7 +360,7 @@ function main() { server.listen(opts.port, opts.host, async () => { const url = browserUrl(opts.host, opts.port); console.log(`monoblok HTTP/SSE demo: ${url}`); - console.log(`proxying /sub and /pub to ${opts.upstream.href.replace(/\/$/, "")}`); + console.log(`proxying /sub, /pub, and /latest to ${opts.upstream.href.replace(/\/$/, "")}`); if (opts.startMonoblok) { try { await waitForUpstream(opts.upstream, 4000); diff --git a/src/router.c b/src/router.c index c93ecf9..02761ca 100644 --- a/src/router.c +++ b/src/router.c @@ -578,6 +578,29 @@ bool mb_router_lvc_entry(const mb_router *router, size_t index, mb_slice *subjec return true; } +bool mb_router_lvc_latest(const mb_router *router, mb_slice subject, mb_slice *payload) { + if (payload == NULL || !mb_proto_subject_valid(subject, false) || !lvc_subject_enabled(router, subject)) { + return false; + } + const uint64_t hash = mb_slice_hash(subject); + const size_t found = lvc_index_find(router, subject, hash); + if (found != SIZE_MAX) { + const mb_lvc_entry *entry = &router->lvc[found]; + *payload = (mb_slice){.ptr = entry->payload.ptr, .len = entry->payload.len}; + return true; + } + if (router->lvc_index_cap == 0) { + for (size_t i = 0; i < router->lvc_len; i += 1) { + const mb_lvc_entry *entry = &router->lvc[i]; + if (mb_slice_eq((mb_slice){.ptr = entry->subject, .len = entry->subject_len}, subject)) { + *payload = (mb_slice){.ptr = entry->payload.ptr, .len = entry->payload.len}; + return true; + } + } + } + return false; +} + static bool emit_cached(mb_router *router, mb_router_conn *conn, mb_slice filter, mb_slice sid) { if (!router->lvc_enabled) { return false; diff --git a/src/router.h b/src/router.h index 9231494..c7972cc 100644 --- a/src/router.h +++ b/src/router.h @@ -167,5 +167,6 @@ bool mb_router_subject_matches(mb_slice filter, mb_slice subject); bool mb_router_store_lvc(mb_router *router, mb_slice subject, mb_slice payload); size_t mb_router_lvc_count(const mb_router *router); bool mb_router_lvc_entry(const mb_router *router, size_t index, mb_slice *subject, mb_slice *payload); +bool mb_router_lvc_latest(const mb_router *router, mb_slice subject, mb_slice *payload); #endif diff --git a/src/server/http.c b/src/server/http.c index 6b3c7f7..e2a012e 100644 --- a/src/server/http.c +++ b/src/server/http.c @@ -88,6 +88,11 @@ static bool slice_eq_lit(mb_slice s, const char *lit) { return s.len == n && memcmp(s.ptr, lit, n) == 0; } +static bool slice_starts_lit(mb_slice s, const char *lit) { + const size_t n = strlen(lit); + return s.len >= n && memcmp(s.ptr, lit, n) == 0; +} + static bool slice_eq_ci_lit(mb_slice s, const char *lit) { const size_t n = strlen(lit); if (s.len != n) { @@ -270,24 +275,32 @@ static void write_cb(uv_write_t *req, int status) { kick_write(conn); } -static bool write_response(mb_http_conn *conn, int code, const char *reason, - const char *extra_header, const char *body) { - const char *payload = body == NULL ? "" : body; - const size_t payload_len = strlen(payload); +static bool write_response_bytes(mb_http_conn *conn, int code, const char *reason, + const char *extra_header, const char *content_type, mb_slice body) { + const char *type = content_type == NULL ? "text/plain; charset=utf-8" : content_type; if (!append_lit(&conn->router_conn.out, "HTTP/1.1 ") || !append_usize(&conn->router_conn.out, (size_t)code) || !mb_buf_append_byte(&conn->router_conn.out, ' ') || !append_lit(&conn->router_conn.out, reason) || !append_lit(&conn->router_conn.out, "\r\nServer: monoblok\r\nConnection: close\r\nContent-Length: ") || - !append_usize(&conn->router_conn.out, payload_len) || - !append_lit(&conn->router_conn.out, "\r\nContent-Type: text/plain; charset=utf-8\r\n")) { + !append_usize(&conn->router_conn.out, body.len) || + !append_lit(&conn->router_conn.out, "\r\nContent-Type: ") || + !append_lit(&conn->router_conn.out, type) || + !append_lit(&conn->router_conn.out, "\r\n")) { return false; } if (extra_header != NULL && !append_lit(&conn->router_conn.out, extra_header)) { return false; } return append_lit(&conn->router_conn.out, "\r\n") && - mb_buf_append(&conn->router_conn.out, payload, payload_len); + mb_buf_append(&conn->router_conn.out, body.ptr, body.len); +} + +static bool write_response(mb_http_conn *conn, int code, const char *reason, + const char *extra_header, const char *body) { + const char *payload = body == NULL ? "" : body; + return write_response_bytes(conn, code, reason, extra_header, NULL, + (mb_slice){.ptr = (const uint8_t *)payload, .len = strlen(payload)}); } static void respond_and_close(mb_http_conn *conn, int code, const char *reason, @@ -300,6 +313,16 @@ static void respond_and_close(mb_http_conn *conn, int code, const char *reason, kick_write(conn); } +static void respond_bytes_and_close(mb_http_conn *conn, int code, const char *reason, + const char *extra_header, const char *content_type, mb_slice body) { + if (!write_response_bytes(conn, code, reason, extra_header, content_type, body)) { + http_conn_begin_close(conn); + return; + } + conn->close_after_write = true; + kick_write(conn); +} + static bool find_header_end(const uint8_t *buf, size_t len, size_t *out) { for (size_t i = 0; i + 3 < len; i += 1) { if (buf[i] == '\r' && buf[i + 1] == '\n' && buf[i + 2] == '\r' && buf[i + 3] == '\n') { @@ -662,6 +685,28 @@ static const char *auth_header(const mb_server *server) { return NULL; } +static void handle_latest(mb_http_conn *conn, const mb_http_request *req) { + if (!decode_subject_path(req->target, "/latest/", false, &conn->subject)) { + respond_and_close(conn, 400, "Bad Request", NULL, "invalid latest path\n"); + return; + } + const mb_slice subject = {.ptr = conn->subject.ptr, .len = conn->subject.len}; + if (mb_router_subject_has_lvc_prefix(subject)) { + respond_and_close(conn, 400, "Bad Request", NULL, "use the unprefixed subject\n"); + return; + } + if (!conn->server->lvc_enabled) { + respond_and_close(conn, 409, "Conflict", NULL, "$LVC is disabled\n"); + return; + } + mb_slice payload = {0}; + if (!mb_router_lvc_latest(&conn->server->router, subject, &payload)) { + respond_and_close(conn, 404, "Not Found", NULL, "not cached\n"); + return; + } + respond_bytes_and_close(conn, 200, "OK", "Cache-Control: no-cache\r\n", "application/octet-stream", payload); +} + static void handle_get(mb_http_conn *conn, const mb_http_request *req) { if (req->has_content_length && req->content_length != 0) { respond_and_close(conn, 400, "Bad Request", NULL, "GET body is not supported\n"); @@ -671,6 +716,14 @@ static void handle_get(mb_http_conn *conn, const mb_http_request *req) { respond_and_close(conn, 401, "Unauthorized", auth_header(conn->server), "unauthorized\n"); return; } + if (slice_starts_lit(req->target, "/latest/")) { + handle_latest(conn, req); + return; + } + if (!slice_starts_lit(req->target, "/sub/")) { + respond_and_close(conn, 400, "Bad Request", NULL, "invalid GET path\n"); + return; + } if (!decode_subject_path(req->target, "/sub/", true, &conn->subject)) { respond_and_close(conn, 400, "Bad Request", NULL, "invalid subscription path\n"); return; diff --git a/test/smoke.sh b/test/smoke.sh index 74da209..b766c41 100755 --- a/test/smoke.sh +++ b/test/smoke.sh @@ -71,6 +71,7 @@ grep '\$STATS is read-only' "$pub_out" >/dev/null grep 'info: loaded 1 patchbay form(s)' "$srv_out" >/dev/null python3 - "$http_port" <<'PY' +import http.client import socket import sys @@ -85,6 +86,18 @@ def recv_until(sock, needle): data += chunk return data +def latest_request(path): + conn = http.client.HTTPConnection("127.0.0.1", port, timeout=5) + conn.request("GET", path) + res = conn.getresponse() + out = { + "status": res.status, + "content_type": res.getheader("Content-Type"), + "body": res.read(), + } + conn.close() + return out + sse = socket.create_connection(("127.0.0.1", port), timeout=5) sse.sendall(b"GET /sub/sensors/temp/seen HTTP/1.1\r\nHost: localhost\r\n\r\n") headers = recv_until(sse, b"\r\n\r\n") @@ -110,6 +123,14 @@ if b'event: msg\n' not in event or b'"subject":"sensors.temp.seen"' not in event raise RuntimeError(f"bad SSE event: {event!r}") sse.close() +latest = latest_request("/latest/sensors/temp") +if latest["status"] != 200 or latest["content_type"] != "application/octet-stream" or latest["body"] != b"44": + raise RuntimeError(f"bad latest response: {latest!r}") + +missing = latest_request("/latest/missing/value") +if missing["status"] != 404 or missing["body"] != b"not cached\n": + raise RuntimeError(f"bad missing latest response: {missing!r}") + bad = socket.create_connection(("127.0.0.1", port), timeout=5) bad.sendall( b"POST /pub/sensors/temp HTTP/1.1\r\n" diff --git a/test/unit.c b/test/unit.c index 74e551b..44106bc 100644 --- a/test/unit.c +++ b/test/unit.c @@ -412,6 +412,23 @@ static void test_router_lvc_replay(void) { mb_router_free(&router); } +static void test_router_lvc_latest(void) { + mb_router router; + mb_router_init(&router); + mb_slice filters[] = {lit("sensors.*")}; + CHECK(mb_router_configure_lvc(&router, filters, 1)); + CHECK(mb_router_publish(&router, lit("sensors.temp"), lit("31"))); + CHECK(mb_router_publish(&router, lit("other.temp"), lit("cold"))); + + mb_slice payload = {0}; + CHECK(mb_router_lvc_latest(&router, lit("sensors.temp"), &payload)); + CHECK(payload.len == 2); + CHECK(memcmp(payload.ptr, "31", 2) == 0); + CHECK(!mb_router_lvc_latest(&router, lit("other.temp"), &payload)); + CHECK(!mb_router_lvc_latest(&router, lit("sensors.*"), &payload)); + mb_router_free(&router); +} + static void test_router_lvc_live_wildcard(void) { mb_router router; mb_router_init(&router); @@ -638,6 +655,7 @@ TEST_MAIN(unit, test_router_non_match, test_router_wildcards, test_router_lvc_replay, + test_router_lvc_latest, test_router_lvc_live_wildcard, test_router_lvc_live_reply_to, test_router_lvc_rejects_writes,