From 297ee11be1737911a9ca799de393ef629ec29aa1 Mon Sep 17 00:00:00 2001 From: yusuftomilola Date: Fri, 19 Jun 2026 15:00:00 +0000 Subject: [PATCH] feat(analytics): add asset flow visualization engine Implement AssetFlowService under apps/backend/src/modules/analytics/flows/: - generateGraph(): builds FlowNode/FlowEdge graph from raw transfers - queryFlow(): filters transfers by address, chain, asset, date range - traceHistory(): returns address transfers sorted by timestamp Includes interfaces (AssetTransfer, FlowNode, FlowEdge, AssetFlowGraph) and unit tests covering graph generation, aggregation, filtering, and historical tracing. --- .../analytics/flows/asset-flow.module.ts | 8 ++ .../flows/asset-flow.service.spec.ts | 136 ++++++++++++++++++ .../analytics/flows/asset-flow.service.ts | 85 +++++++++++ .../flows/interfaces/asset-flow.interface.ts | 40 ++++++ 4 files changed, 269 insertions(+) create mode 100644 apps/backend/src/modules/analytics/flows/asset-flow.module.ts create mode 100644 apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts create mode 100644 apps/backend/src/modules/analytics/flows/asset-flow.service.ts create mode 100644 apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts diff --git a/apps/backend/src/modules/analytics/flows/asset-flow.module.ts b/apps/backend/src/modules/analytics/flows/asset-flow.module.ts new file mode 100644 index 0000000..64c1d43 --- /dev/null +++ b/apps/backend/src/modules/analytics/flows/asset-flow.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { AssetFlowService } from './asset-flow.service'; + +@Module({ + providers: [AssetFlowService], + exports: [AssetFlowService], +}) +export class AssetFlowModule {} \ No newline at end of file diff --git a/apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts b/apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts new file mode 100644 index 0000000..f7145e8 --- /dev/null +++ b/apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts @@ -0,0 +1,136 @@ +import 'reflect-metadata'; +import { Test, TestingModule } from '@nestjs/testing'; +import { AssetFlowService } from './asset-flow.service'; +import { AssetTransfer } from './interfaces/asset-flow.interface'; + +const TRANSFERS: AssetTransfer[] = [ + { + txHash: 'tx1', + fromAddress: 'ADDR_A', + toAddress: 'ADDR_B', + asset: 'XLM', + amount: 100, + timestamp: '2026-01-01T00:00:00Z', + chain: 'Stellar', + }, + { + txHash: 'tx2', + fromAddress: 'ADDR_B', + toAddress: 'ADDR_C', + asset: 'XLM', + amount: 50, + timestamp: '2026-01-02T00:00:00Z', + chain: 'Stellar', + }, + { + txHash: 'tx3', + fromAddress: 'ADDR_A', + toAddress: 'ADDR_B', + asset: 'USDC', + amount: 200, + timestamp: '2026-01-03T00:00:00Z', + chain: 'Stellar', + }, +]; + +describe('AssetFlowService', () => { + let service: AssetFlowService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [AssetFlowService], + }).compile(); + + service = module.get(AssetFlowService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('generateGraph', () => { + it('builds correct nodes from transfers', () => { + const graph = service.generateGraph(TRANSFERS); + const ids = graph.nodes.map(n => n.id).sort(); + expect(ids).toEqual(['ADDR_A', 'ADDR_B', 'ADDR_C'].sort()); + }); + + it('aggregates totalSent and totalReceived per node', () => { + const graph = service.generateGraph(TRANSFERS); + const a = graph.nodes.find(n => n.id === 'ADDR_A')!; + expect(a.totalSent).toBe(300); + expect(a.totalReceived).toBe(0); + const b = graph.nodes.find(n => n.id === 'ADDR_B')!; + expect(b.totalReceived).toBe(300); + expect(b.totalSent).toBe(50); + }); + + it('creates separate edges for different assets between same addresses', () => { + const graph = service.generateGraph(TRANSFERS); + const abEdges = graph.edges.filter(e => e.from === 'ADDR_A' && e.to === 'ADDR_B'); + expect(abEdges.length).toBe(2); + }); + + it('aggregates txCount and totalAmount for repeated asset+pair', () => { + const repeated: AssetTransfer[] = [ + { ...TRANSFERS[0] }, + { ...TRANSFERS[0], txHash: 'tx1b', amount: 40 }, + ]; + const graph = service.generateGraph(repeated); + const edge = graph.edges[0]; + expect(edge.txCount).toBe(2); + expect(edge.totalAmount).toBe(140); + }); + + it('returns a valid ISO timestamp in generatedAt', () => { + const graph = service.generateGraph(TRANSFERS); + expect(new Date(graph.generatedAt).toISOString()).toBe(graph.generatedAt); + }); + + it('returns empty graph for empty transfers', () => { + const graph = service.generateGraph([]); + expect(graph.nodes).toHaveLength(0); + expect(graph.edges).toHaveLength(0); + }); + }); + + describe('queryFlow', () => { + it('filters by address', () => { + const graph = service.queryFlow(TRANSFERS, { address: 'ADDR_C' }); + expect(graph.nodes.some(n => n.id === 'ADDR_C')).toBe(true); + expect(graph.nodes.some(n => n.id === 'ADDR_A')).toBe(false); + }); + + it('filters by asset', () => { + const graph = service.queryFlow(TRANSFERS, { address: 'ADDR_A', asset: 'USDC' }); + expect(graph.edges.every(e => e.asset === 'USDC')).toBe(true); + }); + + it('filters by fromTimestamp', () => { + const graph = service.queryFlow(TRANSFERS, { + address: 'ADDR_A', + fromTimestamp: '2026-01-02T00:00:00Z', + }); + expect(graph.edges.every(e => e.asset === 'USDC')).toBe(true); + }); + }); + + describe('traceHistory', () => { + it('returns only transfers involving the given address', () => { + const history = service.traceHistory(TRANSFERS, 'ADDR_B'); + expect(history.every(tx => tx.fromAddress === 'ADDR_B' || tx.toAddress === 'ADDR_B')).toBe( + true, + ); + }); + + it('returns transfers sorted by timestamp ascending', () => { + const history = service.traceHistory(TRANSFERS, 'ADDR_A'); + const timestamps = history.map(tx => tx.timestamp); + expect(timestamps).toEqual([...timestamps].sort()); + }); + + it('returns empty array when address has no transfers', () => { + expect(service.traceHistory(TRANSFERS, 'UNKNOWN')).toHaveLength(0); + }); + }); +}); \ No newline at end of file diff --git a/apps/backend/src/modules/analytics/flows/asset-flow.service.ts b/apps/backend/src/modules/analytics/flows/asset-flow.service.ts new file mode 100644 index 0000000..4b89596 --- /dev/null +++ b/apps/backend/src/modules/analytics/flows/asset-flow.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from '@nestjs/common'; +import { + AssetFlowGraph, + AssetFlowQuery, + AssetTransfer, + FlowEdge, + FlowNode, +} from './interfaces/asset-flow.interface'; + +@Injectable() +export class AssetFlowService { + /** + * Generates an asset flow graph from a list of transfers. + * Builds nodes (unique addresses) and directed edges (address→address + * per asset), aggregating amounts and transaction counts. + */ + generateGraph(transfers: AssetTransfer[]): AssetFlowGraph { + const nodeMap = new Map(); + const edgeMap = new Map(); + + const upsertNode = (address: string, chain: string) => { + if (!nodeMap.has(address)) { + nodeMap.set(address, { id: address, address, chain, totalSent: 0, totalReceived: 0 }); + } + }; + + for (const tx of transfers) { + upsertNode(tx.fromAddress, tx.chain); + upsertNode(tx.toAddress, tx.chain); + + nodeMap.get(tx.fromAddress)!.totalSent += tx.amount; + nodeMap.get(tx.toAddress)!.totalReceived += tx.amount; + + const edgeKey = `${tx.fromAddress}:${tx.toAddress}:${tx.asset}`; + const existing = edgeMap.get(edgeKey); + if (existing) { + existing.totalAmount += tx.amount; + existing.txCount += 1; + } else { + edgeMap.set(edgeKey, { + from: tx.fromAddress, + to: tx.toAddress, + asset: tx.asset, + totalAmount: tx.amount, + txCount: 1, + }); + } + } + + return { + nodes: Array.from(nodeMap.values()), + edges: Array.from(edgeMap.values()), + generatedAt: new Date().toISOString(), + }; + } + + /** + * Filters transfers by the provided query parameters before graph generation. + */ + queryFlow(transfers: AssetTransfer[], query: AssetFlowQuery): AssetFlowGraph { + const { address, chain, asset, fromTimestamp, toTimestamp } = query; + + const filtered = transfers.filter(tx => { + const involvesAddress = tx.fromAddress === address || tx.toAddress === address; + if (!involvesAddress) return false; + if (chain && tx.chain !== chain) return false; + if (asset && tx.asset !== asset) return false; + if (fromTimestamp && tx.timestamp < fromTimestamp) return false; + if (toTimestamp && tx.timestamp > toTimestamp) return false; + return true; + }); + + return this.generateGraph(filtered); + } + + /** + * Returns the historical transfer chain for a given address, ordered by + * timestamp ascending. + */ + traceHistory(transfers: AssetTransfer[], address: string): AssetTransfer[] { + return transfers + .filter(tx => tx.fromAddress === address || tx.toAddress === address) + .sort((a, b) => (a.timestamp < b.timestamp ? -1 : a.timestamp > b.timestamp ? 1 : 0)); + } +} \ No newline at end of file diff --git a/apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts b/apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts new file mode 100644 index 0000000..8c6dd2b --- /dev/null +++ b/apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts @@ -0,0 +1,40 @@ +export interface AssetTransfer { + txHash: string; + fromAddress: string; + toAddress: string; + asset: string; + amount: number; + timestamp: string; + chain: string; +} + +export interface FlowNode { + id: string; + address: string; + chain: string; + totalSent: number; + totalReceived: number; +} + +export interface FlowEdge { + from: string; + to: string; + asset: string; + totalAmount: number; + txCount: number; +} + +export interface AssetFlowGraph { + nodes: FlowNode[]; + edges: FlowEdge[]; + generatedAt: string; +} + +export interface AssetFlowQuery { + address: string; + chain?: string; + asset?: string; + fromTimestamp?: string; + toTimestamp?: string; + depth?: number; +} \ No newline at end of file