Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/backend/src/modules/analytics/flows/asset-flow.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { AssetFlowService } from './asset-flow.service';

@Module({
providers: [AssetFlowService],
exports: [AssetFlowService],
})
export class AssetFlowModule {}

Check failure on line 8 in apps/backend/src/modules/analytics/flows/asset-flow.module.ts

View workflow job for this annotation

GitHub Actions / Linting (22.x)

Insert `⏎`

Check failure on line 8 in apps/backend/src/modules/analytics/flows/asset-flow.module.ts

View workflow job for this annotation

GitHub Actions / Linting (20.x)

Insert `⏎`
136 changes: 136 additions & 0 deletions apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import 'reflect-metadata';
import { Test, TestingModule } from '@nestjs/testing';
import { AssetFlowService } from './asset-flow.service';
import { AssetTransfer } from './interfaces/asset-flow.interface';

const TRANSFERS: AssetTransfer[] = [
{
txHash: 'tx1',
fromAddress: 'ADDR_A',
toAddress: 'ADDR_B',
asset: 'XLM',
amount: 100,
timestamp: '2026-01-01T00:00:00Z',
chain: 'Stellar',
},
{
txHash: 'tx2',
fromAddress: 'ADDR_B',
toAddress: 'ADDR_C',
asset: 'XLM',
amount: 50,
timestamp: '2026-01-02T00:00:00Z',
chain: 'Stellar',
},
{
txHash: 'tx3',
fromAddress: 'ADDR_A',
toAddress: 'ADDR_B',
asset: 'USDC',
amount: 200,
timestamp: '2026-01-03T00:00:00Z',
chain: 'Stellar',
},
];

describe('AssetFlowService', () => {
let service: AssetFlowService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [AssetFlowService],
}).compile();

service = module.get<AssetFlowService>(AssetFlowService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});

describe('generateGraph', () => {
it('builds correct nodes from transfers', () => {
const graph = service.generateGraph(TRANSFERS);
const ids = graph.nodes.map(n => n.id).sort();
expect(ids).toEqual(['ADDR_A', 'ADDR_B', 'ADDR_C'].sort());
});

it('aggregates totalSent and totalReceived per node', () => {
const graph = service.generateGraph(TRANSFERS);
const a = graph.nodes.find(n => n.id === 'ADDR_A')!;
expect(a.totalSent).toBe(300);
expect(a.totalReceived).toBe(0);
const b = graph.nodes.find(n => n.id === 'ADDR_B')!;
expect(b.totalReceived).toBe(300);
expect(b.totalSent).toBe(50);
});

it('creates separate edges for different assets between same addresses', () => {
const graph = service.generateGraph(TRANSFERS);
const abEdges = graph.edges.filter(e => e.from === 'ADDR_A' && e.to === 'ADDR_B');
expect(abEdges.length).toBe(2);
});

it('aggregates txCount and totalAmount for repeated asset+pair', () => {
const repeated: AssetTransfer[] = [
{ ...TRANSFERS[0] },
{ ...TRANSFERS[0], txHash: 'tx1b', amount: 40 },
];
const graph = service.generateGraph(repeated);
const edge = graph.edges[0];
expect(edge.txCount).toBe(2);
expect(edge.totalAmount).toBe(140);
});

it('returns a valid ISO timestamp in generatedAt', () => {
const graph = service.generateGraph(TRANSFERS);
expect(new Date(graph.generatedAt).toISOString()).toBe(graph.generatedAt);
});

it('returns empty graph for empty transfers', () => {
const graph = service.generateGraph([]);
expect(graph.nodes).toHaveLength(0);
expect(graph.edges).toHaveLength(0);
});
});

describe('queryFlow', () => {
it('filters by address', () => {
const graph = service.queryFlow(TRANSFERS, { address: 'ADDR_C' });
expect(graph.nodes.some(n => n.id === 'ADDR_C')).toBe(true);
expect(graph.nodes.some(n => n.id === 'ADDR_A')).toBe(false);
});

it('filters by asset', () => {
const graph = service.queryFlow(TRANSFERS, { address: 'ADDR_A', asset: 'USDC' });
expect(graph.edges.every(e => e.asset === 'USDC')).toBe(true);
});

it('filters by fromTimestamp', () => {
const graph = service.queryFlow(TRANSFERS, {
address: 'ADDR_A',
fromTimestamp: '2026-01-02T00:00:00Z',
});
expect(graph.edges.every(e => e.asset === 'USDC')).toBe(true);
});
});

describe('traceHistory', () => {
it('returns only transfers involving the given address', () => {
const history = service.traceHistory(TRANSFERS, 'ADDR_B');
expect(history.every(tx => tx.fromAddress === 'ADDR_B' || tx.toAddress === 'ADDR_B')).toBe(
true,
);
});

it('returns transfers sorted by timestamp ascending', () => {
const history = service.traceHistory(TRANSFERS, 'ADDR_A');
const timestamps = history.map(tx => tx.timestamp);
expect(timestamps).toEqual([...timestamps].sort());
});

it('returns empty array when address has no transfers', () => {
expect(service.traceHistory(TRANSFERS, 'UNKNOWN')).toHaveLength(0);
});
});
});

