From d7729a00af1107eb916daac6bffda12980253970 Mon Sep 17 00:00:00 2001 From: Germano Eichenberg Date: Wed, 17 Jun 2026 19:09:02 -0300 Subject: [PATCH 1/4] Support abstract-level 3 test semantics --- guest.js | 33 ++-- package.json | 4 +- test/basic.js | 275 ++++++++++++-------------------- test/concurrent.js | 47 +++--- test/iterator-error.js | 5 +- test/retry.js | 345 +++++++++++++++++++++-------------------- test/streams.js | 28 ++-- test/sublevel.js | 171 ++++++++++---------- 8 files changed, 411 insertions(+), 497 deletions(-) diff --git a/guest.js b/guest.js index 4558018..434dd46 100644 --- a/guest.js +++ b/guest.js @@ -27,7 +27,8 @@ const kPending = Symbol('pending') const kCallback = Symbol('callback') const kSeq = Symbol('seq') const kErrored = Symbol('errored') -const noop = function () {} +const kAbortIterator = Symbol('abortIterator') +const kOnDbClosing = Symbol('onDbClosing') class ManyLevelGuest extends AbstractLevel { constructor (options) { @@ -204,17 +205,7 @@ class ManyLevelGuest extends AbstractLevel { } for (const req of this[kIterators].clear()) { - // Cancel in-flight operation if any - // TODO: does this need to be refactored to use AbortError to pass back up to the request initiator? - const callback = req.iterator[kCallback] - req.iterator[kCallback] = null - - if (callback) { - callback(new ModuleError(msg, { code })) - } - - // Note: an in-flight operation would block close() - req.iterator.close(noop) + req.iterator[kAbortIterator](code) } } @@ -377,13 +368,14 @@ class ManyLevelGuest extends AbstractLevel { // For tests only so does not need error handling this[kExplicitClose] = false const remote = this[kRemote]() + const local = this.createRpcStream() pipeline( remote, - this.createRpcStream(), + local, remote ).catch(err => { if (err.code === 'ABORT_ERR') { - return this.close() + if (!this[kExplicitClose] && this[kRpcStream] === local) return this.close() } }) } else if (this[kExplicitClose]) { @@ -422,6 +414,8 @@ class ManyLevelGuestIterator extends AbstractIterator { this[kPending] = [] this[kCallback] = null this[kSeq] = 0 + this[kOnDbClosing] = () => this[kAbortIterator]('LEVEL_ITERATOR_NOT_OPEN') + db.once('closing', this[kOnDbClosing]) const req = this[kRequest] = { tag: input.iterator, @@ -529,10 +523,21 @@ class ManyLevelGuestIterator extends AbstractIterator { } async _close () { + this.db.removeListener('closing', this[kOnDbClosing]) await this.db[kWrite]({ tag: input.iteratorClose, id: this[kRequest].id }) this.db[kIterators].remove(this[kRequest].id) this.db[kFlushed]() } + + [kAbortIterator] (code) { + this[kPending].push({ error: code }) + this[kEnded] = true + + const callback = this[kCallback] + this[kCallback] = null + + if (callback) callback(null) + } } function normalizeValue (value) { diff --git a/package.json b/package.json index f465642..42120e3 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,7 @@ ], "dependencies": { "@vweevers/length-prefixed-stream": "^1.0.0", - "abstract-level": "^2.0.2", + "abstract-level": "^3.0.0", "module-error": "^1.0.2", "protocol-buffers-encodings": "^1.1.0", "readable-stream": "^4.0.0" @@ -40,7 +40,7 @@ "faucet": "^0.0.3", "hallmark": "^4.0.0", "level-read-stream": "^1.1.0", - "memory-level": "^2.0.0", + "memory-level": "^3.0.0", "nyc": "^15.1.0", "protocol-buffers": "^5.0.0", "standard": "^16.0.3", diff --git a/test/basic.js b/test/basic.js index 6b18457..fadd31a 100644 --- a/test/basic.js +++ b/test/basic.js @@ -7,8 +7,15 @@ const { pipeline } = require('readable-stream') const concat = require('concat-stream') const { ManyLevelHost, ManyLevelGuest } = require('..') -tape('get', function (t) { - t.plan(7) +function collect (stream) { + return new Promise((resolve, reject) => { + stream.on('error', reject) + stream.pipe(concat(resolve)) + }) +} + +tape('get', async function (t) { + t.plan(3) const db = new MemoryLevel() const host = new ManyLevelHost(db) @@ -17,27 +24,14 @@ tape('get', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - db.put('hello', 'world', function (err) { - t.error(err, 'no err') + await db.put('hello', 'world') - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - }) - - guest.get(Buffer.from('hello'), function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - }) - - guest.get('hello', { valueEncoding: 'buffer' }, function (err, value) { - t.error(err, 'no err') - t.same(value, Buffer.from('world')) - }) - }) + t.same(await guest.get('hello'), 'world') + t.same(await guest.get(Buffer.from('hello')), 'world') + t.same(await guest.get('hello', { valueEncoding: 'buffer' }), Buffer.from('world')) }) -tape('get with valueEncoding: json in constructor', function (t) { +tape('get with valueEncoding: json in constructor', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -45,18 +39,11 @@ tape('get with valueEncoding: json in constructor', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - db.put('hello', '{"foo":"world"}', function () { - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, { foo: 'world' }) - t.end() - }) - }) + await db.put('hello', '{"foo":"world"}') + t.same(await guest.get('hello'), { foo: 'world' }) }) -tape('get with valueEncoding: json in get options', function (t) { - t.plan(5) - +tape('get with valueEncoding: json in get options', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -64,22 +51,13 @@ tape('get with valueEncoding: json in get options', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - db.put('hello', '{"foo":"world"}', function (err) { - t.error(err, 'no err') - - guest.get('hello', { valueEncoding: 'json' }, function (err, value) { - t.error(err, 'no err') - t.same(value, { foo: 'world' }) - }) + await db.put('hello', '{"foo":"world"}') - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, '{"foo":"world"}') - }) - }) + t.same(await guest.get('hello', { valueEncoding: 'json' }), { foo: 'world' }) + t.same(await guest.get('hello'), '{"foo":"world"}') }) -tape('put', function (t) { +tape('put', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -87,19 +65,11 @@ tape('put', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('hello', 'world', function (err) { - t.error(err, 'no err') - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - t.end() - }) - }) + await guest.put('hello', 'world') + t.same(await guest.get('hello'), 'world') }) -tape('put with valueEncoding: json in constructor', function (t) { - t.plan(5) - +tape('put with valueEncoding: json in constructor', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -107,24 +77,13 @@ tape('put with valueEncoding: json in constructor', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('hello', { foo: 'world' }, function (err) { - t.error(err, 'no err') - - db.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, '{"foo":"world"}') - }) + await guest.put('hello', { foo: 'world' }) - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, { foo: 'world' }) - }) - }) + t.same(await db.get('hello'), '{"foo":"world"}') + t.same(await guest.get('hello'), { foo: 'world' }) }) -tape('put with valueEncoding: json in put options', function (t) { - t.plan(5) - +tape('put with valueEncoding: json in put options', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -132,19 +91,10 @@ tape('put with valueEncoding: json in put options', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('hello', { foo: 'world' }, { valueEncoding: 'json' }, function (err) { - t.error(err, 'no err') - - db.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, '{"foo":"world"}') - }) + await guest.put('hello', { foo: 'world' }, { valueEncoding: 'json' }) - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, '{"foo":"world"}') - }) - }) + t.same(await db.get('hello'), '{"foo":"world"}') + t.same(await guest.get('hello'), '{"foo":"world"}') }) tape('readonly', async function (t) { @@ -168,7 +118,7 @@ tape('readonly', async function (t) { t.is(await guest.get('hello'), 'verden', 'old value') }) -tape('del', function (t) { +tape('del', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -176,19 +126,17 @@ tape('del', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('hello', 'world', function (err) { - t.error(err, 'no err') - guest.del('hello', function (err) { - t.error(err, 'no err') - guest.get('hello', function (err) { - t.is(err && err.code, 'LEVEL_NOT_FOUND') - t.end() - }) - }) - }) + await guest.put('hello', 'world') + await guest.del('hello') + + try { + await guest.get('hello') + } catch (err) { + t.is(err && err.code, 'LEVEL_NOT_FOUND') + } }) -tape('batch', function (t) { +tape('batch', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -196,21 +144,13 @@ tape('batch', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { - t.error(err, 'no err') - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - guest.get('hej', function (err, value) { - t.error(err, 'no err') - t.same(value, 'verden') - t.end() - }) - }) - }) + await guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }]) + + t.same(await guest.get('hello'), 'world') + t.same(await guest.get('hej'), 'verden') }) -tape('read stream', function (t) { +tape('read stream', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -218,19 +158,15 @@ tape('read stream', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { - t.error(err, 'no err') - const rs = new EntryStream(guest) - rs.pipe(concat(function (entries) { - t.same(entries.length, 2) - t.same(entries[0], { key: 'hej', value: 'verden' }) - t.same(entries[1], { key: 'hello', value: 'world' }) - t.end() - })) - }) + await guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }]) + + const entries = await collect(new EntryStream(guest)) + t.same(entries.length, 2) + t.same(entries[0], { key: 'hej', value: 'verden' }) + t.same(entries[1], { key: 'hello', value: 'world' }) }) -tape('read stream (gt)', function (t) { +tape('read stream (gt)', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -238,15 +174,11 @@ tape('read stream (gt)', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { - t.error(err, 'no err') - const rs = new EntryStream(guest, { gt: 'hej' }) - rs.pipe(concat(function (entries) { - t.same(entries.length, 1) - t.same(entries[0], { key: 'hello', value: 'world' }) - t.end() - })) - }) + await guest.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }]) + + const entries = await collect(new EntryStream(guest, { gt: 'hej' })) + t.same(entries.length, 1) + t.same(entries[0], { key: 'hello', value: 'world' }) }) tape('for await...of iterator', async function (t) { @@ -267,23 +199,26 @@ tape('for await...of iterator', async function (t) { t.same(entries, [['hej', 'verden'], ['hello', 'world']]) }) -tape('close with pending request', function (t) { +tape('close with pending request', async function (t) { t.plan(2) const guest = new ManyLevelGuest() - guest.put('hello', 'world', function (err) { + const pending = guest.put('hello', 'world').catch(async function (err) { t.is(err && err.code, 'LEVEL_DATABASE_NOT_OPEN') - guest.put('hello', 'world', function (err) { + try { + await guest.put('hello', 'world') + } catch (err) { t.is(err && err.code, 'LEVEL_DATABASE_NOT_OPEN') - }) + } }) - guest.close() + await guest.close() + await pending }) -tape('disconnect with pending request', function (t) { +tape('disconnect with pending request', async function (t) { t.plan(3) const db = new MemoryLevel() @@ -293,49 +228,41 @@ tape('disconnect with pending request', function (t) { pipeline(stream, guest.createRpcStream(), stream, () => {}) - db.open(function (err) { - t.ifError(err) - - guest.open(function (err) { - t.ifError(err) + await db.open() + t.pass('db opened') - guest.put('hello', 'world', function (err) { - t.is(err && err.code, 'LEVEL_CONNECTION_LOST') + await guest.open() + t.pass('guest opened') - // TODO: what are we expecting here? - // guest.put('hello', 'world', function (err) { - // t.is(err && err.code, ?) - // }) - }) - - stream.destroy() - }) + const pending = guest.put('hello', 'world').catch(function (err) { + t.is(err && err.code, 'LEVEL_CONNECTION_LOST') }) + + stream.destroy() + await pending }) -tape('close with pending iterator', function (t) { - t.plan(3) +tape('close with pending iterator', async function (t) { + t.plan(2) const guest = new ManyLevelGuest() - guest.open(function (err) { - t.ifError(err) - - const it = guest.iterator() + await guest.open() + t.pass('guest opened') - it.next(function (err) { - t.is(err && err.code, 'LEVEL_ITERATOR_NOT_OPEN') + const it = guest.iterator() - it.next(function (err) { - t.is(err && err.code, 'LEVEL_ITERATOR_NOT_OPEN') - }) - }) - - guest.close() + const pending = it.next().then(function (value) { + t.is(value, undefined) + }, function (err) { + t.is(err && err.code, 'LEVEL_ITERATOR_NOT_OPEN') }) + + await guest.close() + await pending }) -tape('disconnect with pending iterator', function (t) { +tape('disconnect with pending iterator', async function (t) { t.plan(3) const db = new MemoryLevel() @@ -345,22 +272,16 @@ tape('disconnect with pending iterator', function (t) { pipeline(stream, guest.createRpcStream(), stream, () => {}) - db.open(function (err) { - t.ifError(err) - - guest.open(function (err) { - t.ifError(err) + await db.open() + t.pass('db opened') - guest.iterator().next(function (err) { - t.is(err && err.code, 'LEVEL_CONNECTION_LOST') + await guest.open() + t.pass('guest opened') - // TODO: what are we expecting here? - // guest.iterator().next(function (err) { - // t.is(err && err.code, ?) - // }) - }) - - stream.destroy() - }) + const pending = guest.iterator().next().catch(function (err) { + t.is(err && err.code, 'LEVEL_CONNECTION_LOST') }) + + stream.destroy() + await pending }) diff --git a/test/concurrent.js b/test/concurrent.js index 5375ee1..d6a5b65 100644 --- a/test/concurrent.js +++ b/test/concurrent.js @@ -6,7 +6,14 @@ const { EntryStream } = require('level-read-stream') const concat = require('concat-stream') const { ManyLevelHost, ManyLevelGuest } = require('..') -tape('two concurrent iterators', function (t) { +function collect (stream) { + return new Promise((resolve, reject) => { + stream.on('error', reject) + stream.pipe(concat(resolve)) + }) +} + +tape('two concurrent iterators', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() @@ -20,23 +27,17 @@ tape('two concurrent iterators', function (t) { batch.push({ type: 'put', key: 'key-' + i, value: 'value-' + i }) } - guest.batch(batch, function (err) { - t.error(err) + await guest.batch(batch) - const rs1 = new EntryStream(guest) - const rs2 = new EntryStream(guest) + const rs1 = new EntryStream(guest) + const rs2 = new EntryStream(guest) + const [list1, list2] = await Promise.all([collect(rs1), collect(rs2)]) - rs1.pipe(concat(function (list1) { - t.same(list1.length, 100) - rs2.pipe(concat(function (list2) { - t.same(list2.length, 100) - t.end() - })) - })) - }) + t.same(list1.length, 100) + t.same(list2.length, 100) }) -tape('two concurrent guests', function (t) { +tape('two concurrent guests', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream1 = host.createRpcStream() @@ -50,18 +51,12 @@ tape('two concurrent guests', function (t) { const batch = [] for (let i = 0; i < 100; i++) batch.push({ type: 'put', key: 'key-' + i, value: 'value-' + i }) - guest1.batch(batch, function (err) { - t.error(err) + await guest1.batch(batch) - const rs1 = new EntryStream(guest1) - const rs2 = new EntryStream(guest2) + const rs1 = new EntryStream(guest1) + const rs2 = new EntryStream(guest2) + const [list1, list2] = await Promise.all([collect(rs1), collect(rs2)]) - rs1.pipe(concat(function (list1) { - t.same(list1.length, 100) - rs2.pipe(concat(function (list2) { - t.same(list2.length, 100) - t.end() - })) - })) - }) + t.same(list1.length, 100) + t.same(list2.length, 100) }) diff --git a/test/iterator-error.js b/test/iterator-error.js index f71f9d1..a1b1797 100644 --- a/test/iterator-error.js +++ b/test/iterator-error.js @@ -25,9 +25,10 @@ test('iterator.next() error', async function (t) { db._iterator = function (options) { const it = original.call(this, options) - it._nextv = function (size, options, cb) { - this.nextTick(cb, first ? new Error('foo') : new ModuleError('bar', { code: 'LEVEL_XYZ' })) + it._nextv = async function () { + const err = first ? new Error('foo') : new ModuleError('bar', { code: 'LEVEL_XYZ' }) first = false + throw err } return it diff --git a/test/retry.js b/test/retry.js index 077d2c3..e5a2088 100644 --- a/test/retry.js +++ b/test/retry.js @@ -6,72 +6,88 @@ const { EntryStream } = require('level-read-stream') const { pipeline } = require('readable-stream') const { ManyLevelHost, ManyLevelGuest } = require('..') -tape('retry get', function (t) { +function reconnect (host, guest, isDone, onconnect) { + if (isDone()) return + + let local + try { + local = guest.createRpcStream() + } catch (err) { + if (err.message === 'Only one rpc stream can be active') { + setImmediate(() => reconnect(host, guest, isDone, onconnect)) + return + } + + throw err + } + + const remote = host.createRpcStream() + onconnect(local) + pipeline(remote, local, remote, () => { + setImmediate(() => reconnect(host, guest, isDone, onconnect)) + }) +} + +tape('retry get', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() const guest = new ManyLevelGuest({ retry: true }) - db.put('hello', 'world', function () { - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - t.end() - }) + await db.put('hello', 'world') - stream.pipe(guest.createRpcStream()).pipe(stream) - }) + const pending = guest.get('hello') + stream.pipe(guest.createRpcStream()).pipe(stream) + + t.same(await pending, 'world') }) -tape('no retry get', function (t) { +tape('no retry get', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() const guest = new ManyLevelGuest({ retry: false }) - guest.open(function () { - db.put('hello', 'world', function () { - guest.get('hello', function (err, value) { - t.ok(err, 'had error') - t.end() - }) - - const rpc = guest.createRpcStream() - stream.pipe(rpc).pipe(stream) - rpc.destroy() - - setTimeout(function () { - const rpc = guest.createRpcStream() - stream.pipe(rpc).pipe(stream) - }, 100) - }) + await guest.open() + await db.put('hello', 'world') + + const pending = guest.get('hello').catch(function (err) { + t.ok(err, 'had error') }) + + const rpc = guest.createRpcStream() + stream.pipe(rpc).pipe(stream) + rpc.destroy() + + setTimeout(function () { + const rpc = guest.createRpcStream() + stream.pipe(rpc).pipe(stream) + }, 100) + + await pending }) -tape('retry get', function (t) { +tape('retry get', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream() const guest = new ManyLevelGuest({ retry: true }) - guest.open(function () { - db.put('hello', 'world', function () { - guest.get('hello', function (err, value) { - t.error(err, 'no err') - t.same(value, 'world') - t.end() - }) - - const rpc = guest.createRpcStream() - stream.pipe(rpc).pipe(stream) - rpc.destroy() - - setTimeout(function () { - const rpc = guest.createRpcStream() - stream.pipe(rpc).pipe(stream) - }, 100) - }) - }) + await guest.open() + await db.put('hello', 'world') + + const pending = guest.get('hello') + + const rpc = guest.createRpcStream() + stream.pipe(rpc).pipe(stream) + rpc.destroy() + + setTimeout(function () { + const rpc = guest.createRpcStream() + stream.pipe(rpc).pipe(stream) + }, 100) + + t.same(await pending, 'world') }) for (const reverse of [false, true]) { @@ -99,31 +115,22 @@ for (const reverse of [false, true]) { const it = original.call(this, options) const nextv = it._nextv - it._nextv = function (size, options, cb) { + it._nextv = async function (size, options) { if (n++ > entryCount * 10) throw new Error('Infinite loop') - nextv.call(this, size, options, (...args) => { - setTimeout(cb.bind(null, ...args), 10) - }) + const entries = await nextv.call(this, size, options) + await new Promise(resolve => setTimeout(resolve, 10)) + return entries } return it } // (Re)connect every 50ms - ;(function connect () { - if (done) return - + reconnect(host, guest, () => done, function (local) { attempts++ - - const remote = host.createRpcStream() - const local = guest.createRpcStream() - - // TODO: calls back too soon if you destroy remote instead of local, - // because duplexify does not satisfy node's willEmitClose() check - pipeline(remote, local, remote, connect) setTimeout(local.destroy.bind(local), 50) - })() + }) const entries = await guest.iterator({ gte: '1'.padStart(padding, '0'), reverse }).all() done = true @@ -169,13 +176,10 @@ for (const reverse of [false, true]) { // Don't test deferredOpen await guest.open() - ;(function connect () { - if (done) return + reconnect(host, guest, () => done, function (nextLocal) { attempts++ - const remote = host.createRpcStream() - local = guest.createRpcStream() - pipeline(remote, local, remote, connect) - })() + local = nextLocal + }) // Wait for first connection await guest.get(Buffer.alloc(0)) @@ -213,13 +217,10 @@ tape('retry value iterator', async function (t) { return { type: 'put', key, value } })) - ;(function connect () { - if (done) return + reconnect(host, guest, () => done, function (nextLocal) { attempts++ - const remote = host.createRpcStream() - local = guest.createRpcStream() - pipeline(remote, local, remote, connect) - })() + local = nextLocal + }) // Wait for first connection await guest.get('a') @@ -259,13 +260,10 @@ for (const reverse of [false, true]) { // Don't test deferredOpen await guest.open() - ;(function connect () { - if (done) return + reconnect(host, guest, () => done, function (nextLocal) { attempts++ - const remote = host.createRpcStream() - local = guest.createRpcStream() - pipeline(remote, local, remote, connect) - })() + local = nextLocal + }) const result = [] const it = guest.keys({ reverse }) @@ -306,13 +304,10 @@ for (const reverse of [false, true]) { // Don't test deferredOpen await guest.open() - ;(function connect () { - if (done) return + reconnect(host, guest, () => done, function (nextLocal) { attempts++ - const remote = host.createRpcStream() - local = guest.createRpcStream() - pipeline(remote, local, remote, connect) - })() + local = nextLocal + }) const result = [] const it = guest.keys({ reverse }) @@ -347,107 +342,115 @@ for (const reverse of [false, true]) { }) } -tape('retry read stream', function (t) { +tape('retry read stream', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const guest = new ManyLevelGuest({ retry: true }) - guest.open(function () { - db.batch([{ - type: 'put', - key: 'hej', - value: 'verden' - }, { - type: 'put', - key: 'hello', - value: 'world' - }, { - type: 'put', - key: 'hola', - value: 'mundo' - }], function () { - const rs = new EntryStream(guest) - const expected = [{ - key: 'hej', - value: 'verden' - }, { - key: 'hello', - value: 'world' - }, { - key: 'hola', - value: 'mundo' - }] - - rs.on('data', function (data) { - t.same(data, expected.shift(), 'stream continues over retry') - }) - - rs.on('end', function () { - t.same(expected.length, 0, 'no more data') - t.end() - }) - - let stream - let guestStream - - const connect = function () { - stream = host.createRpcStream() - guestStream = guest.createRpcStream() - stream.pipe(guestStream).pipe(stream) - } + await guest.open() + await db.batch([{ + type: 'put', + key: 'hej', + value: 'verden' + }, { + type: 'put', + key: 'hello', + value: 'world' + }, { + type: 'put', + key: 'hola', + value: 'mundo' + }]) + + const rs = new EntryStream(guest) + const expected = [{ + key: 'hej', + value: 'verden' + }, { + key: 'hello', + value: 'world' + }, { + key: 'hola', + value: 'mundo' + }] + + const ended = new Promise((resolve, reject) => { + rs.on('data', function (data) { + t.same(data, expected.shift(), 'stream continues over retry') + }) - connect() + rs.on('end', function () { + t.same(expected.length, 0, 'no more data') + resolve() }) + + rs.on('error', reject) }) + + let stream + let guestStream + + const connect = function () { + stream = host.createRpcStream() + guestStream = guest.createRpcStream() + stream.pipe(guestStream).pipe(stream) + } + + connect() + await ended }) -tape('retry read stream and limit', function (t) { +tape('retry read stream and limit', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const guest = new ManyLevelGuest({ retry: true }) - guest.open(function () { - db.batch([{ - type: 'put', - key: 'hej', - value: 'verden' - }, { - type: 'put', - key: 'hello', - value: 'world' - }, { - type: 'put', - key: 'hola', - value: 'mundo' - }], function () { - const rs = new EntryStream(guest, { limit: 2 }) - const expected = [{ - key: 'hej', - value: 'verden' - }, { - key: 'hello', - value: 'world' - }] - - rs.on('data', function (data) { - t.same(data, expected.shift(), 'stream continues over retry') - }) - - rs.on('end', function () { - t.same(expected.length, 0, 'no more data') - t.end() - }) - - let stream - let guestStream - - const connect = function () { - stream = host.createRpcStream() - guestStream = guest.createRpcStream() - stream.pipe(guestStream).pipe(stream) - } + await guest.open() + await db.batch([{ + type: 'put', + key: 'hej', + value: 'verden' + }, { + type: 'put', + key: 'hello', + value: 'world' + }, { + type: 'put', + key: 'hola', + value: 'mundo' + }]) + + const rs = new EntryStream(guest, { limit: 2 }) + const expected = [{ + key: 'hej', + value: 'verden' + }, { + key: 'hello', + value: 'world' + }] + + const ended = new Promise((resolve, reject) => { + rs.on('data', function (data) { + t.same(data, expected.shift(), 'stream continues over retry') + }) - connect() + rs.on('end', function () { + t.same(expected.length, 0, 'no more data') + resolve() }) + + rs.on('error', reject) }) + + let stream + let guestStream + + const connect = function () { + stream = host.createRpcStream() + guestStream = guest.createRpcStream() + stream.pipe(guestStream).pipe(stream) + } + + connect() + await ended }) diff --git a/test/streams.js b/test/streams.js index f89ffe3..f5f3573 100644 --- a/test/streams.js +++ b/test/streams.js @@ -6,7 +6,14 @@ const { EntryStream } = require('level-read-stream') const concat = require('concat-stream') const { ManyLevelHost, ManyLevelGuest } = require('..') -tape('two concurrent iterators', function (t) { +function collect (stream) { + return new Promise((resolve, reject) => { + stream.on('error', reject) + stream.pipe(concat(resolve)) + }) +} + +tape('two concurrent iterators', async function (t) { const db = new MemoryLevel() const host = new ManyLevelHost(db) const stream = host.createRpcStream(db) @@ -17,19 +24,12 @@ tape('two concurrent iterators', function (t) { const batch = [] for (let i = 0; i < 100; i++) batch.push({ type: 'put', key: 'key-' + i, value: 'value-' + i }) - guest.batch(batch, function (err) { - t.error(err) + await guest.batch(batch) - // TODO: use iterator.all() instead - const rs1 = new EntryStream(guest) - const rs2 = new EntryStream(guest) + const rs1 = new EntryStream(guest) + const rs2 = new EntryStream(guest) + const [list1, list2] = await Promise.all([collect(rs1), collect(rs2)]) - rs1.pipe(concat(function (list1) { - t.same(list1.length, 100) - rs2.pipe(concat(function (list2) { - t.same(list2.length, 100) - t.end() - })) - })) - }) + t.same(list1.length, 100) + t.same(list2.length, 100) }) diff --git a/test/sublevel.js b/test/sublevel.js index 9846eb7..c9dbd2a 100644 --- a/test/sublevel.js +++ b/test/sublevel.js @@ -6,7 +6,14 @@ const { EntryStream } = require('level-read-stream') const concat = require('concat-stream') const { ManyLevelHost, ManyLevelGuest } = require('..') -tape('sublevel on deferred many-level guest', function (t) { +function collect (stream) { + return new Promise((resolve, reject) => { + stream.on('error', reject) + stream.pipe(concat(resolve)) + }) +} + +tape('sublevel on deferred many-level guest', async function (t) { t.plan(5) const db = new MemoryLevel() @@ -19,25 +26,21 @@ tape('sublevel on deferred many-level guest', function (t) { t.is(guest.status, 'opening') stream.pipe(guest.createRpcStream()).pipe(stream) - sub1.put('hello', { test: 'world' }, function (err) { - t.error(err, 'no err') + await sub1.put('hello', { test: 'world' }) + t.pass('put succeeded') - // TODO: use iterator.all() instead - new EntryStream(sub1).pipe(concat(function (entries) { - t.same(entries, [{ key: 'hello', value: { test: 'world' } }]) - })) + const [sub1Entries, sub2Entries, dbEntries] = await Promise.all([ + collect(new EntryStream(sub1)), + collect(new EntryStream(sub2)), + collect(new EntryStream(db)) + ]) - new EntryStream(sub2).pipe(concat(function (entries) { - t.same(entries, [{ key: 'hello', value: '{"test":"world"}' }]) - })) - - new EntryStream(db).pipe(concat(function (entries) { - t.same(entries, [{ key: '!test!hello', value: '{"test":"world"}' }]) - })) - }) + t.same(sub1Entries, [{ key: 'hello', value: { test: 'world' } }]) + t.same(sub2Entries, [{ key: 'hello', value: '{"test":"world"}' }]) + t.same(dbEntries, [{ key: '!test!hello', value: '{"test":"world"}' }]) }) -tape('sublevel on non-deferred many-level guest', function (t) { +tape('sublevel on non-deferred many-level guest', async function (t) { t.plan(5) const db = new MemoryLevel() @@ -47,32 +50,27 @@ tape('sublevel on non-deferred many-level guest', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.once('open', function () { - t.is(guest.status, 'open') + await guest.open() + t.is(guest.status, 'open') - const sub1 = guest.sublevel('test', { valueEncoding: 'json' }) - const sub2 = guest.sublevel('test') - - sub1.put('hello', { test: 'world' }, function (err) { - t.error(err, 'no err') + const sub1 = guest.sublevel('test', { valueEncoding: 'json' }) + const sub2 = guest.sublevel('test') - // TODO: use iterator.all() instead - new EntryStream(sub1).pipe(concat(function (entries) { - t.same(entries, [{ key: 'hello', value: { test: 'world' } }]) - })) + await sub1.put('hello', { test: 'world' }) + t.pass('put succeeded') - new EntryStream(sub2).pipe(concat(function (entries) { - t.same(entries, [{ key: 'hello', value: '{"test":"world"}' }]) - })) + const [sub1Entries, sub2Entries, dbEntries] = await Promise.all([ + collect(new EntryStream(sub1)), + collect(new EntryStream(sub2)), + collect(new EntryStream(db)) + ]) - new EntryStream(db).pipe(concat(function (entries) { - t.same(entries, [{ key: '!test!hello', value: '{"test":"world"}' }]) - })) - }) - }) + t.same(sub1Entries, [{ key: 'hello', value: { test: 'world' } }]) + t.same(sub2Entries, [{ key: 'hello', value: '{"test":"world"}' }]) + t.same(dbEntries, [{ key: '!test!hello', value: '{"test":"world"}' }]) }) -tape('many-level host on deferred sublevel', function (t) { +tape('many-level host on deferred sublevel', async function (t) { t.plan(4) const db = new MemoryLevel() @@ -83,63 +81,57 @@ tape('many-level host on deferred sublevel', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('from', 'guest', function (err) { - t.error(err, 'no err') + await guest.put('from', 'guest') + t.pass('guest put succeeded') - sub2.put('from', 'host', function (err) { - t.error(err, 'no err') + await sub2.put('from', 'host') + t.pass('host put succeeded') - // TODO: use iterator.all() instead - new EntryStream(guest).pipe(concat(function (entries) { - t.same(entries, [{ key: 'from', value: 'guest' }]) - })) + const [guestEntries, dbEntries] = await Promise.all([ + collect(new EntryStream(guest)), + collect(new EntryStream(db)) + ]) - new EntryStream(db).pipe(concat(function (entries) { - t.same(entries, [ - { key: '!test1!from', value: 'guest' }, - { key: '!test2!from', value: 'host' } - ]) - })) - }) - }) + t.same(guestEntries, [{ key: 'from', value: 'guest' }]) + t.same(dbEntries, [ + { key: '!test1!from', value: 'guest' }, + { key: '!test2!from', value: 'host' } + ]) }) -tape('many-level host on non-deferred sublevel', function (t) { +tape('many-level host on non-deferred sublevel', async function (t) { t.plan(4) const db = new MemoryLevel() const sub1 = db.sublevel('test1') const sub2 = db.sublevel('test2') - sub1.once('open', function () { - const stream = new ManyLevelHost(sub1).createRpcStream() - const guest = new ManyLevelGuest() + await sub1.open() - stream.pipe(guest.createRpcStream()).pipe(stream) + const stream = new ManyLevelHost(sub1).createRpcStream() + const guest = new ManyLevelGuest() - guest.put('from', 'guest', function (err) { - t.error(err, 'no err') + stream.pipe(guest.createRpcStream()).pipe(stream) - sub2.put('from', 'host', function (err) { - t.error(err, 'no err') + await guest.put('from', 'guest') + t.pass('guest put succeeded') - // TODO: use iterator.all() instead - new EntryStream(guest).pipe(concat(function (entries) { - t.same(entries, [{ key: 'from', value: 'guest' }]) - })) + await sub2.put('from', 'host') + t.pass('host put succeeded') - new EntryStream(db).pipe(concat(function (entries) { - t.same(entries, [ - { key: '!test1!from', value: 'guest' }, - { key: '!test2!from', value: 'host' } - ]) - })) - }) - }) - }) + const [guestEntries, dbEntries] = await Promise.all([ + collect(new EntryStream(guest)), + collect(new EntryStream(db)) + ]) + + t.same(guestEntries, [{ key: 'from', value: 'guest' }]) + t.same(dbEntries, [ + { key: '!test1!from', value: 'guest' }, + { key: '!test2!from', value: 'host' } + ]) }) -tape('many-level host on nested sublevel', function (t) { +tape('many-level host on nested sublevel', async function (t) { t.plan(4) const db = new MemoryLevel() @@ -151,23 +143,20 @@ tape('many-level host on nested sublevel', function (t) { stream.pipe(guest.createRpcStream()).pipe(stream) - guest.put('from', 'guest', function (err) { - t.error(err, 'no err') + await guest.put('from', 'guest') + t.pass('guest put succeeded') - sub3.put('from', 'host', function (err) { - t.error(err, 'no err') + await sub3.put('from', 'host') + t.pass('host put succeeded') - // TODO: use iterator.all() instead - new EntryStream(guest).pipe(concat(function (entries) { - t.same(entries, [{ key: 'from', value: 'guest' }]) - })) + const [guestEntries, dbEntries] = await Promise.all([ + collect(new EntryStream(guest)), + collect(new EntryStream(db)) + ]) - new EntryStream(db).pipe(concat(function (entries) { - t.same(entries, [ - { key: '!test1!!test2!from', value: 'guest' }, - { key: '!test3!from', value: 'host' } - ]) - })) - }) - }) + t.same(guestEntries, [{ key: 'from', value: 'guest' }]) + t.same(dbEntries, [ + { key: '!test1!!test2!from', value: 'guest' }, + { key: '!test3!from', value: 'host' } + ]) }) From 8d4b3716d8a736e7468ff137cb17aff5ac6141a9 Mon Sep 17 00:00:00 2001 From: Germano Eichenberg Date: Wed, 17 Jun 2026 21:10:22 -0300 Subject: [PATCH 2/4] Support v3 has and snapshot APIs --- guest.js | 119 ++++- host.js | 98 +++- messages.js | 1368 +++++++++++++++++++++++++++++++++++--------------- schema.proto | 36 ++ tags.js | 16 +- test/v3.js | 114 +++++ 6 files changed, 1339 insertions(+), 412 deletions(-) create mode 100644 test/v3.js diff --git a/guest.js b/guest.js index 434dd46..4525dd8 100644 --- a/guest.js +++ b/guest.js @@ -1,6 +1,6 @@ 'use strict' -const { AbstractLevel, AbstractIterator } = require('abstract-level') +const { AbstractLevel, AbstractIterator, AbstractSnapshot } = require('abstract-level') const lpstream = require('@vweevers/length-prefixed-stream') const ModuleError = require('module-error') const { input, output } = require('./tags') @@ -18,6 +18,7 @@ const kRef = Symbol('ref') const kDb = Symbol('db') const kRequests = Symbol('requests') const kIterators = Symbol('iterators') +const kSnapshots = Symbol('snapshots') const kRetry = Symbol('retry') const kRpcStream = Symbol('rpcStream') const kFlushed = Symbol('flushed') @@ -29,6 +30,7 @@ const kSeq = Symbol('seq') const kErrored = Symbol('errored') const kAbortIterator = Symbol('abortIterator') const kOnDbClosing = Symbol('onDbClosing') +const kId = Symbol('id') class ManyLevelGuest extends AbstractLevel { constructor (options) { @@ -37,14 +39,18 @@ class ManyLevelGuest extends AbstractLevel { super({ encodings: { buffer: true }, snapshots: !retry, + implicitSnapshots: !retry, + explicitSnapshots: !retry, permanence: true, seek: true, + has: true, createIfMissing: false, errorIfExists: false }, forward) this[kIterators] = new IdMap() this[kRequests] = new IdMap() + this[kSnapshots] = new IdMap() this[kRetry] = !!retry this[kEncode] = lpstream.encode() this[kRemote] = _remote || null @@ -106,6 +112,14 @@ class ManyLevelGuest extends AbstractLevel { case output.getManyCallback: ongetmanycallback(res) break + + case output.hasCallback: + onhascallback(res) + break + + case output.hasManyCallback: + onhasmanycallback(res) + break } self[kFlushed]() @@ -168,6 +182,20 @@ class ManyLevelGuest extends AbstractLevel { if (res.error) req.callback(new ModuleError('Could not get values', { code: res.error })) else req.callback(null, res.values.map(v => normalizeValue(v.value))) } + + function onhascallback (res) { + const req = self[kRequests].remove(res.id) + if (!req || !req.callback) return + if (res.error) req.callback(new ModuleError('Could not check key', { code: res.error })) + else req.callback(null, res.value) + } + + function onhasmanycallback (res) { + const req = self[kRequests].remove(res.id) + if (!req || !req.callback) return + if (res.error) req.callback(new ModuleError('Could not check keys', { code: res.error })) + else req.callback(null, res.values) + } } // Alias for backwards compat with multileveldown @@ -218,6 +246,7 @@ class ManyLevelGuest extends AbstractLevel { tag: input.get, id: 0, key: key, + snapshot: snapshotId(opts.snapshot), // This will resolve or reject based on the Host's response callback: (err, value) => { if (err) reject(err) @@ -238,6 +267,7 @@ class ManyLevelGuest extends AbstractLevel { tag: input.getMany, id: 0, keys: keys, + snapshot: snapshotId(opts.snapshot), // This will resolve or reject based on the Host's response callback: (err, values) => { if (err) reject(err) @@ -250,6 +280,46 @@ class ManyLevelGuest extends AbstractLevel { }) } + async _has (key, opts) { + if (this[kDb]) return this[kDb]._has(key, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.has, + id: 0, + key: key, + snapshot: snapshotId(opts.snapshot), + callback: (err, value) => { + if (err) reject(err) + else resolve(value) + } + } + + req.id = this[kRequests].add(req) + this[kWrite](req) + }) + } + + async _hasMany (keys, opts) { + if (this[kDb]) return this[kDb]._hasMany(keys, opts) + + return new Promise((resolve, reject) => { + const req = { + tag: input.hasMany, + id: 0, + keys: keys, + snapshot: snapshotId(opts.snapshot), + callback: (err, values) => { + if (err) reject(err) + else resolve(values) + } + } + + req.id = this[kRequests].add(req) + this[kWrite](req) + }) + } + async _put (key, value, opts) { if (this[kDb]) return this[kDb]._put(key, value, opts) @@ -314,6 +384,8 @@ class ManyLevelGuest extends AbstractLevel { async _clear (opts) { if (this[kDb]) return this[kDb]._clear(opts) + opts = encodeOptionsSnapshot(opts) + return new Promise((resolve, reject) => { const req = { tag: input.clear, @@ -331,6 +403,16 @@ class ManyLevelGuest extends AbstractLevel { }) } + _snapshot (options) { + if (this[kDb]) return this[kDb].snapshot(options) + + const snapshot = new ManyLevelGuestSnapshot(this, options) + const req = { tag: input.snapshot, id: snapshot[kId] } + + this[kWrite](req) + return snapshot + } + async [kWrite] (req) { if (this[kRequests].size + this[kIterators].size === 1) ref(this[kRef]) const enc = input.encoding(req.tag) @@ -414,7 +496,11 @@ class ManyLevelGuestIterator extends AbstractIterator { this[kPending] = [] this[kCallback] = null this[kSeq] = 0 - this[kOnDbClosing] = () => this[kAbortIterator]('LEVEL_ITERATOR_NOT_OPEN') + this[kOnDbClosing] = () => { + if (this.db[kRpcStream] === null && this.db[kDb] === null) { + this[kAbortIterator]('LEVEL_ITERATOR_NOT_OPEN') + } + } db.once('closing', this[kOnDbClosing]) const req = this[kRequest] = { @@ -422,10 +508,11 @@ class ManyLevelGuestIterator extends AbstractIterator { id: 0, seq: 0, iterator: this, - options, + options: encodeOptionsSnapshot(options), consumed: 0, bookmark: null, - seek: null + seek: null, + snapshot: snapshotId(options.snapshot) } const ack = this[kAckMessage] = { @@ -540,6 +627,30 @@ class ManyLevelGuestIterator extends AbstractIterator { } } +class ManyLevelGuestSnapshot extends AbstractSnapshot { + constructor (db, options) { + super(options) + this.db = db + this[kId] = db[kSnapshots].add(this) + } + + async _close () { + this.db[kSnapshots].remove(this[kId]) + await this.db[kWrite]({ tag: input.snapshotClose, id: this[kId] }) + this.db[kFlushed]() + } +} + +function snapshotId (snapshot) { + return snapshot instanceof ManyLevelGuestSnapshot ? snapshot[kId] : 0 +} + +function encodeOptionsSnapshot (options) { + if (!options || !options.snapshot) return options + const snapshot = snapshotId(options.snapshot) + return snapshot ? Object.assign({}, options, { snapshot }) : options +} + function normalizeValue (value) { return value === null ? undefined : value } diff --git a/host.js b/host.js index 74f10fd..d80c750 100644 --- a/host.js +++ b/host.js @@ -83,6 +83,7 @@ function createRpcStream (db, options, streamOptions) { if (err) return stream.destroy(err) const iterators = new Map() + const snapshots = new Map() const cleanup = async () => { await finished(stream).catch(() => null) @@ -90,7 +91,12 @@ function createRpcStream (db, options, streamOptions) { iterator.close() } + for (const snapshot of snapshots.values()) { + snapshot.close() + } + iterators.clear() + snapshots.clear() } // Don't await cleanup() @@ -121,6 +127,10 @@ function createRpcStream (db, options, streamOptions) { case input.iteratorSeek: return oniteratorseek(req) case input.clear: return readonly ? onreadonly(req) : onclear(req) case input.getMany: return ongetmany(req) + case input.has: return onhas(req) + case input.hasMany: return onhasmany(req) + case input.snapshot: return onsnapshot(req) + case input.snapshotClose: return onsnapshotclose(req) } }) @@ -134,6 +144,27 @@ function createRpcStream (db, options, streamOptions) { encode.write(encodeMessage(msg, output.getManyCallback)) } + function hasCallback (id, err, value) { + const msg = { id, error: errorCode(err), value } + encode.write(encodeMessage(msg, output.hasCallback)) + } + + function hasManyCallback (id, err, values) { + const msg = { id, error: errorCode(err), values } + encode.write(encodeMessage(msg, output.hasManyCallback)) + } + + function snapshot (id) { + if (!id) return undefined + + const snapshot = snapshots.get(id) + if (snapshot !== undefined) return snapshot + + throw new ModuleError('Snapshot is not open', { + code: 'LEVEL_SNAPSHOT_NOT_OPEN' + }) + } + async function onput (req) { preput(req.key, req.value, function (err) { return callback(req.id, err) }) try { @@ -145,7 +176,7 @@ function createRpcStream (db, options, streamOptions) { async function onget (req) { try { - const value = await db.get(req.key, encodingOptions) + const value = await db.get(req.key, withSnapshot(encodingOptions, snapshot(req.snapshot))) return callback(req.id, null, value) } catch (err) { return callback(req.id, err, null) @@ -154,13 +185,31 @@ function createRpcStream (db, options, streamOptions) { async function ongetmany (req) { try { - const values = await db.getMany(req.keys, encodingOptions) + const values = await db.getMany(req.keys, withSnapshot(encodingOptions, snapshot(req.snapshot))) return getManyCallback(req.id, null, values.map(value => ({ value }))) } catch (err) { return getManyCallback(req.id, err, []) } } + async function onhas (req) { + try { + const value = await db.has(req.key, withSnapshot(encodingOptions, snapshot(req.snapshot))) + return hasCallback(req.id, null, value) + } catch (err) { + return hasCallback(req.id, err, false) + } + } + + async function onhasmany (req) { + try { + const values = await db.hasMany(req.keys, withSnapshot(encodingOptions, snapshot(req.snapshot))) + return hasManyCallback(req.id, null, values) + } catch (err) { + return hasManyCallback(req.id, err, []) + } + } + async function ondel (req) { predel(req.key, function (err) { return callback(req.id, err) }) try { @@ -185,9 +234,17 @@ function createRpcStream (db, options, streamOptions) { } } - function oniterator ({ id, seq, options, consumed, bookmark, seek }) { + function oniterator ({ id, seq, options, consumed, bookmark, seek, snapshot: snapshotId }) { if (iterators.has(id)) return + try { + options = withSnapshot(options, snapshot(snapshotId)) + } catch (err) { + const data = { id, error: errorCode(err), seq } + encode.write(encodeMessage(data, output.iteratorError)) + return + } + const it = new ManyLevelHostIterator(db, id, seq, options, consumed, encode) iterators.set(id, it) @@ -223,12 +280,32 @@ function createRpcStream (db, options, streamOptions) { async function onclear (req) { try { - await db.clear(cleanRangeOptions(req.options)) + const options = cleanRangeOptions(req.options) + await db.clear(withSnapshot(options, snapshot(options.snapshot))) return callback(req.id, null) } catch (err) { return callback(req.id, err) } } + + function onsnapshot (req) { + if (snapshots.has(req.id)) return + + try { + snapshots.set(req.id, db.snapshot()) + } catch (err) { + snapshots.set(req.id, { + close () {}, + error: err + }) + } + } + + async function onsnapshotclose (req) { + const snapshot = snapshots.get(req.id) + snapshots.delete(req.id) + if (snapshot !== undefined) await snapshot.close() + } } } @@ -417,3 +494,16 @@ function cleanRangeOptions (options) { return result } + +function withSnapshot (options, snapshot) { + if (snapshot === undefined) { + if (!options || !hasOwnProperty.call(options, 'snapshot')) return options + + const result = { ...options } + delete result.snapshot + return result + } + + if (snapshot.error) throw snapshot.error + return { ...options, snapshot } +} diff --git a/messages.js b/messages.js index 61d33c7..9661abe 100644 --- a/messages.js +++ b/messages.js @@ -1,141 +1,189 @@ -// This file is auto generated by the protocol-buffers compiler - -/* eslint-disable quotes */ -/* eslint-disable indent */ -/* eslint-disable no-redeclare */ -/* eslint-disable camelcase */ - -// Remember to `npm install --save protocol-buffers-encodings` -var encodings = require('protocol-buffers-encodings') -var varint = encodings.varint -var skip = encodings.skip - -var Get = exports.Get = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Put = exports.Put = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Delete = exports.Delete = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Batch = exports.Batch = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Clear = exports.Clear = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Iterator = exports.Iterator = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var Callback = exports.Callback = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorData = exports.IteratorData = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorAck = exports.IteratorAck = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorSeek = exports.IteratorSeek = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorEnd = exports.IteratorEnd = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorError = exports.IteratorError = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var IteratorClose = exports.IteratorClose = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var GetMany = exports.GetMany = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -var GetManyCallback = exports.GetManyCallback = { - buffer: true, - encodingLength: null, - encode: null, - decode: null -} - -defineGet() -definePut() -defineDelete() -defineBatch() -defineClear() -defineIterator() -defineCallback() -defineIteratorData() -defineIteratorAck() -defineIteratorSeek() -defineIteratorEnd() -defineIteratorError() -defineIteratorClose() -defineGetMany() -defineGetManyCallback() - -function defineGet () { - Get.encodingLength = encodingLength - Get.encode = encode - Get.decode = decode - +// This file is auto generated by the protocol-buffers compiler + +/* eslint-disable quotes */ +/* eslint-disable indent */ +/* eslint-disable no-redeclare */ +/* eslint-disable camelcase */ + +// Remember to `npm install --save protocol-buffers-encodings` +var encodings = require('protocol-buffers-encodings') +var varint = encodings.varint +var skip = encodings.skip + +var Get = exports.Get = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Put = exports.Put = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Delete = exports.Delete = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Batch = exports.Batch = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Clear = exports.Clear = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Iterator = exports.Iterator = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Callback = exports.Callback = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorData = exports.IteratorData = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorAck = exports.IteratorAck = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorSeek = exports.IteratorSeek = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorEnd = exports.IteratorEnd = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorError = exports.IteratorError = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var IteratorClose = exports.IteratorClose = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var GetMany = exports.GetMany = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var GetManyCallback = exports.GetManyCallback = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Has = exports.Has = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var HasMany = exports.HasMany = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var HasCallback = exports.HasCallback = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var HasManyCallback = exports.HasManyCallback = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var Snapshot = exports.Snapshot = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +var SnapshotClose = exports.SnapshotClose = { + buffer: true, + encodingLength: null, + encode: null, + decode: null +} + +defineGet() +definePut() +defineDelete() +defineBatch() +defineClear() +defineIterator() +defineCallback() +defineIteratorData() +defineIteratorAck() +defineIteratorSeek() +defineIteratorEnd() +defineIteratorError() +defineIteratorClose() +defineGetMany() +defineGetManyCallback() +defineHas() +defineHasMany() +defineHasCallback() +defineHasManyCallback() +defineSnapshot() +defineSnapshotClose() + +function defineGet () { + Get.encodingLength = encodingLength + Get.encode = encode + Get.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -144,9 +192,13 @@ function defineGet () { if (!defined(obj.key)) throw new Error("key is required") var len = encodings.bytes.encodingLength(obj.key) length += 1 + len + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -159,10 +211,15 @@ function defineGet () { buf[offset++] = 18 encodings.bytes.encode(obj.key, buf, offset) offset += encodings.bytes.encode.bytes + if (defined(obj.snapshot)) { + buf[offset++] = 24 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -170,7 +227,8 @@ function defineGet () { var oldOffset = offset var obj = { id: 0, - key: null + key: null, + snapshot: 0 } var found0 = false var found1 = false @@ -194,18 +252,22 @@ function defineGet () { offset += encodings.bytes.decode.bytes found1 = true break + case 3: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break default: offset = skip(prefix & 7, buf, offset) } } - } -} - -function definePut () { - Put.encodingLength = encodingLength - Put.encode = encode - Put.decode = decode - + } +} + +function definePut () { + Put.encodingLength = encodingLength + Put.encode = encode + Put.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -219,8 +281,8 @@ function definePut () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -240,8 +302,8 @@ function definePut () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -282,14 +344,14 @@ function definePut () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineDelete () { - Delete.encodingLength = encodingLength - Delete.encode = encode - Delete.decode = decode - + } +} + +function defineDelete () { + Delete.encodingLength = encodingLength + Delete.encode = encode + Delete.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -299,8 +361,8 @@ function defineDelete () { var len = encodings.bytes.encodingLength(obj.key) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -315,8 +377,8 @@ function defineDelete () { offset += encodings.bytes.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -352,24 +414,24 @@ function defineDelete () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineBatch () { - var Operation = Batch.Operation = { - buffer: true, - encodingLength: null, - encode: null, - decode: null - } - - defineOperation() - - function defineOperation () { - Operation.encodingLength = encodingLength - Operation.encode = encode - Operation.decode = decode - + } +} + +function defineBatch () { + var Operation = Batch.Operation = { + buffer: true, + encodingLength: null, + encode: null, + decode: null + } + + defineOperation() + + function defineOperation () { + Operation.encodingLength = encodingLength + Operation.encode = encode + Operation.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.type)) throw new Error("type is required") @@ -383,8 +445,8 @@ function defineBatch () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -404,8 +466,8 @@ function defineBatch () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -446,13 +508,13 @@ function defineBatch () { offset = skip(prefix & 7, buf, offset) } } - } - } - - Batch.encodingLength = encodingLength - Batch.encode = encode - Batch.decode = decode - + } + } + + Batch.encodingLength = encodingLength + Batch.encode = encode + Batch.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -467,8 +529,8 @@ function defineBatch () { } } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -489,8 +551,8 @@ function defineBatch () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -526,24 +588,24 @@ function defineBatch () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineClear () { - var ClearOptions = Clear.ClearOptions = { - buffer: true, - encodingLength: null, - encode: null, - decode: null - } - - defineClearOptions() - - function defineClearOptions () { - ClearOptions.encodingLength = encodingLength - ClearOptions.encode = encode - ClearOptions.decode = decode - + } +} + +function defineClear () { + var ClearOptions = Clear.ClearOptions = { + buffer: true, + encodingLength: null, + encode: null, + decode: null + } + + defineClearOptions() + + function defineClearOptions () { + ClearOptions.encodingLength = encodingLength + ClearOptions.encode = encode + ClearOptions.decode = decode + function encodingLength (obj) { var length = 0 if (defined(obj.gt)) { @@ -570,9 +632,13 @@ function defineClear () { var len = encodings.bool.encodingLength(obj.reverse) length += 1 + len } + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -607,10 +673,15 @@ function defineClear () { encodings.bool.encode(obj.reverse, buf, offset) offset += encodings.bool.encode.bytes } + if (defined(obj.snapshot)) { + buf[offset++] = 72 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -622,7 +693,8 @@ function defineClear () { lt: null, lte: null, limit: 0, - reverse: false + reverse: false, + snapshot: 0 } while (true) { if (end <= offset) { @@ -657,17 +729,21 @@ function defineClear () { obj.reverse = encodings.bool.decode(buf, offset) offset += encodings.bool.decode.bytes break + case 9: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break default: offset = skip(prefix & 7, buf, offset) } } - } - } - - Clear.encodingLength = encodingLength - Clear.encode = encode - Clear.decode = decode - + } + } + + Clear.encodingLength = encodingLength + Clear.encode = encode + Clear.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -679,8 +755,8 @@ function defineClear () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -698,8 +774,8 @@ function defineClear () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -735,24 +811,24 @@ function defineClear () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIterator () { - var Options = Iterator.Options = { - buffer: true, - encodingLength: null, - encode: null, - decode: null - } - - defineOptions() - - function defineOptions () { - Options.encodingLength = encodingLength - Options.encode = encode - Options.decode = decode - + } +} + +function defineIterator () { + var Options = Iterator.Options = { + buffer: true, + encodingLength: null, + encode: null, + decode: null + } + + defineOptions() + + function defineOptions () { + Options.encodingLength = encodingLength + Options.encode = encode + Options.decode = decode + function encodingLength (obj) { var length = 0 if (defined(obj.keys)) { @@ -788,8 +864,8 @@ function defineIterator () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -836,8 +912,8 @@ function defineIterator () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -898,13 +974,13 @@ function defineIterator () { offset = skip(prefix & 7, buf, offset) } } - } - } - - Iterator.encodingLength = encodingLength - Iterator.encode = encode - Iterator.decode = decode - + } + } + + Iterator.encodingLength = encodingLength + Iterator.encode = encode + Iterator.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -928,9 +1004,13 @@ function defineIterator () { var len = encodings.bytes.encodingLength(obj.seek) length += 1 + len } + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -963,10 +1043,15 @@ function defineIterator () { encodings.bytes.encode(obj.seek, buf, offset) offset += encodings.bytes.encode.bytes } + if (defined(obj.snapshot)) { + buf[offset++] = 56 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -978,7 +1063,8 @@ function defineIterator () { options: null, consumed: 0, bookmark: null, - seek: null + seek: null, + snapshot: 0 } var found0 = false var found1 = false @@ -1024,18 +1110,22 @@ function defineIterator () { obj.seek = encodings.bytes.decode(buf, offset) offset += encodings.bytes.decode.bytes break + case 7: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break default: offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineCallback () { - Callback.encodingLength = encodingLength - Callback.encode = encode - Callback.decode = decode - + } +} + +function defineCallback () { + Callback.encodingLength = encodingLength + Callback.encode = encode + Callback.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1050,8 +1140,8 @@ function defineCallback () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1072,8 +1162,8 @@ function defineCallback () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1112,14 +1202,14 @@ function defineCallback () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorData () { - IteratorData.encodingLength = encodingLength - IteratorData.encode = encode - IteratorData.decode = decode - + } +} + +function defineIteratorData () { + IteratorData.encodingLength = encodingLength + IteratorData.encode = encode + IteratorData.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1136,8 +1226,8 @@ function defineIteratorData () { } } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1160,8 +1250,8 @@ function defineIteratorData () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1202,14 +1292,14 @@ function defineIteratorData () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorAck () { - IteratorAck.encodingLength = encodingLength - IteratorAck.encode = encode - IteratorAck.decode = decode - + } +} + +function defineIteratorAck () { + IteratorAck.encodingLength = encodingLength + IteratorAck.encode = encode + IteratorAck.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1222,8 +1312,8 @@ function defineIteratorAck () { var len = encodings.varint.encodingLength(obj.consumed) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1242,8 +1332,8 @@ function defineIteratorAck () { offset += encodings.varint.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1286,14 +1376,14 @@ function defineIteratorAck () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorSeek () { - IteratorSeek.encodingLength = encodingLength - IteratorSeek.encode = encode - IteratorSeek.decode = decode - + } +} + +function defineIteratorSeek () { + IteratorSeek.encodingLength = encodingLength + IteratorSeek.encode = encode + IteratorSeek.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1306,8 +1396,8 @@ function defineIteratorSeek () { var len = encodings.bytes.encodingLength(obj.target) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1326,8 +1416,8 @@ function defineIteratorSeek () { offset += encodings.bytes.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1370,14 +1460,14 @@ function defineIteratorSeek () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorEnd () { - IteratorEnd.encodingLength = encodingLength - IteratorEnd.encode = encode - IteratorEnd.decode = decode - + } +} + +function defineIteratorEnd () { + IteratorEnd.encodingLength = encodingLength + IteratorEnd.encode = encode + IteratorEnd.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1387,8 +1477,8 @@ function defineIteratorEnd () { var len = encodings.varint.encodingLength(obj.seq) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1403,8 +1493,8 @@ function defineIteratorEnd () { offset += encodings.varint.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1440,14 +1530,14 @@ function defineIteratorEnd () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorError () { - IteratorError.encodingLength = encodingLength - IteratorError.encode = encode - IteratorError.decode = decode - + } +} + +function defineIteratorError () { + IteratorError.encodingLength = encodingLength + IteratorError.encode = encode + IteratorError.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1460,8 +1550,8 @@ function defineIteratorError () { var len = encodings.string.encodingLength(obj.error) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1480,8 +1570,8 @@ function defineIteratorError () { offset += encodings.string.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1524,22 +1614,22 @@ function defineIteratorError () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineIteratorClose () { - IteratorClose.encodingLength = encodingLength - IteratorClose.encode = encode - IteratorClose.decode = decode - + } +} + +function defineIteratorClose () { + IteratorClose.encodingLength = encodingLength + IteratorClose.encode = encode + IteratorClose.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") var len = encodings.varint.encodingLength(obj.id) length += 1 + len return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1550,8 +1640,8 @@ function defineIteratorClose () { offset += encodings.varint.encode.bytes encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1580,14 +1670,14 @@ function defineIteratorClose () { offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineGetMany () { - GetMany.encodingLength = encodingLength - GetMany.encode = encode - GetMany.decode = decode - + } +} + +function defineGetMany () { + GetMany.encodingLength = encodingLength + GetMany.encode = encode + GetMany.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1600,9 +1690,13 @@ function defineGetMany () { length += 1 + len } } + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1619,10 +1713,15 @@ function defineGetMany () { offset += encodings.bytes.encode.bytes } } + if (defined(obj.snapshot)) { + buf[offset++] = 24 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1630,7 +1729,8 @@ function defineGetMany () { var oldOffset = offset var obj = { id: 0, - keys: [] + keys: [], + snapshot: 0 } var found0 = false while (true) { @@ -1652,28 +1752,32 @@ function defineGetMany () { obj.keys.push(encodings.bytes.decode(buf, offset)) offset += encodings.bytes.decode.bytes break + case 3: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break default: offset = skip(prefix & 7, buf, offset) } } - } -} - -function defineGetManyCallback () { - var Value = GetManyCallback.Value = { - buffer: true, - encodingLength: null, - encode: null, - decode: null - } - - defineValue() - - function defineValue () { - Value.encodingLength = encodingLength - Value.encode = encode - Value.decode = decode - + } +} + +function defineGetManyCallback () { + var Value = GetManyCallback.Value = { + buffer: true, + encodingLength: null, + encode: null, + decode: null + } + + defineValue() + + function defineValue () { + Value.encodingLength = encodingLength + Value.encode = encode + Value.decode = decode + function encodingLength (obj) { var length = 0 if (defined(obj.value)) { @@ -1681,8 +1785,8 @@ function defineGetManyCallback () { length += 1 + len } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1694,8 +1798,8 @@ function defineGetManyCallback () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1721,13 +1825,13 @@ function defineGetManyCallback () { offset = skip(prefix & 7, buf, offset) } } - } - } - - GetManyCallback.encodingLength = encodingLength - GetManyCallback.encode = encode - GetManyCallback.decode = decode - + } + } + + GetManyCallback.encodingLength = encodingLength + GetManyCallback.encode = encode + GetManyCallback.decode = decode + function encodingLength (obj) { var length = 0 if (!defined(obj.id)) throw new Error("id is required") @@ -1746,8 +1850,8 @@ function defineGetManyCallback () { } } return length - } - + } + function encode (obj, buf, offset) { if (!offset) offset = 0 if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) @@ -1773,8 +1877,8 @@ function defineGetManyCallback () { } encode.bytes = offset - oldOffset return buf - } - + } + function decode (buf, offset, end) { if (!offset) offset = 0 if (!end) end = buf.length @@ -1815,9 +1919,469 @@ function defineGetManyCallback () { offset = skip(prefix & 7, buf, offset) } } - } -} - + } +} + +function defineHas () { + Has.encodingLength = encodingLength + Has.encode = encode + Has.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + if (!defined(obj.key)) throw new Error("key is required") + var len = encodings.bytes.encodingLength(obj.key) + length += 1 + len + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + if (!defined(obj.key)) throw new Error("key is required") + buf[offset++] = 18 + encodings.bytes.encode(obj.key, buf, offset) + offset += encodings.bytes.encode.bytes + if (defined(obj.snapshot)) { + buf[offset++] = 24 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0, + key: null, + snapshot: 0 + } + var found0 = false + var found1 = false + while (true) { + if (end <= offset) { + if (!found0 || !found1) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + case 2: + obj.key = encodings.bytes.decode(buf, offset) + offset += encodings.bytes.decode.bytes + found1 = true + break + case 3: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + +function defineHasMany () { + HasMany.encodingLength = encodingLength + HasMany.encode = encode + HasMany.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + if (defined(obj.keys)) { + for (var i = 0; i < obj.keys.length; i++) { + if (!defined(obj.keys[i])) continue + var len = encodings.bytes.encodingLength(obj.keys[i]) + length += 1 + len + } + } + if (defined(obj.snapshot)) { + var len = encodings.varint.encodingLength(obj.snapshot) + length += 1 + len + } + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + if (defined(obj.keys)) { + for (var i = 0; i < obj.keys.length; i++) { + if (!defined(obj.keys[i])) continue + buf[offset++] = 18 + encodings.bytes.encode(obj.keys[i], buf, offset) + offset += encodings.bytes.encode.bytes + } + } + if (defined(obj.snapshot)) { + buf[offset++] = 24 + encodings.varint.encode(obj.snapshot, buf, offset) + offset += encodings.varint.encode.bytes + } + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0, + keys: [], + snapshot: 0 + } + var found0 = false + while (true) { + if (end <= offset) { + if (!found0) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + case 2: + obj.keys.push(encodings.bytes.decode(buf, offset)) + offset += encodings.bytes.decode.bytes + break + case 3: + obj.snapshot = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + +function defineHasCallback () { + HasCallback.encodingLength = encodingLength + HasCallback.encode = encode + HasCallback.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + if (defined(obj.error)) { + var len = encodings.string.encodingLength(obj.error) + length += 1 + len + } + if (defined(obj.value)) { + var len = encodings.bool.encodingLength(obj.value) + length += 1 + len + } + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + if (defined(obj.error)) { + buf[offset++] = 18 + encodings.string.encode(obj.error, buf, offset) + offset += encodings.string.encode.bytes + } + if (defined(obj.value)) { + buf[offset++] = 24 + encodings.bool.encode(obj.value, buf, offset) + offset += encodings.bool.encode.bytes + } + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0, + error: "", + value: false + } + var found0 = false + while (true) { + if (end <= offset) { + if (!found0) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + case 2: + obj.error = encodings.string.decode(buf, offset) + offset += encodings.string.decode.bytes + break + case 3: + obj.value = encodings.bool.decode(buf, offset) + offset += encodings.bool.decode.bytes + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + +function defineHasManyCallback () { + HasManyCallback.encodingLength = encodingLength + HasManyCallback.encode = encode + HasManyCallback.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + if (defined(obj.error)) { + var len = encodings.string.encodingLength(obj.error) + length += 1 + len + } + if (defined(obj.values)) { + for (var i = 0; i < obj.values.length; i++) { + if (!defined(obj.values[i])) continue + var len = encodings.bool.encodingLength(obj.values[i]) + length += 1 + len + } + } + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + if (defined(obj.error)) { + buf[offset++] = 18 + encodings.string.encode(obj.error, buf, offset) + offset += encodings.string.encode.bytes + } + if (defined(obj.values)) { + for (var i = 0; i < obj.values.length; i++) { + if (!defined(obj.values[i])) continue + buf[offset++] = 24 + encodings.bool.encode(obj.values[i], buf, offset) + offset += encodings.bool.encode.bytes + } + } + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0, + error: "", + values: [] + } + var found0 = false + while (true) { + if (end <= offset) { + if (!found0) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + case 2: + obj.error = encodings.string.decode(buf, offset) + offset += encodings.string.decode.bytes + break + case 3: + obj.values.push(encodings.bool.decode(buf, offset)) + offset += encodings.bool.decode.bytes + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + +function defineSnapshot () { + Snapshot.encodingLength = encodingLength + Snapshot.encode = encode + Snapshot.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0 + } + var found0 = false + while (true) { + if (end <= offset) { + if (!found0) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + +function defineSnapshotClose () { + SnapshotClose.encodingLength = encodingLength + SnapshotClose.encode = encode + SnapshotClose.decode = decode + + function encodingLength (obj) { + var length = 0 + if (!defined(obj.id)) throw new Error("id is required") + var len = encodings.varint.encodingLength(obj.id) + length += 1 + len + return length + } + + function encode (obj, buf, offset) { + if (!offset) offset = 0 + if (!buf) buf = Buffer.allocUnsafe(encodingLength(obj)) + var oldOffset = offset + if (!defined(obj.id)) throw new Error("id is required") + buf[offset++] = 8 + encodings.varint.encode(obj.id, buf, offset) + offset += encodings.varint.encode.bytes + encode.bytes = offset - oldOffset + return buf + } + + function decode (buf, offset, end) { + if (!offset) offset = 0 + if (!end) end = buf.length + if (!(end <= buf.length && offset <= buf.length)) throw new Error("Decoded message is not valid") + var oldOffset = offset + var obj = { + id: 0 + } + var found0 = false + while (true) { + if (end <= offset) { + if (!found0) throw new Error("Decoded message is not valid") + decode.bytes = offset - oldOffset + return obj + } + var prefix = varint.decode(buf, offset) + offset += varint.decode.bytes + var tag = prefix >> 3 + switch (tag) { + case 1: + obj.id = encodings.varint.decode(buf, offset) + offset += encodings.varint.decode.bytes + found0 = true + break + default: + offset = skip(prefix & 7, buf, offset) + } + } + } +} + function defined (val) { return val !== null && val !== undefined && (typeof val !== 'number' || !isNaN(val)) -} +} diff --git a/schema.proto b/schema.proto index c21211d..3acc1ba 100644 --- a/schema.proto +++ b/schema.proto @@ -1,6 +1,7 @@ message Get { required uint32 id = 1; required bytes key = 2; + optional uint32 snapshot = 3; } message Put { @@ -36,6 +37,7 @@ message Clear { optional bytes lte = 6; optional sint32 limit = 7; optional bool reverse = 8; + optional uint32 snapshot = 9; } } @@ -46,6 +48,7 @@ message Iterator { required uint64 consumed = 4; optional bytes bookmark = 5; optional bytes seek = 6; + optional uint32 snapshot = 7; message Options { optional bool keys = 1; @@ -101,6 +104,7 @@ message IteratorClose { message GetMany { required uint32 id = 1; repeated bytes keys = 2; + optional uint32 snapshot = 3; } message GetManyCallback { @@ -113,3 +117,35 @@ message GetManyCallback { optional bytes value = 1; } } + +message Has { + required uint32 id = 1; + required bytes key = 2; + optional uint32 snapshot = 3; +} + +message HasMany { + required uint32 id = 1; + repeated bytes keys = 2; + optional uint32 snapshot = 3; +} + +message HasCallback { + required uint32 id = 1; + optional string error = 2; + optional bool value = 3; +} + +message HasManyCallback { + required uint32 id = 1; + optional string error = 2; + repeated bool values = 3; +} + +message Snapshot { + required uint32 id = 1; +} + +message SnapshotClose { + required uint32 id = 1; +} diff --git a/tags.js b/tags.js index abc7544..d68190b 100644 --- a/tags.js +++ b/tags.js @@ -12,7 +12,11 @@ const INPUT = [ messages.GetMany, messages.IteratorClose, messages.IteratorAck, - messages.IteratorSeek + messages.IteratorSeek, + messages.Has, + messages.HasMany, + messages.Snapshot, + messages.SnapshotClose ] const OUTPUT = [ @@ -20,7 +24,9 @@ const OUTPUT = [ messages.IteratorData, messages.GetManyCallback, messages.IteratorError, - messages.IteratorEnd + messages.IteratorEnd, + messages.HasCallback, + messages.HasManyCallback ] exports.input = { @@ -34,6 +40,10 @@ exports.input = { iteratorClose: 7, iteratorAck: 8, iteratorSeek: 9, + has: 10, + hasMany: 11, + snapshot: 12, + snapshotClose: 13, encoding (tag) { return INPUT[tag] @@ -46,6 +56,8 @@ exports.output = { getManyCallback: 2, iteratorError: 3, iteratorEnd: 4, + hasCallback: 5, + hasManyCallback: 6, encoding (tag) { return OUTPUT[tag] diff --git a/test/v3.js b/test/v3.js new file mode 100644 index 0000000..e40fad5 --- /dev/null +++ b/test/v3.js @@ -0,0 +1,114 @@ +'use strict' + +const test = require('tape') +const { MemoryLevel } = require('memory-level') +const { ManyLevelHost, ManyLevelGuest } = require('..') + +function pair (options) { + const db = new MemoryLevel() + const host = new ManyLevelHost(db) + const guest = new ManyLevelGuest({ + ...options, + _remote: () => host.createRpcStream() + }) + + return { db, guest } +} + +test('has and hasMany over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + await guest.put('hello', 'world') + + t.is(await guest.has('hello'), true) + t.is(await guest.has('missing'), false) + t.same(await guest.hasMany(['hello', 'missing']), [true, false]) + + await guest.close() +}) + +test('explicit snapshot over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + await guest.batch([ + { type: 'put', key: 'a', value: '1' }, + { type: 'put', key: 'b', value: '2' } + ]) + + const snapshot = guest.snapshot() + await guest.batch([ + { type: 'put', key: 'a', value: 'changed' }, + { type: 'put', key: 'c', value: '3' }, + { type: 'del', key: 'b' } + ]) + + t.is(await guest.get('a'), 'changed') + t.is(await guest.get('a', { snapshot }), '1') + t.same(await guest.getMany(['a', 'b', 'c'], { snapshot }), ['1', '2', undefined]) + t.is(await guest.has('b'), false) + t.is(await guest.has('b', { snapshot }), true) + t.same(await guest.hasMany(['a', 'b', 'c'], { snapshot }), [true, true, false]) + t.same(await guest.iterator({ snapshot }).all(), [['a', '1'], ['b', '2']]) + + await snapshot.close() + await guest.close() +}) + +test('clear uses explicit snapshot over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + await guest.put('a', '1') + + const snapshot = guest.snapshot() + await guest.put('b', '2') + await guest.clear({ snapshot }) + + t.same(await guest.keys().all(), ['b']) + t.same(await guest.keys({ snapshot }).all(), ['a']) + + await snapshot.close() + await guest.close() +}) + +test('closed snapshot rejects has and hasMany over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + await guest.put('hello', 'world') + + const snapshot = guest.snapshot() + await snapshot.close() + + try { + await guest.has('hello', { snapshot }) + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + } + + try { + await guest.hasMany(['hello'], { snapshot }) + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + } + + await guest.close() +}) + +test('retry mode supports has but not snapshots', async function (t) { + const { guest } = pair({ retry: true }) + + await guest.open() + await guest.put('hello', 'world') + + t.is(guest.supports.has, true) + t.is(guest.supports.snapshots, false) + t.is(guest.supports.implicitSnapshots, false) + t.is(guest.supports.explicitSnapshots, false) + t.is(await guest.has('hello'), true) + t.same(await guest.hasMany(['hello', 'missing']), [true, false]) + + await guest.close() +}) From 1295525a43e1e0ad3df7d2c563f17af1ffb222be Mon Sep 17 00:00:00 2001 From: Germano Eichenberg Date: Wed, 17 Jun 2026 21:14:24 -0300 Subject: [PATCH 3/4] Expand v3 API coverage --- guest.js | 6 ++ test/v3.js | 185 ++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 175 insertions(+), 16 deletions(-) diff --git a/guest.js b/guest.js index 4525dd8..1ef9098 100644 --- a/guest.js +++ b/guest.js @@ -404,6 +404,12 @@ class ManyLevelGuest extends AbstractLevel { } _snapshot (options) { + if (this[kRetry]) { + throw new ModuleError('Database does not support explicit snapshots', { + code: 'LEVEL_NOT_SUPPORTED' + }) + } + if (this[kDb]) return this[kDb].snapshot(options) const snapshot = new ManyLevelGuestSnapshot(this, options) diff --git a/test/v3.js b/test/v3.js index e40fad5..a45d1de 100644 --- a/test/v3.js +++ b/test/v3.js @@ -5,25 +5,56 @@ const { MemoryLevel } = require('memory-level') const { ManyLevelHost, ManyLevelGuest } = require('..') function pair (options) { - const db = new MemoryLevel() - const host = new ManyLevelHost(db) + const db = options?.db || new MemoryLevel() + const hostDb = options?.hostDb || db + const host = new ManyLevelHost(hostDb, options?.hostOptions) const guest = new ManyLevelGuest({ - ...options, + ...options?.guestOptions, _remote: () => host.createRpcStream() }) - return { db, guest } + return { db, guest, hostDb } +} + +async function rejectsCode (t, promise, code) { + try { + await promise + t.fail('operation should reject') + } catch (err) { + t.is(err && err.code, code) + } } test('has and hasMany over rpc', async function (t) { const { guest } = pair() await guest.open() - await guest.put('hello', 'world') + await guest.batch([ + { type: 'put', key: 'a', value: '1' }, + { type: 'put', key: 'b', value: '2' } + ]) - t.is(await guest.has('hello'), true) + t.is(await guest.has('a'), true) t.is(await guest.has('missing'), false) - t.same(await guest.hasMany(['hello', 'missing']), [true, false]) + t.same(await guest.hasMany(['b', 'missing', 'a']), [true, false, true]) + t.same(await guest.hasMany([]), []) + + await guest.close() +}) + +test('has and hasMany honor key encoding over rpc', async function (t) { + const { db, guest } = pair() + + await db.put(JSON.stringify(['a', 1]), 'one') + await db.put(Buffer.from([0, 1]), 'buffer') + await guest.open() + + t.is(await guest.has(['a', 1], { keyEncoding: 'json' }), true) + t.is(await guest.has(['a', 2], { keyEncoding: 'json' }), false) + t.same(await guest.hasMany([ + Buffer.from([0, 1]), + Buffer.from([0, 2]) + ], { keyEncoding: 'buffer' }), [true, false]) await guest.close() }) @@ -51,11 +82,37 @@ test('explicit snapshot over rpc', async function (t) { t.is(await guest.has('b', { snapshot }), true) t.same(await guest.hasMany(['a', 'b', 'c'], { snapshot }), [true, true, false]) t.same(await guest.iterator({ snapshot }).all(), [['a', '1'], ['b', '2']]) + t.same(await guest.keys({ snapshot }).all(), ['a', 'b']) + t.same(await guest.values({ snapshot }).all(), ['1', '2']) await snapshot.close() await guest.close() }) +test('multiple explicit snapshots over rpc are independent', async function (t) { + const { guest } = pair() + + await guest.open() + await guest.put('number', '0') + + const before = guest.snapshot() + await guest.put('number', '1') + + const middle = guest.snapshot() + await guest.put('number', '2') + + t.is(await guest.get('number'), '2') + t.is(await guest.get('number', { snapshot: before }), '0') + t.is(await guest.get('number', { snapshot: middle }), '1') + + await before.close() + + t.is(await guest.get('number', { snapshot: middle }), '1') + + await middle.close() + await guest.close() +}) + test('clear uses explicit snapshot over rpc', async function (t) { const { guest } = pair() @@ -73,7 +130,7 @@ test('clear uses explicit snapshot over rpc', async function (t) { await guest.close() }) -test('closed snapshot rejects has and hasMany over rpc', async function (t) { +test('closed snapshot rejects snapshot-aware operations over rpc', async function (t) { const { guest } = pair() await guest.open() @@ -82,23 +139,112 @@ test('closed snapshot rejects has and hasMany over rpc', async function (t) { const snapshot = guest.snapshot() await snapshot.close() - try { - await guest.has('hello', { snapshot }) - } catch (err) { - t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') - } + await rejectsCode(t, guest.get('hello', { snapshot }), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.getMany(['hello'], { snapshot }), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.has('hello', { snapshot }), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.hasMany(['hello'], { snapshot }), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.iterator({ snapshot }).all(), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.keys({ snapshot }).all(), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.values({ snapshot }).all(), 'LEVEL_SNAPSHOT_NOT_OPEN') + await rejectsCode(t, guest.clear({ snapshot }), 'LEVEL_SNAPSHOT_NOT_OPEN') + + await guest.close() +}) + +test('database close closes explicit snapshots over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + const snapshot = guest.snapshot() + await guest.close() try { - await guest.hasMany(['hello'], { snapshot }) + snapshot.ref() + t.fail('snapshot should be closed') } catch (err) { - t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + t.is(err && err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') } +}) + +test('guest sublevels use explicit snapshots over rpc', async function (t) { + const { guest } = pair() + + await guest.open() + + const sub = guest.sublevel('sub') + const other = guest.sublevel('other') + + await sub.put('a', '1') + await other.put('a', 'other') + + const snapshot = sub.snapshot() + + await sub.put('a', '2') + await sub.put('b', '3') + await other.put('b', 'ignored') + + t.is(await sub.get('a'), '2') + t.is(await sub.get('a', { snapshot }), '1') + t.same(await sub.hasMany(['a', 'b'], { snapshot }), [true, false]) + t.same(await sub.keys({ snapshot }).all(), ['a']) + t.same(await other.keys().all(), ['a', 'b']) + + await snapshot.close() + await guest.close() +}) + +test('host sublevels use explicit snapshots over rpc', async function (t) { + const db = new MemoryLevel() + const hostDb = db.sublevel('hosted') + const other = db.sublevel('other') + const { guest } = pair({ db, hostDb }) + + await guest.open() + await guest.put('a', '1') + await other.put('a', 'other') + + const snapshot = guest.snapshot() + + await guest.put('a', '2') + await guest.put('b', '3') + await other.put('b', 'ignored') + + t.same(await guest.iterator().all(), [['a', '2'], ['b', '3']]) + t.same(await guest.iterator({ snapshot }).all(), [['a', '1']]) + t.same(await guest.hasMany(['a', 'b'], { snapshot }), [true, false]) + t.same(await db.keys().all(), [ + '!hosted!a', + '!hosted!b', + '!other!a', + '!other!b' + ]) + await snapshot.close() + await guest.close() +}) + +test('readonly host allows v3 reads and rejects clear', async function (t) { + const db = new MemoryLevel() + const { guest } = pair({ db, hostOptions: { readonly: true } }) + + await db.put('a', '1') + await db.put('b', '2') + await guest.open() + + const snapshot = guest.snapshot() + + t.is(await guest.has('a'), true) + t.same(await guest.hasMany(['a', 'missing']), [true, false]) + t.same(await guest.keys({ snapshot }).all(), ['a', 'b']) + + await rejectsCode(t, guest.clear({ snapshot }), 'LEVEL_READONLY') + + await snapshot.close() await guest.close() }) test('retry mode supports has but not snapshots', async function (t) { - const { guest } = pair({ retry: true }) + const { guest } = pair({ guestOptions: { retry: true } }) await guest.open() await guest.put('hello', 'world') @@ -110,5 +256,12 @@ test('retry mode supports has but not snapshots', async function (t) { t.is(await guest.has('hello'), true) t.same(await guest.hasMany(['hello', 'missing']), [true, false]) + try { + guest.snapshot() + t.fail('snapshot should not be supported') + } catch (err) { + t.is(err && err.code, 'LEVEL_NOT_SUPPORTED') + } + await guest.close() }) From fca50cd0b22f53d454b329f71dbbcfa73c0b6348 Mon Sep 17 00:00:00 2001 From: Germano Eichenberg Date: Wed, 17 Jun 2026 21:25:13 -0300 Subject: [PATCH 4/4] Add opt-in getSync support --- guest.js | 18 ++++++++++++- index.d.ts | 14 +++++++++- test/v3.js | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/guest.js b/guest.js index 1ef9098..4bf24de 100644 --- a/guest.js +++ b/guest.js @@ -23,6 +23,7 @@ const kRetry = Symbol('retry') const kRpcStream = Symbol('rpcStream') const kFlushed = Symbol('flushed') const kWrite = Symbol('write') +const kGetSync = Symbol('getSync') const kRequest = Symbol('request') const kPending = Symbol('pending') const kCallback = Symbol('callback') @@ -34,7 +35,8 @@ const kId = Symbol('id') class ManyLevelGuest extends AbstractLevel { constructor (options) { - const { retry, _remote, ...forward } = options || {} + const { retry, _remote, getSync, ...forward } = options || {} + const hasGetSync = getSync === true || typeof getSync === 'function' super({ encodings: { buffer: true }, @@ -44,6 +46,7 @@ class ManyLevelGuest extends AbstractLevel { permanence: true, seek: true, has: true, + getSync: hasGetSync, createIfMissing: false, errorIfExists: false }, forward) @@ -54,6 +57,7 @@ class ManyLevelGuest extends AbstractLevel { this[kRetry] = !!retry this[kEncode] = lpstream.encode() this[kRemote] = _remote || null + this[kGetSync] = typeof getSync === 'function' ? getSync : null this[kCleanup] = null this[kRpcStream] = null this[kRef] = null @@ -259,6 +263,18 @@ class ManyLevelGuest extends AbstractLevel { }) } + _getSync (key, opts) { + if (this[kDb]) return this[kDb]._getSync(key, opts) + + if (this[kGetSync]) { + return normalizeValue(this[kGetSync](key, encodeOptionsSnapshot(opts))) + } + + throw new ModuleError('Database does not support getSync()', { + code: 'LEVEL_NOT_SUPPORTED' + }) + } + async _getMany (keys, opts) { if (this[kDb]) return this[kDb]._getMany(keys, opts) diff --git a/index.d.ts b/index.d.ts index e821809..6ca742f 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,7 +1,8 @@ import { AbstractLevel, AbstractDatabaseOptions, - AbstractOpenOptions + AbstractOpenOptions, + AbstractGetOptions } from 'abstract-level' // Requires `npm install @types/readable-stream`. @@ -88,6 +89,12 @@ declare interface GuestDatabaseOptions extends AbstractDatabaseOptions void } +declare type GuestGetSync = ( + key: Buffer, + options: AbstractGetOptions +) => Buffer | Uint8Array | undefined + /** * Options for the {@link ManyLevelHost} constructor. */ diff --git a/test/v3.js b/test/v3.js index a45d1de..3b05289 100644 --- a/test/v3.js +++ b/test/v3.js @@ -42,6 +42,84 @@ test('has and hasMany over rpc', async function (t) { await guest.close() }) +test('forwarded database supports getSync', async function (t) { + const hostDb = new MemoryLevel() + const guest = new ManyLevelGuest({ getSync: true }) + + await hostDb.open() + guest.forward(hostDb) + await guest.open() + + await guest.put('a', '1') + await guest.put(['json'], { ok: true }, { + keyEncoding: 'json', + valueEncoding: 'json' + }) + + t.is(guest.supports.getSync, true) + t.is(guest.getSync('a'), '1') + t.same(guest.getSync(['json'], { + keyEncoding: 'json', + valueEncoding: 'json' + }), { ok: true }) + t.is(guest.getSync('missing'), undefined) + + await guest.close() +}) + +test('forwarded database getSync honors snapshots', async function (t) { + const hostDb = new MemoryLevel() + const guest = new ManyLevelGuest({ getSync: true }) + + await hostDb.open() + guest.forward(hostDb) + await guest.open() + + await guest.put('a', '1') + const snapshot = guest.snapshot() + await guest.put('a', '2') + + t.is(guest.getSync('a'), '2') + t.is(guest.getSync('a', { snapshot }), '1') + + await snapshot.close() + await guest.close() +}) + +test('custom getSync handler supports remote reads', async function (t) { + const { guest } = pair({ + guestOptions: { + getSync (key) { + if (key.toString() === 'missing') return undefined + return Buffer.from('sync:' + key.toString()) + } + } + }) + + await guest.open() + + t.is(guest.supports.getSync, true) + t.is(guest.getSync('a'), 'sync:a') + t.is(guest.getSync('missing'), undefined) + + await guest.close() +}) + +test('remote getSync rejects without handler', async function (t) { + const { guest } = pair() + + await guest.open() + + try { + guest.getSync('a') + t.fail('getSync should not be supported') + } catch (err) { + t.is(err && err.code, 'LEVEL_NOT_SUPPORTED') + } + + await guest.close() +}) + test('has and hasMany honor key encoding over rpc', async function (t) { const { db, guest } = pair()