Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/chains/stellar/EVENT_FETCHING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Stellar announcement event fetching

This document describes how `@wraith-protocol/sdk/chains/stellar` ingests Soroban
announcer events and the privacy trade-offs of RPC topic filtering.

## Announcer versions

| Version | Contract field | Topic layout | RPC bucket filter |
| ------- | -------------- | ------------------------------------------------- | ----------------- |
| v1 | `announcer` | `("announce", scheme_id, stealth_address)` | No |
| v2 | `announcerV2` | `("announce", 2, view_tag_bucket, metadata_kind)` | Yes |

During the v1 → v2 migration window, `fetchAnnouncements()` reads from **both**
contracts when `announcerV2` is configured in deployments.

## Bucketed `getEvents` queries (v2)

For v2 announcements, the SDK builds Soroban RPC filters:

```
("announce", 2, view_tag_bucket, *)
```

Pass explicit buckets to avoid downloading the full v2 stream:

```typescript
import { fetchAnnouncements, viewTagToBucket } from '@wraith-protocol/sdk/chains/stellar';

const bucket = viewTagToBucket(0x2a);
const announcements = await fetchAnnouncements('stellar', {
viewTagBuckets: [bucket],
});
```

When `viewTagBuckets` is omitted, the SDK uses `("announce", 2, *, *)` on the v2
contract. v1 events are always fetched without bucket filters because
`stealth_address` occupies topic slot 2 in the legacy layout.

## Client-side validation

RPC topic filters reduce bandwidth only. Recipients must still run
`scanAnnouncements()` to cryptographically verify each candidate event against
their viewing key. Never treat RPC-filtered events as trusted payments.

## Privacy trade-off

Indexing `view_tag_bucket` in a public Soroban topic leaks one byte of
correlation per payment. With 256 buckets, observers (including the RPC provider
when you pass bucket filters) can group announcements by approximate recipient
identity.

| Strategy | Bandwidth | Query privacy |
| --------------------------------- | --------- | ------------- |
| v1 full stream | Highest | Lowest |
| v2 single-bucket filter | Lowest | Lower |
| v2 all-bucket wildcard | Medium | Medium |
| Private indexer / self-hosted RPC | Varies | Highest |

For most users, 256 buckets is a reasonable balance. Choose single-bucket
filters when RPC egress cost dominates; prefer broader queries or a private
indexer when query pattern privacy dominates.

## Filter batching

Soroban RPC accepts at most five event filters per `getEvents` request. When
more than five buckets are requested, the SDK splits them into sequential
batches via `buildV2BucketEventFilterBatches()`.

## Related issues