Check failure on line 136 in apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts

View workflow job for this annotation

GitHub Actions / Linting (22.x)

Insert `⏎`

Check failure on line 136 in apps/backend/src/modules/analytics/flows/asset-flow.service.spec.ts

View workflow job for this annotation

GitHub Actions / Linting (20.x)

Insert `⏎`
85 changes: 85 additions & 0 deletions apps/backend/src/modules/analytics/flows/asset-flow.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import {
AssetFlowGraph,
AssetFlowQuery,
AssetTransfer,
FlowEdge,
FlowNode,
} from './interfaces/asset-flow.interface';

@Injectable()
export class AssetFlowService {
/**
* Generates an asset flow graph from a list of transfers.
* Builds nodes (unique addresses) and directed edges (address→address
* per asset), aggregating amounts and transaction counts.
*/
generateGraph(transfers: AssetTransfer[]): AssetFlowGraph {
const nodeMap = new Map<string, FlowNode>();
const edgeMap = new Map<string, FlowEdge>();

const upsertNode = (address: string, chain: string) => {
if (!nodeMap.has(address)) {
nodeMap.set(address, { id: address, address, chain, totalSent: 0, totalReceived: 0 });
}
};

for (const tx of transfers) {
upsertNode(tx.fromAddress, tx.chain);
upsertNode(tx.toAddress, tx.chain);

nodeMap.get(tx.fromAddress)!.totalSent += tx.amount;
nodeMap.get(tx.toAddress)!.totalReceived += tx.amount;

const edgeKey = `${tx.fromAddress}:${tx.toAddress}:${tx.asset}`;
const existing = edgeMap.get(edgeKey);
if (existing) {
existing.totalAmount += tx.amount;
existing.txCount += 1;
} else {
edgeMap.set(edgeKey, {
from: tx.fromAddress,
to: tx.toAddress,
asset: tx.asset,
totalAmount: tx.amount,
txCount: 1,
});
}
}

return {
nodes: Array.from(nodeMap.values()),
edges: Array.from(edgeMap.values()),
generatedAt: new Date().toISOString(),
};
}

/**
* Filters transfers by the provided query parameters before graph generation.
*/
queryFlow(transfers: AssetTransfer[], query: AssetFlowQuery): AssetFlowGraph {
const { address, chain, asset, fromTimestamp, toTimestamp } = query;

const filtered = transfers.filter(tx => {
const involvesAddress = tx.fromAddress === address || tx.toAddress === address;
if (!involvesAddress) return false;
if (chain && tx.chain !== chain) return false;
if (asset && tx.asset !== asset) return false;
if (fromTimestamp && tx.timestamp < fromTimestamp) return false;
if (toTimestamp && tx.timestamp > toTimestamp) return false;
return true;
});

return this.generateGraph(filtered);
}

/**
* Returns the historical transfer chain for a given address, ordered by
* timestamp ascending.
*/
traceHistory(transfers: AssetTransfer[], address: string): AssetTransfer[] {
return transfers
.filter(tx => tx.fromAddress === address || tx.toAddress === address)
.sort((a, b) => (a.timestamp < b.timestamp ? -1 : a.timestamp > b.timestamp ? 1 : 0));
}
}

Check failure on line 85 in apps/backend/src/modules/analytics/flows/asset-flow.service.ts

View workflow job for this annotation

GitHub Actions / Linting (22.x)

Insert `⏎`

Check failure on line 85 in apps/backend/src/modules/analytics/flows/asset-flow.service.ts

View workflow job for this annotation

GitHub Actions / Linting (20.x)

Insert `⏎`
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
export interface AssetTransfer {
txHash: string;
fromAddress: string;
toAddress: string;
asset: string;
amount: number;
timestamp: string;
chain: string;
}

export interface FlowNode {
id: string;
address: string;
chain: string;
totalSent: number;
totalReceived: number;
}

export interface FlowEdge {
from: string;
to: string;
asset: string;
totalAmount: number;
txCount: number;
}

export interface AssetFlowGraph {
nodes: FlowNode[];
edges: FlowEdge[];
generatedAt: string;
}

export interface AssetFlowQuery {
address: string;
chain?: string;
asset?: string;
fromTimestamp?: string;
toTimestamp?: string;
depth?: number;
}

Check failure on line 40 in apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts

View workflow job for this annotation

GitHub Actions / Linting (22.x)

Insert `⏎`

Check failure on line 40 in apps/backend/src/modules/analytics/flows/interfaces/asset-flow.interface.ts

View workflow job for this annotation

GitHub Actions / Linting (20.x)

Insert `⏎`
Loading