Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions jobs/journalApplier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
const cron = require('node-cron');
const mongoose = require('mongoose');
const WriteJournal = require('../models/WriteJournal');
const consensusEngine = require('../services/consensusEngine');
const logger = require('../utils/structuredLogger');

/**
* Journal Applier Background Job
* Issue #769: Background worker finalizing journals into the main DB.
* Orchestrates the transition from "JOURNALED" to "APPLIED" state using consensus logic.
*/
class JournalApplier {
constructor() {
this.isProcessing = false;
this.modelMap = {
'TRANSACTION': 'Transaction',
'EXPENSE': 'Expense',
'WORKSPACE': 'Workspace',
'USER': 'User'
};
}

start() {
// Run every 30 seconds to flush the journal buffer
cron.schedule('*/30 * * * * *', async () => {
if (this.isProcessing) return;
this.isProcessing = true;

try {
await this.processBuffer();
} catch (err) {
logger.error('[JournalApplier] Buffer processing failed', { error: err.message });
} finally {
this.isProcessing = false;
}
});
console.log('✓ Journal Applier scheduled');
}

async processBuffer() {
const pending = await WriteJournal.find({ status: 'PENDING' })
.sort({ createdAt: 1 })
.limit(50);

if (pending.length === 0) return;

logger.info(`[JournalApplier] Applying ${pending.length} pending operations`);

for (const journal of pending) {
try {
await this.applyEntry(journal);
} catch (err) {
logger.error(`[JournalApplier] Failed to apply journal ${journal._id}`, { error: err.message });
journal.status = 'CONFLICT';
journal.retryCount += 1;
await journal.save();
}
}
}

async applyEntry(journal) {
const modelName = this.modelMap[journal.entityType];
if (!modelName) throw new Error(`Unknown entity type: ${journal.entityType}`);

const Model = mongoose.model(modelName);
let entity = await Model.findById(journal.entityId);

// Special case for CREATE: entity won't exist yet
if (journal.operation === 'CREATE') {
if (entity) {
journal.status = 'STALE'; // Already exists
} else {
await Model.create({ ...journal.payload, _id: journal.entityId, vectorClock: journal.vectorClock });
journal.status = 'APPLIED';
}
} else {
if (!entity) {
journal.status = 'STALE';
} else {
const result = await consensusEngine.reconcile(entity, journal);

if (result.action === 'APPLY') {
if (journal.operation === 'UPDATE') {
Object.assign(entity, journal.payload);
} else if (journal.operation === 'DELETE') {
await entity.remove();
journal.status = 'APPLIED';
await journal.save();
return;
}

// Unified clock update logic
entity.vectorClock = journal.vectorClock;
await entity.save();
journal.status = 'APPLIED';
} else if (result.action === 'CONFLICT') {
journal.status = 'CONFLICT';
} else {
journal.status = 'STALE';
}
}
}

journal.appliedAt = new Date();
await journal.save();
}
}

module.exports = new JournalApplier();
44 changes: 44 additions & 0 deletions middleware/journalInterceptor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const WriteJournal = require('../models/WriteJournal');
const logger = require('../utils/structuredLogger');

/**
* Journal Interceptor Middleware
* Issue #769: Intercepting mutations to redirect to the log.
* Prevents direct DB writes by converting specific POST/PUT requests into journals.
*/
const journalInterceptor = async (req, res, next) => {
// Only intercept mutations aimed at collaborative entities
const mutableEntities = ['transactions', 'expenses', 'workspaces'];
const entityType = req.path.split('/')[2]; // Expected structure: /api/expenses/...

if ((req.method === 'POST' || req.method === 'PUT') && mutableEntities.includes(entityType)) {
// Check if the request should be journaled (e.g., has collaborative header)
if (req.headers['x-journal-deferred'] === 'true') {
try {
const journal = await WriteJournal.create({
entityId: req.params.id || new require('mongoose').Types.ObjectId(),
entityType: entityType.toUpperCase().slice(0, -1),
operation: req.method === 'POST' ? 'CREATE' : 'UPDATE',
payload: req.body,
vectorClock: req.body.vectorClock || {},
workspaceId: req.headers['x-workspace-id'] || req.user.activeWorkspace,
userId: req.user._id,
status: 'PENDING'
});

return res.status(202).json({
message: 'Update accepted for consensus processing',
journalId: journal._id,
status: 'JOURNALED'
});
} catch (err) {
logger.error('Failed to intercept and journal request', { error: err.message });
// Fallback to normal execution if journaling fails
}
}
}

next();
};

