diff --git a/package-lock.json b/package-lock.json index 98585e4b..1e27f579 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@cap-js-community/event-queue", - "version": "2.1.0-beta.0", + "version": "2.1.0-beta.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@cap-js-community/event-queue", - "version": "2.1.0-beta.0", + "version": "2.1.0-beta.1", "license": "Apache-2.0", "dependencies": { "@cap-js-community/common": "^0.3.4", @@ -32,7 +32,7 @@ "prettier": "^2.8.8" }, "engines": { - "node": ">=18" + "node": ">=20" } }, "node_modules/@actions/core": { diff --git a/package.json b/package.json index 2ff9cd05..d5369d4a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@cap-js-community/event-queue", - "version": "2.1.0-beta.0", + "version": "2.1.0-beta.1", "description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", "main": "src/index.js", "types": "src/index.d.ts", @@ -22,8 +22,6 @@ "multi-tenancy" ], "scripts": { - "start": "PORT=4005 cds-serve", - "watch": "PORT=4005 cds watch", "test:unit": "jest --selectProjects unit", "test:integration": "jest --selectProjects integration --runInBand", "voter:test:integration": "jest --selectProjects integration", @@ -38,13 +36,12 @@ "eslint:ci": "eslint .", "prettier": "prettier --write --loglevel error .", "prettier:ci": "prettier --check .", - "prepareRelease": "npm prune --production", "docs": "cd docs && bundle exec jekyll serve", "docs:install": "cd docs && npx shx rm -rf vendor Gemfile.lock && bundle install", "upgrade-lock": "npx shx rm -rf package-lock.json node_modules && npm i --package-lock" }, "engines": { - "node": ">=18" + "node": ">=20" }, "dependencies": { "@cap-js-community/common": "^0.3.4", diff --git a/src/EventQueueProcessorBase.js b/src/EventQueueProcessorBase.js index 2eed16d2..1ee4d39b 100644 --- a/src/EventQueueProcessorBase.js +++ b/src/EventQueueProcessorBase.js @@ -408,7 +408,7 @@ class EventQueueProcessorBase { UPDATE.entity(this.#config.tableNameEventQueue) .set({ status: status, - ...(error && { error: this.#error2String(error) }), + ...(error && { error: this._error2String(error) }), }) .where({ ID: queueEntryIds, @@ -498,7 +498,7 @@ class EventQueueProcessorBase { } if (data.error) { - data.error = this.#error2String(data.error); + data.error = this._error2String(data.error); } if (!data.startAfter && [EventProcessingStatus.Error, EventProcessingStatus.Open].includes(data.status)) { @@ -535,7 +535,7 @@ class EventQueueProcessorBase { }); } - #error2String(error) { + _error2String(error) { return JSON.stringify(error, (_, value) => this.#errorReplacer(value)); } diff --git a/src/outbox/EventQueueGenericOutboxHandler.js b/src/outbox/EventQueueGenericOutboxHandler.js index c77d0351..a76895af 100644 --- a/src/outbox/EventQueueGenericOutboxHandler.js +++ b/src/outbox/EventQueueGenericOutboxHandler.js @@ -312,6 +312,10 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { return genericHandler ?? null; } + if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) { + [event] = event.split("/"); + } + const specificHandler = this.__onHandlers[[event, saga].join("/")]; if (specificHandler) { return specificHandler; @@ -351,18 +355,17 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { } async processEvent(processContext, key, queueEntries, payload) { + let statusTuple; + const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries }); try { - const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries }); await this.#setContextUser(processContext, userId, reg); const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg); - const statusTuple = this.#determineResultStatus(result, queueEntries); - await this.#publishFollowupEvents(processContext, reg, statusTuple); - return statusTuple; + statusTuple = this.#determineResultStatus(result, queueEntries); } catch (err) { this.logger.error("error processing outboxed service call", err, { serviceName: this.eventSubType, }); - return queueEntries.map((queueEntry) => [ + statusTuple = queueEntries.map((queueEntry) => [ queueEntry.ID, { status: EventProcessingStatus.Error, @@ -370,6 +373,9 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { }, ]); } + + await this.#publishFollowupEvents(processContext, reg, statusTuple); + return statusTuple; } async #publishFollowupEvents(processContext, req, statusTuple) { @@ -380,7 +386,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { return; } - if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) { + if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) { return; } @@ -393,23 +399,32 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass { } for (const [, result] of statusTuple) { - if (succeeded && result.status === EventProcessingStatus.Done) { - await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data); + const data = result.nextData ?? req.data; + if ( + succeeded && + result.status === EventProcessingStatus.Done && + !req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) + ) { + await this.__srv.tx(processContext).send(succeeded, data); } if (failed && result.status === EventProcessingStatus.Error) { - await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data); + result.error && (data.error = this._error2String(result.error)); + await this.__srv.tx(processContext).send(failed, data); } delete result.nextData; } if (config.insertEventsBeforeCommit) { - this.nextSagaEvents = tx._eventQueue.events; + this.nextSagaEvents = tx._eventQueue?.events; } else { - this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed); + this.nextSagaEvents = tx._eventQueue?.events.filter((event) => JSON.parse(event.payload).event === failed); + } + + if (tx._eventQueue) { + tx._eventQueue.events = nextEvents ?? []; } - tx._eventQueue.events = nextEvents ?? []; } #determineResultStatus(result, queueEntries) { diff --git a/test/asset/outboxProject/srv/service/saga-service.js b/test/asset/outboxProject/srv/service/saga-service.js index 09418b95..5bb4f171 100644 --- a/test/asset/outboxProject/srv/service/saga-service.js +++ b/test/asset/outboxProject/srv/service/saga-service.js @@ -12,6 +12,10 @@ class StandardService extends cds.Service { error: req.data.error, }); + if (req.data.throw) { + throw new Error(req.data.throw); + } + return { status: req.data.status ?? 2, ...(req.data.nextData && { nextData: req.data.nextData }), @@ -94,7 +98,7 @@ class StandardService extends cds.Service { }); return { - status: 2, + status: req.data.status ?? 2, ...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }), }; }); diff --git a/test/eventQueueOutbox.test.js b/test/eventQueueOutbox.test.js index e6892a92..ecfa80d0 100644 --- a/test/eventQueueOutbox.test.js +++ b/test/eventQueueOutbox.test.js @@ -2508,7 +2508,10 @@ describe("event-queue outbox", () => { it("if failed handler exists and event is red, trigger next event", async () => { const service = await cds.connect.to("Saga"); - await service.send("saga", { status: EventProcessingStatus.Error }); + await service.send("saga", { + status: EventProcessingStatus.Error, + nextData: { status: EventProcessingStatus.Done }, + }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { @@ -2541,12 +2544,58 @@ describe("event-queue outbox", () => { expect(loggerMock.callsLengths().error).toEqual(0); config.insertEventsBeforeCommit = true; }); + + it("exception should trigger failed and pass error", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { throw: "error", nextData: { status: EventProcessingStatus.Done } }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + const [done, next] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 2, + additionalColumns: ["payload", "lastAttemptTimestamp"], + }); + expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); + expect(done.status).toEqual(EventProcessingStatus.Error); + + expect(JSON.parse(next.payload)).toMatchObject({ + event: "saga/#failed", + data: { error: expect.stringContaining("error") }, + }); + expect(next.status).toEqual(EventProcessingStatus.Done); + expect(loggerMock.callsLengths().error).toEqual(1); + }); + + it("error in succeeded should trigger failed", async () => { + const service = await cds.connect.to("Saga"); + await service.send("saga", { + status: EventProcessingStatus.Done, + nextData: { status: EventProcessingStatus.Error }, + }); + await commitAndOpenNew(); + await processEventQueue(tx.context, "CAP_OUTBOX", service.name); + const [done, next, nextFailed] = await testHelper.selectEventQueueAndReturn(tx, { + expectedLength: 3, + additionalColumns: ["payload", "lastAttemptTimestamp"], + }); + expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" }); + expect(done.status).toEqual(EventProcessingStatus.Done); + + expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" }); + expect(next.status).toEqual(EventProcessingStatus.Error); + + expect(JSON.parse(nextFailed.payload)).toMatchObject({ event: "saga/#failed" }); + expect(nextFailed.status).toEqual(EventProcessingStatus.Error); + expect(loggerMock.callsLengths().error).toEqual(0); + }); }); describe("provide next data", () => { it("failed handler with next data", async () => { const service = await cds.connect.to("Saga"); - await service.send("saga", { status: EventProcessingStatus.Error, nextData: { newData: "dummyData" } }); + await service.send("saga", { + status: EventProcessingStatus.Error, + nextData: { newData: "dummyData", status: EventProcessingStatus.Done }, + }); await commitAndOpenNew(); await processEventQueue(tx.context, "CAP_OUTBOX", service.name); const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {