Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ a formal framwork for incremental computation. We also try to combine the two.
```

### Incremental queries

```clj
(with-open [node (h/connect {:type :mem :storage :hash :algo :generic})]

Expand All @@ -49,6 +50,47 @@ a formal framwork for incremental computation. We also try to combine the two.
;; => ([["Ada"] 1] [["Alan"] 1] [["Adam"] -1])
```

I am still exploring what the incremental API should look like Clojure-wise. Some options below
```clj
(def node (h/connect {:type :mem :storage :hash :algo :generic}))
(h/transact node [{:db/id -100
:db/ident :name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one}])

(def names-query
'{:find [name]
:where [[e :name name]]})

;; Pull deltas explicitly.
(with-open [deltas (h/open-deltas node names-query)]
(h/transact node [{:db/id :ivan :name "Ivan"}])
(h/take! deltas))
;; => ([["Ivan"] 1])

;; Read deltas from a core.async channel.
(defn- take-with-timeout [ch]
(let [timeout (async/timeout 1000)
[value port] (async/alts!! [ch timeout])]
(if (= port timeout)
::timeout
value)))

(let [delta-ch (h/delta-chan node names-query)]
(try
(h/transact node [{:db/id :petr :name "Petr"}])
(take-with-timeout delta-ch)
(finally
(async/close! delta-ch))))
;; => ([["Petr"] 1])

