From 6232f6a0abae7b4134f70ffa56dcda49ee86e05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sat, 23 May 2026 13:34:06 +0200 Subject: [PATCH 01/10] Add core.async --- build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle.kts b/build.gradle.kts index da28096..489491f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -34,6 +34,7 @@ dependencies { implementation("org.clojure", "spec.alpha", "0.5.238") implementation("org.clojure", "core.match","1.1.0") implementation("org.clojure", "math.combinatorics", "0.3.0") + implementation("org.clojure", "core.async", "1.9.865") implementation("com.cognitect", "anomalies", "0.1.12") // logging From 6f6cf6549cba2859e95752cd96e57b1d45e360d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sat, 23 May 2026 14:17:14 +0200 Subject: [PATCH 02/10] incremental design options --- src/main/clojure/hooray/core.clj | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index 31610b6..afc71e8 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -1,5 +1,6 @@ (ns hooray.core (:require [clojure.spec.alpha :as s] + [clojure.core.async :as async] [hooray.db :as db] [hooray.query :as query] [hooray.pull :as pull] @@ -89,3 +90,37 @@ (if (dbsp/dbsp-query? inc-q) (dbsp/pop-result! inc-q) (incremental/pop-result! inc-q))) + + +(defrecord IncrementalStream [conn inc-q] + Closeable + (close [_] (unregister-inc-q conn inc-q))) + +(defn open-deltas ^Closeable [conn query] + (->IncrementalStream conn (q-inc conn query))) + +(defn take! [{:keys [inc-q]}] + (consume-delta! inc-q)) + +(defn delta-chan [conn query] + (let [inc-q (q-inc conn query) + output-ch (async/chan 1024)] + (async/thread + (loop [] + ;; TODO This needs new deltas to arrive to close + (when-let [delta (consume-delta! inc-q)] + (when (async/>!! output-ch delta) + (recur))))) + output-ch)) + +(defrecord IncrementalSubscription [delta-ch] + Closeable + (close [_] (async/close! delta-ch))) + +(defn subscribe ^Closeable [conn query callback] + (let [delta-ch (delta-chan conn query)] + (async/go-loop [] + (when-let [delta (async/IncrementalSubscription delta-ch))) From 5c8b2db83121d381e46d7ff266b94a7c4de2bf66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sat, 23 May 2026 14:37:38 +0200 Subject: [PATCH 03/10] fix: make incremental subscriptions event-driven Add focused coverage for open-deltas, delta-chan, and subscribe. Co-authored-by: Codex --- src/main/clojure/hooray/core.clj | 66 +++++++++++++------ src/main/clojure/hooray/incremental.clj | 3 +- src/test/clojure/hooray/subscription_test.clj | 43 ++++++++++++ 3 files changed, 90 insertions(+), 22 deletions(-) create mode 100644 src/test/clojure/hooray/subscription_test.clj diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index afc71e8..a8f2325 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -17,7 +17,7 @@ (s/def ::conn-opts (s/keys :req-un [::type ::storage ::algo])) -(defrecord Node [!dbs opts !inc-qs] +(defrecord Node [!dbs opts !inc-qs !delta-listeners] Closeable (close [_] nil)) @@ -26,7 +26,11 @@ (defn connect [opts] {:pre [(s/valid? ::conn-opts opts)]} - (->Node (atom [(db/->db opts)]) opts (atom {}))) + (->Node (atom [(db/->db opts)]) opts (atom {}) (atom {}))) + +(defn- notify-delta-listeners! [{:keys [!delta-listeners]} inc-q delta] + (doseq [listener (vals (get @!delta-listeners (:id inc-q)))] + (listener delta))) (defn transact [{:keys [!dbs !inc-qs] :as node} tx-data] {:pre [(node? node) (s/valid? ::t/tx-data tx-data)]} @@ -35,9 +39,11 @@ (conj dbs (db/transact db-before tx-data)))))] (when-let [inc-qs (seq @!inc-qs)] (doseq [inc-q (vals inc-qs)] - (if (dbsp/dbsp-query? inc-q) - (dbsp/compute-delta! inc-q db-before tx-data) - (incremental/compute-delta! inc-q db-before db-after tx-data)))))) + (let [delta (if (dbsp/dbsp-query? inc-q) + (dbsp/compute-delta! inc-q db-before tx-data) + (incremental/compute-delta! inc-q db-before db-after tx-data))] + (when (seq delta) + (notify-delta-listeners! node inc-q delta))))))) (defn db [{:keys [!dbs] :as node}] {:pre [(node? node)]} @@ -81,9 +87,26 @@ (swap! !inc-qs assoc (:id inc-q) inc-q) inc-q)) -(defn unregister-inc-q [{:keys [!inc-qs] :as node} {:keys [id] :as inc-q}] +(defn unregister-inc-q [{:keys [!inc-qs !delta-listeners] :as node} {:keys [id] :as inc-q}] {:pre [(node? node)]} (swap! !inc-qs dissoc id) + (swap! !delta-listeners dissoc id) + node) + +(defn- register-delta-listener! [{:keys [!delta-listeners] :as node} {:keys [id] :as _inc-q} listener] + {:pre [(node? node)]} + (let [listener-id (random-uuid)] + (swap! !delta-listeners update id assoc listener-id listener) + listener-id)) + +(defn- unregister-delta-listener! [{:keys [!delta-listeners] :as node} {:keys [id] :as _inc-q} listener-id] + {:pre [(node? node)]} + (swap! !delta-listeners + (fn [listeners] + (let [remaining (not-empty (dissoc (get listeners id) listener-id))] + (if remaining + (assoc listeners id remaining) + (dissoc listeners id))))) node) (defn consume-delta! [inc-q] @@ -104,23 +127,24 @@ (defn delta-chan [conn query] (let [inc-q (q-inc conn query) - output-ch (async/chan 1024)] - (async/thread - (loop [] - ;; TODO This needs new deltas to arrive to close - (when-let [delta (consume-delta! inc-q)] - (when (async/>!! output-ch delta) - (recur))))) + output-ch (async/chan 1024) + listener-id (atom nil)] + (reset! listener-id + (register-delta-listener! + conn inc-q + (fn [delta] + (when-not (async/>!! output-ch delta) + (unregister-delta-listener! conn inc-q @listener-id) + (unregister-inc-q conn inc-q))))) output-ch)) -(defrecord IncrementalSubscription [delta-ch] +(defrecord IncrementalSubscription [conn inc-q listener-id] Closeable - (close [_] (async/close! delta-ch))) + (close [_] + (unregister-delta-listener! conn inc-q listener-id) + (unregister-inc-q conn inc-q))) (defn subscribe ^Closeable [conn query callback] - (let [delta-ch (delta-chan conn query)] - (async/go-loop [] - (when-let [delta (async/IncrementalSubscription delta-ch))) + (let [inc-q (q-inc conn query) + listener-id (register-delta-listener! conn inc-q callback)] + (->IncrementalSubscription conn inc-q listener-id))) diff --git a/src/main/clojure/hooray/incremental.clj b/src/main/clojure/hooray/incremental.clj index 660c2ce..123210b 100644 --- a/src/main/clojure/hooray/incremental.clj +++ b/src/main/clojure/hooray/incremental.clj @@ -146,7 +146,8 @@ delta (-> (.step pipeline zset-indices) zset/zset->result-set)] (when (seq delta) - (swap! !queue conj delta)))) + (swap! !queue conj delta) + delta))) (defrecord IncrementalQuery [id query pipeline !queue]) diff --git a/src/test/clojure/hooray/subscription_test.clj b/src/test/clojure/hooray/subscription_test.clj new file mode 100644 index 0000000..1ec90e8 --- /dev/null +++ b/src/test/clojure/hooray/subscription_test.clj @@ -0,0 +1,43 @@ +(ns hooray.subscription-test + (:require [clojure.core.async :as async] + [clojure.test :as t :refer [deftest is testing]] + [hooray.core :as h] + [hooray.fixtures :as fix])) + +(t/use-fixtures :each fix/with-each-dbsp-version fix/with-node fix/with-people-schema) + +(def names-query + '{:find [name] + :where [[e :name name]]}) + +(defn- take-with-timeout [ch] + (let [timeout (async/timeout 1000) + [value port] (async/alts!! [ch timeout])] + (if (= port timeout) + ::timeout + value))) + +(defn- is-delta [expected actual] + (is (not= ::timeout actual)) + (when-not (= ::timeout actual) + (is (= expected (set actual))))) + +(deftest incremental-subscription-apis-produce-deltas + (testing "open-deltas" + (with-open [deltas (h/open-deltas fix/*node* names-query)] + (h/transact fix/*node* [{:db/id :ivan :name "Ivan"}]) + (is-delta #{[["Ivan"] 1]} (h/take! deltas)))) + + (testing "delta-chan" + (let [delta-ch (h/delta-chan fix/*node* names-query)] + (try + (h/transact fix/*node* [{:db/id :petr :name "Petr"}]) + (is-delta #{[["Petr"] 1]} (take-with-timeout delta-ch)) + (finally + (async/close! delta-ch))))) + + (testing "subscribe" + (let [delta (promise)] + (with-open [_subscription (h/subscribe fix/*node* names-query #(deliver delta %))] + (h/transact fix/*node* [{:db/id :anna :name "Anna"}]) + (is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout)))))) From a49c040b5ed7cb09c4b5cb43795fd89f10502d9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 12:32:34 +0200 Subject: [PATCH 04/10] Use promise instead of atom --- src/main/clojure/hooray/core.clj | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index a8f2325..f27a9b9 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -114,7 +114,6 @@ (dbsp/pop-result! inc-q) (incremental/pop-result! inc-q))) - (defrecord IncrementalStream [conn inc-q] Closeable (close [_] (unregister-inc-q conn inc-q))) @@ -128,14 +127,14 @@ (defn delta-chan [conn query] (let [inc-q (q-inc conn query) output-ch (async/chan 1024) - listener-id (atom nil)] - (reset! listener-id - (register-delta-listener! - conn inc-q - (fn [delta] - (when-not (async/>!! output-ch delta) - (unregister-delta-listener! conn inc-q @listener-id) - (unregister-inc-q conn inc-q))))) + listener-id (promise)] + (deliver listener-id + (register-delta-listener! + conn inc-q + (fn [delta] + (when-not (async/>!! output-ch delta) + (unregister-delta-listener! conn inc-q @listener-id) + (unregister-inc-q conn inc-q))))) output-ch)) (defrecord IncrementalSubscription [conn inc-q listener-id] From 8da69da4efa7e105eebe791982018ee8399d31c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 12:44:59 +0200 Subject: [PATCH 05/10] Update README --- README.md | 42 ++++++++++++++++++ src/test/clojure/hooray/subscription_test.clj | 43 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/README.md b/README.md index 2cea066..b2787ab 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ a formal framwork for incremental computation. We also try to combine the two. ``` ### Incremental queries + ```clj (with-open [node (h/connect {:type :mem :storage :hash :algo :generic})] @@ -49,6 +50,47 @@ a formal framwork for incremental computation. We also try to combine the two. ;; => ([["Ada"] 1] [["Alan"] 1] [["Adam"] -1]) ``` +I am still exploring what the incremental API should look like Clojure-wise. Some options below +```clj +(def node (h/connect {:type :mem :storage :hash :algo :generic})) +(h/transact node [{:db/id -100 + :db/ident :name + :db/valueType :db.type/string + :db/cardinality :db.cardinality/one}]) + +(def names-query + '{:find [name] + :where [[e :name name]]}) + +;; Pull deltas explicitly. +(with-open [deltas (h/open-deltas node names-query)] + (h/transact node [{:db/id :ivan :name "Ivan"}]) + (h/take! deltas)) +;; => ([["Ivan"] 1]) + +;; Read deltas from a core.async channel. +(defn- take-with-timeout [ch] + (let [timeout (async/timeout 1000) + [value port] (async/alts!! [ch timeout])] + (if (= port timeout) + ::timeout + value))) + +(let [delta-ch (h/delta-chan node names-query)] + (try + (h/transact node [{:db/id :petr :name "Petr"}]) + (take-with-timeout delta-ch) + (finally + (async/close! delta-ch)))) +;; => ([["Petr"] 1]) + +;; Receive deltas in a callback. +(with-out-str + (with-open [_subscription (h/subscribe node names-query println)] + (h/transact node [{:db/id :anna :name "Anna"}]))) +;; => "([[Anna] 1])\n" +``` + There are currently two incremental join algorithms which I am exploring, `:wcoj` and `:standard`. `:standard` is a the more traditional approach to incremental queries as described in the DBSP paper. Binary joins and reindexing of join keys whenever two relations diff --git a/src/test/clojure/hooray/subscription_test.clj b/src/test/clojure/hooray/subscription_test.clj index 1ec90e8..60875b4 100644 --- a/src/test/clojure/hooray/subscription_test.clj +++ b/src/test/clojure/hooray/subscription_test.clj @@ -41,3 +41,46 @@ (with-open [_subscription (h/subscribe fix/*node* names-query #(deliver delta %))] (h/transact fix/*node* [{:db/id :anna :name "Anna"}]) (is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout)))))) + + +(comment + ;; README examples + + (def node (h/connect {:type :mem :storage :hash :algo :generic})) + (h/transact node [{:db/id -100 + :db/ident :name + :db/valueType :db.type/string + :db/cardinality :db.cardinality/one}]) + + (def names-query + '{:find [name] + :where [[e :name name]]}) + + ;; Pull deltas explicitly. + (with-open [deltas (h/open-deltas node names-query)] + (h/transact node [{:db/id :ivan :name "Ivan"}]) + (h/take! deltas)) + ;; => ([["Ivan"] 1]) + + ;; Read deltas from a core.async channel. + (defn- take-with-timeout [ch] + (let [timeout (async/timeout 1000) + [value port] (async/alts!! [ch timeout])] + (if (= port timeout) + ::timeout + value))) + + (let [delta-ch (h/delta-chan node names-query)] + (try + (h/transact node [{:db/id :petr :name "Petr"}]) + (take-with-timeout delta-ch) + (finally + (async/close! delta-ch)))) + ;; => ([["Petr"] 1]) + + ;; Receive deltas in a callback. + (with-out-str + (with-open [_subscription (h/subscribe node names-query println)] + (h/transact node [{:db/id :anna :name "Anna"}]))) + ;; => "([[Anna] 1])\n" + ) From cc3671d9e2747f446324609656616c439224771f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 12:50:58 +0200 Subject: [PATCH 06/10] Add TODO --- src/main/clojure/hooray/core.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index f27a9b9..cd29c9b 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -132,6 +132,7 @@ (register-delta-listener! conn inc-q (fn [delta] + ;; TODO this only unregisters if a new delta arrives (when-not (async/>!! output-ch delta) (unregister-delta-listener! conn inc-q @listener-id) (unregister-inc-q conn inc-q))))) From 503804b6cca75e197e035c2d4fb8fc52d0c01187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 14:27:29 +0200 Subject: [PATCH 07/10] fix: isolate subscription callback failures Prevent a throwing incremental subscription listener from aborting transact or stopping later listeners. Co-authored-by: Codex --- src/main/clojure/hooray/core.clj | 6 +++++- src/test/clojure/hooray/subscription_test.clj | 10 ++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index cd29c9b..94f06b9 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -1,6 +1,7 @@ (ns hooray.core (:require [clojure.spec.alpha :as s] [clojure.core.async :as async] + [clojure.tools.logging :as log] [hooray.db :as db] [hooray.query :as query] [hooray.pull :as pull] @@ -30,7 +31,10 @@ (defn- notify-delta-listeners! [{:keys [!delta-listeners]} inc-q delta] (doseq [listener (vals (get @!delta-listeners (:id inc-q)))] - (listener delta))) + (try + (listener delta) + (catch Throwable e + (log/warn e "Incremental delta listener failed"))))) (defn transact [{:keys [!dbs !inc-qs] :as node} tx-data] {:pre [(node? node) (s/valid? ::t/tx-data tx-data)]} diff --git a/src/test/clojure/hooray/subscription_test.clj b/src/test/clojure/hooray/subscription_test.clj index 60875b4..3201a74 100644 --- a/src/test/clojure/hooray/subscription_test.clj +++ b/src/test/clojure/hooray/subscription_test.clj @@ -42,6 +42,16 @@ (h/transact fix/*node* [{:db/id :anna :name "Anna"}]) (is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout)))))) +(deftest subscribe-callback-failures-do-not-break-transact + (let [delta (promise)] + (with-open [_failing-subscription (h/subscribe fix/*node* names-query + (fn [_delta] + (throw (ex-info "boom" {})))) + _working-subscription (h/subscribe fix/*node* names-query + #(deliver delta %))] + (is (nil? (h/transact fix/*node* [{:db/id :ivan :name "Ivan"}]))) + (is-delta #{[["Ivan"] 1]} (deref delta 1000 ::timeout))))) + (comment ;; README examples From 29bcdd5fb9a4f6e1af9c0972828320e3ca88ee1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 14:28:25 +0200 Subject: [PATCH 08/10] fix: avoid blocking delta channel delivery Use non-blocking core.async delivery so full delta channels do not stall transact. Co-authored-by: Codex --- src/main/clojure/hooray/core.clj | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index 94f06b9..0b79fae 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -137,9 +137,11 @@ conn inc-q (fn [delta] ;; TODO this only unregisters if a new delta arrives - (when-not (async/>!! output-ch delta) - (unregister-delta-listener! conn inc-q @listener-id) - (unregister-inc-q conn inc-q))))) + (async/put! output-ch delta + (fn [delivered?] + (when-not delivered? + (unregister-delta-listener! conn inc-q @listener-id) + (unregister-inc-q conn inc-q))))))) output-ch)) (defrecord IncrementalSubscription [conn inc-q listener-id] From c042722c3c8c0172411673acaf9b182fbaef143f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 14:29:46 +0200 Subject: [PATCH 09/10] fix: drain listener-backed delta queues Keep pull streams queued while preventing subscription listeners from accumulating already-delivered deltas. Co-authored-by: Codex --- src/main/clojure/hooray/core.clj | 17 +++++++++++------ src/test/clojure/hooray/subscription_test.clj | 5 +++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index 0b79fae..fb1eb24 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -30,11 +30,15 @@ (->Node (atom [(db/->db opts)]) opts (atom {}) (atom {}))) (defn- notify-delta-listeners! [{:keys [!delta-listeners]} inc-q delta] - (doseq [listener (vals (get @!delta-listeners (:id inc-q)))] - (try - (listener delta) - (catch Throwable e - (log/warn e "Incremental delta listener failed"))))) + (when-let [listeners (seq (vals (get @!delta-listeners (:id inc-q))))] + (doseq [listener listeners] + (try + (listener delta) + (catch Throwable e + (log/warn e "Incremental delta listener failed")))) + true)) + +(declare consume-delta!) (defn transact [{:keys [!dbs !inc-qs] :as node} tx-data] {:pre [(node? node) (s/valid? ::t/tx-data tx-data)]} @@ -47,7 +51,8 @@ (dbsp/compute-delta! inc-q db-before tx-data) (incremental/compute-delta! inc-q db-before db-after tx-data))] (when (seq delta) - (notify-delta-listeners! node inc-q delta))))))) + (when (notify-delta-listeners! node inc-q delta) + (consume-delta! inc-q)))))))) (defn db [{:keys [!dbs] :as node}] {:pre [(node? node)]} diff --git a/src/test/clojure/hooray/subscription_test.clj b/src/test/clojure/hooray/subscription_test.clj index 3201a74..50d8b3b 100644 --- a/src/test/clojure/hooray/subscription_test.clj +++ b/src/test/clojure/hooray/subscription_test.clj @@ -38,9 +38,10 @@ (testing "subscribe" (let [delta (promise)] - (with-open [_subscription (h/subscribe fix/*node* names-query #(deliver delta %))] + (with-open [subscription (h/subscribe fix/*node* names-query #(deliver delta %))] (h/transact fix/*node* [{:db/id :anna :name "Anna"}]) - (is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout)))))) + (is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout)) + (is (nil? (h/consume-delta! (:inc-q subscription)))))))) (deftest subscribe-callback-failures-do-not-break-transact (let [delta (promise)] From af3035e9abc6a02cc7adc94e8b32564bdb5c16b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Finn=20V=C3=B6lkel?= Date: Sun, 24 May 2026 14:33:33 +0200 Subject: [PATCH 10/10] Throwable -> Exception --- src/main/clojure/hooray/core.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/clojure/hooray/core.clj b/src/main/clojure/hooray/core.clj index fb1eb24..18a5dcf 100644 --- a/src/main/clojure/hooray/core.clj +++ b/src/main/clojure/hooray/core.clj @@ -34,7 +34,7 @@ (doseq [listener listeners] (try (listener delta) - (catch Throwable e + (catch Exception e (log/warn e "Incremental delta listener failed")))) true))