Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cap-js-community/event-queue",
"version": "2.1.0-beta.0",
"version": "2.1.0-beta.1",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
"main": "src/index.js",
"types": "src/index.d.ts",
Expand All @@ -22,8 +22,6 @@
"multi-tenancy"
],
"scripts": {
"start": "PORT=4005 cds-serve",
"watch": "PORT=4005 cds watch",
"test:unit": "jest --selectProjects unit",
"test:integration": "jest --selectProjects integration --runInBand",
"voter:test:integration": "jest --selectProjects integration",
Expand All @@ -38,13 +36,12 @@
"eslint:ci": "eslint .",
"prettier": "prettier --write --loglevel error .",
"prettier:ci": "prettier --check .",
"prepareRelease": "npm prune --production",
"docs": "cd docs && bundle exec jekyll serve",
"docs:install": "cd docs && npx shx rm -rf vendor Gemfile.lock && bundle install",
"upgrade-lock": "npx shx rm -rf package-lock.json node_modules && npm i --package-lock"
},
"engines": {
"node": ">=18"
"node": ">=20"
},
"dependencies": {
"@cap-js-community/common": "^0.3.4",
Expand Down
6 changes: 3 additions & 3 deletions src/EventQueueProcessorBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class EventQueueProcessorBase {
UPDATE.entity(this.#config.tableNameEventQueue)
.set({
status: status,
...(error && { error: this.#error2String(error) }),
...(error && { error: this._error2String(error) }),
})
.where({
ID: queueEntryIds,
Expand Down Expand Up @@ -498,7 +498,7 @@ class EventQueueProcessorBase {
}

if (data.error) {
data.error = this.#error2String(data.error);
data.error = this._error2String(data.error);
}

if (!data.startAfter && [EventProcessingStatus.Error, EventProcessingStatus.Open].includes(data.status)) {
Expand Down Expand Up @@ -535,7 +535,7 @@ class EventQueueProcessorBase {
});
}

#error2String(error) {
_error2String(error) {
return JSON.stringify(error, (_, value) => this.#errorReplacer(value));
}

Expand Down
39 changes: 27 additions & 12 deletions src/outbox/EventQueueGenericOutboxHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
return genericHandler ?? null;
}

if (event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)) {
[event] = event.split("/");
}

const specificHandler = this.__onHandlers[[event, saga].join("/")];
if (specificHandler) {
return specificHandler;
Expand Down Expand Up @@ -351,25 +355,27 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}

async processEvent(processContext, key, queueEntries, payload) {
let statusTuple;
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
try {
const { userId, invocationFn, reg } = this.#buildDispatchData(processContext, payload, { key, queueEntries });
await this.#setContextUser(processContext, userId, reg);
const result = await this.__srvUnboxed.tx(processContext)[invocationFn](reg);
const statusTuple = this.#determineResultStatus(result, queueEntries);
await this.#publishFollowupEvents(processContext, reg, statusTuple);
return statusTuple;
statusTuple = this.#determineResultStatus(result, queueEntries);
} catch (err) {
this.logger.error("error processing outboxed service call", err, {
serviceName: this.eventSubType,
});
return queueEntries.map((queueEntry) => [
statusTuple = queueEntries.map((queueEntry) => [
queueEntry.ID,
{
status: EventProcessingStatus.Error,
error: err,
},
]);
}

await this.#publishFollowupEvents(processContext, reg, statusTuple);
return statusTuple;
}

async #publishFollowupEvents(processContext, req, statusTuple) {
Expand All @@ -380,7 +386,7 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
return;
}

if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS) || req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
if (req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_FAILED)) {
return;
}

Expand All @@ -393,23 +399,32 @@ class EventQueueGenericOutboxHandler extends EventQueueBaseClass {
}

for (const [, result] of statusTuple) {
if (succeeded && result.status === EventProcessingStatus.Done) {
await this.__srv.tx(processContext).send(succeeded, result.nextData ?? req.data);
const data = result.nextData ?? req.data;
if (
succeeded &&
result.status === EventProcessingStatus.Done &&
!req.event.endsWith(EVENT_QUEUE_ACTIONS.SAGA_SUCCESS)
) {
await this.__srv.tx(processContext).send(succeeded, data);
}

if (failed && result.status === EventProcessingStatus.Error) {
await this.__srv.tx(processContext).send(failed, result.nextData ?? req.data);
result.error && (data.error = this._error2String(result.error));
await this.__srv.tx(processContext).send(failed, data);
}

delete result.nextData;
}

if (config.insertEventsBeforeCommit) {
this.nextSagaEvents = tx._eventQueue.events;
this.nextSagaEvents = tx._eventQueue?.events;
} else {
this.nextSagaEvents = tx._eventQueue.events.filter((event) => JSON.parse(event.payload).event === failed);
this.nextSagaEvents = tx._eventQueue?.events.filter((event) => JSON.parse(event.payload).event === failed);
}

if (tx._eventQueue) {
tx._eventQueue.events = nextEvents ?? [];
}
tx._eventQueue.events = nextEvents ?? [];
}

#determineResultStatus(result, queueEntries) {
Expand Down
6 changes: 5 additions & 1 deletion test/asset/outboxProject/srv/service/saga-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class StandardService extends cds.Service {
error: req.data.error,
});

if (req.data.throw) {
throw new Error(req.data.throw);
}

return {
status: req.data.status ?? 2,
...(req.data.nextData && { nextData: req.data.nextData }),
Expand Down Expand Up @@ -94,7 +98,7 @@ class StandardService extends cds.Service {
});

return {
status: 2,
status: req.data.status ?? 2,
...(req.data.errorMessage && { error: new Error(req.data.errorMessage) }),
};
});
Expand Down
53 changes: 51 additions & 2 deletions test/eventQueueOutbox.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,10 @@ describe("event-queue outbox", () => {

it("if failed handler exists and event is red, trigger next event", async () => {
const service = await cds.connect.to("Saga");
await service.send("saga", { status: EventProcessingStatus.Error });
await service.send("saga", {
status: EventProcessingStatus.Error,
nextData: { status: EventProcessingStatus.Done },
});
await commitAndOpenNew();
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
Expand Down Expand Up @@ -2541,12 +2544,58 @@ describe("event-queue outbox", () => {
expect(loggerMock.callsLengths().error).toEqual(0);
config.insertEventsBeforeCommit = true;
});

it("exception should trigger failed and pass error", async () => {
const service = await cds.connect.to("Saga");
await service.send("saga", { throw: "error", nextData: { status: EventProcessingStatus.Done } });
await commitAndOpenNew();
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
expectedLength: 2,
additionalColumns: ["payload", "lastAttemptTimestamp"],
});
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
expect(done.status).toEqual(EventProcessingStatus.Error);

expect(JSON.parse(next.payload)).toMatchObject({
event: "saga/#failed",
data: { error: expect.stringContaining("error") },
});
expect(next.status).toEqual(EventProcessingStatus.Done);
expect(loggerMock.callsLengths().error).toEqual(1);
});

it("error in succeeded should trigger failed", async () => {
const service = await cds.connect.to("Saga");
await service.send("saga", {
status: EventProcessingStatus.Done,
nextData: { status: EventProcessingStatus.Error },
});
await commitAndOpenNew();
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
const [done, next, nextFailed] = await testHelper.selectEventQueueAndReturn(tx, {
expectedLength: 3,
additionalColumns: ["payload", "lastAttemptTimestamp"],
});
expect(JSON.parse(done.payload)).toMatchObject({ event: "saga" });
expect(done.status).toEqual(EventProcessingStatus.Done);

expect(JSON.parse(next.payload)).toMatchObject({ event: "saga/#succeeded" });
expect(next.status).toEqual(EventProcessingStatus.Error);

expect(JSON.parse(nextFailed.payload)).toMatchObject({ event: "saga/#failed" });
expect(nextFailed.status).toEqual(EventProcessingStatus.Error);
expect(loggerMock.callsLengths().error).toEqual(0);
});
});

describe("provide next data", () => {
it("failed handler with next data", async () => {
const service = await cds.connect.to("Saga");
await service.send("saga", { status: EventProcessingStatus.Error, nextData: { newData: "dummyData" } });
await service.send("saga", {
status: EventProcessingStatus.Error,
nextData: { newData: "dummyData", status: EventProcessingStatus.Done },
});
await commitAndOpenNew();
await processEventQueue(tx.context, "CAP_OUTBOX", service.name);
const [done, next] = await testHelper.selectEventQueueAndReturn(tx, {
Expand Down