Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/backend/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export const MAX_BLOCK_SIZE = 5 * 1024 * 1024;

export const DEV_TAG = stringToHex("dev");

// First network will be used as default in absence of NETWORKS config
export const SUPPORTED_NETWORKS = ["calibration", "mainnet"] as const;

/**
* Fixed metadata marker key tagging every throwaway data set created by the
* `data_set_lifecycle_check` job. The value is a per-run nonce; the key is the stable
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const configValidationSchema = Joi.object({
DATABASE_NAME: Joi.string().required(),

// Blockchain
NETWORK: Joi.string().valid("mainnet", "calibration").default("calibration"),
NETWORK: Joi.string().valid("mainnet", "calibration").required(),
WALLET_ADDRESS: Joi.string().required(),
WALLET_PRIVATE_KEY: Joi.string().optional().empty(""),
RPC_URL: Joi.string()
Expand Down Expand Up @@ -549,7 +549,7 @@ export function loadConfig(): IConfig {
database: process.env.DATABASE_NAME || "filecoin_dealbot",
},
blockchain: {
network: (process.env.NETWORK || "calibration") as Network,
network: process.env.NETWORK as Network,
rpcUrl: process.env.RPC_URL || undefined,
rpcRequestTimeoutMs: Number.parseInt(process.env.RPC_REQUEST_TIMEOUT_MS || "30000", 10),
sessionKeyPrivateKey: (process.env.SESSION_KEY_PRIVATE_KEY || undefined) as `0x${string}` | undefined,
Expand Down
105 changes: 79 additions & 26 deletions apps/backend/src/data-retention/data-retention.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe("DataRetentionService", () => {
configServiceMock = {
get: vi.fn((key: keyof IConfig) => {
if (key === "blockchain") {
return { pdpSubgraphEndpoint: "https://example.com/subgraph" };
return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" };
}
if (key === "spBlocklists") {
return { ids: new Set(), addresses: new Set() };
Expand Down Expand Up @@ -175,7 +175,7 @@ describe("DataRetentionService", () => {

it("returns early when all providers are blocked for data-retention", async () => {
(configServiceMock.get as ReturnType<typeof vi.fn>).mockImplementation((key: string) => {
if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" };
if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" };
if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A, PROVIDER_B]) };
});

Expand All @@ -186,16 +186,16 @@ describe("DataRetentionService", () => {

it("excludes blocked providers from data-retention polling while retaining unblocked ones", async () => {
(configServiceMock.get as ReturnType<typeof vi.fn>).mockImplementation((key: string) => {
if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" };
if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" };
if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A]) };
});
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_B })]);

await service.pollDataRetention();

const allAddressesPolled: string[] = (
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [{ addresses: string[] }][]
).flatMap(([{ addresses }]) => addresses);
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [string, { addresses: string[] }][]
).flatMap(([, { addresses }]) => addresses);
expect(allAddressesPolled).toContain(PROVIDER_B.toLowerCase());
expect(allAddressesPolled).not.toContain(PROVIDER_A.toLowerCase());
});
Expand All @@ -214,7 +214,7 @@ describe("DataRetentionService", () => {
await service.pollDataRetention();

expect(pdpSubgraphServiceMock.fetchSubgraphMeta).toHaveBeenCalled();
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith({
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith("https://example.com/subgraph", {
blockNumber: 1200,
addresses: [PROVIDER_A, PROVIDER_B],
});
Expand All @@ -228,11 +228,12 @@ describe("DataRetentionService", () => {
expect(mockBaselineRepository.upsert).toHaveBeenCalledWith(
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "10",
successPeriods: "90",
lastBlockNumber: "1200",
},
["providerAddress"],
["providerAddress", "network"],
);
});

Expand Down Expand Up @@ -281,8 +282,20 @@ describe("DataRetentionService", () => {
it("handles multiple providers independently", async () => {
// Seed DB baselines so first poll emits deltas
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{ providerAddress: PROVIDER_B, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
{
providerAddress: PROVIDER_B,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

const providerA = makeProvider({ address: PROVIDER_A, totalFaultedPeriods: 5n });
Expand All @@ -305,7 +318,13 @@ describe("DataRetentionService", () => {
it("uses subgraph-confirmed totals directly without overdue estimation", async () => {
// Seed baseline so we can verify the computed values via deltas
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

const provider = makeProvider();
Expand Down Expand Up @@ -342,7 +361,13 @@ describe("DataRetentionService", () => {
it("emits both faulted and success counters from subgraph totals", async () => {
// Seed baseline so we can verify the computed values via deltas
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

const provider = makeProvider();
Expand Down Expand Up @@ -459,7 +484,13 @@ describe("DataRetentionService", () => {

// Seed baseline at zero so the full largeValue becomes the delta
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([
Expand All @@ -483,7 +514,13 @@ describe("DataRetentionService", () => {

// Seed baseline at zero so the full value becomes the delta
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([
Expand All @@ -499,7 +536,13 @@ describe("DataRetentionService", () => {
it("uses only subgraph-confirmed provider-level totals", async () => {
// Seed baseline at zero so subgraph totals are visible as delta
mockBaselineRepository.find.mockResolvedValueOnce([
{ providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" },
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "0",
successPeriods: "0",
lastBlockNumber: "1000",
},
]);

const provider = makeProvider({
Expand Down Expand Up @@ -531,12 +574,16 @@ describe("DataRetentionService", () => {

// Should be called twice: once for first 50, once for remaining 25
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(1, {
addresses: expect.arrayContaining([expect.any(String)]),
blockNumber: 1200,
});
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][0].addresses).toHaveLength(50);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(
1,
"https://example.com/subgraph",
{
addresses: expect.arrayContaining([expect.any(String)]),
blockNumber: 1200,
},
);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][1].addresses).toHaveLength(50);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][1].addresses).toHaveLength(25);
});

it("processes all remaining batches before failing when one batch fails", async () => {
Expand Down Expand Up @@ -614,9 +661,9 @@ describe("DataRetentionService", () => {

await service.pollDataRetention();

// Should fetch stale provider info from database
// Should fetch stale provider info from database (network-scoped)
expect(mockSPRepository.find).toHaveBeenCalledWith({
where: { address: expect.anything() },
where: { address: expect.anything(), network: "calibration" },
select: ["address", "providerId", "name", "isApproved"],
});

Expand Down Expand Up @@ -855,9 +902,9 @@ describe("DataRetentionService", () => {

await service.pollDataRetention();

// Should fetch both stale providers in one query
// Should fetch both stale providers in one query (network-scoped)
expect(mockSPRepository.find).toHaveBeenCalledWith({
where: { address: expect.anything() },
where: { address: expect.anything(), network: "calibration" },
select: ["address", "providerId", "name", "isApproved"],
});

Expand Down Expand Up @@ -928,6 +975,7 @@ describe("DataRetentionService", () => {
mockBaselineRepository.find.mockResolvedValueOnce([
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "10",
successPeriods: "90",
lastBlockNumber: "1100",
Expand All @@ -950,6 +998,7 @@ describe("DataRetentionService", () => {
mockBaselineRepository.find.mockResolvedValueOnce([
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "8",
successPeriods: "85",
lastBlockNumber: "1000",
Expand Down Expand Up @@ -1049,6 +1098,7 @@ describe("DataRetentionService", () => {
mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")).mockResolvedValueOnce([
{
providerAddress: PROVIDER_A,
network: "calibration",
faultedPeriods: "10",
successPeriods: "90",
lastBlockNumber: "1100",
Expand Down Expand Up @@ -1112,8 +1162,11 @@ describe("DataRetentionService", () => {

await service.pollDataRetention();

// Should delete the baseline from DB
expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ providerAddress: PROVIDER_A });
// Should delete the baseline from DB (network-scoped)
expect(mockBaselineRepository.delete).toHaveBeenCalledWith({
providerAddress: PROVIDER_A,
network: "calibration",
});
});
});

Expand Down
Loading