;; Receive deltas in a callback.
(with-out-str
(with-open [_subscription (h/subscribe node names-query println)]
(h/transact node [{:db/id :anna :name "Anna"}])))
;; => "([[Anna] 1])\n"
```

There are currently two incremental join algorithms which I am exploring,
`:wcoj` and `:standard`. `:standard` is a the more traditional approach to incremental queries
as described in the DBSP paper. Binary joins and reindexing of join keys whenever two relations
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation("org.clojure", "spec.alpha", "0.5.238")
implementation("org.clojure", "core.match","1.1.0")
implementation("org.clojure", "math.combinatorics", "0.3.0")
implementation("org.clojure", "core.async", "1.9.865")
implementation("com.cognitect", "anomalies", "0.1.12")

// logging
Expand Down
82 changes: 76 additions & 6 deletions src/main/clojure/hooray/core.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(ns hooray.core
(:require [clojure.spec.alpha :as s]
[clojure.core.async :as async]
[clojure.tools.logging :as log]
[hooray.db :as db]
[hooray.query :as query]
[hooray.pull :as pull]
Expand All @@ -16,7 +18,7 @@

(s/def ::conn-opts (s/keys :req-un [::type ::storage ::algo]))

(defrecord Node [!dbs opts !inc-qs]
(defrecord Node [!dbs opts !inc-qs !delta-listeners]
Closeable
(close [_] nil))

Expand All @@ -25,7 +27,18 @@

(defn connect [opts]
{:pre [(s/valid? ::conn-opts opts)]}
(->Node (atom [(db/->db opts)]) opts (atom {})))
(->Node (atom [(db/->db opts)]) opts (atom {}) (atom {})))

(defn- notify-delta-listeners! [{:keys [!delta-listeners]} inc-q delta]
(when-let [listeners (seq (vals (get @!delta-listeners (:id inc-q))))]
(doseq [listener listeners]
(try
(listener delta)
(catch Exception e
(log/warn e "Incremental delta listener failed"))))
true))

(declare consume-delta!)

(defn transact [{:keys [!dbs !inc-qs] :as node} tx-data]
{:pre [(node? node) (s/valid? ::t/tx-data tx-data)]}
Expand All @@ -34,9 +47,12 @@
(conj dbs (db/transact db-before tx-data)))))]
(when-let [inc-qs (seq @!inc-qs)]
(doseq [inc-q (vals inc-qs)]
(if (dbsp/dbsp-query? inc-q)
(dbsp/compute-delta! inc-q db-before tx-data)
(incremental/compute-delta! inc-q db-before db-after tx-data))))))
(let [delta (if (dbsp/dbsp-query? inc-q)
(dbsp/compute-delta! inc-q db-before tx-data)
(incremental/compute-delta! inc-q db-before db-after tx-data))]
(when (seq delta)
(when (notify-delta-listeners! node inc-q delta)
(consume-delta! inc-q))))))))

(defn db [{:keys [!dbs] :as node}]
{:pre [(node? node)]}
Expand Down Expand Up @@ -80,12 +96,66 @@
(swap! !inc-qs assoc (:id inc-q) inc-q)
inc-q))

(defn unregister-inc-q [{:keys [!inc-qs] :as node} {:keys [id] :as inc-q}]
(defn unregister-inc-q [{:keys [!inc-qs !delta-listeners] :as node} {:keys [id] :as inc-q}]
{:pre [(node? node)]}
(swap! !inc-qs dissoc id)
(swap! !delta-listeners dissoc id)
node)

(defn- register-delta-listener! [{:keys [!delta-listeners] :as node} {:keys [id] :as _inc-q} listener]
{:pre [(node? node)]}
(let [listener-id (random-uuid)]
(swap! !delta-listeners update id assoc listener-id listener)
listener-id))

(defn- unregister-delta-listener! [{:keys [!delta-listeners] :as node} {:keys [id] :as _inc-q} listener-id]
{:pre [(node? node)]}
(swap! !delta-listeners
(fn [listeners]
(let [remaining (not-empty (dissoc (get listeners id) listener-id))]
(if remaining
(assoc listeners id remaining)
(dissoc listeners id)))))
node)

(defn consume-delta! [inc-q]
(if (dbsp/dbsp-query? inc-q)
(dbsp/pop-result! inc-q)
(incremental/pop-result! inc-q)))

(defrecord IncrementalStream [conn inc-q]
Closeable
(close [_] (unregister-inc-q conn inc-q)))

(defn open-deltas ^Closeable [conn query]
(->IncrementalStream conn (q-inc conn query)))

(defn take! [{:keys [inc-q]}]
(consume-delta! inc-q))

(defn delta-chan [conn query]
(let [inc-q (q-inc conn query)
output-ch (async/chan 1024)
listener-id (promise)]
(deliver listener-id
(register-delta-listener!
conn inc-q
(fn [delta]
;; TODO this only unregisters if a new delta arrives
(async/put! output-ch delta
(fn [delivered?]
(when-not delivered?
(unregister-delta-listener! conn inc-q @listener-id)
(unregister-inc-q conn inc-q)))))))
output-ch))

(defrecord IncrementalSubscription [conn inc-q listener-id]
Closeable
(close [_]
(unregister-delta-listener! conn inc-q listener-id)
(unregister-inc-q conn inc-q)))

(defn subscribe ^Closeable [conn query callback]
(let [inc-q (q-inc conn query)
listener-id (register-delta-listener! conn inc-q callback)]
(->IncrementalSubscription conn inc-q listener-id)))
3 changes: 2 additions & 1 deletion src/main/clojure/hooray/incremental.clj
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@
delta (-> (.step pipeline zset-indices)
zset/zset->result-set)]
(when (seq delta)
(swap! !queue conj delta))))
(swap! !queue conj delta)
delta)))

(defrecord IncrementalQuery [id query pipeline !queue])

Expand Down
97 changes: 97 additions & 0 deletions src/test/clojure/hooray/subscription_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
(ns hooray.subscription-test
(:require [clojure.core.async :as async]
[clojure.test :as t :refer [deftest is testing]]
[hooray.core :as h]
[hooray.fixtures :as fix]))

(t/use-fixtures :each fix/with-each-dbsp-version fix/with-node fix/with-people-schema)

(def names-query
'{:find [name]
:where [[e :name name]]})

(defn- take-with-timeout [ch]
(let [timeout (async/timeout 1000)
[value port] (async/alts!! [ch timeout])]
(if (= port timeout)
::timeout
value)))

(defn- is-delta [expected actual]
(is (not= ::timeout actual))
(when-not (= ::timeout actual)
(is (= expected (set actual)))))

(deftest incremental-subscription-apis-produce-deltas
(testing "open-deltas"
(with-open [deltas (h/open-deltas fix/*node* names-query)]
(h/transact fix/*node* [{:db/id :ivan :name "Ivan"}])
(is-delta #{[["Ivan"] 1]} (h/take! deltas))))

(testing "delta-chan"
(let [delta-ch (h/delta-chan fix/*node* names-query)]
(try
(h/transact fix/*node* [{:db/id :petr :name "Petr"}])
(is-delta #{[["Petr"] 1]} (take-with-timeout delta-ch))
(finally
(async/close! delta-ch)))))

(testing "subscribe"
(let [delta (promise)]
(with-open [subscription (h/subscribe fix/*node* names-query #(deliver delta %))]
(h/transact fix/*node* [{:db/id :anna :name "Anna"}])
(is-delta #{[["Anna"] 1]} (deref delta 1000 ::timeout))
(is (nil? (h/consume-delta! (:inc-q subscription))))))))

(deftest subscribe-callback-failures-do-not-break-transact
(let [delta (promise)]
(with-open [_failing-subscription (h/subscribe fix/*node* names-query
(fn [_delta]
(throw (ex-info "boom" {}))))
_working-subscription (h/subscribe fix/*node* names-query
#(deliver delta %))]
(is (nil? (h/transact fix/*node* [{:db/id :ivan :name "Ivan"}])))
(is-delta #{[["Ivan"] 1]} (deref delta 1000 ::timeout)))))


(comment
;; README examples

(def node (h/connect {:type :mem :storage :hash :algo :generic}))
(h/transact node [{:db/id -100
:db/ident :name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one}])

(def names-query
'{:find [name]
:where [[e :name name]]})

;; Pull deltas explicitly.
(with-open [deltas (h/open-deltas node names-query)]
(h/transact node [{:db/id :ivan :name "Ivan"}])
(h/take! deltas))
;; => ([["Ivan"] 1])

;; Read deltas from a core.async channel.
(defn- take-with-timeout [ch]
(let [timeout (async/timeout 1000)
[value port] (async/alts!! [ch timeout])]
(if (= port timeout)
::timeout
value)))

(let [delta-ch (h/delta-chan node names-query)]
(try
(h/transact node [{:db/id :petr :name "Petr"}])
(take-with-timeout delta-ch)
(finally
(async/close! delta-ch))))
;; => ([["Petr"] 1])

;; Receive deltas in a callback.
(with-out-str
(with-open [_subscription (h/subscribe node names-query println)]
(h/transact node [{:db/id :anna :name "Anna"}])))
;; => "([[Anna] 1])\n"
)