Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion workers/grouper/src/data-filter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { EventAddons, EventData } from '@hawk.so/types';
import { unsafeFields } from '../../../lib/utils/unsafeFields';

/**
* Maximum depth for object traversal to prevent excessive memory allocations
*/
const MAX_TRAVERSAL_DEPTH = 20;

/**
* Recursively iterate through object and call function on each key
*
Expand All @@ -18,7 +23,12 @@ function forAll(obj: Record<string, unknown>, callback: (path: string[], key: st
if (!(typeof value === 'object' && !Array.isArray(value))) {
callback(path, key, current);
} else {
visit(value, [...path, key]);
/**
* Limit path depth to prevent excessive memory allocations from deep nesting
* This reduces GC pressure and memory usage for deeply nested objects
*/
const newPath = path.length < MAX_TRAVERSAL_DEPTH ? path.concat(key) : path;
visit(value, newPath);
}
}
};
Expand Down
42 changes: 40 additions & 2 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ import type { RepetitionDBScheme } from '../types/repetition';
import { DatabaseReadWriteError, DiffCalculationError, ValidationError } from '../../../lib/workerErrors';
import { decodeUnsafeFields, encodeUnsafeFields } from '../../../lib/utils/unsafeFields';
import { MS_IN_SEC } from '../../../lib/utils/consts';
import TimeMs from '../../../lib/utils/time';
import DataFilter from './data-filter';
import RedisHelper from './redisHelper';
import { computeDelta } from './utils/repetitionDiff';
import { rightTrim } from '../../../lib/utils/string';
import { hasValue } from '../../../lib/utils/hasValue';

/**
* eslint does not count decorators as a variable usage
*/
/* eslint-disable-next-line no-unused-vars */
import { memoize } from '../../../lib/memoize';

/**
* eslint does not count decorators as a variable usage
*/
/* eslint-disable-next-line no-unused-vars */
const MEMOIZATION_TTL = Number(process.env.MEMOIZATION_TTL ?? 0);
const MEMOIZATION_TTL = 600_000;

/**
* Cache cleanup interval in minutes
*/
const CACHE_CLEANUP_INTERVAL_MINUTES = 5;

/**
* Error code of MongoDB key duplication error
Expand Down Expand Up @@ -72,6 +82,11 @@ export default class GrouperWorker extends Worker {
*/
private redis = new RedisHelper();

/**
* Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth
*/
private cacheCleanupInterval: NodeJS.Timeout | null = null;

/**
* Start consuming messages
*/
Expand All @@ -85,13 +100,30 @@ export default class GrouperWorker extends Worker {

await this.redis.initialize();
console.log('redis initialized');

/**
* Start periodic cache cleanup to prevent memory leaks from unbounded cache growth
* Runs every 5 minutes to clear old cache entries
*/
this.cacheCleanupInterval = setInterval(() => {
this.clearCache();
}, CACHE_CLEANUP_INTERVAL_MINUTES * TimeMs.MINUTE);

await super.start();
}

/**
* Finish everything
*/
public async finish(): Promise<void> {
/**
* Clear cache cleanup interval to prevent resource leaks
*/
if (this.cacheCleanupInterval) {
clearInterval(this.cacheCleanupInterval);
this.cacheCleanupInterval = null;
}

await super.finish();
this.prepareCache();
await this.eventsDb.close();
Expand Down Expand Up @@ -237,6 +269,12 @@ export default class GrouperWorker extends Worker {
} as RepetitionDBScheme;

repetitionId = await this.saveRepetition(task.projectId, newRepetition);

/**
* Clear the large event payload references to allow garbage collection
* This prevents memory leaks from retaining full event objects after delta is computed
*/
delta = undefined;
}

/**
Expand Down Expand Up @@ -334,7 +372,7 @@ export default class GrouperWorker extends Worker {
* @param projectId - where to find
* @param title - title of the event to find similar one
*/
@memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'hash', skipCache: [undefined] })
@memoize({ max: 50, ttl: MEMOIZATION_TTL, strategy: 'hash', skipCache: [undefined] })
private async findSimilarEvent(projectId: string, title: string): Promise<GroupedEventDBScheme | undefined> {
/**
* If no match by Levenshtein, try matching by patterns
Expand Down
34 changes: 34 additions & 0 deletions workers/grouper/tests/data-filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,5 +327,39 @@ describe('GrouperWorker', () => {
expect(event.context['secret']).toBe('[filtered]');
expect(event.context['auth']).toBe('[filtered]');
});

test('should handle deeply nested objects (>20 levels) without excessive memory allocations', () => {
// Create an object nested deeper than the cap (>20 levels)
let deeplyNested: any = { value: 'leaf', secret: 'should-be-filtered' };

for (let i = 0; i < 25; i++) {
deeplyNested = { [`level${i}`]: deeplyNested, password: `sensitive${i}` };
}

const event = generateEvent({
context: deeplyNested,
});

// This should not throw or cause memory issues
dataFilter.processEvent(event);

// Verify that filtering still works at various depths
expect(event.context['password']).toBe('[filtered]');

// Navigate to a mid-level and check filtering
let current = event.context['level24'] as any;
for (let i = 24; i > 15; i--) {
expect(current['password']).toBe('[filtered]');
current = current[`level${i - 1}`];
}

// At the leaf level, the secret should still be filtered
// (though path tracking may be capped, filtering should still work)
let leaf = event.context;
for (let i = 24; i >= 0; i--) {
leaf = leaf[`level${i}`] as any;
}
expect(leaf['secret']).toBe('[filtered]');
});
});
});
Loading