module.exports = journalInterceptor;
3 changes: 3 additions & 0 deletions models/Workspace.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ const workspaceSchema = new mongoose.Schema({
// Issue #741: Atomic Invalidation Tracking
cacheEpoch: { type: Number, default: 0 },

// Issue #769: Distributed Write-Ahead Journaling
journalVersionEpoch: { type: Number, default: 0 },

// Hierarchy fields (#629)
parentWorkspace: {
type: mongoose.Schema.Types.ObjectId,
Expand Down
63 changes: 63 additions & 0 deletions models/WriteJournal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const mongoose = require('mongoose');

/**
* WriteJournal Model
* Issue #769: Persistent buffer for pending state changes.
* Part of the "Journal-First" architecture to handle distributed concurrency.
*/
const writeJournalSchema = new mongoose.Schema({
entityId: {
type: mongoose.Schema.Types.ObjectId,
required: true,
index: true
},
entityType: {
type: String,
required: true,
enum: ['TRANSACTION', 'EXPENSE', 'WORKSPACE', 'USER']
},
operation: {
type: String,
required: true,
enum: ['CREATE', 'UPDATE', 'DELETE']
},
payload: {
type: mongoose.Schema.Types.Mixed,
required: true
},
vectorClock: {
type: Map,
of: Number,
default: {}
},
status: {
type: String,
enum: ['PENDING', 'APPLIED', 'CONFLICT', 'STALE'],
default: 'PENDING',
index: true
},
workspaceId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'Workspace',
required: true,
index: true
},
userId: {
type: mongoose.Schema.Types.ObjectId,
ref: 'User',
required: true
},
retryCount: {
type: Number,
default: 0
},
appliedAt: Date
}, {
timestamps: true
});

// Indexes for consensus retrieval
writeJournalSchema.index({ entityId: 1, status: 1 });
writeJournalSchema.index({ workspaceId: 1, createdAt: 1 });

module.exports = mongoose.model('WriteJournal', writeJournalSchema);
55 changes: 47 additions & 8 deletions repositories/baseRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ class BaseRepository {
}

/**
* Create a new document (with vault encryption interception)
* Create a new document (with vault encryption and journaling support)
*/
async create(data) {
async create(data, options = {}) {
if (options.deferred) {
return await this._journalMutation('CREATE', data, options);
}
let processedData = await this._encryptSensitiveFields(data);
const document = new this.model(processedData);
let saved = await document.save();
Expand Down Expand Up @@ -128,16 +131,24 @@ class BaseRepository {
}

/**
* Update a document by ID
* Update a document by ID (with journaling support)
*/
async updateById(id, data, options = { new: true, runValidators: true }) {
if (options.deferred) {
return await this._journalMutation('UPDATE', { ...data, _id: id }, options);
}
return await this.model.findByIdAndUpdate(id, data, options);
}

/**
* Update one document by filters
* Update one document by filters (with journaling support)
*/
async updateOne(filters, data, options = { new: true, runValidators: true }) {
if (options.deferred) {
const doc = await this.model.findOne(filters);
if (!doc) throw new Error('Document not found for deferred update');
return await this._journalMutation('UPDATE', { ...data, _id: doc._id }, options);
}
return await this.model.findOneAndUpdate(filters, data, options);
}

Expand All @@ -149,16 +160,24 @@ class BaseRepository {
}

/**
* Delete a document by ID
* Delete a document by ID (with journaling support)
*/
async deleteById(id) {
async deleteById(id, options = {}) {
if (options.deferred) {
return await this._journalMutation('DELETE', { _id: id }, options);
}
return await this.model.findByIdAndDelete(id);
}

/**
* Delete one document by filters
* Delete one document by filters (with journaling support)
*/
async deleteOne(filters) {
async deleteOne(filters, options = {}) {
if (options.deferred) {
const doc = await this.model.findOne(filters);
if (!doc) throw new Error('Document not found for deferred delete');
return await this._journalMutation('DELETE', { _id: doc._id }, options);
}
return await this.model.findOneAndDelete(filters);
}

Expand Down Expand Up @@ -243,6 +262,26 @@ class BaseRepository {
async executeQuery(queryFn) {
return await queryFn(this.model);
}

/**
* Helper: Record mutation in journal instead of direct DB write
*/
async _journalMutation(operation, data, options = {}) {
const WriteJournal = require('../models/WriteJournal');
const mongoose = require('mongoose');

const journal = await WriteJournal.create({
entityId: data._id || options.entityId || new mongoose.Types.ObjectId(),
entityType: this.model.modelName.toUpperCase(),
operation,
payload: data,
vectorClock: options.vectorClock || {},
workspaceId: options.workspaceId || data.workspace,
userId: options.userId,
status: 'PENDING'
});
return { journalId: journal._id, status: 'JOURNALED', deferred: true };
}
}

module.exports = BaseRepository;
72 changes: 72 additions & 0 deletions routes/conflicts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const express = require('express');
const router = express.Router();
const auth = require('../middleware/auth');
const WriteJournal = require('../models/WriteJournal');
const ResponseFactory = require('../utils/responseFactory');
const mongoose = require('mongoose');

/**
* Conflict Resolution Routes
* Issue #769: API for manual resolution of unresolvable writes.
*/

/**
* @route GET /api/conflicts
* @desc Get all unresolved conflicts for the current workspace
*/
router.get('/', auth, async (req, res) => {
try {
const workspaceId = req.headers['x-workspace-id'] || req.user.activeWorkspace;
const conflicts = await WriteJournal.find({
workspaceId,
status: 'CONFLICT'
}).sort({ createdAt: -1 });

return ResponseFactory.success(res, conflicts);
} catch (error) {
return ResponseFactory.error(res, 500, error.message);
}
});

/**
* @route POST /api/conflicts/:id/resolve
* @desc Manually resolve a conflict by choosing a payload
*/
router.post('/:id/resolve', auth, async (req, res) => {
try {
const journal = await WriteJournal.findById(req.params.id);
if (!journal) return ResponseFactory.error(res, 404, 'Conflict record not found');

const { resolution, customPayload } = req.body; // 'USE_JOURNAL', 'USE_CURRENT', 'CUSTOM'

const modelName = journal.entityType.charAt(0) + journal.entityType.slice(1).toLowerCase();
const Model = mongoose.model(modelName);
const entity = await Model.findById(journal.entityId);

if (resolution === 'USE_JOURNAL') {
if (entity) {
Object.assign(entity, journal.payload);
entity.vectorClock = journal.vectorClock;
await entity.save();
} else if (journal.operation === 'CREATE') {
await Model.create({ ...journal.payload, _id: journal.entityId, vectorClock: journal.vectorClock });
}
} else if (resolution === 'CUSTOM') {
if (entity) {
Object.assign(entity, customPayload);
// Merge clocks or reset? For custom, we usually bump the server clock
await entity.save();
}
}

journal.status = 'APPLIED';
journal.appliedAt = new Date();
await journal.save();

return ResponseFactory.success(res, { message: 'Conflict resolved successfully' });
} catch (error) {
return ResponseFactory.error(res, 500, error.message);
}
});

module.exports = router;
Loading