diff --git a/.github/knip.jsonc b/.github/knip.jsonc index ae73b60c..5165ec4e 100644 --- a/.github/knip.jsonc +++ b/.github/knip.jsonc @@ -45,7 +45,7 @@ "src/index.ts", "src/sp/index.ts", "src/chains.ts", - "src/piece.ts", + "src/piece/index.ts", "src/usdfc.ts", "src/abis/index.ts", "src/auction/index.ts", diff --git a/examples/cli/src/commands/upload-dataset.ts b/examples/cli/src/commands/upload-dataset.ts index 82f4f879..560e70e2 100644 --- a/examples/cli/src/commands/upload-dataset.ts +++ b/examples/cli/src/commands/upload-dataset.ts @@ -55,6 +55,7 @@ export const uploadDataset: Command = command( await SP.findPiece({ pieceCid, serviceURL: provider.pdp.serviceURL, + retry: true, }) const rsp = await SP.createDataSetAndAddPieces(client, { diff --git a/examples/cli/src/utils.ts b/examples/cli/src/utils.ts index f935433e..095ec131 100644 --- a/examples/cli/src/utils.ts +++ b/examples/cli/src/utils.ts @@ -1,7 +1,7 @@ import * as p from '@clack/prompts' import type { Chain } from '@filoz/synapse-core/chains' import { getPieces } from '@filoz/synapse-core/pdp-verifier' -import { getPDPProviders } from '@filoz/synapse-core/sp-registry' +import { getApprovedPDPProviders } from '@filoz/synapse-core/sp-registry' import { getPdpDataSets, type PdpDataSet, @@ -111,7 +111,7 @@ export async function selectProvider( spinner.start(`Fetching providers...`) try { - const { providers } = await getPDPProviders(client) + const providers = await getApprovedPDPProviders(client) spinner.stop(`Providers fetched.`) if (providers.length === 0) { diff --git a/packages/synapse-core/package.json b/packages/synapse-core/package.json index 3808cae2..83849848 100644 --- a/packages/synapse-core/package.json +++ b/packages/synapse-core/package.json @@ -82,8 +82,8 @@ "default": "./dist/src/errors/index.js" }, "./piece": { - "types": "./dist/src/piece.d.ts", - "default": "./dist/src/piece.js" + "types": "./dist/src/piece/index.d.ts", + "default": "./dist/src/piece/index.js" }, "./utils": { "types": "./dist/src/utils/index.d.ts", @@ -141,7 +141,7 @@ "./dist/src/errors/index" ], "piece": [ - "./dist/src/piece" + "./dist/src/piece/index" ], "utils": [ "./dist/src/utils/index" @@ -235,7 +235,9 @@ "iso-web": "^2.1.0", "multiformats": "^13.4.1", "ox": "catalog:", + "p-locate": "^7.0.0", "p-retry": "^7.1.0", + "p-some": "^7.0.0", "zod": "catalog:" }, "devDependencies": { diff --git a/packages/synapse-core/src/chains.ts b/packages/synapse-core/src/chains.ts index a52fd6c8..c1f9862f 100644 --- a/packages/synapse-core/src/chains.ts +++ b/packages/synapse-core/src/chains.ts @@ -332,6 +332,7 @@ export function getChain(id?: number): Chain { /** * Convert a viem chain to a filecoin chain. + * * @param chain - The viem chain. * @returns The filecoin chain. * @throws Errors {@link asChain.ErrorType} diff --git a/packages/synapse-core/src/errors/index.ts b/packages/synapse-core/src/errors/index.ts index 0fc26f89..4655fd28 100644 --- a/packages/synapse-core/src/errors/index.ts +++ b/packages/synapse-core/src/errors/index.ts @@ -13,3 +13,4 @@ export * from './chains.ts' export * from './erc20.ts' export * from './pay.ts' export * from './pdp.ts' +export * from './piece.ts' diff --git a/packages/synapse-core/src/errors/piece.ts b/packages/synapse-core/src/errors/piece.ts new file mode 100644 index 00000000..ad543f31 --- /dev/null +++ b/packages/synapse-core/src/errors/piece.ts @@ -0,0 +1,19 @@ +import { isSynapseError, SynapseError, type SynapseErrorOptions } from './base.ts' + +export class InvalidPieceCIDError extends SynapseError { + override name: 'InvalidPieceCIDError' = 'InvalidPieceCIDError' + + constructor(input: unknown, options?: SynapseErrorOptions) { + let msg = 'Invalid piece CID' + if (typeof input === 'object' && input != null && 'toString' in input && typeof input.toString === 'function') { + msg = `Invalid piece CID: ${input.toString()}` + } else if (typeof input === 'string') { + msg = `Invalid piece CID: ${input}` + } + super(msg, options) + } + + static override is(value: unknown): value is InvalidPieceCIDError { + return isSynapseError(value) && value.name === 'InvalidPieceCIDError' + } +} diff --git a/packages/synapse-core/src/index.ts b/packages/synapse-core/src/index.ts index 96ee24e4..2ede6d2c 100644 --- a/packages/synapse-core/src/index.ts +++ b/packages/synapse-core/src/index.ts @@ -17,7 +17,7 @@ export * as erc20 from './erc20/index.ts' export * as errors from './errors/index.ts' export * as pay from './pay/index.ts' -export * as piece from './piece.ts' +export * as piece from './piece/index.ts' export * as sessionKey from './session-key/index.ts' export * as sp from './sp/index.ts' export * as spRegistry from './sp-registry/index.ts' diff --git a/packages/synapse-core/src/mocks/jsonrpc/index.ts b/packages/synapse-core/src/mocks/jsonrpc/index.ts index df6243ef..feae8845 100644 --- a/packages/synapse-core/src/mocks/jsonrpc/index.ts +++ b/packages/synapse-core/src/mocks/jsonrpc/index.ts @@ -16,7 +16,7 @@ import { stringToHex, toHex, } from 'viem' -import * as Piece from '../../piece.ts' +import * as Piece from '../../piece/piece.ts' import { TIME_CONSTANTS } from '../../utils/constants.ts' import { ADDRESSES } from './constants.ts' import { endorsementsCallHandler } from './endorsements.ts' diff --git a/packages/synapse-core/src/pdp-verifier/get-active-pieces.ts b/packages/synapse-core/src/pdp-verifier/get-active-pieces.ts index 73ea6f0d..00fb1176 100644 --- a/packages/synapse-core/src/pdp-verifier/get-active-pieces.ts +++ b/packages/synapse-core/src/pdp-verifier/get-active-pieces.ts @@ -11,7 +11,7 @@ import type { import { readContract } from 'viem/actions' import type { pdpVerifierAbi } from '../abis/generated.ts' import { asChain } from '../chains.ts' -import { hexToPieceCID, type PieceCID } from '../piece.ts' +import { hexToPieceCID, type PieceCID } from '../piece/piece.ts' import type { ActionCallChain } from '../types.ts' export namespace getActivePieces { diff --git a/packages/synapse-core/src/pdp-verifier/get-pieces.ts b/packages/synapse-core/src/pdp-verifier/get-pieces.ts index cd4089fd..acfc94b6 100644 --- a/packages/synapse-core/src/pdp-verifier/get-pieces.ts +++ b/packages/synapse-core/src/pdp-verifier/get-pieces.ts @@ -2,7 +2,7 @@ import type { Simplify } from 'type-fest' import type { Address, Chain, Client, ReadContractErrorType, Transport } from 'viem' import { multicall } from 'viem/actions' import { asChain } from '../chains.ts' -import { hexToPieceCID } from '../piece.ts' +import { hexToPieceCID } from '../piece/piece.ts' import { metadataArrayToObject } from '../utils/metadata.ts' import { createPieceUrl } from '../utils/piece-url.ts' import { getAllPieceMetadataCall } from '../warm-storage/get-all-piece-metadata.ts' diff --git a/packages/synapse-core/src/piece/download.ts b/packages/synapse-core/src/piece/download.ts new file mode 100644 index 00000000..b82000ec --- /dev/null +++ b/packages/synapse-core/src/piece/download.ts @@ -0,0 +1,138 @@ +import { type AbortError, HttpError, type NetworkError, request, type TimeoutError } from 'iso-web/http' +import { DownloadPieceError } from '../errors/pdp.ts' +import { InvalidPieceCIDError } from '../errors/piece.ts' +import { asPieceCID, createPieceCIDStream, type PieceCID } from './piece.ts' + +export namespace download { + export type OptionsType = { + url: string + } + export type ReturnType = Uint8Array + export type ErrorType = DownloadPieceError | TimeoutError | NetworkError | AbortError +} + +/** + * Download a piece from a URL. + * + * @param options - {@link download.OptionsType} + * @returns Data {@link download.ReturnType} + * @throws Errors {@link download.ErrorType} + */ +export async function download(options: download.OptionsType): Promise { + const response = await request.get(options.url) + if (response.error) { + if (HttpError.is(response.error)) { + throw new DownloadPieceError(await response.error.response.text()) + } + throw response.error + } + return new Uint8Array(await response.result.arrayBuffer()) +} + +export namespace downloadAndValidate { + export type OptionsType = { + url: string + expectedPieceCid: string | PieceCID + } + export type ReturnType = Uint8Array + export type ErrorType = DownloadPieceError | TimeoutError | NetworkError | AbortError | InvalidPieceCIDError +} + +/** + * Download data from a URL, validate its PieceCID, and return as Uint8Array + * + * This function: + * 1. Downloads data from the URL + * 2. Calculates PieceCID during streaming + * 3. Collects all chunks into a Uint8Array + * 4. Validates the calculated PieceCID matches the expected value + * + * @param options - {@link downloadAndValidate.OptionsType} + * @returns Data {@link downloadAndValidate.ReturnType} + * @throws Errors {@link downloadAndValidate.ErrorType} + * @example + * ```ts + * import * as Piece from '@filoz/synapse-core/piece' + * const data = await Piece.downloadAndValidate({ + * url: 'https://example.com/piece', + * expectedPieceCid: 'bafkzcib...', + * }) + * console.log(data) + * ``` + */ +export async function downloadAndValidate(options: downloadAndValidate.OptionsType): Promise { + const { url, expectedPieceCid } = options + + // Parse and validate the expected PieceCID + const parsedPieceCid = asPieceCID(expectedPieceCid) + if (parsedPieceCid == null) { + throw new InvalidPieceCIDError(expectedPieceCid) + } + + const rsp = await request.get(url) + if (rsp.error) { + if (HttpError.is(rsp.error)) { + throw new DownloadPieceError(await rsp.error.response.text()) + } + throw rsp.error + } + + if (rsp.result.body == null) { + throw new DownloadPieceError('Response body is null') + } + + // Create PieceCID calculation stream + const { stream: pieceCidStream, getPieceCID } = createPieceCIDStream() + + // Create a stream that collects all chunks into an array + const chunks: Uint8Array[] = [] + const collectStream = new TransformStream({ + transform(chunk: Uint8Array, controller: TransformStreamDefaultController) { + chunks.push(chunk) + controller.enqueue(chunk) + }, + }) + + // Pipe the response through both streams + const pipelineStream = rsp.result.body.pipeThrough(pieceCidStream).pipeThrough(collectStream) + + // Consume the stream to completion + const reader = pipelineStream.getReader() + try { + while (true) { + const { done } = await reader.read() + if (done) break + } + } finally { + reader.releaseLock() + } + + if (chunks.length === 0) { + throw new DownloadPieceError('Response body is empty') + } + + // Get the calculated PieceCID + const calculatedPieceCid = getPieceCID() + + if (calculatedPieceCid == null) { + throw new DownloadPieceError('Failed to calculate PieceCID from stream') + } + + // Verify the PieceCID + if (calculatedPieceCid.toString() !== parsedPieceCid.toString()) { + throw new DownloadPieceError( + `PieceCID verification failed. Expected: ${String(parsedPieceCid)}, Got: ${String(calculatedPieceCid)}` + ) + } + + // Combine all chunks into a single Uint8Array + const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of chunks) { + result.set(chunk, offset) + offset += chunk.length + } + + return result +} diff --git a/packages/synapse-core/src/piece/index.ts b/packages/synapse-core/src/piece/index.ts new file mode 100644 index 00000000..1842e6b8 --- /dev/null +++ b/packages/synapse-core/src/piece/index.ts @@ -0,0 +1,4 @@ +export * from '../utils/piece-url.ts' +export * from './download.ts' +export * from './piece.ts' +export * from './resolve-piece-url.ts' diff --git a/packages/synapse-core/src/piece.ts b/packages/synapse-core/src/piece/piece.ts similarity index 61% rename from packages/synapse-core/src/piece.ts rename to packages/synapse-core/src/piece/piece.ts index b86e9468..3d7498dd 100644 --- a/packages/synapse-core/src/piece.ts +++ b/packages/synapse-core/src/piece/piece.ts @@ -18,7 +18,6 @@ import { CID } from 'multiformats/cid' import * as Raw from 'multiformats/codecs/raw' import * as Link from 'multiformats/link' import { type Hex, hexToBytes } from 'viem' -import { DownloadPieceError } from './errors/pdp.ts' /** * PieceCID - A constrained CID type for Piece Commitments. @@ -33,41 +32,6 @@ import { DownloadPieceError } from './errors/pdp.ts' */ export type PieceCID = PieceCIDType -/** - * Parse a PieceCID string into a CID and validate it - * @param pieceCidString - The PieceCID as a string (base32 or other multibase encoding) - * @returns The parsed and validated PieceCID CID or null if invalid - */ -function parsePieceCID(pieceCidString: string): PieceCID | null { - try { - const cid = CID.parse(pieceCidString) - if (isValidPieceCID(cid)) { - return cid as PieceCID - } - } catch { - // ignore error - } - return null -} - -/** - * Type guard to check if a value is a CID - * @param value - The value to check - * @returns True if it's a CID - */ -function isCID(value: unknown): value is CID { - return typeof value === 'object' && value !== null && CID.asCID(value as CID) !== null -} - -/** - * Check if a CID is a valid PieceCID - * @param cid - The CID to check - * @returns True if it's a valid PieceCID - */ -function isValidPieceCID(cid: PieceCID | CID): cid is PieceCID { - return cid.code === Raw.code && cid.multihash.code === Hasher.code -} - /** * Convert a PieceCID input (string or CID) to a validated CID * This is the main function to use when accepting PieceCID inputs @@ -80,13 +44,15 @@ export function asPieceCID(pieceCidInput: PieceCID | CID | string | null | undef } if (typeof pieceCidInput === 'string') { - return parsePieceCID(pieceCidInput) + try { + return parse(pieceCidInput) + } catch { + return null + } } - if (isCID(pieceCidInput)) { - if (isValidPieceCID(pieceCidInput)) { - return pieceCidInput - } + if (isPieceCID(pieceCidInput)) { + return pieceCidInput } return null @@ -151,13 +117,17 @@ export function parse(pieceCid: string): PieceCID { } /** - * Check if a CID is a valid PieceCID + * Check if a CID is a valid PieceCIDv2 * @param cid - The CID to check - * @returns True if it's a valid PieceCID + * @returns True if it's a valid PieceCIDv2 */ -export function isPieceCID(cid: Link.Link): cid is PieceCID { +export function isPieceCID(cid: Link.Link | CID): cid is PieceCID { return ( - typeof cid === 'object' && CID.asCID(cid) != null && cid.code === Raw.code && cid.multihash.code === Hasher.code + typeof cid === 'object' && + CID.asCID(cid) != null && + cid.code === Raw.code && + cid.multihash.code === Hasher.code && + cid.version === 1 ) } @@ -262,104 +232,9 @@ export function createPieceCIDStream(): { export function hexToPieceCID(pieceCidHex: Hex | string): PieceCID { const pieceDataBytes = hexToBytes(pieceCidHex as Hex) const possiblePieceCID = CID.decode(pieceDataBytes) - const isValid = isValidPieceCID(possiblePieceCID) + const isValid = isPieceCID(possiblePieceCID) if (!isValid) { throw new Error(`Hex string '${pieceCidHex}' is a valid CID but not a valid PieceCID`) } return possiblePieceCID as PieceCID } - -/** - * Download data from a Response object, validate its PieceCID, and return as Uint8Array - * - * This function: - * 1. Streams data from the Response body - * 2. Calculates PieceCID during streaming - * 3. Collects all chunks into a Uint8Array - * 4. Validates the calculated PieceCID matches the expected value - * - * @param response - The Response object from a fetch() call - * @param expectedPieceCid - The expected PieceCID to validate against - * @returns The downloaded data as a Uint8Array - * @throws Error if PieceCID validation fails or download errors occur - * - * @example - * ```typescript - * const response = await fetch(url) - * const data = await downloadAndValidate(response, 'bafkzcib...') - * ``` - */ -export async function downloadAndValidate( - response: Response, - expectedPieceCid: string | PieceCID -): Promise { - // Parse and validate the expected PieceCID - const parsedPieceCid = asPieceCID(expectedPieceCid) - if (parsedPieceCid == null) { - throw new DownloadPieceError(`Invalid PieceCID: ${String(expectedPieceCid)}`) - } - - // Check response is OK - if (!response.ok) { - throw new DownloadPieceError(`Download failed: ${response.status} ${response.statusText}`) - } - - if (response.body == null) { - throw new DownloadPieceError('Response body is null') - } - - // Create PieceCID calculation stream - const { stream: pieceCidStream, getPieceCID } = createPieceCIDStream() - - // Create a stream that collects all chunks into an array - const chunks: Uint8Array[] = [] - const collectStream = new TransformStream({ - transform(chunk: Uint8Array, controller: TransformStreamDefaultController) { - chunks.push(chunk) - controller.enqueue(chunk) - }, - }) - - // Pipe the response through both streams - const pipelineStream = response.body.pipeThrough(pieceCidStream).pipeThrough(collectStream) - - // Consume the stream to completion - const reader = pipelineStream.getReader() - try { - while (true) { - const { done } = await reader.read() - if (done) break - } - } finally { - reader.releaseLock() - } - - if (chunks.length === 0) { - throw new DownloadPieceError('Response body is empty') - } - - // Get the calculated PieceCID - const calculatedPieceCid = getPieceCID() - - if (calculatedPieceCid == null) { - throw new DownloadPieceError('Failed to calculate PieceCID from stream') - } - - // Verify the PieceCID - if (calculatedPieceCid.toString() !== parsedPieceCid.toString()) { - throw new DownloadPieceError( - `PieceCID verification failed. Expected: ${String(parsedPieceCid)}, Got: ${String(calculatedPieceCid)}` - ) - } - - // Combine all chunks into a single Uint8Array - const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0) - const result = new Uint8Array(totalLength) - let offset = 0 - for (const chunk of chunks) { - result.set(chunk, offset) - offset += chunk.length - } - - return result -} diff --git a/packages/synapse-core/src/piece/resolve-piece-url.ts b/packages/synapse-core/src/piece/resolve-piece-url.ts new file mode 100644 index 00000000..c945c048 --- /dev/null +++ b/packages/synapse-core/src/piece/resolve-piece-url.ts @@ -0,0 +1,208 @@ +import { request } from 'iso-web/http' +import pLocate from 'p-locate' +import pSome from 'p-some' +import type { Address, Chain, Client, Transport } from 'viem' +import { asChain } from '../chains.ts' +import { findPiece } from '../sp/sp.ts' +import type { PDPProvider } from '../sp-registry/types.ts' +import { createPieceUrlPDP } from '../utils/piece-url.ts' +import { getPdpDataSets } from '../warm-storage/get-pdp-data-sets.ts' +import type { PieceCID } from './piece.ts' + +export namespace resolvePieceUrl { + export type ResolverFnType = (options: ResolverFnOptionsType) => Promise + export type OptionsType = { + /** The client to use to resolve the piece URL. */ + client: Client + /** The address of the user. */ + address: Address + /** The piece CID to resolve. */ + pieceCid: PieceCID + /** The signal to abort the request. */ + signal?: AbortSignal + /** The resolvers to use to resolve the piece URL. Defaults to {@link defaultResolvers}. */ + resolvers?: ResolverFnType[] + } + export type ResolverFnOptionsType = Omit + + export type OutputType = string + export type ErrorType = AggregateError +} + +/** + * The default resolvers to use when resolving the piece URL + */ +export const defaultResolvers: resolvePieceUrl.ResolverFnType[] = [filbeamResolver, chainResolver] + +/** + * Resolve the piece URL from the available resolvers + * + * @param options - {@link resolvePieceUrl.OptionsType} + * @returns The piece URL or throws an error if no URL is found + * @throws Errors {@link AggregateError} If no URL is found + * + * @example + * ```ts + * import { resolvePieceUrl } from '@filoz/synapse-core/piece' + * import { getApprovedPDPProviders } from '@filoz/synapse-core/sp-registry' + * const providers = await getApprovedPDPProviders(client) + * + * const pieceUrl = await resolvePieceUrl({ + * client: client, + * address: '0x1234567890123456789012345678901234567890', + * pieceCid: 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy', + * resolvers: [filbeamResolver, chainResolver, providersResolver(providers)], + * }) + * console.log(pieceUrl) // https://0x1234567890123456789012345678901234567890.mainnet.filbeam.io/bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy + */ +export async function resolvePieceUrl(options: resolvePieceUrl.OptionsType): Promise { + const { address, client, pieceCid, signal, resolvers = defaultResolvers } = options + asChain(client.chain) + + const controller = new AbortController() + const _signal = signal ? AbortSignal.any([controller.signal, signal]) : controller.signal + + const result = await pSome( + resolvers.map((resolver) => resolver({ address, client, pieceCid, signal: _signal })), + { count: 1 } + ) + controller.abort() + return result[0] +} + +/** + * Resolve the piece URL from the FilBeam CDN + * + * @param options - {@link resolvePieceUrl.ResolverFnOptionsType} + * @returns The piece URL + * @throws Errors {@link Error} If FilBeam is not supported on this chain + * + * @example + * ```ts + * import { filbeamResolver } from '@filoz/synapse-core/piece' + * const pieceUrl = await filbeamResolver({ + * address: '0x1234567890123456789012345678901234567890', + * pieceCid: 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy', + * client: client, + * }) + */ +export async function filbeamResolver(options: resolvePieceUrl.ResolverFnOptionsType): Promise { + const { address, pieceCid, signal } = options + const chain = asChain(options.client.chain) + if (chain.filbeam == null) { + throw new Error('FilBeam not supported on this chain') + } + const url = `https://${address}.${chain.filbeam.retrievalDomain}/${pieceCid.toString()}` + const result = await request.head(url, { + signal, + }) + if (result.error) { + throw result.error + } + return url +} + +/** + * Resolve the piece URL from the chain + * + * @param options - {@link resolvePieceUrl.ResolverFnOptionsType} + * @returns The piece URL + * @throws Errors {@link Error} If no provider found + * + * @example + * ```ts + * import { chainResolver } from '@filoz/synapse-core/piece' + * const pieceUrl = await chainResolver({ + * address: '0x1234567890123456789012345678901234567890', + * pieceCid: 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy', + * client: client, + * }) + */ +export async function chainResolver(options: resolvePieceUrl.ResolverFnOptionsType): Promise { + const { address, client, pieceCid, signal } = options + const dataSets = await getPdpDataSets(client, { + address, + }) + + const providersById = dataSets.reduce((acc, dataSet) => { + if (dataSet.live && dataSet.managed && dataSet.pdpEndEpoch === 0n) { + acc.set(dataSet.providerId, dataSet.provider) + } + return acc + }, new Map()) + const providers = [...providersById.values()] + + const result = await findPieceOnProviders(providers, pieceCid, signal) + if (result == null) { + throw new Error('No provider found') + } + return createPieceUrlPDP({ + cid: pieceCid.toString(), + serviceURL: result.pdp.serviceURL, + }) +} + +/** + * Resolve the piece URL from the providers + * + * @param providers - {@link PDPProvider[]} + * @returns A resolver function that resolves the piece URL from the providers + * @throws Errors {@link Error} If no provider found + * + * @example + * ```ts + * import { providersResolver } from '@filoz/synapse-core/piece' + * const resolver = providersResolver(providers) + * const pieceUrl = await resolver({ + * pieceCid: 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy', + * }) + */ +export function providersResolver(providers: PDPProvider[]): resolvePieceUrl.ResolverFnType { + return async (options: resolvePieceUrl.ResolverFnOptionsType) => { + const { pieceCid, signal } = options + const result = await findPieceOnProviders(providers, pieceCid, signal) + if (result == null) { + throw new Error('No provider found') + } + + return createPieceUrlPDP({ + cid: pieceCid.toString(), + serviceURL: result.pdp.serviceURL, + }) + } +} + +/** + * Find the piece on the providers + * + * @param providers - {@link PDPProvider[]} + * @param pieceCid - {@link PieceCID} + * @param signal - {@link AbortSignal} + * @returns The piece URL + */ +export async function findPieceOnProviders(providers: PDPProvider[], pieceCid: PieceCID, signal?: AbortSignal) { + const controller = new AbortController() + const _signal = signal ? AbortSignal.any([controller.signal, signal]) : controller.signal + + const result = await pLocate( + providers.map((p) => + findPiece({ + serviceURL: p.pdp.serviceURL, + pieceCid, + signal: _signal, + }).then( + () => p, + () => null + ) + ), + (p) => { + if (p !== null) { + controller.abort() + return true + } + return false + }, + { concurrency: 5 } + ) + return result +} diff --git a/packages/synapse-core/src/sp-registry/get-pdp-provider.ts b/packages/synapse-core/src/sp-registry/get-pdp-provider.ts index 32a0dcb5..e4de5742 100644 --- a/packages/synapse-core/src/sp-registry/get-pdp-provider.ts +++ b/packages/synapse-core/src/sp-registry/get-pdp-provider.ts @@ -3,6 +3,7 @@ import type { Address, Chain, Client, ContractFunctionReturnType, Transport } fr import type { serviceProviderRegistry as serviceProviderRegistryAbi } from '../abis/index.ts' import type { ActionCallChain } from '../types.ts' import { decodePDPOffering } from '../utils/pdp-capabilities.ts' +import { getProviderIdByAddress } from './get-provider-id-by-address.ts' import { getProviderWithProduct, getProviderWithProductCall } from './get-provider-with-product.ts' import { type PDPProvider, PRODUCTS } from './types.ts' @@ -122,3 +123,56 @@ export function parsePDPProvider(data: getPDPProvider.ContractOutputType): PDPPr pdp: decodePDPOffering(data), } } + +export namespace getPDPProviderByAddress { + export type OptionsType = { + /** The provider address. */ + address: Address + /** Service Provider Registry contract address. If not provided, the default is the contract address for the chain. */ + contractAddress?: Address + } + export type OutputType = PDPProvider | null + export type ErrorType = getProviderIdByAddress.ErrorType | getPDPProvider.ErrorType +} + +/** + * Get PDP provider by address + * + * @param client - The client to use to get the provider. + * @param options - {@link getPDPProviderByAddress.OptionsType} + * @returns The PDP provider {@link getPDPProviderByAddress.OutputType} + * @throws Errors {@link getPDPProviderByAddress.ErrorType} + * + * @example + * ```ts + * import { getPDPProviderByAddress } from '@filoz/synapse-core/sp-registry' + * import { createPublicClient, http } from 'viem' + * import { calibration } from '@filoz/synapse-core/chains' + * + * const client = createPublicClient({ + * chain: calibration, + * transport: http(), + * }) + * + * const provider = await getPDPProviderByAddress(client, { + * address: '0x1234567890123456789012345678901234567890', + * }) + * + * console.log(provider.name) + * ``` + */ +export async function getPDPProviderByAddress( + client: Client, + options: getPDPProviderByAddress.OptionsType +): Promise { + const providerId = await getProviderIdByAddress(client, { + providerAddress: options.address, + contractAddress: options.contractAddress, + }) + + if (providerId === 0n) { + return null + } + + return getPDPProvider(client, { providerId, contractAddress: options.contractAddress }) +} diff --git a/packages/synapse-core/src/sp/add-pieces.ts b/packages/synapse-core/src/sp/add-pieces.ts index 25a4b810..9022c8d1 100644 --- a/packages/synapse-core/src/sp/add-pieces.ts +++ b/packages/synapse-core/src/sp/add-pieces.ts @@ -1,6 +1,6 @@ import type { Account, Chain, Client, Transport } from 'viem' import { AtLeastOnePieceRequiredError } from '../errors/warm-storage.ts' -import type { PieceCID } from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' import { signAddPieces } from '../typed-data/sign-add-pieces.ts' import { type MetadataObject, pieceMetadataObjectToEntry } from '../utils/metadata.ts' import * as PDP from './sp.ts' diff --git a/packages/synapse-core/src/sp/data-sets.ts b/packages/synapse-core/src/sp/data-sets.ts index 3be600d2..d93a8a68 100644 --- a/packages/synapse-core/src/sp/data-sets.ts +++ b/packages/synapse-core/src/sp/data-sets.ts @@ -1,6 +1,6 @@ import type { Account, Address, Chain, Client, Transport } from 'viem' import { asChain, getChain } from '../chains.ts' -import type { PieceCID } from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' import { signCreateDataSet } from '../typed-data/sign-create-dataset.ts' import { signCreateDataSetAndAddPieces } from '../typed-data/sign-create-dataset-add-pieces.ts' import { datasetMetadataObjectToEntry, type MetadataObject, pieceMetadataObjectToEntry } from '../utils/metadata.ts' diff --git a/packages/synapse-core/src/sp/index.ts b/packages/synapse-core/src/sp/index.ts index e6c32834..aec4e097 100644 --- a/packages/synapse-core/src/sp/index.ts +++ b/packages/synapse-core/src/sp/index.ts @@ -15,7 +15,7 @@ export * from './data-sets.ts' export * from './get-data-set.ts' export * from './schedule-piece-deletion.ts' export type { deletePiece, UploadPieceStreamingData } from './sp.ts' -export { downloadPiece, findPiece, ping, uploadPiece, uploadPieceStreaming } from './sp.ts' +export { findPiece, ping, uploadPiece, uploadPieceStreaming } from './sp.ts' export * from './upload.ts' export * from './wait-for-add-pieces.ts' export * from './wait-for-create-dataset.ts' diff --git a/packages/synapse-core/src/sp/sp.ts b/packages/synapse-core/src/sp/sp.ts index 13349036..4220b98f 100644 --- a/packages/synapse-core/src/sp/sp.ts +++ b/packages/synapse-core/src/sp/sp.ts @@ -12,8 +12,8 @@ import { PostPieceError, UploadPieceError, } from '../errors/pdp.ts' -import type { PieceCID } from '../piece.ts' -import * as Piece from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' +import * as Piece from '../piece/piece.ts' import type * as TypedData from '../typed-data/index.ts' import { RETRY_CONSTANTS, SIZE_CONSTANTS } from '../utils/constants.ts' import { createPieceUrlPDP } from '../utils/piece-url.ts' @@ -621,24 +621,3 @@ export namespace downloadPiece { export type ReturnType = Uint8Array export type ErrorType = DownloadPieceError | TimeoutError | NetworkError | AbortError } - -/** - * Download a piece and verify from the PDP API. - * - * GET /piece/{pieceCid} - * - * @param options - {@link downloadPiece.OptionsType} - * @returns Data {@link downloadPiece.ReturnType} - * @throws Errors {@link downloadPiece.ErrorType} - */ -export async function downloadPiece(options: downloadPiece.OptionsType): Promise { - const url = createPieceUrlPDP({ cid: options.pieceCid.toString(), serviceURL: options.serviceURL }) - const response = await request.get(url) - if (response.error) { - if (HttpError.is(response.error)) { - throw new DownloadPieceError(await response.error.response.text()) - } - throw response.error - } - return await Piece.downloadAndValidate(response.result, options.pieceCid) -} diff --git a/packages/synapse-core/src/sp/upload.ts b/packages/synapse-core/src/sp/upload.ts index 1a77c862..edb96658 100644 --- a/packages/synapse-core/src/sp/upload.ts +++ b/packages/synapse-core/src/sp/upload.ts @@ -1,7 +1,7 @@ import type { Account, Chain, Client, Transport } from 'viem' import { asChain } from '../chains.ts' import { DataSetNotFoundError } from '../errors/warm-storage.ts' -import * as Piece from '../piece.ts' +import * as Piece from '../piece/piece.ts' import { signAddPieces } from '../typed-data/sign-add-pieces.ts' import { pieceMetadataObjectToEntry } from '../utils/metadata.ts' import { createPieceUrl } from '../utils/piece-url.ts' diff --git a/packages/synapse-core/src/typed-data/sign-add-pieces.ts b/packages/synapse-core/src/typed-data/sign-add-pieces.ts index f3f8cd4d..1d3f9597 100644 --- a/packages/synapse-core/src/typed-data/sign-add-pieces.ts +++ b/packages/synapse-core/src/typed-data/sign-add-pieces.ts @@ -12,7 +12,7 @@ import { } from 'viem' import { signTypedData } from 'viem/actions' import { asChain } from '../chains.ts' -import type { PieceCID } from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' import { randU256 } from '../utils/rand.ts' import { EIP712Types, getStorageDomain, type MetadataEntry } from './type-definitions.ts' diff --git a/packages/synapse-core/src/typed-data/sign-create-dataset-add-pieces.ts b/packages/synapse-core/src/typed-data/sign-create-dataset-add-pieces.ts index 0a65d766..4b6f298f 100644 --- a/packages/synapse-core/src/typed-data/sign-create-dataset-add-pieces.ts +++ b/packages/synapse-core/src/typed-data/sign-create-dataset-add-pieces.ts @@ -8,7 +8,7 @@ import { type Hex, type Transport, } from 'viem' -import type { PieceCID } from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' import { randU256 } from '../utils/rand.ts' import { signAddPieces } from './sign-add-pieces.ts' import { signCreateDataSet } from './sign-create-dataset.ts' diff --git a/packages/synapse-core/src/utils/schemas.ts b/packages/synapse-core/src/utils/schemas.ts index 263dfe5f..0d7d5548 100644 --- a/packages/synapse-core/src/utils/schemas.ts +++ b/packages/synapse-core/src/utils/schemas.ts @@ -1,6 +1,6 @@ import { type Hex, isHex } from 'viem' import * as z from 'zod' -import { isPieceCID, type PieceCID, parse } from '../piece.ts' +import { isPieceCID, type PieceCID, parse } from '../piece/piece.ts' export const zHex = z.custom((val) => { return typeof val === 'string' ? isHex(val) : false diff --git a/packages/synapse-core/src/warm-storage/types.ts b/packages/synapse-core/src/warm-storage/types.ts index c58f566d..e1cbf1b0 100644 --- a/packages/synapse-core/src/warm-storage/types.ts +++ b/packages/synapse-core/src/warm-storage/types.ts @@ -1,5 +1,5 @@ import type { Address } from 'viem' -import type { PieceCID } from '../piece.ts' +import type { PieceCID } from '../piece/piece.ts' import type { PDPProvider } from '../sp-registry/types.ts' import type { MetadataObject } from '../utils/metadata.ts' diff --git a/packages/synapse-core/test/download-piece.ts b/packages/synapse-core/test/download-piece.ts new file mode 100644 index 00000000..9430c7fc --- /dev/null +++ b/packages/synapse-core/test/download-piece.ts @@ -0,0 +1,206 @@ +import { assert } from 'chai' +import { setup } from 'iso-web/msw' +import { HttpResponse, http } from 'msw' +import { DownloadPieceError } from '../src/errors/pdp.ts' +import { downloadAndValidate } from '../src/piece/download.ts' +import * as Piece from '../src/piece/piece.ts' +import {} from '../src/sp/sp.ts' + +describe('Piece download and validation', () => { + const server = setup() + + before(async () => { + await server.start() + }) + + after(() => { + server.stop() + }) + + beforeEach(() => { + server.resetHandlers() + }) + + describe('downloadAndValidate', () => { + it('should successfully download and verify piece', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + return HttpResponse.arrayBuffer(testData.buffer) + }) + ) + + const result = await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.deepEqual(result, testData) + }) + + it('should throw on download failure (404)', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + return HttpResponse.text('Not Found', { + status: 404, + }) + }) + ) + + try { + await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.fail('Should have thrown error') + } catch (error) { + assert.instanceOf(error, DownloadPieceError) + assert.include(error.message, 'Failed to download piece') + } + }) + + it('should throw on server error (500)', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + return HttpResponse.text('Internal Server Error', { + status: 500, + }) + }) + ) + + try { + await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.fail('Should have thrown error') + } catch (error) { + assert.instanceOf(error, DownloadPieceError) + assert.include(error.message, 'Failed to download piece') + } + }) + + it('should throw on PieceCID verification failure', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + const wrongData = new Uint8Array([9, 9, 9, 9]) // Different data + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + return HttpResponse.arrayBuffer(wrongData.buffer) + }) + ) + + try { + await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.fail('Should have thrown error') + } catch (error) { + assert.instanceOf(error, DownloadPieceError) + assert.include(error.message, 'PieceCID verification failed') + } + }) + + it('should handle null response body', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + return new HttpResponse() + }) + ) + + try { + await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.fail('Should have thrown error') + } catch (error) { + assert.instanceOf(error, DownloadPieceError) + // Accept either error message as HttpResponse() behaves differently in Node vs browser + assert.match(error.message, /Response body is (null|empty)/) + } + }) + + it('should correctly stream and verify chunked data', async () => { + const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + // Split test data into chunks + const chunk1 = testData.slice(0, 4) + const chunk2 = testData.slice(4) + + // Create readable stream that emits chunks + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(chunk1) + // Small delay to simulate network + await new Promise((resolve) => setTimeout(resolve, 10)) + controller.enqueue(chunk2) + controller.close() + }, + }) + return new HttpResponse(stream, { + status: 200, + }) + }) + ) + + const result = await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + // Verify we got all the data correctly reassembled + assert.deepEqual(result, testData) + }) + + it('should handle large chunked downloads', async () => { + // Create larger test data (1KB) + const testData = new Uint8Array(1024) + for (let i = 0; i < testData.length; i++) { + testData[i] = i % 256 + } + const pieceCid = Piece.calculate(testData) + + server.use( + http.get('http://pdp.local/piece/:pieceCid', () => { + // Create readable stream that emits in 128-byte chunks + const chunkSize = 128 + let offset = 0 + + const stream = new ReadableStream({ + async pull(controller) { + if (offset >= testData.length) { + controller.close() + return + } + const chunk = testData.slice(offset, Math.min(offset + chunkSize, testData.length)) + offset += chunkSize + controller.enqueue(chunk) + }, + }) + return new HttpResponse(stream, { status: 200 }) + }) + ) + + const result = await downloadAndValidate({ + url: `http://pdp.local/piece/${pieceCid.toString()}`, + expectedPieceCid: pieceCid, + }) + assert.deepEqual(result, testData) + }) + }) +}) diff --git a/packages/synapse-core/test/get-active-pieces.test.ts b/packages/synapse-core/test/get-active-pieces.test.ts index 1c203ffd..1d6b0020 100644 --- a/packages/synapse-core/test/get-active-pieces.test.ts +++ b/packages/synapse-core/test/get-active-pieces.test.ts @@ -4,7 +4,7 @@ import { createPublicClient, http } from 'viem' import { calibration, mainnet } from '../src/chains.ts' import { JSONRPC, presets } from '../src/mocks/jsonrpc/index.ts' import { getActivePieces, getActivePiecesCall } from '../src/pdp-verifier/get-active-pieces.ts' -import * as Piece from '../src/piece.ts' +import * as Piece from '../src/piece/piece.ts' describe('getActivePieces', () => { const server = setup() diff --git a/packages/synapse-core/test/piece.test.ts b/packages/synapse-core/test/piece.test.ts index 7065d398..1ccf30c2 100644 --- a/packages/synapse-core/test/piece.test.ts +++ b/packages/synapse-core/test/piece.test.ts @@ -2,6 +2,11 @@ * Basic tests for PieceCID utilities */ +import type { API } from '@web3-storage/data-segment' +import { Size, toLink } from '@web3-storage/data-segment/piece' +import { assert } from 'chai' +import { CID } from 'multiformats/cid' +import { bytesToHex } from 'viem/utils' import { asPieceCID, calculate, @@ -9,12 +14,7 @@ import { getSizeFromPieceCID, hexToPieceCID, type PieceCID, -} from '@filoz/synapse-core/piece' -import type { API } from '@web3-storage/data-segment' -import { Size, toLink } from '@web3-storage/data-segment/piece' -import { assert } from 'chai' -import { CID } from 'multiformats/cid' -import { bytesToHex } from 'viem/utils' +} from '../src/piece/index.ts' // https://github.com/filecoin-project/go-fil-commp-hashhash/blob/master/testdata/zero.txt const zeroPieceCidFixture = ` diff --git a/packages/synapse-core/test/resolve-piece-url.test.ts b/packages/synapse-core/test/resolve-piece-url.test.ts new file mode 100644 index 00000000..61184203 --- /dev/null +++ b/packages/synapse-core/test/resolve-piece-url.test.ts @@ -0,0 +1,375 @@ +import assert from 'assert' +import { setup } from 'iso-web/msw' +import { HttpResponse, http } from 'msw' +import { createPublicClient, http as viemHttp } from 'viem' +import { calibration, devnet } from '../src/chains.ts' +import { ADDRESSES, JSONRPC, presets } from '../src/mocks/jsonrpc/index.ts' +import * as Piece from '../src/piece/piece.ts' +import { + chainResolver, + filbeamResolver, + findPieceOnProviders, + providersResolver, + resolvePieceUrl, +} from '../src/piece/resolve-piece-url.ts' +import type { PDPProvider } from '../src/sp-registry/types.ts' + +describe('resolve-piece-url', () => { + const server = setup() + const client = createPublicClient({ + chain: calibration, + transport: viemHttp(), + }) + const pieceCidString = 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy' + const pieceCid = Piece.parse(pieceCidString) + const expectedPdpUrl = `https://pdp.example.com/piece/${pieceCidString}` + + function createProvider(serviceURL: string, id: bigint = 1n): PDPProvider { + return { + id, + serviceProvider: ADDRESSES.serviceProvider1, + payee: ADDRESSES.payee1, + isActive: true, + name: `provider-${id}`, + description: 'test provider', + pdp: { + serviceURL, + minPieceSizeInBytes: 127n, + maxPieceSizeInBytes: 1024n * 1024n * 1024n, + storagePricePerTibPerDay: 1n, + minProvingPeriodInEpochs: 1n, + location: 'US', + paymentTokenAddress: ADDRESSES.calibration.usdfcToken, + ipniPiece: false, + ipniIpfs: false, + }, + } + } + + before(async () => { + await server.start() + }) + + after(() => { + server.stop() + }) + + beforeEach(() => { + server.resetHandlers() + }) + + describe('resolvePieceUrl', () => { + it('returns the first successful resolver result', async () => { + let aborted = false + + const fastResolver = async () => expectedPdpUrl + const slowResolver = async ({ signal }: resolvePieceUrl.ResolverFnOptionsType) => { + return await new Promise((resolve, reject) => { + signal?.addEventListener('abort', () => { + aborted = true + reject(new Error('aborted')) + }) + setTimeout(() => resolve('https://slow.example/piece'), 100) + }) + } + + const url = await resolvePieceUrl({ + client, + address: ADDRESSES.client1, + pieceCid, + resolvers: [fastResolver, slowResolver], + }) + + assert.equal(url, expectedPdpUrl) + assert.equal(aborted, true) + }) + + it('throws AggregateError when all resolvers fail', async () => { + const failA = async () => { + throw new Error('resolver a failed') + } + const failB = async () => { + throw new Error('resolver b failed') + } + + await assert.rejects( + resolvePieceUrl({ + client, + address: ADDRESSES.client1, + pieceCid, + resolvers: [failA, failB], + }), + AggregateError + ) + }) + + it('uses defaultResolvers and falls back from FilBeam to chain resolver', async () => { + const filbeamUrl = `https://${ADDRESSES.client1}.${calibration.filbeam?.retrievalDomain}/${pieceCidString}` + + server.use( + JSONRPC(presets.basic), + http.head(filbeamUrl, () => HttpResponse.text('not found', { status: 404 })), + http.get('https://pdp.example.com/pdp/piece', ({ request }) => { + const url = new URL(request.url) + return HttpResponse.json({ pieceCid: url.searchParams.get('pieceCid') }, { status: 200 }) + }) + ) + + const result = await resolvePieceUrl({ + client, + address: ADDRESSES.client1, + pieceCid, + }) + + assert.equal(result, expectedPdpUrl) + }) + + it('throws AggregateError when defaultResolvers all fail', async () => { + const filbeamUrl = `https://${ADDRESSES.client1}.${calibration.filbeam?.retrievalDomain}/${pieceCidString}` + + server.use( + JSONRPC({ + ...presets.basic, + warmStorageView: { + ...presets.basic.warmStorageView, + getClientDataSets: () => [[]], + }, + }), + http.head(filbeamUrl, () => HttpResponse.text('not found', { status: 404 })) + ) + + await assert.rejects( + resolvePieceUrl({ + client, + address: ADDRESSES.client1, + pieceCid, + }), + AggregateError + ) + }) + }) + + describe('filbeamResolver', () => { + it('returns filbeam URL when HEAD succeeds', async () => { + const url = `https://${ADDRESSES.client1}.${calibration.filbeam?.retrievalDomain}/${pieceCidString}` + server.use( + http.head(url, () => { + return new HttpResponse(null, { status: 200 }) + }) + ) + + const result = await filbeamResolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }) + assert.equal(result, url) + }) + + it('throws when chain does not support FilBeam', async () => { + const devnetClient = createPublicClient({ + chain: devnet, + transport: viemHttp(), + }) + + await assert.rejects( + filbeamResolver({ + client: devnetClient, + address: ADDRESSES.client1, + pieceCid, + }), + /FilBeam not supported on this chain/ + ) + }) + }) + + describe('pingProviders', () => { + it('returns first provider with piece found', async () => { + const providers: PDPProvider[] = [ + createProvider('https://missing.example.com/'), + createProvider('https://pdp.example.com/', 2n), + ] + + server.use( + http.get('https://missing.example.com/pdp/piece', () => HttpResponse.text('not found', { status: 404 })), + http.get('https://pdp.example.com/pdp/piece', ({ request }) => { + const url = new URL(request.url) + return HttpResponse.json({ pieceCid: url.searchParams.get('pieceCid') }, { status: 200 }) + }) + ) + + const result = await findPieceOnProviders(providers, pieceCid) + assert.ok(result) + assert.equal(result?.id, 2n) + }) + + it('returns undefined when no provider has the piece', async () => { + const providers: PDPProvider[] = [createProvider('https://missing.example.com/')] + + server.use( + http.get('https://missing.example.com/pdp/piece', () => HttpResponse.text('not found', { status: 404 })) + ) + + const result = await findPieceOnProviders(providers, pieceCid) + assert.equal(result, undefined) + }) + }) + + describe('providersResolver', () => { + it('returns serviceURL when a provider contains the piece', async () => { + const providers: PDPProvider[] = [createProvider('https://pdp.example.com/', 5n)] + server.use( + http.get('https://pdp.example.com/pdp/piece', ({ request }) => { + const url = new URL(request.url) + return HttpResponse.json({ pieceCid: url.searchParams.get('pieceCid') }, { status: 200 }) + }) + ) + + const resolver = providersResolver(providers) + const result = await resolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }) + assert.equal(result, `https://pdp.example.com/piece/${pieceCid.toString()}`) + }) + + it('throws when no provider has the piece', async () => { + const providers: PDPProvider[] = [createProvider('https://missing.example.com/')] + server.use( + http.get('https://missing.example.com/pdp/piece', () => HttpResponse.text('not found', { status: 404 })) + ) + + const resolver = providersResolver(providers) + await assert.rejects( + resolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }), + /No provider found/ + ) + }) + }) + + describe('chainResolver', () => { + it('resolves piece URL from on-chain provider list', async () => { + server.use( + JSONRPC(presets.basic), + http.get('https://pdp.example.com/pdp/piece', ({ request }) => { + const url = new URL(request.url) + return HttpResponse.json({ pieceCid: url.searchParams.get('pieceCid') }, { status: 200 }) + }) + ) + + const result = await chainResolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }) + + assert.equal(result, expectedPdpUrl) + }) + + it('throws when client has no active managed data sets', async () => { + server.use( + JSONRPC({ + ...presets.basic, + warmStorageView: { + ...presets.basic.warmStorageView, + getClientDataSets: () => [[]], + }, + }) + ) + + await assert.rejects( + chainResolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }), + /No provider found/ + ) + }) + + it('ignores non-live, unmanaged, and expired data sets', async () => { + server.use( + JSONRPC({ + ...presets.basic, + warmStorageView: { + ...presets.basic.warmStorageView, + getClientDataSets: () => [ + [ + { + pdpRailId: 1n, + cacheMissRailId: 0n, + cdnRailId: 0n, + payer: ADDRESSES.client1, + payee: ADDRESSES.serviceProvider1, + serviceProvider: ADDRESSES.serviceProvider1, + commissionBps: 100n, + clientDataSetId: 0n, + pdpEndEpoch: 0n, + providerId: 1n, + cdnEndEpoch: 0n, + dataSetId: 1n, + }, + { + pdpRailId: 2n, + cacheMissRailId: 0n, + cdnRailId: 0n, + payer: ADDRESSES.client1, + payee: ADDRESSES.serviceProvider2, + serviceProvider: ADDRESSES.serviceProvider2, + commissionBps: 100n, + clientDataSetId: 1n, + pdpEndEpoch: 0n, + providerId: 2n, + cdnEndEpoch: 0n, + dataSetId: 2n, + }, + { + pdpRailId: 3n, + cacheMissRailId: 0n, + cdnRailId: 0n, + payer: ADDRESSES.client1, + payee: ADDRESSES.serviceProvider1, + serviceProvider: ADDRESSES.serviceProvider1, + commissionBps: 100n, + clientDataSetId: 2n, + pdpEndEpoch: 1n, + providerId: 1n, + cdnEndEpoch: 0n, + dataSetId: 3n, + }, + ], + ], + }, + pdpVerifier: { + ...presets.basic.pdpVerifier, + dataSetLive: (args) => { + const [dataSetId] = args + return [dataSetId === 2n] + }, + getDataSetListener: (args) => { + const [dataSetId] = args + if (dataSetId === 2n) { + return [ADDRESSES.zero] + } + return [ADDRESSES.calibration.warmStorage] + }, + }, + }) + ) + + await assert.rejects( + chainResolver({ + client, + address: ADDRESSES.client1, + pieceCid, + }), + /No provider found/ + ) + }) + }) +}) diff --git a/packages/synapse-core/test/sp.test.ts b/packages/synapse-core/test/sp.test.ts index 6daa432f..ed9dfb2f 100644 --- a/packages/synapse-core/test/sp.test.ts +++ b/packages/synapse-core/test/sp.test.ts @@ -8,7 +8,6 @@ import { AddPiecesError, CreateDataSetError, DeletePieceError, - DownloadPieceError, FindPieceError, GetDataSetError, InvalidUploadSizeError, @@ -28,14 +27,13 @@ import { uploadPieceHandler, uploadPieceStreamingHandler, } from '../src/mocks/pdp.ts' -import * as Piece from '../src/piece.ts' +import * as Piece from '../src/piece/piece.ts' import { getDataSet, TimeoutError, waitForAddPieces } from '../src/sp/index.ts' import { addPieces, createDataSet, createDataSetAndAddPieces, deletePiece, - downloadPiece, findPiece, uploadPiece, uploadPieceStreaming, @@ -1450,187 +1448,4 @@ InvalidSignature(address expected, address actual) }) }) }) - - describe('downloadPiece', () => { - it('should successfully download and verify piece', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - return HttpResponse.arrayBuffer(testData.buffer) - }) - ) - - const result = await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.deepEqual(result, testData) - }) - - it('should throw on download failure (404)', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - return HttpResponse.text('Not Found', { - status: 404, - }) - }) - ) - - try { - await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.fail('Should have thrown error') - } catch (error) { - assert.instanceOf(error, DownloadPieceError) - assert.include(error.message, 'Failed to download piece') - } - }) - - it('should throw on server error (500)', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - return HttpResponse.text('Internal Server Error', { - status: 500, - }) - }) - ) - - try { - await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.fail('Should have thrown error') - } catch (error) { - assert.instanceOf(error, DownloadPieceError) - assert.include(error.message, 'Failed to download piece') - } - }) - - it('should throw on PieceCID verification failure', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - const wrongData = new Uint8Array([9, 9, 9, 9]) // Different data - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - return HttpResponse.arrayBuffer(wrongData.buffer) - }) - ) - - try { - await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.fail('Should have thrown error') - } catch (error) { - assert.instanceOf(error, DownloadPieceError) - assert.include(error.message, 'PieceCID verification failed') - } - }) - - it('should handle null response body', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - return new HttpResponse() - }) - ) - - try { - await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.fail('Should have thrown error') - } catch (error) { - assert.instanceOf(error, DownloadPieceError) - // Accept either error message as HttpResponse() behaves differently in Node vs browser - assert.match(error.message, /Response body is (null|empty)/) - } - }) - - it('should correctly stream and verify chunked data', async () => { - const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]) - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - // Split test data into chunks - const chunk1 = testData.slice(0, 4) - const chunk2 = testData.slice(4) - - // Create readable stream that emits chunks - const stream = new ReadableStream({ - async start(controller) { - controller.enqueue(chunk1) - // Small delay to simulate network - await new Promise((resolve) => setTimeout(resolve, 10)) - controller.enqueue(chunk2) - controller.close() - }, - }) - return new HttpResponse(stream, { - status: 200, - }) - }) - ) - - const result = await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - // Verify we got all the data correctly reassembled - assert.deepEqual(result, testData) - }) - - it('should handle large chunked downloads', async () => { - // Create larger test data (1KB) - const testData = new Uint8Array(1024) - for (let i = 0; i < testData.length; i++) { - testData[i] = i % 256 - } - const pieceCid = Piece.calculate(testData) - - server.use( - http.get('http://pdp.local/piece/:pieceCid', () => { - // Create readable stream that emits in 128-byte chunks - const chunkSize = 128 - let offset = 0 - - const stream = new ReadableStream({ - async pull(controller) { - if (offset >= testData.length) { - controller.close() - return - } - const chunk = testData.slice(offset, Math.min(offset + chunkSize, testData.length)) - offset += chunkSize - controller.enqueue(chunk) - }, - }) - return new HttpResponse(stream, { status: 200 }) - }) - ) - - const result = await downloadPiece({ - serviceURL: 'http://pdp.local', - pieceCid, - }) - assert.deepEqual(result, testData) - }) - }) }) diff --git a/packages/synapse-core/test/typed-data.test.ts b/packages/synapse-core/test/typed-data.test.ts index f5c8e441..1ee8e42a 100644 --- a/packages/synapse-core/test/typed-data.test.ts +++ b/packages/synapse-core/test/typed-data.test.ts @@ -2,7 +2,7 @@ import { assert } from 'chai' import { type Address, createWalletClient, decodeAbiParameters, type Hex, http, parseSignature } from 'viem' import { privateKeyToAccount } from 'viem/accounts' import * as Chains from '../src/chains.ts' -import * as Piece from '../src/piece.ts' +import * as Piece from '../src/piece/piece.ts' import * as TypedData from '../src/typed-data/index.ts' import { getStorageDomain } from '../src/typed-data/type-definitions.ts' diff --git a/packages/synapse-core/tsconfig.json b/packages/synapse-core/tsconfig.json index ee447457..2f18d71d 100644 --- a/packages/synapse-core/tsconfig.json +++ b/packages/synapse-core/tsconfig.json @@ -23,7 +23,7 @@ "src/warm-storage/index.ts", "src/chains.ts", "src/index.ts", - "src/piece.ts", + "src/piece/index.ts", "src/sp/index.ts", "src/usdfc.ts" ] diff --git a/packages/synapse-sdk/src/retriever/chain.ts b/packages/synapse-sdk/src/retriever/chain.ts deleted file mode 100644 index d050e184..00000000 --- a/packages/synapse-sdk/src/retriever/chain.ts +++ /dev/null @@ -1,147 +0,0 @@ -/** - * ChainRetriever - Queries on-chain data to find and retrieve pieces - * - * This retriever uses the Warm Storage service to find service providers - * that have the requested piece, then attempts to download from them. - */ - -import type { PDPProvider } from '@filoz/synapse-core/sp-registry' -import type { Address } from 'viem' -import type { SPRegistryService } from '../sp-registry/index.ts' -import type { PieceFetchOptions, PieceRetriever } from '../types.ts' -import { createError } from '../utils/index.ts' -import type { WarmStorageService } from '../warm-storage/index.ts' -import { fetchPiecesFromProviders } from './utils.ts' - -export interface ChainRetrieverConstructorOptions { - warmStorageService: WarmStorageService - spRegistry: SPRegistryService - childRetriever?: PieceRetriever -} - -interface FindProvidersOptions { - client: Address - providerAddress?: Address -} - -export class ChainRetriever implements PieceRetriever { - private readonly warmStorageService: WarmStorageService - private readonly childRetriever?: PieceRetriever - private readonly spRegistry: SPRegistryService - - /** - * @param options - Constructor options - * @param options.warmStorageService - Warm storage service instance - * @param options.spRegistry - Service provider registry instance - * @param options.childRetriever - Optional fallback retriever - */ - constructor(options: ChainRetrieverConstructorOptions) { - this.warmStorageService = options.warmStorageService - this.spRegistry = options.spRegistry - this.childRetriever = options.childRetriever - } - - /** - * Find providers that can serve pieces for a client - * @param options - Provider discovery options - * @param options.client - The client address - * @param options.providerAddress - Optional specific provider to use - * @returns List of provider info - */ - private async findProviders(options: FindProvidersOptions): Promise { - const { client, providerAddress } = options - - if (providerAddress != null) { - // Direct provider case - skip data set lookup entirely - const provider = await this.spRegistry.getProviderByAddress({ address: providerAddress }) - if (provider == null) { - throw createError('ChainRetriever', 'findProviders', `Provider ${providerAddress} not found in registry`) - } - return [provider] - } - - // Multiple provider case - need data sets to find providers - - // Get client's data sets with details - const dataSets = await this.warmStorageService.getClientDataSetsWithDetails({ address: client }) - - // Filter for live data sets with pieces - const validDataSets = dataSets.filter((ds) => ds.isLive && ds.activePieceCount > 0) - - if (validDataSets.length === 0) { - throw createError('ChainRetriever', 'findProviders', `No active data sets with data found for client ${client}`) - } - - // Get unique provider IDs from data sets (much more reliable than using payee addresses) - const uniqueProviderIds = [...new Set(validDataSets.map((ds) => ds.providerId))] - - // Batch fetch provider info for all unique provider IDs - const providerInfos = await this.spRegistry.getProviders({ providerIds: uniqueProviderIds }) - - // Filter out null values (providers not found in registry) - const validProviderInfos = providerInfos.filter((info): info is PDPProvider => info != null) - - if (validProviderInfos.length === 0) { - throw createError( - 'ChainRetriever', - 'findProviders', - 'No valid providers found (all providers may have been removed from registry or are inactive)' - ) - } - - return validProviderInfos - } - - /** - * Fetch a piece from on-chain discovered providers. - * @param options - Piece retrieval options - * @param options.pieceCid - The piece identifier - * @param options.client - The client address requesting the piece - * @param options.providerAddress - Optional provider address override - * @param options.withCDN - Optional CDN hint passed to child retrievers - * @param options.signal - Optional AbortSignal for request cancellation - * @returns A response containing the piece data - */ - async fetchPiece(options: PieceFetchOptions): Promise { - const { pieceCid, client, providerAddress, withCDN, signal } = options - - // Helper function to try child retriever or throw error - const tryChildOrThrow = async (reason: string, cause?: unknown): Promise => { - if (this.childRetriever !== undefined) { - return await this.childRetriever.fetchPiece({ pieceCid, client, providerAddress, withCDN, signal }) - } - throw createError( - 'ChainRetriever', - 'fetchPiece', - `Failed to retrieve piece ${pieceCid.toString()}: ${reason}`, - cause - ) - } - - // Find providers - let providersToTry: PDPProvider[] = [] - try { - providersToTry = await this.findProviders({ client, providerAddress }) - } catch (error) { - // Provider discovery failed - this is a critical error - const message = error instanceof Error ? error.message : 'Provider discovery failed' - return await tryChildOrThrow(message, error) - } - - // If no providers found, try child retriever - if (providersToTry.length === 0) { - return await tryChildOrThrow('No providers found and no additional retriever method was configured') - } - - // Try to fetch from providers - try { - return await fetchPiecesFromProviders(providersToTry, pieceCid, 'ChainRetriever', signal) - } catch (error) { - // All provider attempts failed - return await tryChildOrThrow( - 'All provider retrieval attempts failed and no additional retriever method was configured', - error - ) - } - } -} diff --git a/packages/synapse-sdk/src/retriever/filbeam.ts b/packages/synapse-sdk/src/retriever/filbeam.ts deleted file mode 100644 index bec8fa7f..00000000 --- a/packages/synapse-sdk/src/retriever/filbeam.ts +++ /dev/null @@ -1,54 +0,0 @@ -/** - * FilBeamRetriever - CDN optimization wrapper for piece retrieval - * - * This intercepts piece requests and attempts CDN retrieval before falling back - * to the base retriever. - */ - -import { asChain, type Chain } from '@filoz/synapse-core/chains' -import type { PieceFetchOptions, PieceRetriever } from '../types.ts' - -export interface FilBeamRetrieverConstructorOptions { - baseRetriever: PieceRetriever - chain: Chain -} - -export class FilBeamRetriever implements PieceRetriever { - private readonly baseRetriever: PieceRetriever - private readonly chain: Chain - - /** - * @param options - Constructor options - * @param options.baseRetriever - Base retriever used as fallback - * @param options.chain - Chain configuration for CDN resolution - */ - constructor(options: FilBeamRetrieverConstructorOptions) { - this.baseRetriever = options.baseRetriever - this.chain = asChain(options.chain) - } - - async fetchPiece(options: PieceFetchOptions): Promise { - const { pieceCid, client, withCDN, signal } = options - - if (withCDN === true && this.chain.filbeam != null) { - const cdnUrl = `https://${client}.${this.chain.filbeam.retrievalDomain}/${pieceCid.toString()}` - try { - const cdnResponse = await fetch(cdnUrl, { signal }) - if (cdnResponse.ok) { - return cdnResponse - } else if (cdnResponse.status === 402) { - console.warn( - 'CDN requires payment. Please initialise Synapse SDK with the option `withCDN: true` and re-upload your files.' - ) - } else { - console.warn('CDN fetch failed with status:', cdnResponse.status) - } - } catch (error) { - console.warn('CDN fetch failed:', error) - } - console.log('Falling back to direct retrieval') - } - - return await this.baseRetriever.fetchPiece(options) - } -} diff --git a/packages/synapse-sdk/src/retriever/index.ts b/packages/synapse-sdk/src/retriever/index.ts deleted file mode 100644 index 9a74a5de..00000000 --- a/packages/synapse-sdk/src/retriever/index.ts +++ /dev/null @@ -1,10 +0,0 @@ -/** - * PieceRetriever implementations for flexible piece fetching - * - * This module provides different strategies for retrieving pieces: - * - ChainRetriever: Queries on-chain data to find providers - * - FilBeamRetriever: CDN optimization wrapper - */ - -export { ChainRetriever } from './chain.ts' -export { FilBeamRetriever } from './filbeam.ts' diff --git a/packages/synapse-sdk/src/retriever/utils.ts b/packages/synapse-sdk/src/retriever/utils.ts deleted file mode 100644 index a6e5fafb..00000000 --- a/packages/synapse-sdk/src/retriever/utils.ts +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Utility to attempt fetching a piece from multiple providers in parallel. - */ - -import * as SP from '@filoz/synapse-core/sp' -import type { PDPProvider } from '@filoz/synapse-core/sp-registry' -import { createPieceUrlPDP } from '@filoz/synapse-core/utils' -import type { PieceCID } from '../types.ts' -import { createError } from '../utils/errors.ts' - -// Define the type for provider attempt results (internal to this function) -interface ProviderAttemptResult { - response: Response - index: number -} - -/** - * Attempt to fetch a piece from multiple providers in parallel - * @param providers - List of providers to try - * @param pieceCid - The piece to fetch - * @param retrieverName - Name of the calling retriever for error reporting - * @param signal - Optional abort signal - * @returns The first successful response - */ -export async function fetchPiecesFromProviders( - providers: PDPProvider[], - pieceCid: PieceCID, - retrieverName: string, - signal?: AbortSignal -): Promise { - // Track failures for error reporting - const failures: Array<{ provider: string; error: string }> = [] - - // Create individual abort controllers for each provider - const abortControllers: AbortController[] = [] - - const providerAttempts: Array> = providers.map(async (provider, index) => { - // Create a dedicated controller for this provider - const controller = new AbortController() - abortControllers[index] = controller - const _signal = signal ? AbortSignal.any([controller.signal, signal]) : controller.signal - - try { - // Phase 1: Check if provider has the piece - - await SP.findPiece({ - serviceURL: provider.pdp.serviceURL, - pieceCid, - signal: _signal, - }) - - // Phase 2: Provider has piece, download it - const downloadUrl = createPieceUrlPDP({ - cid: pieceCid.toString(), - serviceURL: provider.pdp.serviceURL, - }) - const response = await fetch(downloadUrl, { - signal: _signal, - }) - - if (response.ok) { - // Don't cancel here! Let Promise.race decide the winner - return { response, index } - } - - // Download failed - failures.push({ - provider: provider.serviceProvider, - error: `download returned ${response.status}`, - }) - throw new Error(`Download failed with status ${response.status}`) - } catch (error: any) { - // Log actual failures - const errorMsg = error.message ?? 'Unknown error' - if (!failures.some((f) => f.provider === provider.serviceProvider)) { - failures.push({ provider: provider.serviceProvider, error: errorMsg }) - } - if (errorMsg !== 'This operation was aborted') { - // TODO: remove this at some point, it might get noisy - console.warn(`Failed to fetch from provider ${provider.serviceProvider}:`, errorMsg) - } - throw error - } - }) - - try { - // Use Promise.any to get the first successful response - const { response, index: winnerIndex } = await Promise.any(providerAttempts) - - // Now that we have a winner, cancel all other requests - abortControllers.forEach((ctrl, i) => { - if (i !== winnerIndex) { - ctrl.abort() - } - }) - - return response - } catch (error) { - // Promise.any throws AggregateError when all promises reject - if (error instanceof AggregateError) { - // All providers failed - const failureDetails = failures.map((f) => `${f.provider}: ${f.error}`).join('; ') - throw createError( - retrieverName, - 'fetchPiecesFromProviders', - `All providers failed to serve piece ${pieceCid.toString()}. Details: ${failureDetails}` - ) - } - // Re-throw unexpected errors - throw error - } -} diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 14bb8537..493a6db7 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -24,8 +24,9 @@ import { asChain, type Chain as FilecoinChain } from '@filoz/synapse-core/chains' import { getProviderIds } from '@filoz/synapse-core/endorsements' +import { InvalidPieceCIDError } from '@filoz/synapse-core/errors' import * as PDPVerifier from '@filoz/synapse-core/pdp-verifier' -import { asPieceCID } from '@filoz/synapse-core/piece' +import * as Piece from '@filoz/synapse-core/piece' import * as SP from '@filoz/synapse-core/sp' import { schedulePieceDeletion, type UploadPieceStreamingData } from '@filoz/synapse-core/sp' import { @@ -1129,10 +1130,25 @@ export class StorageContext { * @returns The downloaded data {@link Uint8Array} */ async download(options: DownloadOptions): Promise { - return this._synapse.storage.download({ - pieceCid: options.pieceCid, - providerAddress: this._provider.serviceProvider, - withCDN: options?.withCDN ?? this._withCDN, + const parsedPieceCID = Piece.asPieceCID(options.pieceCid) + if (parsedPieceCID == null) { + throw new InvalidPieceCIDError(options.pieceCid) + } + const withCDN = options.withCDN ?? this._withCDN + const pieceUrl = await Piece.resolvePieceUrl({ + client: this._client, + address: this._client.account.address, + pieceCid: parsedPieceCID, + resolvers: [ + Piece.providersResolver([this._provider]), + ...(withCDN ? [Piece.filbeamResolver] : []), + Piece.chainResolver, + ], + }) + + return Piece.downloadAndValidate({ + url: pieceUrl, + expectedPieceCid: parsedPieceCID, }) } @@ -1195,7 +1211,7 @@ export class StorageContext { if (this.dataSetId == null) { throw createError('StorageContext', 'getPieceIdByCID', 'Data set not found') } - const parsedPieceCID = asPieceCID(pieceCid) + const parsedPieceCID = Piece.asPieceCID(pieceCid) if (parsedPieceCID == null) { throw createError('StorageContext', 'deletePiece', 'Invalid PieceCID provided') } @@ -1244,7 +1260,7 @@ export class StorageContext { */ async hasPiece(options: { pieceCid: string | PieceCID }): Promise { const { pieceCid } = options - const parsedPieceCID = asPieceCID(pieceCid) + const parsedPieceCID = Piece.asPieceCID(pieceCid) if (parsedPieceCID == null) { return false } @@ -1277,7 +1293,7 @@ export class StorageContext { if (this.dataSetId == null) { throw createError('StorageContext', 'pieceStatus', 'Data set not found') } - const parsedPieceCID = asPieceCID(options.pieceCid) + const parsedPieceCID = Piece.asPieceCID(options.pieceCid) if (parsedPieceCID == null) { throw createError('StorageContext', 'pieceStatus', 'Invalid PieceCID provided') } diff --git a/packages/synapse-sdk/src/storage/manager.ts b/packages/synapse-sdk/src/storage/manager.ts index f88e3daa..0a78da0a 100644 --- a/packages/synapse-sdk/src/storage/manager.ts +++ b/packages/synapse-sdk/src/storage/manager.ts @@ -20,9 +20,9 @@ * ``` */ -import { asPieceCID, downloadAndValidate } from '@filoz/synapse-core/piece' +import * as Piece from '@filoz/synapse-core/piece' import type { UploadPieceStreamingData } from '@filoz/synapse-core/sp' -import { randIndex } from '@filoz/synapse-core/utils' +import { getPDPProviderByAddress } from '@filoz/synapse-core/sp-registry' import { type Address, type Hash, zeroAddress } from 'viem' import { SPRegistryService } from '../sp-registry/index.ts' import type { Synapse } from '../synapse.ts' @@ -32,7 +32,6 @@ import type { EnhancedDataSetInfo, PDPProvider, PieceCID, - PieceRetriever, PreflightInfo, StorageContextCallbacks, StorageInfo, @@ -95,8 +94,6 @@ export interface StorageManagerOptions { synapse: Synapse /** The WarmStorageService instance */ warmStorageService: WarmStorageService - /** The PieceRetriever instance */ - pieceRetriever: PieceRetriever /** Whether to enable CDN services */ withCDN: boolean } @@ -104,7 +101,6 @@ export interface StorageManagerOptions { export class StorageManager { private readonly _synapse: Synapse private readonly _warmStorageService: WarmStorageService - private readonly _pieceRetriever: PieceRetriever private readonly _withCDN: boolean private _defaultContexts?: StorageContext[] @@ -115,7 +111,6 @@ export class StorageManager { constructor(options: StorageManagerOptions) { this._synapse = options.synapse this._warmStorageService = options.warmStorageService - this._pieceRetriever = options.pieceRetriever this._withCDN = options.withCDN } @@ -229,42 +224,49 @@ export class StorageManager { }) } - // SP-agnostic download with fast path optimization - const parsedPieceCID = asPieceCID(options.pieceCid) + const parsedPieceCID = Piece.asPieceCID(options.pieceCid) if (parsedPieceCID == null) { throw createError('StorageManager', 'download', `Invalid PieceCID: ${String(options.pieceCid)}`) } - // Use withCDN setting: option > manager default > synapse default - const withCDN = options?.withCDN ?? this._withCDN + const clientAddress = this._synapse.client.account.address + const withCDN = options.withCDN ?? this._withCDN + let pieceUrl: string - let finalProviderAddress: Address | undefined = options?.providerAddress - // Fast path: If we have a default context with CDN disabled and no specific provider requested, - // check if the piece exists on the default context's provider first - if (this._defaultContexts != null && !withCDN && finalProviderAddress == null) { - // from the default contexts, select a random storage provider that has the piece - const contextsWithoutCDN = this._defaultContexts.filter((context) => context.withCDN === false) - const contextsHavePiece = await Promise.all( - contextsWithoutCDN.map((context) => context.hasPiece({ pieceCid: parsedPieceCID })) - ) - const defaultContextsWithPiece = contextsWithoutCDN.filter((_context, i) => contextsHavePiece[i]) - if (defaultContextsWithPiece.length > 0) { - finalProviderAddress = - defaultContextsWithPiece[randIndex(defaultContextsWithPiece.length)].provider.serviceProvider + if (options.providerAddress) { + // Direct provider download + const provider = await getPDPProviderByAddress(this._synapse.client, { address: options.providerAddress }) + + if (provider == null) { + throw createError('StorageManager', 'download', `Provider ${options.providerAddress} not found`) + } + pieceUrl = Piece.createPieceUrlPDP({ cid: parsedPieceCID.toString(), serviceURL: provider.pdp.serviceURL }) + } else { + // Resolve piece URL from providers + try { + pieceUrl = await Piece.resolvePieceUrl({ + client: this._synapse.client, + address: clientAddress, + pieceCid: parsedPieceCID, + resolvers: [ + ...(withCDN ? [Piece.filbeamResolver] : []), + Piece.chainResolver, + Piece.providersResolver(this._defaultContexts?.map((context) => context.provider) ?? []), + ], + }) + } catch (error) { + throw createError( + 'StorageManager', + 'download', + `All provider retrieval attempts failed and no additional retriever method was configured`, + error + ) } } - - const clientAddress = this._synapse.client.account.address - - // Use piece retriever to fetch - const response = await this._pieceRetriever.fetchPiece({ - pieceCid: parsedPieceCID, - client: clientAddress, - providerAddress: finalProviderAddress, - withCDN, + return Piece.downloadAndValidate({ + expectedPieceCid: parsedPieceCID, + url: pieceUrl, }) - - return await downloadAndValidate(response, parsedPieceCID) } /** diff --git a/packages/synapse-sdk/src/synapse.ts b/packages/synapse-sdk/src/synapse.ts index 1152570d..8375efe7 100644 --- a/packages/synapse-sdk/src/synapse.ts +++ b/packages/synapse-sdk/src/synapse.ts @@ -13,7 +13,6 @@ import { } from 'viem' import { FilBeamService } from './filbeam/index.ts' import { PaymentsService } from './payments/index.ts' -import { ChainRetriever, FilBeamRetriever } from './retriever/index.ts' import { SPRegistryService } from './sp-registry/index.ts' import { StorageManager } from './storage/manager.ts' import type { PDPProvider, SynapseFromClientOptions, SynapseOptions } from './types.ts' @@ -70,13 +69,6 @@ export class Synapse { this._storageManager = new StorageManager({ synapse: this, warmStorageService: this._warmStorageService, - pieceRetriever: new FilBeamRetriever({ - baseRetriever: new ChainRetriever({ - warmStorageService: this._warmStorageService, - spRegistry: this._providers, - }), - chain: this._chain, - }), withCDN: this._withCDN, }) } diff --git a/packages/synapse-sdk/src/test/retriever-chain.test.ts b/packages/synapse-sdk/src/test/retriever-chain.test.ts deleted file mode 100644 index 74475e6a..00000000 --- a/packages/synapse-sdk/src/test/retriever-chain.test.ts +++ /dev/null @@ -1,676 +0,0 @@ -import { calibration } from '@filoz/synapse-core/chains' -import * as Mocks from '@filoz/synapse-core/mocks' -import { asPieceCID } from '@filoz/synapse-core/piece' -import { assert } from 'chai' -import { setup } from 'iso-web/msw' -import { HttpResponse, http } from 'msw' -import { createWalletClient, http as viemHttp } from 'viem' -import { privateKeyToAccount } from 'viem/accounts' -import { ChainRetriever } from '../retriever/chain.ts' -import { SPRegistryService } from '../sp-registry/index.ts' -import type { PieceCID, PieceRetriever } from '../types.ts' -import { WarmStorageService } from '../warm-storage/index.ts' - -// Mock server for testing -const server = setup() - -// Create a mock PieceCID for testing -const mockPieceCID = asPieceCID('bafkzcibeqcad6efnpwn62p5vvs5x3nh3j7xkzfgb3xtitcdm2hulmty3xx4tl3wace') as PieceCID - -// Mock child retriever -const mockChildRetriever: PieceRetriever = { - fetchPiece: async (_options): Promise => { - return new Response('data from child', { status: 200 }) - }, -} - -describe('ChainRetriever', () => { - let warmStorage: WarmStorageService - let spRegistry: SPRegistryService - - before(async () => { - await server.start() - }) - - after(() => { - server.stop() - }) - - beforeEach(async () => { - server.resetHandlers() - // Set up basic JSON-RPC handler before creating services - server.use(Mocks.JSONRPC(Mocks.presets.basic)) - const client = createWalletClient({ - chain: calibration, - transport: viemHttp(), - account: privateKeyToAccount(Mocks.PRIVATE_KEYS.key1), - }) - warmStorage = new WarmStorageService({ client }) - spRegistry = new SPRegistryService({ client }) - }) - - describe('fetchPiece with specific provider', () => { - it('should fetch from specific provider when providerAddress is given', async () => { - let findPieceCalled = false - let downloadCalled = false - - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1]), - }), - http.get('https://provider1.example.com/pdp/piece', async ({ request }) => { - findPieceCalled = true - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider1.example.com/piece/:pieceCid', async () => { - downloadCalled = true - return HttpResponse.text('test data', { status: 200 }) - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - const response = await retriever.fetchPiece({ - pieceCid: mockPieceCID, - client: Mocks.ADDRESSES.client1, - providerAddress: Mocks.ADDRESSES.serviceProvider1, - }) - - assert.isTrue(findPieceCalled, 'Should call findPiece') - assert.isTrue(downloadCalled, 'Should call download') - assert.equal(response.status, 200) - assert.equal(await response.text(), 'test data') - }) - - it('should fall back to child retriever when specific provider is not approved', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: { - ...Mocks.presets.basic.serviceRegistry, - getProviderByAddress: () => [ - { - providerId: 0n, - info: { - serviceProvider: Mocks.ADDRESSES.zero, - payee: Mocks.ADDRESSES.zero, - name: '', - description: '', - isActive: false, - }, - }, - ], - }, - }) - ) - - const retriever = new ChainRetriever({ - warmStorageService: warmStorage, - spRegistry, - childRetriever: mockChildRetriever, - }) - const response = await retriever.fetchPiece({ - pieceCid: mockPieceCID, - client: Mocks.ADDRESSES.client1, - providerAddress: '0xNotApproved', - }) - assert.equal(response.status, 200) - assert.equal(await response.text(), 'data from child') - }) - - it('should throw when specific provider is not approved and no child retriever', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - debug: false, - serviceRegistry: { - ...Mocks.presets.basic.serviceRegistry, - getProviderByAddress: () => [ - { - providerId: 0n, - info: { - serviceProvider: Mocks.ADDRESSES.zero, - payee: Mocks.ADDRESSES.zero, - name: '', - description: '', - isActive: false, - }, - }, - ], - }, - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - - try { - await retriever.fetchPiece({ - pieceCid: mockPieceCID, - client: Mocks.ADDRESSES.client1, - providerAddress: Mocks.ADDRESSES.client1, - }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, `Provider ${Mocks.ADDRESSES.client1} not found in registry`) - } - }) - }) - - describe('fetchPiece with multiple providers', () => { - it('should wait for successful provider even if others fail first', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1, Mocks.PROVIDERS.provider2]), - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n, 2n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - if (dataSetId === 2n) { - return [ - { - pdpRailId: 2n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider2, - commissionBps: 100n, - clientDataSetId: 2n, - pdpEndEpoch: 0n, - providerId: 2n, - paymentEndEpoch: 0n, - dataSetId: 2n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }), - http.get('https://provider1.example.com/pdp/piece', async () => { - return HttpResponse.json(null, { status: 404 }) - }), - http.get('https://provider2.example.com/pdp/piece', async ({ request }) => { - // Simulate network delay - await new Promise((resolve) => setTimeout(resolve, 50)) - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider2.example.com/piece/:pieceCid', async () => { - return HttpResponse.text('success from provider 2', { status: 200 }) - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - const response = await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - - // Should get response from provider 2 even though provider 1 failed first - assert.equal(response.status, 200) - assert.equal(await response.text(), 'success from provider 2') - }) - - it('should race multiple providers and return first success', async () => { - let provider1Called = false - let provider2Called = false - - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1, Mocks.PROVIDERS.provider2]), - }), - http.get('https://provider1.example.com/pdp/piece', async ({ request }) => { - provider1Called = true - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider1.example.com/piece/:pieceCid', async () => { - // Simulate slower response from provider1 - await new Promise((resolve) => setTimeout(resolve, 100)) - return HttpResponse.text('data from provider1', { status: 200 }) - }), - http.get('https://provider2.example.com/pdp/piece', async ({ request }) => { - provider2Called = true - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider2.example.com/piece/:pieceCid', async () => { - // Provider2 responds faster - await new Promise((resolve) => setTimeout(resolve, 10)) - return HttpResponse.text('data from provider2', { status: 200 }) - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - const response = await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - - assert.isTrue(provider1Called || provider2Called, 'At least one provider should be called') - assert.equal(response.status, 200) - const text = await response.text() - assert.include(['data from provider1', 'data from provider2'], text) - }) - - it('should fall back to child retriever when all providers fail', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1]), - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }), - http.get('https://provider1.example.com/pdp/piece', async () => { - return HttpResponse.json(null, { status: 404 }) - }), - http.get('https://provider1.example.com/piece/:pieceCid', async () => { - return HttpResponse.json(null, { status: 404 }) - }) - ) - - const retriever = new ChainRetriever({ - warmStorageService: warmStorage, - spRegistry, - childRetriever: mockChildRetriever, - }) - const response = await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - - assert.equal(response.status, 200) - assert.equal(await response.text(), 'data from child') - }) - - it('should throw when all providers fail and no child retriever', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1]), - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }), - http.get('https://provider1.example.com/pdp/piece', async () => { - return HttpResponse.json(null, { status: 404 }) - }), - http.get('https://provider1.example.com/piece/:pieceCid', async () => { - return HttpResponse.json(null, { status: 404 }) - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - try { - await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, 'All provider retrieval attempts failed') - } - }) - - it('should handle child retriever when no data sets exist', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[]], - }, - }) - ) - - const retriever = new ChainRetriever({ - warmStorageService: warmStorage, - spRegistry, - childRetriever: mockChildRetriever, - }) - const response = await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.equal(response.status, 200) - assert.equal(await response.text(), 'data from child') - }) - - it('should throw when no data sets and no child retriever', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[]], - }, - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - - try { - await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, 'No active data sets with data found') - } - }) - }) - - describe('fetchPiece error handling', () => { - it('should throw error when provider discovery fails', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => { - throw new Error('Database connection failed') - }, - }, - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - - try { - await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, 'Database connection failed') - } - }) - - it('should handle provider with no PDP product', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.providerNoPDP]), // No PDP product - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - - try { - await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, 'Failed to retrieve piece') - } - }) - - it('should handle mixed success and failure from multiple providers', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1, Mocks.PROVIDERS.provider2]), - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n, 2n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - if (dataSetId === 2n) { - return [ - { - pdpRailId: 2n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider2, - commissionBps: 100n, - clientDataSetId: 2n, - pdpEndEpoch: 0n, - providerId: 2n, - paymentEndEpoch: 0n, - dataSetId: 2n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }), - http.get('https://provider1.example.com/pdp/piece', async () => { - return HttpResponse.json(null, { status: 500 }) - }), - http.get('https://provider2.example.com/pdp/piece', async ({ request }) => { - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider2.example.com/piece/:pieceCid', async () => { - return HttpResponse.text('success from provider2', { status: 200 }) - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - const response = await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - - assert.equal(response.status, 200) - assert.equal(await response.text(), 'success from provider2') - }) - - it('should handle providers with no valid data sets', async () => { - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n, 2n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n || dataSetId === 2n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: dataSetId, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - pdpVerifier: { - ...Mocks.presets.basic.pdpVerifier, - dataSetLive: (args) => { - const [dataSetId] = args - return [dataSetId !== 1n] // Data set 1 not live - }, - getDataSetListener: () => [Mocks.ADDRESSES.calibration.warmStorage], - getActivePieceCount: (args) => { - const [dataSetId] = args - return [dataSetId === 2n ? 0n : 1n] // Data set 2 has no pieces - }, - }, - }) - ) - - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - - try { - await retriever.fetchPiece({ pieceCid: mockPieceCID, client: Mocks.ADDRESSES.client1 }) - assert.fail('Should have thrown') - } catch (error: any) { - assert.include(error.message, 'No active data sets with data found') - } - }) - }) - - describe('AbortSignal support', () => { - it('should pass AbortSignal to provider fetch', async () => { - let signalPassed = false - - server.use( - Mocks.JSONRPC({ - ...Mocks.presets.basic, - serviceRegistry: Mocks.mockServiceProviderRegistry([Mocks.PROVIDERS.provider1]), - warmStorageView: { - ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n]], - getDataSet: (args) => { - const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - pdpRailId: 1n, - cacheMissRailId: 0n, - cdnRailId: 0n, - payer: Mocks.ADDRESSES.client1, - payee: Mocks.ADDRESSES.payee1, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - commissionBps: 100n, - clientDataSetId: 1n, - pdpEndEpoch: 0n, - providerId: 1n, - paymentEndEpoch: 0n, - dataSetId: 1n, - }, - ] - } - return Mocks.presets.basic.warmStorageView.getDataSet(args) - }, - }, - }), - http.get('https://provider1.example.com/pdp/piece', async ({ request }) => { - if (request.signal) { - signalPassed = true - } - const url = new URL(request.url) - const pieceCid = url.searchParams.get('pieceCid') - return HttpResponse.json({ pieceCid }) - }), - http.get('https://provider1.example.com/piece/:pieceCid', async ({ request }) => { - if (request.signal) { - signalPassed = true - } - return HttpResponse.text('test data', { status: 200 }) - }) - ) - - const controller = new AbortController() - const retriever = new ChainRetriever({ warmStorageService: warmStorage, spRegistry }) - await retriever.fetchPiece({ - pieceCid: mockPieceCID, - client: Mocks.ADDRESSES.client1, - signal: controller.signal, - }) - - assert.isTrue(signalPassed, 'AbortSignal should be passed to fetch') - }) - }) -}) diff --git a/packages/synapse-sdk/src/test/retriever-filcdn.test.ts b/packages/synapse-sdk/src/test/retriever-filcdn.test.ts deleted file mode 100644 index 9cde6869..00000000 --- a/packages/synapse-sdk/src/test/retriever-filcdn.test.ts +++ /dev/null @@ -1,231 +0,0 @@ -/* globals describe it */ - -import { calibration, mainnet } from '@filoz/synapse-core/chains' -import { asPieceCID } from '@filoz/synapse-core/piece' -import { assert } from 'chai' -import { FilBeamRetriever } from '../retriever/filbeam.ts' -import type { PieceCID, PieceRetriever } from '../types.ts' - -// Create a mock PieceCID for testing -const mockPieceCID = asPieceCID('bafkzcibeqcad6efnpwn62p5vvs5x3nh3j7xkzfgb3xtitcdm2hulmty3xx4tl3wace') as PieceCID - -describe('FilBeamRetriever', () => { - describe('pass-through behavior', () => { - it('should pass through when withCDN=false', async () => { - let baseCalled = false - const baseResponse = new Response('test data', { status: 200 }) - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async (options) => { - baseCalled = true - assert.equal(options.pieceCid, mockPieceCID) - assert.equal(options.client, '0xClient') - assert.equal(options?.withCDN, false) - return baseResponse - }, - } - - const originalFetch = global.fetch - global.fetch = async () => { - throw new Error('Should not call fetch when withCDN is false') - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: calibration }) - const response = await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - withCDN: false, - }) - - assert.isTrue(baseCalled, 'Base retriever should be called') - assert.equal(response, baseResponse) - } finally { - global.fetch = originalFetch - } - }) - - it('should propagate abort signal to base retriever', async () => { - const controller = new AbortController() - let signalPropagated = false - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async (options) => { - if (options?.signal != null) { - signalPropagated = true - assert.equal(options.signal, controller.signal) - } - return new Response('test data') - }, - } - const originalFetch = global.fetch - global.fetch = async () => { - throw new Error('Should not call fetch when withCDN is false') - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: mainnet }) - await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - signal: controller.signal, - withCDN: false, - }) - - assert.isTrue(signalPropagated, 'Signal should be propagated') - } finally { - global.fetch = originalFetch - } - }) - - it('should pass through when CDN responds with 402', async () => { - let baseCalled = false - let cdnCalled = false - const baseResponse = new Response('test data', { status: 200 }) - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async (options) => { - baseCalled = true - assert.equal(options.pieceCid, mockPieceCID) - assert.equal(options.client, '0xClient') - assert.equal(options?.withCDN, true) - return baseResponse - }, - } - const originalFetch = global.fetch - global.fetch = async () => { - cdnCalled = true - const response = new Response('Payment required', { status: 402 }) - return response - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: calibration }) - const response = await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - withCDN: true, - }) - - assert.isTrue(cdnCalled, 'CDN fetch should be attempted') - assert.isTrue(baseCalled, 'Base retriever should be called') - assert.equal(response, baseResponse) - } finally { - global.fetch = originalFetch - } - }) - - it('should pass through when CDN responds badly', async () => { - let baseCalled = false - let cdnCalled = false - const baseResponse = new Response('test data', { status: 200 }) - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async (options) => { - baseCalled = true - assert.equal(options.pieceCid, mockPieceCID) - assert.equal(options.client, '0xClient') - assert.equal(options?.withCDN, true) - return baseResponse - }, - } - const originalFetch = global.fetch - global.fetch = async () => { - cdnCalled = true - const response = new Response('Internal Server Error', { status: 500 }) - return response - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: calibration }) - const response = await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - withCDN: true, - }) - - assert.isTrue(cdnCalled, 'CDN fetch should be attempted') - assert.isTrue(baseCalled, 'Base retriever should be called') - assert.equal(response, baseResponse) - } finally { - global.fetch = originalFetch - } - }) - - it('should pass through on network error', async () => { - let baseCalled = false - let cdnCalled = false - const baseResponse = new Response('test data', { status: 200 }) - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async (options) => { - baseCalled = true - assert.equal(options.pieceCid, mockPieceCID) - assert.equal(options.client, '0xClient') - assert.equal(options?.withCDN, true) - return baseResponse - }, - } - const originalFetch = global.fetch - global.fetch = async () => { - cdnCalled = true - throw new Error('Network error') - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: calibration }) - const response = await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - withCDN: true, - }) - - assert.isTrue(cdnCalled, 'CDN fetch should be attempted') - assert.isTrue(baseCalled, 'Base retriever should be called') - assert.equal(response, baseResponse) - } finally { - global.fetch = originalFetch - } - }) - }) - - describe('CDN handling', () => { - it('should respond and not pass through', async () => { - let baseCalled = false - let cdnCalled = false - const cdnResponse = new Response('CDN data', { status: 200 }) - - const mockBaseRetriever: PieceRetriever = { - fetchPiece: async () => { - baseCalled = true - throw new Error() - }, - } - const originalFetch = global.fetch - global.fetch = async (url) => { - cdnCalled = true - assert.strictEqual( - url, - `https://0xClient.calibration.filbeam.io/${mockPieceCID.toString()}`, - 'CDN URL should be constructed correctly' - ) - return cdnResponse - } - - try { - const cdnRetriever = new FilBeamRetriever({ baseRetriever: mockBaseRetriever, chain: calibration }) - const response = await cdnRetriever.fetchPiece({ - pieceCid: mockPieceCID, - client: '0xClient', - withCDN: true, - }) - - assert.isTrue(cdnCalled, 'CDN fetch should be called') - assert.isFalse(baseCalled, 'Base retriever should not be called') - assert.equal(response, cdnResponse) - } finally { - global.fetch = originalFetch - } - }) - }) -}) diff --git a/packages/synapse-sdk/src/test/storage.test.ts b/packages/synapse-sdk/src/test/storage.test.ts index 7616881c..94336d41 100644 --- a/packages/synapse-sdk/src/test/storage.test.ts +++ b/packages/synapse-sdk/src/test/storage.test.ts @@ -2,6 +2,7 @@ import { type Chain, calibration } from '@filoz/synapse-core/chains' import * as Mocks from '@filoz/synapse-core/mocks' import * as Piece from '@filoz/synapse-core/piece' import { calculate, calculate as calculatePieceCID } from '@filoz/synapse-core/piece' +import { NetworkError } from '@filoz/synapse-core/sp' import { assert } from 'chai' import { setup } from 'iso-web/msw' import { HttpResponse, http } from 'msw' @@ -949,7 +950,7 @@ describe('StorageService', () => { ...Mocks.presets.basic, }), Mocks.PING(), - http.get(`https://${Mocks.ADDRESSES.client1}.calibration.filbeam.io/:cid`, async () => { + http.head(`https://${client.account.address}.calibration.filbeam.io/:cid`, async () => { return HttpResponse.text('Not Found', { status: 404, }) @@ -989,7 +990,7 @@ describe('StorageService', () => { await service.download({ pieceCid: testPieceCID }) assert.fail('Should have thrown') } catch (error: any) { - assert.include(error.message, 'Failed to retrieve piece') + assert.instanceOf(error, NetworkError) } }) diff --git a/packages/synapse-sdk/src/test/synapse.test.ts b/packages/synapse-sdk/src/test/synapse.test.ts index 5b4aa366..2b5e0ef6 100644 --- a/packages/synapse-sdk/src/test/synapse.test.ts +++ b/packages/synapse-sdk/src/test/synapse.test.ts @@ -241,7 +241,6 @@ describe('Synapse', () => { server.use( Mocks.JSONRPC({ ...Mocks.presets.basic, - debug: true, serviceRegistry: { ...Mocks.presets.basic.serviceRegistry, }, @@ -325,6 +324,9 @@ describe('Synapse', () => { const testData = new TextEncoder().encode('test data') server.use( Mocks.JSONRPC({ ...Mocks.presets.basic }), + http.head<{ cid: string; wallet: string }>(`https://:wallet.calibration.filbeam.io/:cid`, async () => { + return HttpResponse.text('ok', { status: 200 }) + }), http.get<{ cid: string; wallet: string }>(`https://:wallet.calibration.filbeam.io/:cid`, async ({ params }) => { deferred.resolve(params) return HttpResponse.arrayBuffer(testData.buffer)