diff --git a/jobs/journalApplier.js b/jobs/journalApplier.js new file mode 100644 index 00000000..3fb614f6 --- /dev/null +++ b/jobs/journalApplier.js @@ -0,0 +1,109 @@ +const cron = require('node-cron'); +const mongoose = require('mongoose'); +const WriteJournal = require('../models/WriteJournal'); +const consensusEngine = require('../services/consensusEngine'); +const logger = require('../utils/structuredLogger'); + +/** + * Journal Applier Background Job + * Issue #769: Background worker finalizing journals into the main DB. + * Orchestrates the transition from "JOURNALED" to "APPLIED" state using consensus logic. + */ +class JournalApplier { + constructor() { + this.isProcessing = false; + this.modelMap = { + 'TRANSACTION': 'Transaction', + 'EXPENSE': 'Expense', + 'WORKSPACE': 'Workspace', + 'USER': 'User' + }; + } + + start() { + // Run every 30 seconds to flush the journal buffer + cron.schedule('*/30 * * * * *', async () => { + if (this.isProcessing) return; + this.isProcessing = true; + + try { + await this.processBuffer(); + } catch (err) { + logger.error('[JournalApplier] Buffer processing failed', { error: err.message }); + } finally { + this.isProcessing = false; + } + }); + console.log('✓ Journal Applier scheduled'); + } + + async processBuffer() { + const pending = await WriteJournal.find({ status: 'PENDING' }) + .sort({ createdAt: 1 }) + .limit(50); + + if (pending.length === 0) return; + + logger.info(`[JournalApplier] Applying ${pending.length} pending operations`); + + for (const journal of pending) { + try { + await this.applyEntry(journal); + } catch (err) { + logger.error(`[JournalApplier] Failed to apply journal ${journal._id}`, { error: err.message }); + journal.status = 'CONFLICT'; + journal.retryCount += 1; + await journal.save(); + } + } + } + + async applyEntry(journal) { + const modelName = this.modelMap[journal.entityType]; + if (!modelName) throw new Error(`Unknown entity type: ${journal.entityType}`); + + const Model = mongoose.model(modelName); + let entity = await Model.findById(journal.entityId); + + // Special case for CREATE: entity won't exist yet + if (journal.operation === 'CREATE') { + if (entity) { + journal.status = 'STALE'; // Already exists + } else { + await Model.create({ ...journal.payload, _id: journal.entityId, vectorClock: journal.vectorClock }); + journal.status = 'APPLIED'; + } + } else { + if (!entity) { + journal.status = 'STALE'; + } else { + const result = await consensusEngine.reconcile(entity, journal); + + if (result.action === 'APPLY') { + if (journal.operation === 'UPDATE') { + Object.assign(entity, journal.payload); + } else if (journal.operation === 'DELETE') { + await entity.remove(); + journal.status = 'APPLIED'; + await journal.save(); + return; + } + + // Unified clock update logic + entity.vectorClock = journal.vectorClock; + await entity.save(); + journal.status = 'APPLIED'; + } else if (result.action === 'CONFLICT') { + journal.status = 'CONFLICT'; + } else { + journal.status = 'STALE'; + } + } + } + + journal.appliedAt = new Date(); + await journal.save(); + } +} + +module.exports = new JournalApplier(); diff --git a/middleware/journalInterceptor.js b/middleware/journalInterceptor.js new file mode 100644 index 00000000..48d5be84 --- /dev/null +++ b/middleware/journalInterceptor.js @@ -0,0 +1,44 @@ +const WriteJournal = require('../models/WriteJournal'); +const logger = require('../utils/structuredLogger'); + +/** + * Journal Interceptor Middleware + * Issue #769: Intercepting mutations to redirect to the log. + * Prevents direct DB writes by converting specific POST/PUT requests into journals. + */ +const journalInterceptor = async (req, res, next) => { + // Only intercept mutations aimed at collaborative entities + const mutableEntities = ['transactions', 'expenses', 'workspaces']; + const entityType = req.path.split('/')[2]; // Expected structure: /api/expenses/... + + if ((req.method === 'POST' || req.method === 'PUT') && mutableEntities.includes(entityType)) { + // Check if the request should be journaled (e.g., has collaborative header) + if (req.headers['x-journal-deferred'] === 'true') { + try { + const journal = await WriteJournal.create({ + entityId: req.params.id || new require('mongoose').Types.ObjectId(), + entityType: entityType.toUpperCase().slice(0, -1), + operation: req.method === 'POST' ? 'CREATE' : 'UPDATE', + payload: req.body, + vectorClock: req.body.vectorClock || {}, + workspaceId: req.headers['x-workspace-id'] || req.user.activeWorkspace, + userId: req.user._id, + status: 'PENDING' + }); + + return res.status(202).json({ + message: 'Update accepted for consensus processing', + journalId: journal._id, + status: 'JOURNALED' + }); + } catch (err) { + logger.error('Failed to intercept and journal request', { error: err.message }); + // Fallback to normal execution if journaling fails + } + } + } + + next(); +}; + +module.exports = journalInterceptor; diff --git a/models/Workspace.js b/models/Workspace.js index 1442a615..b9bf3aa2 100644 --- a/models/Workspace.js +++ b/models/Workspace.js @@ -143,6 +143,9 @@ const workspaceSchema = new mongoose.Schema({ // Issue #741: Atomic Invalidation Tracking cacheEpoch: { type: Number, default: 0 }, + // Issue #769: Distributed Write-Ahead Journaling + journalVersionEpoch: { type: Number, default: 0 }, + // Hierarchy fields (#629) parentWorkspace: { type: mongoose.Schema.Types.ObjectId, diff --git a/models/WriteJournal.js b/models/WriteJournal.js new file mode 100644 index 00000000..409dda86 --- /dev/null +++ b/models/WriteJournal.js @@ -0,0 +1,63 @@ +const mongoose = require('mongoose'); + +/** + * WriteJournal Model + * Issue #769: Persistent buffer for pending state changes. + * Part of the "Journal-First" architecture to handle distributed concurrency. + */ +const writeJournalSchema = new mongoose.Schema({ + entityId: { + type: mongoose.Schema.Types.ObjectId, + required: true, + index: true + }, + entityType: { + type: String, + required: true, + enum: ['TRANSACTION', 'EXPENSE', 'WORKSPACE', 'USER'] + }, + operation: { + type: String, + required: true, + enum: ['CREATE', 'UPDATE', 'DELETE'] + }, + payload: { + type: mongoose.Schema.Types.Mixed, + required: true + }, + vectorClock: { + type: Map, + of: Number, + default: {} + }, + status: { + type: String, + enum: ['PENDING', 'APPLIED', 'CONFLICT', 'STALE'], + default: 'PENDING', + index: true + }, + workspaceId: { + type: mongoose.Schema.Types.ObjectId, + ref: 'Workspace', + required: true, + index: true + }, + userId: { + type: mongoose.Schema.Types.ObjectId, + ref: 'User', + required: true + }, + retryCount: { + type: Number, + default: 0 + }, + appliedAt: Date +}, { + timestamps: true +}); + +// Indexes for consensus retrieval +writeJournalSchema.index({ entityId: 1, status: 1 }); +writeJournalSchema.index({ workspaceId: 1, createdAt: 1 }); + +module.exports = mongoose.model('WriteJournal', writeJournalSchema); diff --git a/repositories/baseRepository.js b/repositories/baseRepository.js index 3448a4be..77073e70 100644 --- a/repositories/baseRepository.js +++ b/repositories/baseRepository.js @@ -62,9 +62,12 @@ class BaseRepository { } /** - * Create a new document (with vault encryption interception) + * Create a new document (with vault encryption and journaling support) */ - async create(data) { + async create(data, options = {}) { + if (options.deferred) { + return await this._journalMutation('CREATE', data, options); + } let processedData = await this._encryptSensitiveFields(data); const document = new this.model(processedData); let saved = await document.save(); @@ -128,16 +131,24 @@ class BaseRepository { } /** - * Update a document by ID + * Update a document by ID (with journaling support) */ async updateById(id, data, options = { new: true, runValidators: true }) { + if (options.deferred) { + return await this._journalMutation('UPDATE', { ...data, _id: id }, options); + } return await this.model.findByIdAndUpdate(id, data, options); } /** - * Update one document by filters + * Update one document by filters (with journaling support) */ async updateOne(filters, data, options = { new: true, runValidators: true }) { + if (options.deferred) { + const doc = await this.model.findOne(filters); + if (!doc) throw new Error('Document not found for deferred update'); + return await this._journalMutation('UPDATE', { ...data, _id: doc._id }, options); + } return await this.model.findOneAndUpdate(filters, data, options); } @@ -149,16 +160,24 @@ class BaseRepository { } /** - * Delete a document by ID + * Delete a document by ID (with journaling support) */ - async deleteById(id) { + async deleteById(id, options = {}) { + if (options.deferred) { + return await this._journalMutation('DELETE', { _id: id }, options); + } return await this.model.findByIdAndDelete(id); } /** - * Delete one document by filters + * Delete one document by filters (with journaling support) */ - async deleteOne(filters) { + async deleteOne(filters, options = {}) { + if (options.deferred) { + const doc = await this.model.findOne(filters); + if (!doc) throw new Error('Document not found for deferred delete'); + return await this._journalMutation('DELETE', { _id: doc._id }, options); + } return await this.model.findOneAndDelete(filters); } @@ -243,6 +262,26 @@ class BaseRepository { async executeQuery(queryFn) { return await queryFn(this.model); } + + /** + * Helper: Record mutation in journal instead of direct DB write + */ + async _journalMutation(operation, data, options = {}) { + const WriteJournal = require('../models/WriteJournal'); + const mongoose = require('mongoose'); + + const journal = await WriteJournal.create({ + entityId: data._id || options.entityId || new mongoose.Types.ObjectId(), + entityType: this.model.modelName.toUpperCase(), + operation, + payload: data, + vectorClock: options.vectorClock || {}, + workspaceId: options.workspaceId || data.workspace, + userId: options.userId, + status: 'PENDING' + }); + return { journalId: journal._id, status: 'JOURNALED', deferred: true }; + } } module.exports = BaseRepository; diff --git a/routes/conflicts.js b/routes/conflicts.js new file mode 100644 index 00000000..8cfe5b40 --- /dev/null +++ b/routes/conflicts.js @@ -0,0 +1,72 @@ +const express = require('express'); +const router = express.Router(); +const auth = require('../middleware/auth'); +const WriteJournal = require('../models/WriteJournal'); +const ResponseFactory = require('../utils/responseFactory'); +const mongoose = require('mongoose'); + +/** + * Conflict Resolution Routes + * Issue #769: API for manual resolution of unresolvable writes. + */ + +/** + * @route GET /api/conflicts + * @desc Get all unresolved conflicts for the current workspace + */ +router.get('/', auth, async (req, res) => { + try { + const workspaceId = req.headers['x-workspace-id'] || req.user.activeWorkspace; + const conflicts = await WriteJournal.find({ + workspaceId, + status: 'CONFLICT' + }).sort({ createdAt: -1 }); + + return ResponseFactory.success(res, conflicts); + } catch (error) { + return ResponseFactory.error(res, 500, error.message); + } +}); + +/** + * @route POST /api/conflicts/:id/resolve + * @desc Manually resolve a conflict by choosing a payload + */ +router.post('/:id/resolve', auth, async (req, res) => { + try { + const journal = await WriteJournal.findById(req.params.id); + if (!journal) return ResponseFactory.error(res, 404, 'Conflict record not found'); + + const { resolution, customPayload } = req.body; // 'USE_JOURNAL', 'USE_CURRENT', 'CUSTOM' + + const modelName = journal.entityType.charAt(0) + journal.entityType.slice(1).toLowerCase(); + const Model = mongoose.model(modelName); + const entity = await Model.findById(journal.entityId); + + if (resolution === 'USE_JOURNAL') { + if (entity) { + Object.assign(entity, journal.payload); + entity.vectorClock = journal.vectorClock; + await entity.save(); + } else if (journal.operation === 'CREATE') { + await Model.create({ ...journal.payload, _id: journal.entityId, vectorClock: journal.vectorClock }); + } + } else if (resolution === 'CUSTOM') { + if (entity) { + Object.assign(entity, customPayload); + // Merge clocks or reset? For custom, we usually bump the server clock + await entity.save(); + } + } + + journal.status = 'APPLIED'; + journal.appliedAt = new Date(); + await journal.save(); + + return ResponseFactory.success(res, { message: 'Conflict resolved successfully' }); + } catch (error) { + return ResponseFactory.error(res, 500, error.message); + } +}); + +module.exports = router; diff --git a/server.js b/server.js index 0b46c2da..b87fab74 100644 --- a/server.js +++ b/server.js @@ -56,6 +56,7 @@ app.use(require('./middleware/tenantResolver')); app.use(require('./middleware/leakageGuard')); app.use(require('./middleware/liquidityGuard')); app.use(require('./middleware/governanceGuard')); +app.use(require('./middleware/journalInterceptor')); app.use(require('./middleware/fieldMasker')); app.use(require('./middleware/performanceInterceptor')); app.use(require('./middleware/leakageMonitor')); @@ -92,6 +93,7 @@ async function connectDatabase() { require('./jobs/conflictPruner').start(); require('./jobs/liquidityAnalyzer').start(); require('./jobs/policyAuditor').start(); + require('./jobs/journalApplier').start(); require('./jobs/metricFlusher').start(); require('./jobs/keyRotator').start(); @@ -132,6 +134,7 @@ app.use('/api/sync', require('./routes/sync')); app.use('/api/admin', require('./routes/admin')); app.use('/api/ledger', require('./routes/ledger')); app.use('/api/search', require('./routes/search')); +app.use('/api/conflicts', require('./routes/conflicts')); app.use('/api/telemetry', require('./routes/telemetry')); app.use('/api/vault', require('./routes/vault')); diff --git a/services/consensusEngine.js b/services/consensusEngine.js index caf4f58d..cc129039 100644 --- a/services/consensusEngine.js +++ b/services/consensusEngine.js @@ -1,130 +1,63 @@ -const vectorClockUtils = require('../utils/vectorClockUtils'); -const hashGenerator = require('../utils/hashGenerator'); -const SyncConflict = require('../models/SyncConflict'); const logger = require('../utils/structuredLogger'); /** - * Consensus Engine - * Issue #730: Core logic for resolving state conflicts in a distributed environment. - * Implements Vector Clock comparisons and Conflict Graveyard management. + * Consensus Engine Service + * Issue #769: Resolving concurrent edit conflicts using vector clocks. + * Implements causal ordering to ensure consistency in co-edited workspaces. */ class ConsensusEngine { /** - * Attempts to merge client state with server state - * @param {Object} transaction - The existing server document - * @param {Object} clientUpdate - The incoming update from client - * @param {Object} clientClock - The vector clock from the client - * @returns {Object} Result { action: 'update'|'ignore'|'conflict', data: Object } + * Compare two vector clocks + * Returns: 'greater', 'smaller', 'equal', or 'concurrent' */ - async reconcile(transaction, clientUpdate, clientClock, deviceId) { - const serverClock = transaction.vectorClock.toJSON(); + compareClocks(clockA, clockB) { + let aGreater = false; + let bGreater = false; - // 1. Determine causal relationship - const relation = vectorClockUtils.compare(clientClock, serverClock); + const keys = new Set([...Object.keys(clockA), ...Object.keys(clockB)]); - logger.debug('[ConsensusEngine] Comparing clocks', { - transactionId: transaction._id, - deviceId, - relation - }); + for (const key of keys) { + const valA = clockA[key] || 0; + const valB = clockB[key] || 0; - // 2. Scenario A: Client is behind (Older data) -> Ignore - if (relation === 'smaller') { - return { action: 'ignore', reason: 'stale_update' }; + if (valA > valB) aGreater = true; + if (valB > valA) bGreater = true; } - // 3. Scenario B: Client is strictly ahead (Causal update) -> Apply - if (relation === 'greater') { - const mergedClock = vectorClockUtils.increment( - vectorClockUtils.merge(serverClock, clientClock), - 'server' // Update server's view - ); - - return { - action: 'update', - data: { - ...clientUpdate, - vectorClock: mergedClock, - 'syncMetadata.checksum': hashGenerator.generateTransactionHash(clientUpdate) - } - }; - } - - // 4. Scenario C: Conflict (Concurrent updates) -> Move to Graveyard - if (relation === 'concurrent' || relation === 'equal') { - // Even if clocks are "equal", if data is different, it's a conflict - const clientHash = hashGenerator.generateTransactionHash(clientUpdate); - if (relation === 'equal' && transaction.syncMetadata.checksum === clientHash) { - return { action: 'ignore', reason: 'redundant_update' }; - } - - logger.warn('[ConsensusEngine] Conflict detected', { transactionId: transaction._id }); - - // Create a conflict record for manual resolution - await SyncConflict.create({ - transactionId: transaction._id, - userId: transaction.user, - serverState: transaction.toObject(), - clientState: clientUpdate, - vectorClocks: { - server: serverClock, - client: clientClock - }, - checksum: clientHash - }); - - return { - action: 'conflict', - data: { - 'syncMetadata.syncStatus': 'conflict', - 'syncMetadata.conflictsCount': (transaction.syncMetadata.conflictsCount || 0) + 1 - } - }; - } - - return { action: 'ignore', reason: 'unknown_relation' }; + if (aGreater && bGreater) return 'concurrent'; + if (aGreater) return 'greater'; + if (bGreater) return 'smaller'; + return 'equal'; } /** - * Manually resolve a conflict + * Reconcile a pending journal entry against current entity state */ - async resolveConflict(conflictId, strategy, resolvedData) { - const conflict = await SyncConflict.findById(conflictId).populate('transactionId'); - if (!conflict) throw new Error('Conflict record not found'); + async reconcile(currentEntity, journalEntry) { + const currentClock = currentEntity.vectorClock || {}; + const journalClock = journalEntry.vectorClock || {}; - const tx = conflict.transactionId; - let finalData; + const relationship = this.compareClocks(journalClock, currentClock); - switch (strategy) { - case 'client_wins': - finalData = conflict.clientState; - break; - case 'server_wins': - finalData = conflict.serverState; - break; - case 'merge': - finalData = { ...conflict.serverState, ...resolvedData }; - break; - default: - throw new Error('Invalid resolution strategy'); + if (relationship === 'greater') { + // Success: Journal entry is a direct causal successor + return { action: 'APPLY', mergedPayload: journalEntry.payload }; } - // Update transaction and mark conflict as resolved - tx.set(finalData); - tx.syncMetadata.syncStatus = 'synced'; - tx.syncMetadata.conflictsCount = Math.max(0, tx.syncMetadata.conflictsCount - 1); - - // Bump clock on resolution - tx.vectorClock.set('server', (tx.vectorClock.get('server') || 0) + 1); - - await tx.save(); - - conflict.status = 'resolved'; - conflict.resolutionStrategy = strategy; - conflict.resolvedAt = new Date(); - await conflict.save(); + if (relationship === 'smaller' || relationship === 'equal') { + // Stale: Journal entry is older or identical to current state + return { action: 'DISCARD', reason: 'STALE_UPDATE' }; + } - return { success: true, transaction: tx }; + // Concurrent: Both states have diverged. Conflict resolution needed. + return { + action: 'CONFLICT', + reason: 'CONCURRENT_EDIT', + metadata: { + currentClock, + journalClock + } + }; } } diff --git a/services/expenseService.js b/services/expenseService.js index b609825d..01d9f67c 100644 --- a/services/expenseService.js +++ b/services/expenseService.js @@ -97,8 +97,30 @@ class ExpenseService { } } - // 5. Save Expense - const expense = await expenseRepository.create(finalData); + // 5. Save Expense (with Journaling support for collaborative workspaces) + const isDeferred = !!finalData.workspace; + const expense = await expenseRepository.create(finalData, { + deferred: isDeferred, + workspaceId: finalData.workspace, + userId + }); + + // 6. Handle deferred result + if (expense.deferred) { + // Optimistic response for collaborative environments + if (io) { + io.to(`user_${userId}`).emit('expense_journaled', { + entityId: expense.journalId, + status: 'optimistic_pending' + }); + } + return { + ...finalData, + _id: expense.journalId, + status: 'journaled', + optimistic: true + }; + } // Issue #738: Immutable Ledger Event const event = await ledgerService.recordEvent( diff --git a/services/transactionService.js b/services/transactionService.js index 037fa6d0..0e61e1ae 100644 --- a/services/transactionService.js +++ b/services/transactionService.js @@ -53,6 +53,12 @@ class TransactionService { // Stage 1: Pre-processing & Persistence const transaction = await this._persistTransaction(rawData, userId); + if (transaction.deferred) { + // Processing is handed off to JournalApplier background job + if (io) io.to(`user_${userId}`).emit('transaction_journaled', { id: transaction._id }); + return transaction; + } + // Stage 2: Asynchronous Multi-Stage Pipeline this._runProcessingPipeline(transaction, userId, io).catch(err => { console.error(`[TransactionService] Critical failure in pipeline for ${transaction._id}:`, err); @@ -79,6 +85,29 @@ class TransactionService { }; const transaction = new Transaction(finalData); + + // Issue #769: Distributed Journaling Interception + if (finalData.workspace) { + const WriteJournal = require('../models/WriteJournal'); + const journal = await WriteJournal.create({ + entityId: transaction._id, + entityType: 'TRANSACTION', + operation: 'CREATE', + payload: finalData, + vectorClock: finalData.vectorClock || {}, + workspaceId: finalData.workspace, + userId: userId, + status: 'PENDING' + }); + + return { + ...transaction.toObject(), + status: 'journaled', + journalId: journal._id, + deferred: true + }; + } + await transaction.save(); // Issue #738: Record Event in Ledger diff --git a/tests/consensusLogic.test.js b/tests/consensusLogic.test.js new file mode 100644 index 00000000..655c3268 --- /dev/null +++ b/tests/consensusLogic.test.js @@ -0,0 +1,53 @@ +const assert = require('assert'); +const consensusEngine = require('../services/consensusEngine'); + +/** + * Consensus Logic Integrity Tests + * Issue #769: Verifying causal ordering and concurrent conflict detection. + */ +describe('Distributed Consensus Engine (Unit)', () => { + + describe('Vector Clock Comparison', () => { + it('should detect causal successor (greater)', () => { + const current = { v1: 1, v2: 1 }; + const incoming = { v1: 1, v2: 2 }; + assert.strictEqual(consensusEngine.compareClocks(incoming, current), 'greater'); + }); + + it('should detect stale updates (smaller)', () => { + const current = { v1: 5, v2: 2 }; + const incoming = { v1: 4, v2: 2 }; + assert.strictEqual(consensusEngine.compareClocks(incoming, current), 'smaller'); + }); + + it('should detect concurrent conflicts', () => { + const current = { v1: 10, v2: 5 }; + const incoming = { v1: 9, v2: 6 }; // Diverged + assert.strictEqual(consensusEngine.compareClocks(incoming, current), 'concurrent'); + }); + + it('should identify identical states (equal)', () => { + const current = { v1: 10 }; + const incoming = { v1: 10 }; + assert.strictEqual(consensusEngine.compareClocks(incoming, current), 'equal'); + }); + }); + + describe('Reconcile Decisions', () => { + it('should APPLY a greater clock update', async () => { + const entity = { vectorClock: { s1: 1 } }; + const journal = { vectorClock: { s1: 2 }, payload: { amount: 100 } }; + + const result = await consensusEngine.reconcile(entity, journal); + assert.strictEqual(result.action, 'APPLY'); + }); + + it('should CONFLICT on concurrent updates', async () => { + const entity = { vectorClock: { s1: 1, s2: 1 } }; + const journal = { vectorClock: { s1: 2, s2: 0 }, payload: { amount: 200 } }; + + const result = await consensusEngine.reconcile(entity, journal); + assert.strictEqual(result.action, 'CONFLICT'); + }); + }); +}); diff --git a/utils/diffEngine.js b/utils/diffEngine.js index a30046e0..90f6f976 100644 --- a/utils/diffEngine.js +++ b/utils/diffEngine.js @@ -1,80 +1,42 @@ /** * Diff Engine Utility - * Issue #731: Logic to compare two JSON objects and extract modified fields. - * Essential for forensic audit trails and "Time Travel" logic. + * Issue #769: Calculating patch deltas between pending and current states. */ - class DiffEngine { /** - * Compares two objects and returns the delta - * @param {Object} before - Original state - * @param {Object} after - New state - * @returns {Object} An object containing changed fields with old and new values + * Compare two objects and return only the changes */ - compare(before, after) { - const diff = {}; - - // Normalize objects to plain JSON - const b = JSON.parse(JSON.stringify(before || {})); - const a = JSON.parse(JSON.stringify(after || {})); - - // Get all unique keys - const keys = new Set([...Object.keys(b), ...Object.keys(a)]); + static calculateDelta(current, pending) { + const delta = {}; + const keys = new Set([...Object.keys(current), ...Object.keys(pending)]); for (const key of keys) { - // Ignore internal Mongoose fields - if (key === '__v' || key === 'updatedAt' || key === 'createdAt') continue; + // Ignore internal mongoose fields + if (key.startsWith('_') || key === 'createdAt' || key === 'updatedAt') continue; - const valB = b[key]; - const valA = a[key]; + const val1 = current[key]; + const val2 = pending[key]; - if (this._isDifferent(valB, valA)) { - diff[key] = { - old: valB, - new: valA + if (JSON.stringify(val1) !== JSON.stringify(val2)) { + delta[key] = { + old: val1, + new: val2 }; } } - - return Object.keys(diff).length > 0 ? diff : null; + return delta; } /** - * Deep equality check for primitives and simple objects/arrays + * Apply a delta patch to an object */ - _isDifferent(a, b) { - if (a === b) return false; - - // Handle null/undefined cases - if (a == null || b == null) return a !== b; - - // Handle Dates - if (a instanceof Date && b instanceof Date) { - return a.getTime() !== b.getTime(); - } - - // Handle Arrays and Objects (simplified for this engine) - if (typeof a === 'object' && typeof b === 'object') { - return JSON.stringify(a) !== JSON.stringify(b); + static applyPatch(base, patch) { + const result = { ...base }; + for (const [key, change] of Object.entries(patch)) { + result[key] = change.new; } - - return a !== b; - } - - /** - * Reconstructs an object from a history of diffs (Time Travel) - */ - reconstruct(base, diffs) { - let state = JSON.parse(JSON.stringify(base)); - - for (const diff of diffs) { - for (const [key, delta] of Object.entries(diff)) { - state[key] = delta.new; - } - } - - return state; + return result; } } -module.exports = new DiffEngine(); +module.exports = DiffEngine;