From 153e487a94d9e191e2c994b307964ea44e721794 Mon Sep 17 00:00:00 2001 From: Peter Savchenko Date: Tue, 23 Dec 2025 22:19:50 +0300 Subject: [PATCH 01/11] Add repetitionId to event notification flow Introduces the repetitionId field to event notification data structures and templates, allowing emails and notifications to reference specific event repetitions. Updates TypeScript interfaces, worker logic, and email templates to support and display repetitionId where applicable. --- workers/email/src/templates/emails/event/html.twig | 7 ++++++- workers/email/src/templates/emails/event/text.twig | 7 ++++++- workers/grouper/src/index.ts | 1 + workers/notifier/src/index.ts | 1 + workers/notifier/types/channel.ts | 6 ++++++ workers/notifier/types/notifier-task.ts | 6 ++++++ workers/sender/src/index.ts | 3 ++- workers/sender/types/template-variables/event.ts | 5 +++++ 8 files changed, 33 insertions(+), 3 deletions(-) diff --git a/workers/email/src/templates/emails/event/html.twig b/workers/email/src/templates/emails/event/html.twig index 35be79c6c..535afe3a7 100644 --- a/workers/email/src/templates/emails/event/html.twig +++ b/workers/email/src/templates/emails/event/html.twig @@ -8,6 +8,7 @@ {% set utmParams = 'utm_source=email&utm_medium=transactional&utm_campaign=event' %} {% set event = events[0].event %} + {% set repetitionId = events[0].repetitionId %} {% set daysRepeated = events[0].daysRepeated %} {% set newCount = events[0].newCount %} {% set usersAffected = events[0].usersAffected %} @@ -56,7 +57,11 @@ - {% set eventURL = host ~ '/project/' ~ project._id ~ '/event/' ~ event._id ~ '?' ~ utmParams %} + {% if repetitionId %} + {% set eventURL = host ~ '/project/' ~ project._id ~ '/event/' ~ event._id ~ '/' ~ repetitionId ~ '/overview?' ~ utmParams %} + {% else %} + {% set eventURL = host ~ '/project/' ~ project._id ~ '/event/' ~ event._id ~ '?' ~ utmParams %} + {% endif %} {% include '../../components/button.twig' with {href: eventURL, label: 'Смотреть детали'} %} diff --git a/workers/email/src/templates/emails/event/text.twig b/workers/email/src/templates/emails/event/text.twig index d12c7f00d..f078d7d10 100644 --- a/workers/email/src/templates/emails/event/text.twig +++ b/workers/email/src/templates/emails/event/text.twig @@ -1,4 +1,5 @@ {% set event = events[0].event %} +{% set repetitionId = events[0].repetitionId %} {% set daysRepeated = events[0].daysRepeated %} {% set newCount = events[0].newCount %} {% set usersAffected = events[0].usersAffected %} @@ -25,7 +26,11 @@ Это событие произошло {{ event.totalCount }} {{ pluralize_ru(event.totalCount, ['раз', 'раза', 'раз']) }} за {{ daysRepeated }} {{ pluralize_ru(daysRepeated, ['день', 'дня', 'дней']) }}. -Смотреть детали: {{ host }}/project/{{ project._id }}/event/{{ event._id }}?{{ utmParams }} +{% if repetitionId %} +Смотреть детали: {{ host }}/project/{{ project._id }}/event/{{ event._id }}/{{ repetitionId }}/overview?{{ utmParams }} +{% else %} +Смотреть детали: {{ host }}/project/{{ project._id }}/event/{{ event._id }}/overview?{{ utmParams }} +{% endif %} *** diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 8eef04fce..8d20daf56 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -265,6 +265,7 @@ export default class GrouperWorker extends Worker { title: task.payload.title, groupHash: uniqueEventHash, isNew: isFirstOccurrence, + repetitionId: repetitionId ? repetitionId.toString() : null, }, }); } diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index a05101c32..d2d17966e 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -160,6 +160,7 @@ export default class NotifierWorker extends Worker { await this.sendToSenderWorker(channelKey, [ { key: event.groupHash, count: 1, + repetitionId: event.repetitionId, } ]); } } diff --git a/workers/notifier/types/channel.ts b/workers/notifier/types/channel.ts index 3a195c3d7..001a1f273 100644 --- a/workers/notifier/types/channel.ts +++ b/workers/notifier/types/channel.ts @@ -35,6 +35,12 @@ export interface SenderData { * Number of events received */ count: number; + + /** + * ID of the repetition that triggered this notification + * null for first occurrence, ObjectId string for repetitions + */ + repetitionId: string | null; } /** diff --git a/workers/notifier/types/notifier-task.ts b/workers/notifier/types/notifier-task.ts index f773660c9..04bf3abb6 100644 --- a/workers/notifier/types/notifier-task.ts +++ b/workers/notifier/types/notifier-task.ts @@ -14,6 +14,12 @@ export type NotifierEvent = Pick, 'title'> & { * Flag to show if event is received first time */ isNew: boolean; + + /** + * ID of the repetition that triggered this notification + * null for first occurrence, string for repetitions + */ + repetitionId: string | null; }; /** diff --git a/workers/sender/src/index.ts b/workers/sender/src/index.ts index 7f24c1dc3..23fdeb63c 100644 --- a/workers/sender/src/index.ts +++ b/workers/sender/src/index.ts @@ -171,13 +171,14 @@ export default abstract class SenderWorker extends Worker { const eventsData = await Promise.all( events.map( - async ({ key: groupHash, count }: { key: string; count: number }): Promise => { + async ({ key: groupHash, count, repetitionId }: { key: string; count: number; repetitionId?: string | null }): Promise => { const [event, daysRepeated] = await this.getEventDataByGroupHash(projectId, groupHash); return { event, newCount: count, daysRepeated, + repetitionId: repetitionId ?? null, }; } ) diff --git a/workers/sender/types/template-variables/event.ts b/workers/sender/types/template-variables/event.ts index 701444ece..69e7fe3d3 100644 --- a/workers/sender/types/template-variables/event.ts +++ b/workers/sender/types/template-variables/event.ts @@ -25,6 +25,11 @@ export interface TemplateEventData { * Number of affected users for this event */ usersAffected?: number; + + /** + * ID of the particular repetition of occurred event + */ + repetitionId?: string | null; } /** From fe1554fefc235fa937f899b196dd8813c283eb8e Mon Sep 17 00:00:00 2001 From: Peter Savchenko Date: Tue, 23 Dec 2025 23:40:34 +0300 Subject: [PATCH 02/11] add logs to sentry worker --- workers/sentry/src/index.ts | 54 ++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/workers/sentry/src/index.ts b/workers/sentry/src/index.ts index 42c5bdfa5..0af9bef5f 100644 --- a/workers/sentry/src/index.ts +++ b/workers/sentry/src/index.ts @@ -39,9 +39,24 @@ export default class SentryEventWorker extends Worker { const [headers, items] = envelope; + if (items.length === 0) { + this.logger.warn('Received envelope with no items'); + return; + } + + let processedCount = 0; + let skippedCount = 0; + for (const item of items) { - await this.handleEnvelopeItem(headers, item, event.projectId); + const result = await this.handleEnvelopeItem(headers, item, event.projectId); + if (result === 'processed') { + processedCount++; + } else if (result === 'skipped') { + skippedCount++; + } } + + this.logger.verbose(`Processed ${processedCount} events, skipped ${skippedCount} non-event items from envelope`); } catch (error) { this.logger.error(`Error handling Sentry event task:`, error); this.logger.info('👇 Here is the problematic event:'); @@ -99,8 +114,9 @@ export default class SentryEventWorker extends Worker { * @param envelopeHeaders - The whole envelope headers * @param item - Sentry item * @param projectId - Sentry project ID + * @returns 'processed' if event was sent, 'skipped' if non-event item, throws error on failure */ - private async handleEnvelopeItem(envelopeHeaders: Envelope[0], item: EnvelopeItem, projectId: string): Promise { + private async handleEnvelopeItem(envelopeHeaders: Envelope[0], item: EnvelopeItem, projectId: string): Promise<'processed' | 'skipped'> { try { const [itemHeader, itemPayload] = item; @@ -112,7 +128,8 @@ export default class SentryEventWorker extends Worker { * Skip non-event items */ if (itemHeader.type !== 'event') { - return; + this.logger.verbose(`Skipping non-event item of type: ${itemHeader.type}`); + return 'skipped'; } const payloadHasSDK = typeof itemPayload === 'object' && 'sdk' in itemPayload; @@ -121,18 +138,37 @@ export default class SentryEventWorker extends Worker { */ const sentryJsSDK = ['browser', 'react', 'vue', 'angular', 'capacirtor', 'electron']; - const isJsSDK = payloadHasSDK && sentryJsSDK.includes(itemPayload.sdk.name); + /** + * Safely check if SDK name exists and is in the list + */ + const sdkName = payloadHasSDK && itemPayload.sdk && typeof itemPayload.sdk === 'object' && 'name' in itemPayload.sdk + ? itemPayload.sdk.name + : undefined; + + const isJsSDK = sdkName !== undefined && sentryJsSDK.includes(sdkName); const hawkEvent = this.transformToHawkFormat(envelopeHeaders as EventEnvelope[0], item as EventItem, projectId, isJsSDK); /** - * If we have release attached to the event + * Send task to appropriate worker and check if it was successfully queued */ - if (isJsSDK) { - await this.addTask(WorkerNames.JAVASCRIPT, hawkEvent as JavaScriptEventWorkerTask); - } else { - await this.addTask(WorkerNames.DEFAULT, hawkEvent as DefaultEventWorkerTask); + const workerName = isJsSDK ? WorkerNames.JAVASCRIPT : WorkerNames.DEFAULT; + const taskSent = await this.addTask(workerName, hawkEvent as JavaScriptEventWorkerTask | DefaultEventWorkerTask); + + if (!taskSent) { + /** + * If addTask returns false, the message was not queued (queue full or channel closed) + * This is a critical error that should be logged and thrown + */ + const error = new Error(`Failed to queue event to ${workerName} worker. Queue may be full or channel closed.`); + this.logger.error(error.message); + this.logger.info('👇 Here is the event that failed to queue:'); + this.logger.json(hawkEvent); + throw error; } + + this.logger.verbose(`Successfully queued event to ${workerName} worker`); + return 'processed'; } catch (error) { this.logger.error('Error handling envelope item:', error); this.logger.info('👇 Here is the problematic item:'); From 8777e77c8706215d5fa0f890922d237cbed18417 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 31 Jan 2026 02:20:15 +0500 Subject: [PATCH 03/11] chore(grouper): add counters to the grouper worker --- workers/grouper/src/index.ts | 181 +++++++++++++++++ workers/grouper/src/redisHelper.ts | 213 ++++++++++++++++++- workers/grouper/tests/index.test.ts | 304 ++++++++++++++++++++++++++++ 3 files changed, 697 insertions(+), 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc77..8a10b369c 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import TimeMs from '../../../lib/utils/time'; /** * eslint does not count decorators as a variable usage @@ -268,6 +269,186 @@ export default class GrouperWorker extends Worker { }); } } + + await this.incrementRateLimitCounter(task.projectId); + await this.recordProjectMetrics(task.projectId, 'events-stored'); + } + + /** + * Build RedisTimeSeries key for project metrics. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + * @param granularity - time granularity + */ + private getTimeSeriesKey( + projectId: string, + metricType: string, + granularity: 'minutely' | 'hourly' | 'daily' + ): string { + return `ts:project-${metricType}:${projectId}:${granularity}`; + } + + /** + * Record project metrics to Redis TimeSeries. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + */ + private async recordProjectMetrics(projectId: string, metricType: string): Promise { + const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely'); + const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly'); + const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily'); + + const labels: Record = { + type: 'error', + status: metricType, + project: projectId, + }; + + const series = [ + { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, + { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + ]; + + for (const { key, label, retentionMs } of series) { + try { + await this.redis.safeTsAdd(key, 1, labels, retentionMs); + } catch (error) { + this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + } + } + } + + /** + * Increment rate limit counters for the project. + * + * @param projectId - id of the project + */ + private async incrementRateLimitCounter(projectId: string): Promise { + try { + const settings = await this.getProjectRateLimitSettings(projectId); + + if (!settings) { + return; + } + + await this.redis.incrementRateLimitCounterForCurrentEvent( + projectId, + settings.eventsPeriod, + settings.eventsLimit + ); + } catch (error) { + this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); + } + } + + /** + * Fetch and normalize rate limit settings + * Rate limit settings could appear in tarifPlan, workspace and project. + * All rateLimits have different priority. + * + * @param projectId - id of the project + */ + @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) + private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { + if (!projectId || !mongodb.ObjectID.isValid(projectId)) { + return null; + } + + const accountsDb = this.accountsDb.getConnection(); + + /** + * Fetch project from the db + */ + const project = await accountsDb + .collection('projects') + .findOne( + { _id: new mongodb.ObjectId(projectId) }, + { projection: { rateLimitSettings: 1, workspaceId: 1 } } + ); + + if (!project) { + return null; + } + + const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; + const workspaceId = new mongodb.ObjectID(project.workspaceId); + + let planRateLimitSettings: { N: number, T: number}; + let workspaceRateLimitSettings: { N: number, T: number}; + + /** + * Fetch workspace from the db + */ + if (workspaceId) { + const workspace = await accountsDb + .collection('workspaces') + .findOne( + { _id: workspaceId }, + { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + ); + + workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; + + const planId = new mongodb.ObjectId(workspace?.tariffPlanId); + + /** + * Tarif plan from the db + */ + if (planId) { + const plan = await accountsDb + .collection('plans') + .findOne( + { _id: planId }, + { projection: { rateLimitSettings: 1 } } + ); + + planRateLimitSettings = plan?.rateLimitSettings; + } + } + + return this.normalizeRateLimitSettings( + planRateLimitSettings, + workspaceRateLimitSettings, + projectRateLimitSettings + ); + } + + /** + * Normalize rate limit settings shape from database. + * + * @param rateLimitLayers - raw settings documents in priority order + */ + private normalizeRateLimitSettings( + ...rateLimitLayers: { N: number, T: number }[] + ): { eventsLimit: number; eventsPeriod: number } | null { + let eventsLimit = 0; + let eventsPeriod = 0; + + for (const layer of rateLimitLayers) { + if (!layer) { + continue; + } + + const limit = layer.N as number; + const period = layer.T as number; + + if (limit !== undefined && limit > 0) { + eventsLimit = limit; + } + + if (period !== undefined && period > 0) { + eventsPeriod = period; + } + } + + if (eventsLimit <= 0 || eventsPeriod <= 0) { + return null; + } + + return { eventsLimit, eventsPeriod }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index a655c24bf..f0f45a7eb 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -110,6 +110,217 @@ export default class RedisHelper { return result === null; } + /** + * Increments redis counter used for rate limiting + * + * @param projectId - id of the project which event belongs to + * @param eventsPeriod - rate limit period configured for the project + */ + public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { + const script = ` + local key = KEYS[1] + local field = ARGV[1] + local now = tonumber(ARGV[2]) + local period = tonumber(ARGV[3]) + local limit = tonumber(ARGV[4]) + + local current = redis.call('HGET', key, field) + + -- If no record yet, start a new window with count = 1 + if not current then + redis.call('HSET', key, field, now .. ':1') + return + end + + local timestamp, count = string.match(current, '(%d+):(%d+)') + timestamp = tonumber(timestamp) + count = tonumber(count) + + -- Check if we're in a new time window + if now - timestamp >= period then + redis.call('HSET', key, field, now .. ':1') + return + end + + -- Check if incrementing would exceed limit + if count + 1 > limit then + return + end + + -- Increment counter + redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) + ` + + const key = 'rate_limits'; + const now = Math.floor(Date.now() / 1000); + + await this.redisClient.eval(script, { + keys: [key], + arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], + }); + } + + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + + /** + * Creates a RedisTimeSeries key if it doesn't exist. + * + * @param key - time series key + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async tsCreateIfNotExists( + key: string, + labels: Record, + retentionMs = 0 + ): Promise { + const exists = await this.redisClient.exists(key); + + if (exists > 0) { + return; + } + + const args: string[] = ['TS.CREATE', key]; + + if (retentionMs > 0) { + args.push('RETENTION', Math.floor(retentionMs).toString()); + } + + args.push(...this.buildLabelArguments(labels)); + + await this.redisClient.sendCommand(args); + } + + /** + * Increments a RedisTimeSeries key with labels and timestamp. + * + * @param key - time series key + * @param value - value to increment by + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsIncrBy( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.INCRBY', + key, + value.toString(), + 'TIMESTAMP', + Math.floor(timestamp).toString(), + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and increments it safely. + * + * @param key - time series key + * @param value - value to increment by + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsIncrBy( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsIncrBy(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsIncrBy(key, value, timestamp, labels); + } else { + throw error; + } + } + } + + /** + * Adds a sample to a RedisTimeSeries key. + * + * @param key - time series key + * @param value - value to add + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsAdd( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.ADD', + key, + Math.floor(timestamp).toString(), + value.toString(), + 'ON_DUPLICATE', + 'SUM', + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and adds a sample safely. + * + * @param key - time series key + * @param value - value to add + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsAdd( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsAdd(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsAdd(key, value, timestamp, labels); + } else { + throw error; + } + } + } + /** * Creates callback function for Redis operations * @@ -130,4 +341,4 @@ export default class RedisHelper { resolve(resp !== 'OK'); }; } -} +} \ No newline at end of file diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index ee781e98a..2ae618101 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -7,6 +7,7 @@ import type { Collection } from 'mongodb'; import { MongoClient } from 'mongodb'; import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import TimeMs from '../../../lib/utils/time'; import * as mongodb from 'mongodb'; import { patch } from '@n1ru4l/json-patch-plus'; @@ -57,16 +58,41 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ +const planIdMock = new mongodb.ObjectId(); +const workspaceIdMock = new mongodb.ObjectId(); + +const planMock = { + _id: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + +const workspaceMock = { + _id: workspaceIdMock, + tariffPlanId: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', + workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', + rateLimitSettings: { + N: 0, + T: 0, + }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -113,8 +139,30 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; + let workspacesCollection: Collection; + let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; + const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await plansCollection.updateOne( + { _id: planIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await workspacesCollection.updateOne( + { _id: workspaceIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await projectsCollection.updateOne( + { _id: new mongodb.ObjectId(projectIdMock) }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + ); + }; beforeAll(async () => { worker = new GrouperWorker(); @@ -133,12 +181,17 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); + workspacesCollection = accountsConnection.db().collection('workspaces'); + plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); + await plansCollection.insertOne(planMock); + await workspacesCollection.insertOne(workspaceMock); + /** * Insert mock project into accounts database */ @@ -155,9 +208,13 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); + delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); + await setPlanRateLimit(0, 0); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -743,10 +800,257 @@ describe('GrouperWorker', () => { }); }); + describe('Rate limits counter increment', () => { + const rateLimitsKey = 'rate_limits'; + + test('increments counter when handling an event', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 1_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); + } finally { + nowSpy.mockRestore(); + } + }); + + test('reuses window and increments while within limit', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 2_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(2); + } finally { + nowSpy.mockRestore(); + } + }); + + test('does not exceed configured limit within same window', async () => { + const eventsLimit = 3; + + await setProjectRateLimit(eventsLimit, 60); + + let currentTime = 3_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < eventsLimit + 2; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(eventsLimit); + } finally { + nowSpy.mockRestore(); + } + }); + + test('resets window after period elapses', async () => { + await setProjectRateLimit(5, 2); + + let currentTime = 4_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + currentTime += 3_000; // advance by 3 seconds + + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [timestamp, count] = (storedValue as string).split(':'); + + expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); + expect(Number(count)).toBe(1); + } finally { + nowSpy.mockRestore(); + } + }); + + test('uses workspace limits when project overrides are absent', async () => { + await setPlanRateLimit(10, 60); + await setWorkspaceRateLimit(3, 60); + await setProjectRateLimit(0, 0); + + let currentTime = 5_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 5; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(3); + } finally { + nowSpy.mockRestore(); + } + }); + + test('falls back to plan limits when workspace settings are empty', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); + + let currentTime = 6_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 6; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(4); + } finally { + nowSpy.mockRestore(); + } + }); + + test('prefers project limits over workspace and plan', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(6, 60); + await setProjectRateLimit(8, 60); + + let currentTime = 7_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 10; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(8); + } finally { + nowSpy.mockRestore(); + } + }); + }); + + describe('Events-stored metrics', () => { + test('writes minutely, hourly, and daily samples after handling an event', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + + try { + await worker.handle(generateTask()); + + expect(safeTsAddSpy).toHaveBeenCalledTimes(3); + + const expectedLabels = { + type: 'error', + status: 'events-stored', + project: projectIdMock, + }; + + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 1, + `ts:project-events-stored:${projectIdMock}:minutely`, + 1, + expectedLabels, + TimeMs.DAY, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 2, + `ts:project-events-stored:${projectIdMock}:hourly`, + 1, + expectedLabels, + TimeMs.WEEK, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 3, + `ts:project-events-stored:${projectIdMock}:daily`, + 1, + expectedLabels, + 90 * TimeMs.DAY, + ); + } finally { + safeTsAddSpy.mockRestore(); + } + }); + + test('logs when a time-series write fails but continues processing', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined); + const failure = new Error('TS failure'); + + safeTsAddSpy + .mockImplementationOnce(() => Promise.resolve()) + .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(() => Promise.resolve()); + + try { + await worker.handle(generateTask()); + + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(await eventsCollection.find().count()).toBe(1); + } finally { + safeTsAddSpy.mockRestore(); + loggerErrorSpy.mockRestore(); + } + }); + + test('records metrics exactly once per handled event', async () => { + const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics'); + + try { + await worker.handle(generateTask()); + + expect(recordMetricsSpy).toHaveBeenCalledTimes(1); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + } finally { + recordMetricsSpy.mockRestore(); + } + }); + }); + afterAll(async () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); + await workspacesCollection.deleteMany({}); + await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From 3badfa93c4ced6f085356c2d114fbef707fecee7 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:49:23 +0300 Subject: [PATCH 04/11] chore(): eslint fix --- workers/grouper/src/index.ts | 1 + workers/grouper/src/redisHelper.ts | 33 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 8a10b369c..f42789ed7 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -309,6 +309,7 @@ export default class GrouperWorker extends Worker { const series = [ { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, ]; diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index f0f45a7eb..0d2072087 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,6 +2,7 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; +import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis @@ -152,7 +153,7 @@ export default class RedisHelper { ` const key = 'rate_limits'; - const now = Math.floor(Date.now() / 1000); + const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { keys: [key], @@ -160,21 +161,6 @@ export default class RedisHelper { }); } - /** - * Build label arguments for RedisTimeSeries commands - * - * @param labels - labels to attach to the time series - */ - private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; - - for (const [labelKey, labelValue] of Object.entries(labels)) { - labelArgs.push(labelKey, labelValue); - } - - return labelArgs; - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * @@ -321,6 +307,21 @@ export default class RedisHelper { } } + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + /** * Creates callback function for Redis operations * From 0d00df96b33ac3ac2a5eb62a65575861f832ff86 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:59:33 +0300 Subject: [PATCH 05/11] chore(): clean up --- workers/grouper/src/index.ts | 39 ++++++++++++++++++----- workers/grouper/src/redisHelper.ts | 7 +++-- workers/grouper/tests/index.test.ts | 49 +++++++++++++++++++++-------- 3 files changed, 72 insertions(+), 23 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index f42789ed7..52b9c705c 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -307,10 +307,22 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + }, + { + key: dailyKey, + label: 'daily', + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + retentionMs: 90 * TimeMs.DAY, + }, ]; for (const { key, label, retentionMs } of series) { @@ -367,7 +379,12 @@ export default class GrouperWorker extends Worker { .collection('projects') .findOne( { _id: new mongodb.ObjectId(projectId) }, - { projection: { rateLimitSettings: 1, workspaceId: 1 } } + { + projection: { + rateLimitSettings: 1, + workspaceId: 1, + }, + } ); if (!project) { @@ -388,7 +405,12 @@ export default class GrouperWorker extends Worker { .collection('workspaces') .findOne( { _id: workspaceId }, - { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + { + projection: { + rateLimitSettings: 1, + tariffPlanId: 1, + }, + } ); workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; @@ -449,7 +471,10 @@ export default class GrouperWorker extends Worker { return null; } - return { eventsLimit, eventsPeriod }; + return { + eventsLimit, + eventsPeriod, + }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index 0d2072087..bed08418e 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -116,6 +116,7 @@ export default class RedisHelper { * * @param projectId - id of the project which event belongs to * @param eventsPeriod - rate limit period configured for the project + * @param limit - current event count limit (from project / workspace / plan) */ public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { const script = ` @@ -150,13 +151,13 @@ export default class RedisHelper { -- Increment counter redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - ` + `; const key = 'rate_limits'; const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { - keys: [key], + keys: [ key ], arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], }); } @@ -313,7 +314,7 @@ export default class RedisHelper { * @param labels - labels to attach to the time series */ private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; + const labelArgs: string[] = [ 'LABELS' ]; for (const [labelKey, labelValue] of Object.entries(labels)) { labelArgs.push(labelKey, labelValue); diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2ae618101..34bad1ccf 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -146,21 +146,42 @@ describe('GrouperWorker', () => { const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await plansCollection.updateOne( { _id: planIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await workspacesCollection.updateOne( { _id: workspaceIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await projectsCollection.updateOne( { _id: new mongodb.ObjectId(projectIdMock) }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + } ); }; @@ -806,7 +827,7 @@ describe('GrouperWorker', () => { test('increments counter when handling an event', async () => { await setProjectRateLimit(5, 60); - let currentTime = 1_000_000; + const currentTime = 1_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -823,7 +844,7 @@ describe('GrouperWorker', () => { test('reuses window and increments while within limit', async () => { await setProjectRateLimit(5, 60); - let currentTime = 2_000_000; + const currentTime = 2_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -847,7 +868,7 @@ describe('GrouperWorker', () => { await setProjectRateLimit(eventsLimit, 60); - let currentTime = 3_000_000; + const currentTime = 3_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -898,7 +919,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(3, 60); await setProjectRateLimit(0, 0); - let currentTime = 5_000_000; + const currentTime = 5_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -923,7 +944,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(0, 0); await setProjectRateLimit(0, 0); - let currentTime = 6_000_000; + const currentTime = 6_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -948,7 +969,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(6, 60); await setProjectRateLimit(8, 60); - let currentTime = 7_000_000; + const currentTime = 7_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -989,21 +1010,21 @@ describe('GrouperWorker', () => { `ts:project-events-stored:${projectIdMock}:minutely`, 1, expectedLabels, - TimeMs.DAY, + TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, `ts:project-events-stored:${projectIdMock}:hourly`, 1, expectedLabels, - TimeMs.WEEK, + TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, `ts:project-events-stored:${projectIdMock}:daily`, 1, expectedLabels, - 90 * TimeMs.DAY, + 90 * TimeMs.DAY ); } finally { safeTsAddSpy.mockRestore(); @@ -1017,7 +1038,9 @@ describe('GrouperWorker', () => { safeTsAddSpy .mockImplementationOnce(() => Promise.resolve()) - .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(async () => { + throw failure; + }) .mockImplementationOnce(() => Promise.resolve()); try { From 20990d34f56a3846ff0a22f0c48845484aaea605 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:05:48 +0300 Subject: [PATCH 06/11] chore(grouper): remove redundant rate-limit increment logic --- workers/grouper/src/index.ts | 144 ------------------- workers/grouper/src/redisHelper.ts | 51 ------- workers/grouper/tests/index.test.ts | 215 ---------------------------- 3 files changed, 410 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52b9c705c..05aff09b0 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -270,7 +270,6 @@ export default class GrouperWorker extends Worker { } } - await this.incrementRateLimitCounter(task.projectId); await this.recordProjectMetrics(task.projectId, 'events-stored'); } @@ -334,149 +333,6 @@ export default class GrouperWorker extends Worker { } } - /** - * Increment rate limit counters for the project. - * - * @param projectId - id of the project - */ - private async incrementRateLimitCounter(projectId: string): Promise { - try { - const settings = await this.getProjectRateLimitSettings(projectId); - - if (!settings) { - return; - } - - await this.redis.incrementRateLimitCounterForCurrentEvent( - projectId, - settings.eventsPeriod, - settings.eventsLimit - ); - } catch (error) { - this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); - } - } - - /** - * Fetch and normalize rate limit settings - * Rate limit settings could appear in tarifPlan, workspace and project. - * All rateLimits have different priority. - * - * @param projectId - id of the project - */ - @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) - private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { - if (!projectId || !mongodb.ObjectID.isValid(projectId)) { - return null; - } - - const accountsDb = this.accountsDb.getConnection(); - - /** - * Fetch project from the db - */ - const project = await accountsDb - .collection('projects') - .findOne( - { _id: new mongodb.ObjectId(projectId) }, - { - projection: { - rateLimitSettings: 1, - workspaceId: 1, - }, - } - ); - - if (!project) { - return null; - } - - const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; - const workspaceId = new mongodb.ObjectID(project.workspaceId); - - let planRateLimitSettings: { N: number, T: number}; - let workspaceRateLimitSettings: { N: number, T: number}; - - /** - * Fetch workspace from the db - */ - if (workspaceId) { - const workspace = await accountsDb - .collection('workspaces') - .findOne( - { _id: workspaceId }, - { - projection: { - rateLimitSettings: 1, - tariffPlanId: 1, - }, - } - ); - - workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; - - const planId = new mongodb.ObjectId(workspace?.tariffPlanId); - - /** - * Tarif plan from the db - */ - if (planId) { - const plan = await accountsDb - .collection('plans') - .findOne( - { _id: planId }, - { projection: { rateLimitSettings: 1 } } - ); - - planRateLimitSettings = plan?.rateLimitSettings; - } - } - - return this.normalizeRateLimitSettings( - planRateLimitSettings, - workspaceRateLimitSettings, - projectRateLimitSettings - ); - } - - /** - * Normalize rate limit settings shape from database. - * - * @param rateLimitLayers - raw settings documents in priority order - */ - private normalizeRateLimitSettings( - ...rateLimitLayers: { N: number, T: number }[] - ): { eventsLimit: number; eventsPeriod: number } | null { - let eventsLimit = 0; - let eventsPeriod = 0; - - for (const layer of rateLimitLayers) { - if (!layer) { - continue; - } - - const limit = layer.N as number; - const period = layer.T as number; - - if (limit !== undefined && limit > 0) { - eventsLimit = limit; - } - - if (period !== undefined && period > 0) { - eventsPeriod = period; - } - } - - if (eventsLimit <= 0 || eventsPeriod <= 0) { - return null; - } - - return { - eventsLimit, - eventsPeriod, - }; - } - /** * Trims source code lines in event's backtrace to prevent memory leaks * diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index bed08418e..cc009cd59 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -111,57 +111,6 @@ export default class RedisHelper { return result === null; } - /** - * Increments redis counter used for rate limiting - * - * @param projectId - id of the project which event belongs to - * @param eventsPeriod - rate limit period configured for the project - * @param limit - current event count limit (from project / workspace / plan) - */ - public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { - const script = ` - local key = KEYS[1] - local field = ARGV[1] - local now = tonumber(ARGV[2]) - local period = tonumber(ARGV[3]) - local limit = tonumber(ARGV[4]) - - local current = redis.call('HGET', key, field) - - -- If no record yet, start a new window with count = 1 - if not current then - redis.call('HSET', key, field, now .. ':1') - return - end - - local timestamp, count = string.match(current, '(%d+):(%d+)') - timestamp = tonumber(timestamp) - count = tonumber(count) - - -- Check if we're in a new time window - if now - timestamp >= period then - redis.call('HSET', key, field, now .. ':1') - return - end - - -- Check if incrementing would exceed limit - if count + 1 > limit then - return - end - - -- Increment counter - redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - `; - - const key = 'rate_limits'; - const now = Math.floor(Date.now() / MS_IN_SEC); - - await this.redisClient.eval(script, { - keys: [ key ], - arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], - }); - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 34bad1ccf..2986bf7fd 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -143,48 +143,6 @@ describe('GrouperWorker', () => { let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; - const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await plansCollection.updateOne( - { _id: planIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await workspacesCollection.updateOne( - { _id: workspaceIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await projectsCollection.updateOne( - { _id: new mongodb.ObjectId(projectIdMock) }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - } - ); - }; - beforeAll(async () => { worker = new GrouperWorker(); @@ -229,13 +187,9 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); - delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); - await setPlanRateLimit(0, 0); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -821,175 +775,6 @@ describe('GrouperWorker', () => { }); }); - describe('Rate limits counter increment', () => { - const rateLimitsKey = 'rate_limits'; - - test('increments counter when handling an event', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 1_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); - } finally { - nowSpy.mockRestore(); - } - }); - - test('reuses window and increments while within limit', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 2_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(2); - } finally { - nowSpy.mockRestore(); - } - }); - - test('does not exceed configured limit within same window', async () => { - const eventsLimit = 3; - - await setProjectRateLimit(eventsLimit, 60); - - const currentTime = 3_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < eventsLimit + 2; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(eventsLimit); - } finally { - nowSpy.mockRestore(); - } - }); - - test('resets window after period elapses', async () => { - await setProjectRateLimit(5, 2); - - let currentTime = 4_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - currentTime += 3_000; // advance by 3 seconds - - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [timestamp, count] = (storedValue as string).split(':'); - - expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); - expect(Number(count)).toBe(1); - } finally { - nowSpy.mockRestore(); - } - }); - - test('uses workspace limits when project overrides are absent', async () => { - await setPlanRateLimit(10, 60); - await setWorkspaceRateLimit(3, 60); - await setProjectRateLimit(0, 0); - - const currentTime = 5_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 5; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(3); - } finally { - nowSpy.mockRestore(); - } - }); - - test('falls back to plan limits when workspace settings are empty', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); - - const currentTime = 6_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 6; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(4); - } finally { - nowSpy.mockRestore(); - } - }); - - test('prefers project limits over workspace and plan', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(6, 60); - await setProjectRateLimit(8, 60); - - const currentTime = 7_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 10; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(8); - } finally { - nowSpy.mockRestore(); - } - }); - }); - describe('Events-stored metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); From a8a31c850928eef18b43a35b1ea21a1dc0d9273b Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:08:15 +0300 Subject: [PATCH 07/11] chore(grouper): remove redundant mocks --- workers/grouper/tests/index.test.ts | 34 ----------------------------- 1 file changed, 34 deletions(-) diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2986bf7fd..af1c2cf24 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -58,41 +58,16 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ -const planIdMock = new mongodb.ObjectId(); -const workspaceIdMock = new mongodb.ObjectId(); - -const planMock = { - _id: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - -const workspaceMock = { - _id: workspaceIdMock, - tariffPlanId: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', - workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', - rateLimitSettings: { - N: 0, - T: 0, - }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -139,8 +114,6 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; - let workspacesCollection: Collection; - let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; beforeAll(async () => { @@ -160,17 +133,12 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); - workspacesCollection = accountsConnection.db().collection('workspaces'); - plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); - await plansCollection.insertOne(planMock); - await workspacesCollection.insertOne(workspaceMock); - /** * Insert mock project into accounts database */ @@ -857,8 +825,6 @@ describe('GrouperWorker', () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); - await workspacesCollection.deleteMany({}); - await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From f9b181d2bffb4414b3c55b66c64c90b62f23af74 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:09:47 +0300 Subject: [PATCH 08/11] chore(): eslint fix --- workers/grouper/src/redisHelper.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index cc009cd59..d15df9403 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,7 +2,6 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; -import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis From a583997f84be9436e0e1ef82e18073365a1cb6e5 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:12:44 +0300 Subject: [PATCH 09/11] chore(): change metric type --- workers/grouper/src/index.ts | 2 +- workers/grouper/tests/index.test.ts | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 05aff09b0..a0611383e 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -270,7 +270,7 @@ export default class GrouperWorker extends Worker { } } - await this.recordProjectMetrics(task.projectId, 'events-stored'); + await this.recordProjectMetrics(task.projectId, 'events-accepted'); } /** diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index af1c2cf24..6ad01812d 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -743,7 +743,7 @@ describe('GrouperWorker', () => { }); }); - describe('Events-stored metrics', () => { + describe('Events-accepted metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); @@ -754,27 +754,27 @@ describe('GrouperWorker', () => { const expectedLabels = { type: 'error', - status: 'events-stored', + status: 'events-accepted', project: projectIdMock, }; expect(safeTsAddSpy).toHaveBeenNthCalledWith( 1, - `ts:project-events-stored:${projectIdMock}:minutely`, + `ts:project-events-accepted:${projectIdMock}:minutely`, 1, expectedLabels, TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, - `ts:project-events-stored:${projectIdMock}:hourly`, + `ts:project-events-accepted:${projectIdMock}:hourly`, 1, expectedLabels, TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, - `ts:project-events-stored:${projectIdMock}:daily`, + `ts:project-events-accepted:${projectIdMock}:daily`, 1, expectedLabels, 90 * TimeMs.DAY @@ -799,7 +799,7 @@ describe('GrouperWorker', () => { try { await worker.handle(generateTask()); - expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure); expect(await eventsCollection.find().count()).toBe(1); } finally { safeTsAddSpy.mockRestore(); @@ -814,7 +814,7 @@ describe('GrouperWorker', () => { await worker.handle(generateTask()); expect(recordMetricsSpy).toHaveBeenCalledTimes(1); - expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted'); } finally { recordMetricsSpy.mockRestore(); } From 8ed970c057e680650a0a8ed52c63df64273a0f9f Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:44:50 +0300 Subject: [PATCH 10/11] Update workers/grouper/src/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workers/grouper/src/index.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index a0611383e..d0bdb4e65 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -324,13 +324,19 @@ export default class GrouperWorker extends Worker { }, ]; - for (const { key, label, retentionMs } of series) { - try { - await this.redis.safeTsAdd(key, 1, labels, retentionMs); - } catch (error) { - this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + const operations = series.map(({ key, label, retentionMs }) => ({ + label, + promise: this.redis.safeTsAdd(key, 1, labels, retentionMs), + })); + + const results = await Promise.allSettled(operations.map((op) => op.promise)); + + results.forEach((result, index) => { + if (result.status === 'rejected') { + const { label } = operations[index]; + this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason); } - } + }); } /** From 45e527abb003c8d8ef5da53f044b9ffb84ed9089 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 22:06:29 +0300 Subject: [PATCH 11/11] imp(): use lua for create if not exists, to avoid race-cond --- workers/grouper/src/redisHelper.ts | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index d15df9403..ec486ddb0 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -122,13 +122,16 @@ export default class RedisHelper { labels: Record, retentionMs = 0 ): Promise { - const exists = await this.redisClient.exists(key); + const script = ` + if redis.call('EXISTS', KEYS[1]) == 1 then + return 0 + end - if (exists > 0) { - return; - } + redis.call('TS.CREATE', KEYS[1], unpack(ARGV)) + return 1 + `; - const args: string[] = ['TS.CREATE', key]; + const args: string[] = []; if (retentionMs > 0) { args.push('RETENTION', Math.floor(retentionMs).toString()); @@ -136,7 +139,13 @@ export default class RedisHelper { args.push(...this.buildLabelArguments(labels)); - await this.redisClient.sendCommand(args); + await this.redisClient.eval( + script, + { + keys: [key], + arguments: args, + } + ); } /**