- contracts [#23](https://github.com/wraith-protocol/contracts/issues/23) — topic design
- contracts [#24](https://github.com/wraith-protocol/contracts/issues/24) — v2 announcer contract
- contracts [#25](https://github.com/wraith-protocol/contracts/issues/25) — SDK bucketed fetch (this module)
250 changes: 176 additions & 74 deletions src/chains/stellar/announcements.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import type { Announcement } from './types';
import { bytesToHex } from './utils';
import { getDeployment } from './deployments';
import type { StellarChainDeployment } from './deployments';
import {
buildV1AnnouncerEventFilter,
buildV2AllBucketsEventFilter,
buildV2BucketEventFilterBatches,
type SorobanEventFilter,
} from './event-filters';
import { Address, xdr } from '@stellar/stellar-sdk';

let stellarSdkPromise: Promise<typeof import('@stellar/stellar-sdk')> | undefined;

function loadStellarSdk(): Promise<typeof import('@stellar/stellar-sdk')> {
stellarSdkPromise ??= import('@stellar/stellar-sdk');
return stellarSdkPromise;
}

export interface FetchAnnouncementsOptions {
/** Earliest ledger to include, inclusive. Ignored when cursor is provided. */
fromLedger?: number;
Expand All @@ -21,6 +21,15 @@ export interface FetchAnnouncementsOptions {
toTimestamp?: Date;
/** Soroban RPC pagination cursor returned by a previous scan. */
cursor?: string;
/**
* View-tag buckets (0–255) to query on the v2 announcer via RPC topic filters.
* When omitted, all v2 buckets are fetched with `("announce", 2, *, *)`.
*/
viewTagBuckets?: number[];
/** Fetch the legacy v1 announcer stream (default: `true`). */
includeV1?: boolean;
/** Fetch the v2 announcer when `announcerV2` is configured (default: `true`). */
includeV2?: boolean;
}

export interface FetchAnnouncementsResult {
Expand Down Expand Up @@ -50,6 +59,10 @@ export class RetentionExceededError extends Error {
* `getEvents`, handles pagination, and parses event XDR into SDK announcement
* objects.
*
* During the v1 → v2 transition window this function reads from **both** announcer
* deployments when configured. v2 queries use Soroban RPC topic filters; v1 always
* downloads the full announcer stream. See `EVENT_FETCHING.md` for privacy trade-offs.
*
* @param chain - Deployment key from {@link DEPLOYMENTS}; defaults to `stellar`.
* @param sorobanUrl - Optional Soroban RPC URL override.
* @returns Parsed announcements from the selected announcer contract.
Expand Down Expand Up @@ -96,8 +109,13 @@ export async function fetchAnnouncements(
const sorobanUrl = typeof sorobanUrlOrOpts === 'string' ? sorobanUrlOrOpts : undefined;
const url = sorobanUrl || deployment.sorobanUrl;
const announcerContract = deployment.contracts.announcer;
const filterGroups = buildFilterGroups(deployment, opts);
const all: Announcement[] = [];

if (filterGroups.length === 0) {
return returnsCursor ? { announcements: [], nextCursor: undefined } : [];
}

if (opts?.fromLedger !== undefined && opts.fromTimestamp !== undefined) {
throw new Error('fromLedger and fromTimestamp are mutually exclusive');
}
Expand All @@ -124,56 +142,70 @@ export async function fetchAnnouncements(

let cursor = opts?.cursor;
let nextCursor: string | undefined = cursor;
let hasMore = true;
const seen = new Set<string>();
const singleFilterGroup = filterGroups.length === 1;

while (hasMore) {
const params: Record<string, unknown> = {
filters: [{ type: 'contract', contractIds: [announcerContract] }],
pagination: cursor ? { limit: 1000, cursor } : { limit: 1000 },
};
for (const filters of filterGroups) {
let hasMore = true;
let groupCursor = singleFilterGroup ? cursor : undefined;

if (!cursor) {
params.startLedger = startLedger;
}
while (hasMore) {
const params: Record<string, unknown> = {
filters,
pagination: groupCursor ? { limit: 1000, cursor: groupCursor } : { limit: 1000 },
};

const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: 2,
method: 'getEvents',
params,
}),
});
if (!groupCursor) {
params.startLedger = startLedger;
}

const data = await res.json();
if (data.error?.message) {
const range = parseLedgerRange(data.error.message);
if (range && !opts?.cursor && startLedger < range.oldest) {
throw new RetentionExceededError(startLedger, range.oldest);
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
jsonrpc: '2.0',
id: 2,
method: 'getEvents',
params,
}),
});

const data = await res.json();
if (data.error?.message) {
const range = parseLedgerRange(data.error.message);
if (range && !groupCursor && startLedger < range.oldest) {
throw new RetentionExceededError(startLedger, range.oldest);
}
break;
}
break;
}

const events = data.result?.events ?? [];
const events = data.result?.events ?? [];

for (const event of events) {
const ledger = eventLedger(event);
if (toLedger !== undefined && ledger !== undefined && ledger >= toLedger) {
hasMore = false;
continue;
for (const event of events) {
const ledger = eventLedger(event);
if (toLedger !== undefined && ledger !== undefined && ledger >= toLedger) {
hasMore = false;
continue;
}

const dedupeKey = String(event.id ?? `${event.txHash}:${JSON.stringify(event.topic)}`);
if (seen.has(dedupeKey)) continue;
seen.add(dedupeKey);

const ann = parseAnnouncementEvent(event);
if (ann) all.push(ann);
}
const ann = await parseAnnouncementEvent(event);
if (ann) all.push(ann);
}

nextCursor = data.result?.cursor ?? cursor;
if (!hasMore || events.length < 1000) {
hasMore = false;
} else {
cursor = data.result?.cursor;
if (!cursor) hasMore = false;
if (singleFilterGroup) {
nextCursor = data.result?.cursor ?? groupCursor;
}

if (!hasMore || events.length < 1000) {
hasMore = false;
} else {
groupCursor = data.result?.cursor;
if (!groupCursor) hasMore = false;
}
}
}

Expand Down Expand Up @@ -334,6 +366,30 @@ async function horizonLedger(
return data;
}

function buildFilterGroups(
deployment: StellarChainDeployment,
opts?: FetchAnnouncementsOptions,
): SorobanEventFilter[][] {
const includeV1 = opts?.includeV1 ?? true;
const includeV2 = opts?.includeV2 ?? true;
const announcerV2 = deployment.contracts.announcerV2;
const groups: SorobanEventFilter[][] = [];

if (includeV1) {
groups.push([buildV1AnnouncerEventFilter(deployment.contracts.announcer)]);
}

if (includeV2 && announcerV2) {
if (opts?.viewTagBuckets && opts.viewTagBuckets.length > 0) {
groups.push(...buildV2BucketEventFilterBatches(announcerV2, opts.viewTagBuckets));
} else {
groups.push([buildV2AllBucketsEventFilter(announcerV2)]);
}
}

return groups;
}

function parseLedgerRange(message: string): { oldest: number; latest: number } | undefined {
const match = message.match(/range:\s*(\d+)\s*-\s*(\d+)/);
if (!match) return undefined;
Expand All @@ -343,40 +399,86 @@ function parseLedgerRange(message: string): { oldest: number; latest: number } |
};
}

async function parseAnnouncementEvent(
event: Record<string, unknown>,
): Promise<Announcement | null> {
/** @internal Exported for unit tests. */
export function parseAnnouncementEvent(event: Record<string, unknown>): Announcement | null {
try {
const { xdr, Address } = await loadStellarSdk();

const topics = event.topic as string[];
const topics = event.topic as string[] | undefined;
if (!topics || topics.length < 3) return null;

const schemeIdScVal = xdr.ScVal.fromXDR(topics[1], 'base64');
const stealthScVal = xdr.ScVal.fromXDR(topics[2], 'base64');
const stealthAddress = Address.fromScAddress(stealthScVal.address()).toString();

const valueScVal = xdr.ScVal.fromXDR(event.value as string, 'base64');
const valueVec = valueScVal.vec();
if (!valueVec || valueVec.length < 3) return null;

const caller = Address.fromScAddress(valueVec[0].address()).toString();
const ephPubKeyBytes = valueVec[1].bytes();
const viewTagBytes = valueVec[2].bytes();
if (!ephPubKeyBytes || !viewTagBytes) return null;

return {
schemeId: schemeIdScVal.u32(),
stealthAddress,
caller,
ephemeralPubKey: bytesToHex(new Uint8Array(ephPubKeyBytes)),
metadata: bytesToHex(new Uint8Array(viewTagBytes)),
};
if (topics.length === 3) {
return parseV1AnnouncementEvent(event, topics);
}

if (topics.length === 4) {
return parseV2AnnouncementEvent(event, topics);
}

return null;
} catch {
return null;
}
}

function parseV1AnnouncementEvent(
event: Record<string, unknown>,
topics: string[],
): Announcement | null {
const schemeIdScVal = xdr.ScVal.fromXDR(topics[1], 'base64');
const stealthScVal = xdr.ScVal.fromXDR(topics[2], 'base64');
const stealthAddress = Address.fromScAddress(stealthScVal.address()).toString();

const valueScVal = xdr.ScVal.fromXDR(event.value as string, 'base64');
const valueVec = valueScVal.vec();
if (!valueVec || valueVec.length < 3) return null;

const caller = Address.fromScAddress(valueVec[0].address()).toString();
const ephPubKeyBytes = valueVec[1].bytes();
const metadataBytes = valueVec[2].bytes();
if (!ephPubKeyBytes || !metadataBytes) return null;

return {
schemeId: schemeIdScVal.u32(),
stealthAddress,
caller,
ephemeralPubKey: bytesToHex(new Uint8Array(ephPubKeyBytes)),
metadata: bytesToHex(new Uint8Array(metadataBytes)),
viewTagBucket: undefined,
};
}

function parseV2AnnouncementEvent(
event: Record<string, unknown>,
topics: string[],
): Announcement | null {
const schemeIdScVal = xdr.ScVal.fromXDR(topics[1], 'base64');
const bucketScVal = xdr.ScVal.fromXDR(topics[2], 'base64');

const valueScVal = xdr.ScVal.fromXDR(event.value as string, 'base64');
const valueVec = valueScVal.vec();
if (!valueVec || valueVec.length < 3) return null;

const stealthAddress = Address.fromScAddress(valueVec[0].address()).toString();
const ephPubKeyBytes = valueVec[1].bytes();
const metadataBytes = valueVec[2].bytes();
if (!ephPubKeyBytes || !metadataBytes) return null;

const caller =
typeof event.contractId === 'string'
? event.contractId
: typeof event.contract_id === 'string'
? event.contract_id
: '';

return {
schemeId: schemeIdScVal.u32(),
stealthAddress,
caller,
ephemeralPubKey: bytesToHex(new Uint8Array(ephPubKeyBytes)),
metadata: bytesToHex(new Uint8Array(metadataBytes)),
viewTagBucket: bucketScVal.u32(),
};
}

function eventLedger(event: Record<string, unknown>): number | undefined {
const ledger = event.ledger;
return typeof ledger === 'number' ? ledger : undefined;
Expand Down
Loading