diff --git a/get-sync.js b/get-sync.js new file mode 100644 index 0000000..c86270b --- /dev/null +++ b/get-sync.js @@ -0,0 +1,136 @@ +'use strict' + +const ModuleError = require('module-error') +const { Worker, MessageChannel, receiveMessageOnPort } = require('worker_threads') + +const leaders = new Map() + +// HERE BE DRAGONS +// +// getSync() must be synchronous from abstract-level's perspective, but a follower can only reach the leader through async socket IO. +// We do that async work in a worker thread, then block this thread with Atomics.wait() until the worker replies. +// This exists because real LevelDB getSync() blocks too, and some callers may require the sync API surface. + +// If you are wondering why God has forsaken us and given us JavaScript: same. +// This is the pinnacle of cursed code and war crimes. A monstrocity that should never have been born, +// let alone be considered "useful" for any situation other than inflicting psychological damage. +exports.createGetSync = function createGetSync (socketPath) { + let syncRead + + return function getSync (key, options) { + if (options.snapshot !== undefined) { + throw new ModuleError('Snapshots are not supported by rave-level getSync() followers', { + code: 'LEVEL_NOT_SUPPORTED' + }) + } + + const leader = leaders.get(socketPath) + + if (leader !== undefined) { + return leader._getSync(key, options) + } + + if (syncRead === undefined) { + syncRead = createSyncReadWorker() + } + + return syncRead({ socketPath, key }) + } +} + +exports.registerLeader = function registerLeader (socketPath, db) { + leaders.set(socketPath, db) + + return function unregisterLeader () { + if (leaders.get(socketPath) === db) { + leaders.delete(socketPath) + } + } +} + +function createSyncReadWorker () { + const { port1, port2 } = new MessageChannel() + const worker = new Worker(syncReadWorkerCode(), { + eval: true, + workerData: { port: port2 }, + transferList: [port2] + }) + + worker.unref() + port1.unref() + + return function syncRead (payload) { + const semaphore = new Int32Array(new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT)) + + worker.postMessage({ payload, semaphore }) + + Atomics.wait(semaphore, 0, 0) + + const message = receiveMessageOnPort(port1).message + + if (message.error) { + throw remoteError(message.error) + } + + return message.value === undefined ? undefined : Buffer.from(message.value) + } +} + +function remoteError (error) { + const err = new ModuleError(error.message || 'Could not get value', { + code: error.code || 'LEVEL_REMOTE_ERROR' + }) + + if (error.stack) err.stack = error.stack + return err +} + +function syncReadWorkerCode () { + return ` + 'use strict' + + const { workerData, parentPort } = require('worker_threads') + const net = require('net') + const { ManyLevelGuest } = require('many-level') + + const port = workerData.port + + parentPort.on('message', async function ({ payload, semaphore }) { + try { + const value = await get(payload.socketPath, payload.key) + port.postMessage({ value }) + } catch (err) { + port.postMessage({ + error: { + code: err && err.code, + message: err && err.message, + stack: err && err.stack + } + }) + } finally { + Atomics.store(semaphore, 0, 1) + Atomics.notify(semaphore, 0, 1) + } + }) + + async function get (socketPath, key) { + const db = new ManyLevelGuest({ + keyEncoding: 'buffer', + valueEncoding: 'buffer', + retry: false, + _remote: () => net.connect(socketPath) + }) + + await db.open() + + try { + return await db.get(Buffer.from(key), { + keyEncoding: 'buffer', + valueEncoding: 'buffer' + }) + } finally { + await db.close() + } + } + ` +} diff --git a/index.js b/index.js index bc38f17..4d7d2df 100644 --- a/index.js +++ b/index.js @@ -8,41 +8,7 @@ const ModuleError = require('module-error') const fs = require('fs').promises const net = require('net') const path = require('path') - -/** - * Symbol for storing the database location path. - * @private - * @type {symbol} - */ -const kLocation = Symbol('location') - -/** - * Symbol for storing the Unix socket path or Windows named pipe path. - * @private - * @type {symbol} - */ -const kSocketPath = Symbol('socketPath') - -/** - * Symbol for storing database options like encoding settings. - * @private - * @type {symbol} - */ -const kOptions = Symbol('options') - -/** - * Symbol for the internal connect method. - * @private - * @type {symbol} - */ -const kConnect = Symbol('connect') - -/** - * Symbol for the internal destroy method. - * @private - * @type {symbol} - */ -const kDestroy = Symbol('destroy') +const { createGetSync, registerLeader } = require('./get-sync') /** * Maximum time (in milliseconds) to retry connecting before giving up. @@ -50,6 +16,7 @@ const kDestroy = Symbol('destroy') * @default */ const MAX_CONNECT_RETRY_TIME = 10000 // 10 seconds +const CONNECT_RETRY_DELAY = 100 /** * A distributed LevelDB implementation that allows multiple processes to access @@ -83,25 +50,20 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { */ constructor (location, options = {}) { const { keyEncoding, valueEncoding, retry } = options + const resolvedLocation = path.resolve(location) + const raveSocketPath = options.raveSocketPath || socketPath(resolvedLocation) super({ keyEncoding, valueEncoding, - retry: retry !== false + retry: retry !== false, + getSync: createGetSync(raveSocketPath) }) - this[kLocation] = path.resolve(location) - this[kSocketPath] = options.raveSocketPath || socketPath(this[kLocation]) - this[kOptions] = { keyEncoding, valueEncoding } - this[kConnect] = this[kConnect].bind(this) - this[kDestroy] = this[kDestroy].bind(this) - - /** - * Timestamp when the current connection attempt started. - * Used to track retry timeouts. - * @type {number|null} - */ - this.connectAttemptStartTime = null + this._location = resolvedLocation + this._socketPath = raveSocketPath + this._options = { keyEncoding, valueEncoding } + this._connectAttemptStartTime = null /** * Whether this instance is the leader (has the database lock). @@ -121,10 +83,7 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { */ async _open (options) { await super._open(options) - return new Promise((resolve, reject) => { - // Pass resolve & reject to kConnect so that it can let _open finish when needed - this[kConnect](resolve, reject).then(resolve) - }) + await this._connect() } /** @@ -133,66 +92,89 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { * process that is still starting up. * * @private - * @param {Function} [resolve] - Promise resolve function from _open - * @param {Function} [reject] - Promise reject function from _open * @returns {Promise} */ - async [kConnect] (resolve, reject) { - if (!this.connectAttemptStartTime) this.connectAttemptStartTime = Date.now() + async _connect () { + if (!this._connectAttemptStartTime) this._connectAttemptStartTime = Date.now() - // Monitor database state and do not proceed to open if in a non-opening state - if (!['open', 'opening'].includes(this.status)) { - return + while (this._canConnect()) { + if (await this._connectToLeader()) return + + const { db, retry } = await this._tryOpenLeaderDatabase() + + if (db) { + await this._becomeLeader(db) + return + } + + if (!retry) return + + await new Promise(resolve => setTimeout(resolve, CONNECT_RETRY_DELAY)) } + } - // Attempt to connect to leader as follower - const socket = net.connect(this[kSocketPath]) + _canConnect () { + return this.status === 'open' || this.status === 'opening' + } - // Track whether we succeeded to connect + async _connectToLeader () { + const socket = net.connect(this._socketPath) + const onerror = () => {} + socket.on('error', onerror) + + const stream = this.createRpcStream({ ref: socket }) let connected = false + let settled = false + let settle + + const connectedPromise = new Promise(resolve => { + settle = (value) => { + if (settled) return + settled = true + resolve(value) + } + }) + + const cleanup = () => { + socket.removeListener('connect', onconnect) + socket.removeListener('close', onclose) + socket.removeListener('error', onerror) + } - /** - * Callback fired when socket successfully connects to a leader. - * Resolves the _open promise to allow the database to finish opening. - * - * @private - * @function onconnect - * @returns {void} - */ const onconnect = () => { connected = true - // If we manage to connect to an existing host, [kConnect] will be waiting in the pipeline - // call below. We need to resolve the promise here, so that _open can finish. - if (resolve) resolve() - resolve = reject = null + this._connectAttemptStartTime = null + settle(true) } + const onclose = () => { - connected = false - this.connectAttemptStartTime = null - // Disconnected. Cleanup events. - socket.removeListener('connect', onconnect) - socket.removeListener('close', onclose) + if (!connected) settle(false) } + socket.once('connect', onconnect) socket.once('close', onclose) - // Pass socket as the ref option so we don't hang the event loop. - await pipeline(socket, this.createRpcStream({ ref: socket }), socket).catch(() => null) - // Disconnected. Cleanup events. - socket.removeListener('connect', onconnect) - socket.removeListener('close', onclose) + pipeline(socket, stream, socket).catch(() => null).then(() => { + cleanup() + if (!connected) settle(false) + if (connected && this._canConnect()) { + setImmediate(() => { + if (this._canConnect()) { + this._connect().catch(err => this._destroy(err)) + } + }) + } + }) - // Monitor database state and do not proceed to open if in a non-opening state - if (!['open', 'opening'].includes(this.status)) { - return - } + return connectedPromise + } - // We are still trying to open the db the first time and there is no leader yet to connect to. - // Attempt to open db as leader - const db = new ClassicLevel(this[kLocation], this[kOptions]) + async _tryOpenLeaderDatabase () { + const db = new ClassicLevel(this._location, this._options) // When guest db is closed, close db this.attachResource(db) + try { await db.open() } catch (err) { @@ -201,42 +183,61 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { // If already locked, another process became the leader if (err.cause && err.cause.code === 'LEVEL_LOCKED') { - // If we've been retrying for too long, abort. - if (this.connectAttemptStartTime && (Date.now() - this.connectAttemptStartTime > MAX_CONNECT_RETRY_TIME)) { - return this[kDestroy](err) + if (this._connectAttemptStartTime && (Date.now() - this._connectAttemptStartTime > MAX_CONNECT_RETRY_TIME)) { + this._destroy(err) + return { retry: false } } - if (connected) { - return this[kConnect](resolve, reject) - } else { - // Wait for a short delay - await new Promise((resolve) => setTimeout(resolve, 100)) - // Call connect again - return this[kConnect](resolve, reject) - } - } else { - return this[kDestroy](err) + + return { retry: true } } + + this._destroy(err) + return { retry: false } } - if (!['open', 'opening'].includes(this.status)) { - return + return { db, retry: false } + } + + async _becomeLeader (db) { + if (!this._canConnect()) return + + if (!await this._removeStaleSocket()) return + + const host = new ManyLevelHost(db) + const { server, close } = this._createServer(host) + const unregisterLeader = registerLeader(this._socketPath, db) + const closeLeader = async () => { + unregisterLeader() + return close() } - // We're the leader now + this.attachResource({ close: closeLeader }) + + // Bypass socket, so that e.g. this.put() goes directly to db.put() + // Note: changes order of operations, because we only later flush previous operations (below) + this.forward(db) + + server.listen(this._socketPath, () => this._onLeaderListening(server)) + } + + async _removeStaleSocket () { try { - await fs.unlink(this[kSocketPath]) + await fs.unlink(this._socketPath) } catch (err) { - if (!['open', 'opening'].includes(this.status)) { - return + if (!this._canConnect()) { + return false } if (err && err.code !== 'ENOENT') { - return this[kDestroy](err) + this._destroy(err) + return false } } - // Create host to expose db - const host = new ManyLevelHost(db) + return true + } + + _createServer (host) { const sockets = new Set() /** @@ -254,7 +255,8 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { sockets.delete(sock) }) - server.on('error', this[kDestroy]) + const onerror = err => this._destroy(err) + server.on('error', onerror) /** * Cleanup function that closes all follower connections and shuts down @@ -269,66 +271,56 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { sock.destroy() } - server.removeListener('error', this[kDestroy]) + server.removeListener('error', onerror) return server.close() } - // When guest db is closed, close server - this.attachResource({ close }) + return { server, close } + } - // Bypass socket, so that e.g. this.put() goes directly to db.put() - // Note: changes order of operations, because we only later flush previous operations (below) - this.forward(db) + async _onLeaderListening (server) { + server.unref() - server.listen(this[kSocketPath], async () => { - server.unref() + if (this.status !== 'open') { + return + } - if (this.status !== 'open') { - return - } + this.isLeader = true - this.isLeader = true + /** + * Leader event. + * Fired when this instance successfully becomes the database leader. + * + * @event RaveLevel#leader + */ + this.emit('leader') - /** - * Leader event. - * Fired when this instance successfully becomes the database leader. - * - * @event RaveLevel#leader - */ - this.emit('leader') + if (this.status !== 'open' || this.isFlushed()) { + return + } - if (this.status !== 'open' || this.isFlushed()) { - return - } + await this._flushPendingRequests() + } - // Connect to ourselves to flush pending requests - const sock = net.connect(this[kSocketPath]) + async _flushPendingRequests () { + const sock = net.connect(this._socketPath) - /** - * Callback that destroys the flush socket when all pending - * operations have been processed. - * - * @private - * @function onflush - * @returns {void} - */ - const onflush = () => { sock.destroy() } + const onflush = () => { sock.destroy() } - this.once('flush', onflush) + this.once('flush', onflush) - let cause - try { - await pipeline(sock, this.createRpcStream(), sock) - } catch (err) { - cause = err - } - this.removeListener('flush', onflush) + let cause + try { + await pipeline(sock, this.createRpcStream(), sock) + } catch (err) { + cause = err + } + this.removeListener('flush', onflush) - // Socket should only close because of a this.close() - if (!this.isFlushed() && this.status === 'open') { - this[kDestroy](new ModuleError('Did not flush', { cause })) - } - }) + // Socket should only close because of a this.close() + if (!this.isFlushed() && this.status === 'open') { + this._destroy(new ModuleError('Did not flush', { cause })) + } } /** @@ -340,7 +332,7 @@ exports.RaveLevel = class RaveLevel extends ManyLevelGuest { * @returns {void} * @fires RaveLevel#error */ - [kDestroy] (err) { + _destroy (err) { if (this.status === 'open') { /** * Error event. diff --git a/package-lock.json b/package-lock.json index 0cbbd00..863b589 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "1.0.0", "license": "MIT", "dependencies": { - "classic-level": "^2.0.0", + "classic-level": "^3.0.0", "many-level": "git+ssh://git@github.com/ForgeVTT/many-level.git#7adbb795f0114ca3933fe24bee2145ea3b0aeb51", "module-error": "^1.0.2", "readable-stream": "^4.0.0" @@ -1259,20 +1259,20 @@ } }, "node_modules/abstract-level": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/abstract-level/-/abstract-level-2.0.2.tgz", - "integrity": "sha512-pPJixmXk/kTKLB2sSue7o4Uj6TlLD2XfaP2gWZomHVCC6cuUGX/VslQqKG1yZHfXwBb/3lS6oSTMPGzh1P1iig==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/abstract-level/-/abstract-level-3.1.1.tgz", + "integrity": "sha512-CW2gKbJFTuX1feMvOrvsVMmijAOgI9kg2Ie9Dq3gOcMt/dVVoVmqNlLcEUCT13NxHFMEajcUcVBIplbyDroDiw==", "license": "MIT", "dependencies": { "buffer": "^6.0.3", "is-buffer": "^2.0.5", - "level-supports": "^6.0.0", + "level-supports": "^6.2.0", "level-transcoder": "^1.0.1", "maybe-combine-errors": "^1.0.0", "module-error": "^1.0.1" }, "engines": { - "node": ">=16" + "node": ">=18" } }, "node_modules/acorn": { @@ -2089,13 +2089,13 @@ } }, "node_modules/classic-level": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/classic-level/-/classic-level-2.0.0.tgz", - "integrity": "sha512-ftiMvKgCQK+OppXcvMieDoYlYLYWhScK6yZRFBrrlHQRbm4k6Gr+yDgu/wt3V0k1/jtNbuiXAsRmuAFcD0Tx5Q==", + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/classic-level/-/classic-level-3.0.0.tgz", + "integrity": "sha512-yGy8j8LjPbN0Bh3+ygmyYvrmskVita92pD/zCoalfcC9XxZj6iDtZTAnz+ot7GG8p9KLTG+MZ84tSA4AhkgVZQ==", "hasInstallScript": true, "license": "MIT", "dependencies": { - "abstract-level": "^2.0.0", + "abstract-level": "^3.1.0", "module-error": "^1.0.1", "napi-macros": "^2.2.2", "node-gyp-build": "^4.3.0" diff --git a/package.json b/package.json index 88e290b..8883ce7 100644 --- a/package.json +++ b/package.json @@ -19,12 +19,13 @@ }, "files": [ "index.js", + "get-sync.js", "index.d.ts", "CHANGELOG.md", "UPGRADING.md" ], "dependencies": { - "classic-level": "^2.0.0", + "classic-level": "^3.0.0", "many-level": "git+ssh://git@github.com/ForgeVTT/many-level.git#e3e77e4ab35f1a39e4c134228b061b70043da442", "module-error": "^1.0.2", "readable-stream": "^4.0.0" diff --git a/test/basic.js b/test/basic.js index c84fa12..cf2d378 100644 --- a/test/basic.js +++ b/test/basic.js @@ -20,6 +20,22 @@ test('single database', async function (t) { await db.close() }) +test('single database getSync', async function (t) { + t.plan(3) + + const location = tempy.directory() + const db = new RaveLevel(location, { valueEncoding: 'json' }) + const value = { number: Math.floor(Math.random() * 100000) } + + await db.put('a', value) + + t.is(db.supports.getSync, true) + t.same(db.getSync('a'), value) + t.is(db.getSync('missing'), undefined) + + await db.close() +}) + test('two databases', async function (t) { t.plan(3) @@ -39,6 +55,26 @@ test('two databases', async function (t) { t.is(db2.status, 'closed') }) +test('follower database getSync', async function (t) { + t.plan(4) + + const location = tempy.directory() + const db1 = new RaveLevel(location, { valueEncoding: 'json' }) + const db2 = new RaveLevel(location, { valueEncoding: 'json' }) + const value = { number: Math.floor(Math.random() * 100000) } + + await db1.put('a', value) + await db2.open() + + t.is(db2.isLeader, false) + t.is(db2.supports.getSync, true) + t.same(db2.getSync('a'), value) + t.is(db2.getSync('missing'), undefined) + + await db1.close() + await db2.close() +}) + test('three databases', async function (t) { t.plan(5) diff --git a/test/get-sync.js b/test/get-sync.js new file mode 100644 index 0000000..2e8d986 --- /dev/null +++ b/test/get-sync.js @@ -0,0 +1,67 @@ +'use strict' + +const test = require('tape') +const tempy = require('./util/tempy') +const { fork } = require('child_process') +const { once } = require('events') +const { RaveLevel } = require('..') + +if (process.argv[2] === 'child') { + (async () => { + const [location] = process.argv.slice(3) + const db = new RaveLevel(location, { valueEncoding: 'json' }) + + try { + await db.open() + + process.send({ + isLeader: db.isLeader, + value: db.getSync('a'), + missing: db.getSync('missing') + }) + + await db.close() + process.exit(0) + } catch (err) { + process.send({ + error: err.message, + code: err.code, + stack: err.stack + }) + process.exit(1) + } + })() +} else { + test('getSync from follower process', async function (t) { + const location = tempy.directory() + const leader = new RaveLevel(location, { valueEncoding: 'json' }) + const value = { number: Math.floor(Math.random() * 100000) } + + await once(leader, 'leader') + await leader.put('a', value) + + const result = await new Promise((resolve, reject) => { + const child = fork(__filename, ['child', location], { timeout: 30e3 }) + let message = null + + child.on('message', msg => { + message = msg + }) + + child.on('error', reject) + + child.on('exit', (code, signal) => { + resolve({ code, signal, message }) + }) + }) + + t.is(result.code, 0) + t.is(result.signal, null) + t.is(result.message && result.message.error, undefined) + t.is(result.message && result.message.isLeader, false) + t.same(result.message && result.message.value, value) + t.is(result.message && result.message.missing, undefined) + + await leader.close() + }) +}