Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 441 additions & 0 deletions IMPLEMENTATION_ISSUE_623.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions backend/src/dal/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { assertApiKeyRepository } from './apiKeyRepository.js';
import { createSqliteApiKeyRepository } from './sqliteApiKeyRepository.js';
import { createSqliteFailedJobRepository } from './sqliteFailedJobRepository.js';
import { createSqliteVariantRepository } from './sqliteVariantRepository.js';
import { createSqliteCohortRepository } from './sqliteCohortRepository.js';
import { createPool, isPostgresUrl } from './pg/pgClient.js';
import { createSqliteAllowlistRepository } from './sqliteAllowlistRepository.js';

Expand Down Expand Up @@ -79,6 +80,7 @@ export async function createDal({
webhooks: webhookRepository ?? new WebhookRepository(db),
referrals: createSqliteReferralRepository({ db }),
variants: createSqliteVariantRepository({ db }),
cohorts: createSqliteCohortRepository({ db }),
apiKeys: assertApiKeyRepository(apiKeyRepository ?? createSqliteApiKeyRepository({ db })),
failedJobs: failedJobRepository ?? createSqliteFailedJobRepository({ db }),
allowlists: allowlistRepository ?? createSqliteAllowlistRepository({ db }),
Expand Down
258 changes: 258 additions & 0 deletions backend/src/dal/sqliteCohortRepository.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// @ts-check

/**
* Repository for cohort and retention data access
* @param {{db: any}} params
*/
export function createSqliteCohortRepository({ db }) {
/**
* Record a user activity event
* @param {object} params
* @param {number} params.campaignId
* @param {string} params.userAddress
* @param {string} params.activityType - 'registered', 'claimed', 'active'
* @param {string} params.occurredAt - ISO 8601 timestamp
* @param {number} [params.ledger]
* @param {string} [params.txHash]
* @param {object} [params.metadata]
*/
function recordActivity({
campaignId,
userAddress,
activityType,
occurredAt,
ledger,
txHash,
metadata = {},
}) {
const stmt = db.prepare(`
INSERT INTO user_activities
(campaign_id, user_address, activity_type, occurred_at, ledger, tx_hash, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);

return stmt.run(
Number(campaignId),
userAddress,
activityType,
occurredAt,
ledger || null,
txHash || null,
JSON.stringify(metadata),
);
}

/**
* Get cohort size for a specific period
* @param {number} campaignId
* @param {string} cohortPeriod - e.g., '2024-W01', '2024-01', '2024-01-01'
* @param {string} granularity - 'day', 'week', 'month'
* @returns {number}
*/
function getCohortSize(campaignId, cohortPeriod, granularity) {
const stmt = db.prepare(`
SELECT cohort_size
FROM cohort_stats
WHERE campaign_id = ? AND cohort_period = ? AND granularity = ?
`);

const row = stmt.get(Number(campaignId), cohortPeriod, granularity);
return row?.cohort_size || 0;
}

/**
* Save precomputed cohort statistics
* @param {object} params
* @param {number} params.campaignId
* @param {string} params.cohortPeriod
* @param {number} params.cohortSize
* @param {string} params.granularity
* @param {string} params.periodStart
* @param {string} params.periodEnd
*/
function saveCohortStats({
campaignId,
cohortPeriod,
cohortSize,
granularity,
periodStart,
periodEnd,
}) {
const stmt = db.prepare(`
INSERT INTO cohort_stats
(campaign_id, cohort_period, cohort_size, granularity, period_start, period_end)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id, cohort_period, granularity)
DO UPDATE SET
cohort_size = excluded.cohort_size,
period_start = excluded.period_start,
period_end = excluded.period_end,
computed_at = datetime('now')
`);

return stmt.run(
Number(campaignId),
cohortPeriod,
cohortSize,
granularity,
periodStart,
periodEnd,
);
}

/**
* Save precomputed retention data
* @param {object} params
* @param {number} params.campaignId
* @param {string} params.cohortPeriod
* @param {number} params.offsetPeriod
* @param {string} params.metricType - 'claimed', 'active'
* @param {number} params.userCount
* @param {string} params.granularity
*/
function saveRetentionData({
campaignId,
cohortPeriod,
offsetPeriod,
metricType,
userCount,
granularity,
}) {
const stmt = db.prepare(`
INSERT INTO retention_data
(campaign_id, cohort_period, offset_period, metric_type, user_count, granularity)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(campaign_id, cohort_period, offset_period, metric_type, granularity)
DO UPDATE SET
user_count = excluded.user_count,
computed_at = datetime('now')
`);

return stmt.run(
Number(campaignId),
cohortPeriod,
offsetPeriod,
metricType,
userCount,
granularity,
);
}

/**
* Get retention data for a cohort
* @param {number} campaignId
* @param {string} cohortPeriod
* @param {string} metricType
* @param {string} granularity
* @returns {Array<{offsetPeriod: number, userCount: number, retentionRate: number}>}
*/
function getRetentionData(campaignId, cohortPeriod, metricType, granularity) {
const stmt = db.prepare(`
SELECT
r.offset_period as offsetPeriod,
r.user_count as userCount,
c.cohort_size as cohortSize,
CASE
WHEN c.cohort_size > 0
THEN CAST(r.user_count AS REAL) / c.cohort_size
ELSE 0
END as retentionRate
FROM retention_data r
JOIN cohort_stats c
ON r.campaign_id = c.campaign_id
AND r.cohort_period = c.cohort_period
AND r.granularity = c.granularity
WHERE r.campaign_id = ?
AND r.cohort_period = ?
AND r.metric_type = ?
AND r.granularity = ?
ORDER BY r.offset_period ASC
`);

return stmt.all(Number(campaignId), cohortPeriod, metricType, granularity);
}

/**
* Get all cohort periods for a campaign
* @param {number} campaignId
* @param {string} granularity
* @returns {Array<{cohortPeriod: string, cohortSize: number, periodStart: string, periodEnd: string}>}
*/
function getCohorts(campaignId, granularity) {
const stmt = db.prepare(`
SELECT
cohort_period as cohortPeriod,
cohort_size as cohortSize,
period_start as periodStart,
period_end as periodEnd
FROM cohort_stats
WHERE campaign_id = ? AND granularity = ?
ORDER BY period_start ASC
`);

return stmt.all(Number(campaignId), granularity);
}

/**
* Get user activities for analysis
* @param {object} params
* @param {number} params.campaignId
* @param {string} [params.startDate]
* @param {string} [params.endDate]
* @param {string} [params.activityType]
* @returns {Array<{userAddress: string, activityType: string, occurredAt: string}>}
*/
function getUserActivities({ campaignId, startDate, endDate, activityType }) {
let sql = `
SELECT
user_address as userAddress,
activity_type as activityType,
occurred_at as occurredAt
FROM user_activities
WHERE campaign_id = ?
`;

/** @type {Array<number | string>} */
const params = [Number(campaignId)];

if (startDate) {
sql += ` AND occurred_at >= ?`;
params.push(startDate);
}

if (endDate) {
sql += ` AND occurred_at < ?`;
params.push(endDate);
}

if (activityType) {
sql += ` AND activity_type = ?`;
params.push(activityType);
}

sql += ` ORDER BY occurred_at ASC`;

const stmt = db.prepare(sql);
return stmt.all(...params);
}

/**
* Clear cached cohort and retention data for recomputation
* @param {number} campaignId
*/
function clearCache(campaignId) {
db.prepare(`DELETE FROM cohort_stats WHERE campaign_id = ?`).run(Number(campaignId));
db.prepare(`DELETE FROM retention_data WHERE campaign_id = ?`).run(Number(campaignId));
}

return {
recordActivity,
getCohortSize,
saveCohortStats,
saveRetentionData,
getRetentionData,
getCohorts,
getUserActivities,
clearCache,
};
}
92 changes: 92 additions & 0 deletions backend/src/db/migrations/011_cohort_retention_tables.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
export const version = 11;
export const description = 'Add event tables for cohort and retention analysis';

export function up(db) {
// Create tables for tracking user events for cohort analysis
db.exec(`
-- User registration/activity tracking
CREATE TABLE IF NOT EXISTS user_activities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER NOT NULL,
user_address TEXT NOT NULL,
activity_type TEXT NOT NULL, -- 'registered', 'claimed', 'active'
occurred_at TEXT NOT NULL, -- ISO 8601 timestamp
ledger INTEGER,
tx_hash TEXT,
metadata TEXT, -- JSON for additional data
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (campaign_id) REFERENCES campaigns(id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_user_activities_campaign
ON user_activities(campaign_id);
CREATE INDEX IF NOT EXISTS idx_user_activities_user
ON user_activities(campaign_id, user_address);
CREATE INDEX IF NOT EXISTS idx_user_activities_type
ON user_activities(campaign_id, activity_type);
CREATE INDEX IF NOT EXISTS idx_user_activities_occurred
ON user_activities(campaign_id, occurred_at);
CREATE INDEX IF NOT EXISTS idx_user_activities_user_type
ON user_activities(campaign_id, user_address, activity_type);

-- Precomputed cohort statistics for performance
CREATE TABLE IF NOT EXISTS cohort_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER NOT NULL,
cohort_period TEXT NOT NULL, -- e.g., '2024-W01', '2024-01', '2024-01-01'
cohort_size INTEGER NOT NULL DEFAULT 0,
granularity TEXT NOT NULL, -- 'day', 'week', 'month'
period_start TEXT NOT NULL, -- ISO 8601 timestamp
period_end TEXT NOT NULL, -- ISO 8601 timestamp
computed_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(campaign_id, cohort_period, granularity),
FOREIGN KEY (campaign_id) REFERENCES campaigns(id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_cohort_stats_campaign
ON cohort_stats(campaign_id);
CREATE INDEX IF NOT EXISTS idx_cohort_stats_period
ON cohort_stats(campaign_id, cohort_period);

-- Precomputed retention data
CREATE TABLE IF NOT EXISTS retention_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER NOT NULL,
cohort_period TEXT NOT NULL,
offset_period INTEGER NOT NULL, -- 0, 1, 2, 3... (weeks/days/months after cohort)
metric_type TEXT NOT NULL, -- 'claimed', 'active'
user_count INTEGER NOT NULL DEFAULT 0,
granularity TEXT NOT NULL, -- 'day', 'week', 'month'
computed_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(campaign_id, cohort_period, offset_period, metric_type, granularity),
FOREIGN KEY (campaign_id) REFERENCES campaigns(id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_retention_data_campaign
ON retention_data(campaign_id);
CREATE INDEX IF NOT EXISTS idx_retention_data_cohort
ON retention_data(campaign_id, cohort_period);
CREATE INDEX IF NOT EXISTS idx_retention_data_metric
ON retention_data(campaign_id, metric_type);
`);
}

export function down(db) {
db.exec(`
DROP INDEX IF EXISTS idx_retention_data_metric;
DROP INDEX IF EXISTS idx_retention_data_cohort;
DROP INDEX IF EXISTS idx_retention_data_campaign;
DROP TABLE IF EXISTS retention_data;

DROP INDEX IF EXISTS idx_cohort_stats_period;
DROP INDEX IF EXISTS idx_cohort_stats_campaign;
DROP TABLE IF EXISTS cohort_stats;

DROP INDEX IF EXISTS idx_user_activities_user_type;
DROP INDEX IF EXISTS idx_user_activities_occurred;
DROP INDEX IF EXISTS idx_user_activities_type;
DROP INDEX IF EXISTS idx_user_activities_user;
DROP INDEX IF EXISTS idx_user_activities_campaign;
DROP TABLE IF NOT EXISTS user_activities;
`);
}
Loading
Loading