From 98bec22ddceb748fbc2da673f90ebd16f71160c4 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 18:59:21 +0100 Subject: [PATCH 01/22] feat: refactor application structure to modularize components and improve initialization process --- console.config.json | 8 + src/index.ts | 26 +-- src/lib/core/arweave/index.ts | 64 +++++++ src/lib/core/is_on/index.ts | 7 + src/lib/core/prover/index.ts | 66 ++++++++ src/lib/core/streamr/index.ts | 51 ++++++ src/lib/utils.ts | 97 +++++++++++ src/logic/mqtt.ts | 309 ---------------------------------- src/types.ts | 44 ++++- src/utils/logger.ts | 12 +- tests/logic/sync.test.ts | 4 +- 11 files changed, 354 insertions(+), 334 deletions(-) create mode 100644 console.config.json create mode 100644 src/lib/core/arweave/index.ts create mode 100644 src/lib/core/is_on/index.ts create mode 100644 src/lib/core/prover/index.ts create mode 100644 src/lib/core/streamr/index.ts create mode 100644 src/lib/utils.ts delete mode 100644 src/logic/mqtt.ts diff --git a/console.config.json b/console.config.json new file mode 100644 index 0000000..2503635 --- /dev/null +++ b/console.config.json @@ -0,0 +1,8 @@ +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on" + ] +} diff --git a/src/index.ts b/src/index.ts index c2e4d79..3e54d04 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,34 +1,36 @@ import "dotenv/config"; -import { handleUplinks } from "./logic/mqtt"; +import { handleUplinks } from "./services/mqtt"; import { Request, Response } from "express"; -import { app } from "./logic/context"; +import { app } from "./services/context"; +import { loadExtensionsFromConfig, runHook } from "./lib/utils"; import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite"; -import { initializeVerifiersCache } from "./logic/sync"; -import { publishHeartbeatToStream } from "./logic/streamr"; // Async initialization function async function initializeApp() { try { console.log("[info] Starting application initialization..."); + // Load extensions from config + await loadExtensionsFromConfig(); + console.log("[info] Extensions loaded successfully"); + + runHook("onBeforeInit"); + // Initialize database tables and jobs setupDatabase(); console.log("[info] Database setup completed"); - // Initialize verifiers cache on startup - // (disable ccip read initialization) - // await initializeVerifiersCache(); - // console.log("[info] Verifiers cache initialized successfully"); + runHook("onDatabaseSetup") // Start MQTT handling - handleUplinks(); - console.log("[info] MQTT uplinks handler started"); - - await publishHeartbeatToStream(); + await handleUplinks(); console.log("[info] Application initialization completed successfully"); + + runHook("onAfterInit"); } catch (error) { console.error("[fatal] Failed to initialize application:", error); + runHook("onInitError", error); process.exit(1); } } diff --git a/src/lib/core/arweave/index.ts b/src/lib/core/arweave/index.ts new file mode 100644 index 0000000..d0e076a --- /dev/null +++ b/src/lib/core/arweave/index.ts @@ -0,0 +1,64 @@ +import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; +import { Readable } from "stream"; +import Arweave from "arweave"; +import type { DecodedPayload, Hooks } from "../../../types"; + +export default class implements Hooks { + async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) { + // encode transaction into standard format (payload[0]) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = decoded.buf; + + const arweave = Arweave.init({ + host: "arweave.net", + protocol: "https", + port: 443, + }); + + const key = await arweave.wallets.generate(); + const signer = new ArweaveSigner(key); + const turbo = TurboFactory.authenticated({ signer }); + + const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; + + const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); + + await turbo.uploadFile({ + fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), + fileSizeFactory: () => byteLength, + dataItemOpts: { + paidBy: await arweave.wallets.jwkToAddress(key), + tags: [ + { name: "Contract-Label", value: contractLabel }, + { name: "Contract-Use", value: "M3tering Protocol Test" }, + { name: "Content-Type", value: "text/plain" }, + { name: "M3ter-ID", value: m3terId.toString() }, + { name: "Timestamp", value: Date.now().toString() }, + { name: "Nonce", value: decoded.nonce.toString() }, + { name: "Energy", value: decoded.energy.toString() }, + { name: "Signature", value: decoded.signature }, + { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, + { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, + { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, + { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, + ], + }, + events: { + onUploadProgress: (progress) => { + console.log("[arweave] Upload progress:", progress); + }, + onError: (error) => { + console.error("[arweave] Upload error:", error); + }, + onSuccess(event) { + console.log("[arweave] Upload successful! Transaction ID:", event); + }, + onUploadSuccess(event) { + console.log("[arweave] Upload completed! Transaction ID:", event); + }, + }, + }); + + console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`); + } +} diff --git a/src/lib/core/is_on/index.ts b/src/lib/core/is_on/index.ts new file mode 100644 index 0000000..040e0cf --- /dev/null +++ b/src/lib/core/is_on/index.ts @@ -0,0 +1,7 @@ +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + isOnStateCompute(m3terId: number) { + return true; + } +} diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts new file mode 100644 index 0000000..fcb5383 --- /dev/null +++ b/src/lib/core/prover/index.ts @@ -0,0 +1,66 @@ +import { buildBatchPayload } from "../../utils"; +import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; + +const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to prover node + try { + const proverURL = await getProverURL(); + console.info(`Sending pending transactions to prover: ${proverURL}`); + + const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); + + console.info("done sending to prover"); + console.info(`Prover response (text): ${await response?.text()}`); + } catch (error) { + console.error(`Error sending pending transactions to prover: ${error}`); + } + } +} + +/** + * Send transactions to prover node for verification + */ +export async function sendTransactionsToProver( + proverURL: string, + transactionData: BatchTransactionPayload[] +): Promise { + try { + const response = await fetch(`${proverURL}/batch-payloads`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(transactionData), + }); + + console.log("[info] received", response.status, "from the prover"); + + if (!response.ok) { + throw new Error(`Prover responded with status: ${response.status}`); + } + return response; + } catch (err: any) { + console.error("Failed to send transactions to prover:", err.message); + return null; + } +} + +export async function getProverURL(): Promise { + return PREFERRED_PROVER_NODE; +} + +export async function sendPendingTransactionsToProver( + proverURL: string, + pendingTransactions: TransactionRecord[] +) { + console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); + + const requestPayload = buildBatchPayload(pendingTransactions); + + console.log("[info] Request payload:", requestPayload); + + return await sendTransactionsToProver(proverURL, requestPayload); +} diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts new file mode 100644 index 0000000..bddec0c --- /dev/null +++ b/src/lib/core/streamr/index.ts @@ -0,0 +1,51 @@ +import { StreamrClient } from "@streamr/sdk"; +import { buildBatchPayload } from "../../utils"; +import type { Hooks, TransactionRecord } from "../../../types"; + +const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; + +if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); +} + +export const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY, + }, +}); + +const stream = streamrClient.getStream(STREAMR_STREAM_ID); + +stream + .then((stream) => { + console.log(`[Streamr] Connected to stream: ${stream.id}`); + }) + .catch((error) => { + console.error("[Streamr] Error connecting to stream:", error); + }); + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to streamr + try { + console.info(`Sending pending transactions to streamr`); + await publishPendingTransactionsToStreamr(pendingTransactions); + console.info(`Successfully sent pending transactions to streamr`); + } catch (error) { + console.error(`Error sending pending transactions to streamr: ${error}`); + } + } +} + +async function getStream() { + return await stream; +} + +async function publishToStream(data: any) { + const stream = await getStream(); + await stream.publish(data); +} + +async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + await publishToStream(buildBatchPayload(pendingTransactions)); +} diff --git a/src/lib/utils.ts b/src/lib/utils.ts new file mode 100644 index 0000000..8fb7d56 --- /dev/null +++ b/src/lib/utils.ts @@ -0,0 +1,97 @@ +import fs from "fs"; +import path from "path"; +import { createPublicKey, verify } from "crypto"; +import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } from "../types"; + +const extensions: Hooks[] = []; + +export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise { + const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + + for (const modulePath of config.modules) { + const resolved = path.resolve(__dirname, modulePath); + + const mod = await import(resolved); + extensions.push(new mod.default()); + } + + return extensions; +} + +export async function runHook(hook: K, ...args: Parameters>) { + let result: ReturnType> | boolean = true; + + for (const ext of extensions) { + const fn = ext[hook]; + let functionReturn; + if (fn) functionReturn = await (fn as any)(...args); + + if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") { + result = result && functionReturn; + } + } + + return result; +} + +/** + * Retries a function up to 5 times with exponential backoff + * @param fn Function to retry + * @param maxRetries Maximum number of retries (default: 5) + * @param baseDelay Base delay in milliseconds (default: 1000) + * @returns Promise that resolves with the function result or rejects with the last error + */ +export async function retry(fn: () => Promise, maxRetries: number = 5, baseDelay: number = 1000): Promise { + let lastError: Error; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + + if (attempt === maxRetries) { + throw lastError; + } + + const delay = baseDelay * Math.pow(2, attempt); + console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); + + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + throw lastError!; +} + +export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { + return transactions.map((transaction) => ({ + m3ter_id: Number(transaction.identifier), + message: transaction.raw, + })); +} + +export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { + try { + const message = transaction.subarray(0, 8); + const signature = transaction.subarray(8, 72); + + // Wrap raw key in SPKI DER + const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); + const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); + + const publicKey = createPublicKey({ + key: derKey, + format: "der", + type: "spki", + }); + + // Verify + const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); + + return ok; + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +} diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts deleted file mode 100644 index 537a9af..0000000 --- a/src/logic/mqtt.ts +++ /dev/null @@ -1,309 +0,0 @@ -import { connect } from "mqtt"; -import { enqueue } from "./grpc"; -import { interact } from "./arweave"; -import { encode } from "./encode"; -import { m3ter as m3terContract } from "./context"; -import { - deleteMeterByPublicKey, - getAllTransactionRecords, - getMeterByDevEui, - getMeterByPublicKey, - getMeterByTokenId, - insertTransaction, - saveMeter, - updateMeterDevEui, - updateMeterNonce, -} from "../store/sqlite"; -import { State, TransactionRecord } from "../types"; -import { getProverURL, sendPendingTransactionsToProver } from "./prover"; -import { decodePayload } from "./decode"; -import { verifyPayloadSignature } from "../utils"; -import { - getLatestTransactionNonce, - pruneAndSyncOnchain, - getCrossChainRevenue, - getOwedFromPriceContext, -} from "./sync"; -import { createMeterLogger, MeterLogger } from "../utils/logger"; -import { publishPendingTransactionsToStreamr } from "./streamr"; - -const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; -const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain -const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing - -export function handleUplinks() { - const client = connect({ - host: CHIRPSTACK_HOST, - port: 1883, - clean: true, - connectTimeout: 9000, - reconnectPeriod: 1000, - }); - - client.on("connect", () => { - client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => { - console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); - }); - }); - - client.on("error", (err) => { - console.error("Connection error: ", err); - client.end(); - process.exit(1); - }); - - client.on("reconnect", () => { - console.log("Reconnecting..."); - }); - - client.on("message", async (_, blob) => { - return await handleMessage(blob); - }); -} - -export async function handleMessage(blob: Buffer) { - let devEui: string | undefined; - let logger: MeterLogger = createMeterLogger({}); // Default logger for early errors - - try { - const message = JSON.parse(blob.toString()); - devEui = message["deviceInfo"]["devEui"]; - - // Initialize logger with devEui context - logger = createMeterLogger({ devEui }); - - if (!devEui) { - console.log("[warn] Message dropped - no devEui found in message"); - return; - } - - // Check if this specific device is already being processed - if (deviceLocks.get(devEui)) { - logger.warn(`Message dropped - device is already being processed`); - return; - } - - let is_on = true; - - // Set lock for this specific device - deviceLocks.set(devEui, true); - - logger.info(`Received uplink from device: ${JSON.stringify(message)}`); - - // encode transaction into standard format (payload is hex string) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = Buffer.from(message["data"], "base64"); - const decoded = decodePayload(transactionHex); - let publicKey = decoded.extensions.deviceId; - let payloadHadPublicKey = !!publicKey; - - logger.info(`Decoded payload: ${JSON.stringify(decoded)}`); - - if (!publicKey) { - // try to find public key by DevEui - const meterByDevEui = getMeterByDevEui(devEui); - - if (!meterByDevEui) { - throw new Error("Device EUI not associated with any meter: " + devEui); - } - - publicKey = meterByDevEui.publicKey.replace("0x", ""); - } - - // verify transaction signature - const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex")); - if (!isValid) { - throw new Error("Invalid transaction signature for meter with public key: " + publicKey); - } - - logger.info("Verified signature"); - - if (payloadHadPublicKey) { - logger.info(`Payload contained public key: ${publicKey}`); - // save public key with device EUI mapping if not already saved - const existingMeter = getMeterByPublicKey(`0x${publicKey}`); - - if (!existingMeter) { - const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); - - const latestNonce = await getLatestTransactionNonce(tokenId); - - // Update logger with tokenId context now that we have it - logger = createMeterLogger({ devEui, tokenId, publicKey: `0x${publicKey}` }); - - logger.info( - `Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}` - ); - - // save new meter with devEui - const newMeter = { - publicKey: `0x${publicKey}`, - devEui: message["deviceInfo"]["devEui"], - tokenId, - latestNonce, - }; - - const existingMeter = getMeterByTokenId(tokenId); - - // incase of the public key being updated - if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { - deleteMeterByPublicKey(`0x${publicKey}`); - } - - saveMeter(newMeter); - logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); - } else { - // Update logger with existing meter context - logger = createMeterLogger({ - devEui, - tokenId: existingMeter.tokenId, - publicKey: `0x${publicKey}`, - }); - - // update existing meter with devEui if not already set - if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { - logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); - updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); - } - - // fetch and update latest nonce from chain - const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId); - - logger.info(`Fetched latestNonce from chain and local state: ${latestNonce}`); - - updateMeterNonce(`0x${publicKey}`, latestNonce); - } - } - - let m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found for public key: " + publicKey); - } - - // Update logger with complete meter context - logger = createMeterLogger({ devEui, tokenId: m3ter.tokenId, publicKey: `0x${publicKey}` }); - - logger.info(`Found meter: ${JSON.stringify(m3ter)}`); - - // If both latest nonce and received nonce are 0, enqueue 0 immediately - if (m3ter.latestNonce === 0 && decoded.nonce === 0) { - logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); - - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - - const state = { nonce: 0, is_on }; - - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); - - return; // Exit early without processing the transaction - } - - if (m3ter.latestNonce % SYNC_EPOCH === 0) { - // sync with blockchain every SYNC_EPOCH transactions - await pruneAndSyncOnchain(m3ter.tokenId); - - logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); - - m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found after sync for public key: " + publicKey); - } - } - - const expectedNonce = m3ter.latestNonce + 1; - - logger.info( - `Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}` - ); - - if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { - throw new Error( - `Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}` - ); - } - - // if device nonce is correct - if (decoded.nonce === expectedNonce) { - logger.info(`Nonce is valid: ${decoded.nonce}`); - - // Upload to arweave - await interact(m3ter.tokenId, decoded); - - logger.info(`Uploaded transaction to Arweave for meter ${m3ter.tokenId}`); - - // save transaction to local store - const transactionRecord = { - nonce: decoded.nonce, - identifier: m3ter.tokenId, - receivedAt: Date.now(), - raw: transactionHex.toString("hex"), - } as TransactionRecord; - - insertTransaction(transactionRecord); - - updateMeterNonce(`0x${publicKey}`, expectedNonce); - - logger.info(`Updated meter nonce to: ${expectedNonce}`); - - const pendingTransactions = getAllTransactionRecords(); - - // send pending transactions to prover node - try { - const proverURL = await getProverURL(); - logger.info(`Sending pending transactions to prover: ${proverURL}`); - - const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); - - logger.info("done sending to prover"); - logger.info(`Prover response (text): ${await response?.text()}`); - } catch (error) { - logger.error(`Error sending pending transactions to prover: ${error}`); - } - - // send pending transactions to streamr - try { - logger.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); - logger.info(`Successfully sent pending transactions to streamr`); - } catch (error) { - logger.error(`Error sending pending transactions to streamr: ${error}`); - } - } - - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; - - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); - } catch (error) { - logger.error(`Error handling MQTT message: ${error}`); - } finally { - // Release lock for this specific device - if (devEui) { - deviceLocks.delete(devEui); - } - } -} diff --git a/src/types.ts b/src/types.ts index 8082331..f80f4fd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,45 @@ +import { MqttClient } from "mqtt/*"; + +// Application configuration type (console.config.json) +export type AppConfig = { + modules: string[]; +}; + +// Hooks type for lifecycle events +export type Hooks = { + onBeforeInit?: () => void | Promise; + onDatabaseSetup?: () => void | Promise; + onAfterInit?: () => void | Promise; + onInitError?: (error: any) => void | Promise; + + onMqttConnect?: (client: MqttClient) => void | Promise; + onMqttSubscribed?: (client: MqttClient, topic: string) => void | Promise; + onMqttError?: (error: any, client: MqttClient) => void | Promise; + onMqttReconnect?: (client: MqttClient) => void | Promise; + + onMessageReceived?: (blob: Buffer) => void | Promise; + onMessageDropped?: (reason: string, devEui: string) => void | Promise; + + onMeterCreated?: (newMeter: MeterRecord) => void | Promise; + + onSyncEpochReached?: () => void | Promise; + + onTransactionDistribution?: ( + tokenId: number, + decodedPayload: DecodedPayload, + pendingTransactions: TransactionRecord[] + ) => void | Promise; + + isOnStateCompute?: (m3terId: number) => boolean | Promise; + onIsOnStateComputed?: (m3terId: number, isOn: boolean) => void | Promise; + onIsOnStateComputeError?: (m3terId: number, error: any) => void | Promise; + onStateEnqueued?: (state: any, latitude: number, longitude: number) => void | Promise; + + onMessageError?: (error: any) => void | Promise; + onDeviceUnlocked?: (devEui: string) => void | Promise; + onMessageProcessingComplete?: () => void | Promise; +}; + // Meter interface for database operations export interface MeterRecord { publicKey: string; @@ -50,4 +92,4 @@ export interface VerifierInfo { ensName: string; targetAddress: string; verifierAddress: string; -} \ No newline at end of file +} diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 815b8a1..90b0598 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -8,18 +8,10 @@ export interface MeterLogger { export interface MeterContext { devEui?: string; - tokenId?: number; - publicKey?: string; } export function createMeterLogger(context: MeterContext): MeterLogger { - const prefix = context.tokenId - ? `[Meter-${context.tokenId}${context.devEui ? `|${context.devEui}` : ''}]` - : context.devEui - ? `[Device-${context.devEui}]` - : context.publicKey - ? `[PubKey-${context.publicKey.slice(-8)}]` - : '[Unknown-Meter]'; + const prefix = context.devEui ? `[Device-${context.devEui}]` : "[Unknown-Meter]"; return { info: (message: string) => console.log(`${prefix} [info] ${message}`), @@ -27,4 +19,4 @@ export function createMeterLogger(context: MeterContext): MeterLogger { error: (message: string) => console.error(`${prefix} [error] ${message}`), debug: (message: string) => console.log(`${prefix} [debug] ${message}`), }; -} \ No newline at end of file +} diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 4582f68..22d7db9 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -4,7 +4,7 @@ import { isVerifiersCacheInitialized, getCachedVerifiersCount, getCrossChainRevenue, -} from "../../src/logic/sync"; +} from "../../src/lib/sync"; // Mock the context module jest.mock("../../src/logic/context", () => ({ @@ -19,7 +19,7 @@ jest.mock("../../src/logic/context", () => ({ })); // Mock the retry utility -jest.mock("../../src/utils", () => ({ +jest.mock("../../src/lib/utils", () => ({ retry: jest.fn((fn) => fn()), })); From cb0e5c152d768dff7ac11461b51adb976acfa819 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:00:51 +0100 Subject: [PATCH 02/22] feat: remove deprecated logic files and move grpc and mqtt services --- src/logic/arweave.ts | 60 ------- src/logic/decode.ts | 49 ----- src/logic/encode.ts | 92 ---------- src/logic/gps.ts | 16 -- src/logic/prover.ts | 137 -------------- src/logic/streamr.ts | 45 ----- src/logic/sync.ts | 201 --------------------- src/{logic => services}/context.ts | 0 src/{logic => services}/grpc.ts | 0 src/services/mqtt.ts | 275 +++++++++++++++++++++++++++++ 10 files changed, 275 insertions(+), 600 deletions(-) delete mode 100644 src/logic/arweave.ts delete mode 100644 src/logic/decode.ts delete mode 100644 src/logic/encode.ts delete mode 100644 src/logic/gps.ts delete mode 100644 src/logic/prover.ts delete mode 100644 src/logic/streamr.ts delete mode 100644 src/logic/sync.ts rename src/{logic => services}/context.ts (100%) rename src/{logic => services}/grpc.ts (100%) create mode 100644 src/services/mqtt.ts diff --git a/src/logic/arweave.ts b/src/logic/arweave.ts deleted file mode 100644 index a75f0d8..0000000 --- a/src/logic/arweave.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; -import { Readable } from "stream"; -import Arweave from "arweave"; -import type { DecodedPayload } from "../types"; - -export async function interact(m3terId: number, decoded: DecodedPayload) { - // encode transaction into standard format (payload[0]) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = decoded.buf; - - const arweave = Arweave.init({ - host: "arweave.net", - protocol: "https", - port: 443, - }); - - const key = await arweave.wallets.generate(); - const signer = new ArweaveSigner(key); - const turbo = TurboFactory.authenticated({ signer }); - - const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; - - const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); - - return await turbo.uploadFile({ - fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), - fileSizeFactory: () => byteLength, - dataItemOpts: { - paidBy: await arweave.wallets.jwkToAddress(key), - tags: [ - { name: "Contract-Label", value: contractLabel }, - { name: "Contract-Use", value: "M3tering Protocol Test" }, - { name: "Content-Type", value: "text/plain" }, - { name: "M3ter-ID", value: m3terId.toString() }, - { name: "Timestamp", value: Date.now().toString() }, - { name: "Nonce", value: decoded.nonce.toString() }, - { name: "Energy", value: decoded.energy.toString() }, - { name: "Signature", value: decoded.signature }, - { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, - { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, - { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, - { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, - ], - }, - events: { - onUploadProgress: (progress) => { - console.log("[arweave] Upload progress:", progress); - }, - onError: (error) => { - console.error("[arweave] Upload error:", error); - }, - onSuccess(event) { - console.log("[arweave] Upload successful! Transaction ID:", event); - }, - onUploadSuccess(event) { - console.log("[arweave] Upload completed! Transaction ID:", event); - }, - }, - }); -} diff --git a/src/logic/decode.ts b/src/logic/decode.ts deleted file mode 100644 index 49c9185..0000000 --- a/src/logic/decode.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { DecodedPayload } from "../types"; - -export function decodePayload(buf: Buffer) { - if (buf.length < 72) { - throw new Error("Payload too short. Must be at least 72 bytes"); - } - - // --- Core fields --- - const nonce = buf.readUInt32BE(0); - - const rawEnergy = buf.readUInt32BE(4); - const energyKWh = rawEnergy / 1e6; - - const signature = buf.subarray(8, 72).toString("hex"); - - // --- Optional extensions --- - const ext = {} as NonNullable; - let offset = 72; - - if (buf.length >= offset + 2) { - ext.voltage = buf.readUInt16BE(offset) / 10; - offset += 2; - } - - if (buf.length >= offset + 32) { - ext.deviceId = buf.subarray(offset, offset + 32).toString("hex"); - offset += 32; - } - - if (buf.length >= offset + 3) { - let lon = buf.readIntBE(offset, 3); // signed 3-byte int - ext.longitude = lon / 1e5; - offset += 3; - } - - if (buf.length >= offset + 3) { - let lat = buf.readIntBE(offset, 3); // signed 3-byte int - ext.latitude = lat / 1e5; - offset += 3; - } - - return { - nonce, - energy: energyKWh, - signature, - extensions: ext, - buf, - }; -} diff --git a/src/logic/encode.ts b/src/logic/encode.ts deleted file mode 100644 index df9234c..0000000 --- a/src/logic/encode.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { State, TransactionRecord } from "../types"; - -export function intToByteArray(num: number, byteLength: number = 4) { - const byteArray = new Uint8Array(byteLength); - for (let i = 0; i < byteLength; i++) { - byteArray[byteLength - 1 - i] = (num >> (i * 8)) & 0xff; - } - return Array.from(byteArray); -} - -export function stringToByteArray(str: string, length: number | null = null) { - const encoder = new TextEncoder(); // UTF-8 - const encoded = encoder.encode(str); - const byteArray = new Uint8Array(length === null ? encoded.length : length); - byteArray.set(encoded.slice(0, length == null ? encoded.length : length)); // truncate if longer - return Array.from(byteArray); -} - -function floatToByteArray(float: number) { - const buffer = new ArrayBuffer(4); // 4 bytes for a float (32-bit) - const view = new DataView(buffer); - view.setFloat32(0, float, false); // false for Big Endian - return Array.from(new Uint8Array(buffer)); -} - -/** - * - * @notice only needs `nonce` from the state - */ -export function encode(state: State, latitude: number, longitude: number) { - let responseBytes = floatToByteArray(latitude).concat( - floatToByteArray(longitude) - ); - - let nonce = state.nonce; - - if (nonce) { - responseBytes = intToByteArray(nonce).concat(responseBytes); - } - - responseBytes.unshift(state.is_on ? 1 : 0); - return responseBytes; -} - -/** - * Encode transaction data into a byte array. - * - * format: nonce(4 bytes) | energy (4 bytes) | signature(64 bytes) | voltage(2 bytes) | device_id(32 bytes) | longitude(3 bytes) | latitude(3 bytes) - * - * @param nonce - The transaction nonce. - * @param energy - The energy value. - * @param signature - The transaction signature. - * @param voltage - The voltage value. - * @param deviceId - The device ID. - * @param longitude - The longitude value. - * @returns The encoded transaction byte array. - */ -export function encodeTransaction({ - nonce, - energy, - signature, - voltage, - deviceId, - longitude, - latitude, -}: { - nonce: number; - energy: number; - signature: string; - voltage: number; - deviceId: string; - longitude: number; - latitude: number; -}) { - const encodedNonce = intToByteArray(nonce, 4); - const encodedEnergy = intToByteArray(energy * 10e6, 4); - const encodedSignature = stringToByteArray(signature, 64); - const encodedVoltage = intToByteArray(voltage * 10, 2); - const encodedDeviceId = stringToByteArray(deviceId, 32); - const encodedLongitude = intToByteArray(longitude * 10e5, 3); - const encodedLatitude = intToByteArray(latitude * 10e5, 3); - - return [ - ...encodedNonce, - ...encodedEnergy, - ...encodedSignature, - ...encodedVoltage, - ...encodedDeviceId, - ...encodedLongitude, - ...encodedLatitude, - ]; -} diff --git a/src/logic/gps.ts b/src/logic/gps.ts deleted file mode 100644 index f7d44c3..0000000 --- a/src/logic/gps.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { readFileSync } from "fs"; -import { join } from "path"; - -export function getGPS(): number[] { - try { - const data = readFileSync(join("gps_service", "data.json"), "utf-8"); - const gpsData = JSON.parse(data); - if (gpsData.onBoard.lat && gpsData.onBoard.lon) { - return [gpsData.onBoard.lat.toFixed(2), gpsData.onBoard.lon.toFixed(2)]; - } else { - return [gpsData.wifi.lat.toFixed(2), gpsData.wifi.lon.toFixed(2)]; - } - } catch (err) { - return [0.00, 0.00]; - } -} diff --git a/src/logic/prover.ts b/src/logic/prover.ts deleted file mode 100644 index c4f828a..0000000 --- a/src/logic/prover.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { BatchTransactionPayload, TransactionRecord } from "../types"; -import { rollup } from "./context"; -import { buildBatchPayload } from "../utils"; -import { getAllTransactionRecords } from "../store/sqlite"; - -const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; - -// Prover node structure -export interface ProverNode { - id: string; - url: string; - isActive: boolean; - lastSeen: number; -} - -/** - * Get prover node list from smart contract - * Makes a request to a smart contract to get an array of node structs - */ -export async function getProverNodeList(): Promise { - try { - // TODO: Implement smart contract call to get prover nodes - console.log("Fetching prover node list from smart contract..."); - return []; - } catch (err: any) { - console.error("Failed to get prover node list:", err.message); - return []; - } -} - -/** - * Choose a prover node from the fetched list - * Picks one prover node from the available nodes (e.g., random selection, load balancing, etc.) - */ -export function chooseProverNode(nodes: ProverNode[]): ProverNode | null { - try { - // TODO: Implement node selection logic - // Filter active nodes - const activeNodes = nodes.filter((node) => node.isActive); - - if (activeNodes.length === 0) { - console.warn("No active prover nodes available"); - return null; - } - const randomIndex = Math.floor(Math.random() * activeNodes.length); - const selectedNode = activeNodes[randomIndex]; - - console.log("Selected prover node:", { - id: selectedNode.id, - url: selectedNode.url, - }); - return selectedNode; - } catch (err: any) { - console.error("❌ Failed to choose prover node:", err.message); - return null; - } -} - -/** - * Send transactions to prover node for verification - */ -export async function sendTransactionsToProver( - proverURL: string, - transactionData: BatchTransactionPayload[] -): Promise { - try { - const response = await fetch(`${proverURL}/batch-payloads`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(transactionData), - }); - - console.log("[info] received", response.status, "from the prover"); - - if (!response.ok) { - throw new Error(`Prover responded with status: ${response.status}`); - } - return response; - } catch (err: any) { - console.error("Failed to send transactions to prover:", err.message); - return null; - } -} - -/** - * Check prover node status - */ -export async function checkProverNodeStatus(proverURL: string) { - try { - const response = await fetch(`${proverURL}/status`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - }); - - if (!response.ok) { - return false; - } - - return true; - } catch (err: any) { - console.error("❌ Failed to check prover node status:", err); - return false; - } -} - -/** - * Checks the nonce for a meter id onchain - * - * @returns {Promise} meter nonce onchain - */ -export async function checkNonceOnchain(meterId: string): Promise { - try { - const nonce = await rollup.nonce(meterId); - return nonce; - } catch (err: any) { - console.error("Failed to check nonce onchain:", err); - throw err; - } -} - -export async function getProverURL(): Promise { - return PREFERRED_PROVER_NODE; -} - -export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); - - const requestPayload = buildBatchPayload(pendingTransactions); - - console.log("[info] Request payload:", requestPayload); - - return await sendTransactionsToProver(proverURL, requestPayload); -} diff --git a/src/logic/streamr.ts b/src/logic/streamr.ts deleted file mode 100644 index 45ba21c..0000000 --- a/src/logic/streamr.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { StreamrClient } from "@streamr/sdk"; -import { TransactionRecord } from "../types"; -import { buildBatchPayload } from "../utils"; - -const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; - -if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { - throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); -} - -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream.then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); -}).catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); -}); - -async function getStream() { - return await stream; -} - -export async function publishHeartbeatToStream() { - const stream = await getStream(); - const heartbeatPayload = { - timestamp: new Date().toISOString(), - }; - await stream.publish(heartbeatPayload); - console.log("[Streamr] Published heartbeat:", heartbeatPayload); -} - -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} - -export async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); -} \ No newline at end of file diff --git a/src/logic/sync.ts b/src/logic/sync.ts deleted file mode 100644 index 909e397..0000000 --- a/src/logic/sync.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { - getMeterByPublicKey, - getMeterByTokenId, - getTransactionByNonce, - pruneTransactionsAfter, - pruneTransactionsBefore, - updateMeterNonce, -} from "../store/sqlite"; -import { - provider, - rollup as rollupContract, - ccipRevenueReader as ccipRevenueReaderContract, - priceContext as priceContextContract, -} from "./context"; -import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; -import { retry } from "../utils"; -import type { VerifierInfo } from "../types"; - -// Cache for verifiers - populated once on startup -let verifiersCache: VerifierInfo[] | null = null; -let isCacheInitialized = false; - -/** - * Initialize verifiers cache on program startup - * Fetches all verifiers and resolves their ENS names once - * Throws error if any fetch/resolution fails - */ -export async function initializeVerifiersCache(): Promise { - try { - console.log("[info] Initializing verifiers cache..."); - - // Get the number of verifiers - const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); - console.log(`[info] Found ${verifierCount} verifiers to cache`); - - const verifiers: VerifierInfo[] = []; - - // Fetch all verifiers and resolve their ENS names - for (let i = 0; i < verifierCount; i++) { - try { - // Get verifier info (ensName, targetContractAddress) - const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); - - console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`); - - // Resolve ENS name to get the verifier address - const verifierAddress = await retry(() => provider.resolveName(ensName)); - - if (!verifierAddress || verifierAddress === ZeroAddress) { - throw new Error(`Failed to resolve ENS name: ${ensName}`); - } - - console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); - - verifiers.push({ - ensName, - targetAddress, - verifierAddress, - }); - } catch (error) { - console.error(`[error] Failed to initialize verifier ${i}:`, error); - throw error; // Fail fast as requested - } - } - - // Cache the verifiers - verifiersCache = verifiers; - isCacheInitialized = true; - - console.log(`[info] Successfully cached ${verifiers.length} verifiers`); - } catch (error) { - console.error("[error] Failed to initialize verifiers cache:", error); - isCacheInitialized = false; - verifiersCache = null; - throw error; - } -} - -/** - * Get cached verifiers, throws error if cache is not initialized - */ -function getCachedVerifiers(): VerifierInfo[] { - if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); - } - return verifiersCache; -} - -/** - * Check if verifiers cache is initialized - */ -export function isVerifiersCacheInitialized(): boolean { - return isCacheInitialized && verifiersCache !== null; -} - -/** - * Get the number of cached verifiers - */ -export function getCachedVerifiersCount(): number { - return verifiersCache?.length ?? 0; -} - -export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { - const meter = - typeof meterIdentifier === "number" - ? getMeterByTokenId(meterIdentifier) - : getMeterByPublicKey(meterIdentifier); - - if (!meter) { - throw new Error(`Meter with identifier ${meterIdentifier} not found`); - } - - // Check the latest nonce on the blockchain - const onchainNonce = Number(await rollupContract.nonce(meter.tokenId)); - const latestNonce = meter.latestNonce; - - if (onchainNonce > latestNonce) { - const publicKey = meter.publicKey; - // If the onchain nonce is greater, update the local record - updateMeterNonce(publicKey, onchainNonce); - } - // prune transactions with nonce less than or equal to onchainNonce - pruneTransactionsBefore(meter.tokenId, onchainNonce); - - return onchainNonce; -} - -export async function getLatestTransactionNonce(meterIdentifier: number): Promise { - // get latest nonce from chain - let latestNonce = Number(await rollupContract.nonce(meterIdentifier)); - - // check local state for the highest nonce we have - while (true) { - const existingTransaction = getTransactionByNonce(latestNonce + 1, meterIdentifier); - if (existingTransaction) { - latestNonce += 1; - } else { - pruneTransactionsAfter(latestNonce, meterIdentifier); - break; - } - } - - return latestNonce; -} - -// get revenue across suppored chains -export async function getCrossChainRevenue(tokenId: number): Promise { - try { - // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); - - let totalRevenue = 0; - - // Iterate through all cached verifiers and get revenue from each chain - for (const verifier of verifiers) { - try { - console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`); - - // Get revenue from this specific chain using CCIP read - // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) - const revenue = await retry(() => - ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, { - enableCcipRead: true, - }) - ); - const revenueAmount = Number(revenue); - - console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`); - totalRevenue += revenueAmount; - } catch (error) { - console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error); - // Continue with other verifiers even if one fails - } - } - - console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`); - return totalRevenue; - } catch (error) { - console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error); - throw error; - } -} - -// get owed from price context -export async function getOwedFromPriceContext(tokenId: number): Promise { - try { - return await retry(async () => { - console.log(`[info] Getting owed amount for token ${tokenId} from price context`); - - // Call the price context to get the amount the user owes with CCIP read enabled - const owedAmount = await priceContextContract.owed(tokenId); - const owed = Number(owedAmount); - - console.log(`[info] Owed amount for token ${tokenId}: ${owed}`); - return owed; - }); - } catch (error) { - console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error); - throw error; - } -} diff --git a/src/logic/context.ts b/src/services/context.ts similarity index 100% rename from src/logic/context.ts rename to src/services/context.ts diff --git a/src/logic/grpc.ts b/src/services/grpc.ts similarity index 100% rename from src/logic/grpc.ts rename to src/services/grpc.ts diff --git a/src/services/mqtt.ts b/src/services/mqtt.ts new file mode 100644 index 0000000..9c22601 --- /dev/null +++ b/src/services/mqtt.ts @@ -0,0 +1,275 @@ +import { connect } from "mqtt"; +import { enqueue } from "./grpc"; +import { encode } from "../lib/encode"; +import { app, m3ter as m3terContract } from "./context"; +import { + deleteMeterByPublicKey, + getAllTransactionRecords, + getMeterByDevEui, + getMeterByPublicKey, + getMeterByTokenId, + insertTransaction, + saveMeter, + updateMeterDevEui, + updateMeterNonce, +} from "../store/sqlite"; +import type { State, TransactionRecord } from "../types"; +import { decodePayload } from "../lib/decode"; +import { runHook, verifyPayloadSignature } from "../lib/utils"; +import { getLatestTransactionNonce, pruneAndSyncOnchain, getCrossChainRevenue, getOwedFromPriceContext } from "../lib/sync"; +import { createMeterLogger } from "../utils/logger"; + +const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; +const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain +const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing + +export function handleUplinks(): Promise { + return new Promise(function (resolve, reject) { + const client = connect({ + host: CHIRPSTACK_HOST, + port: 1883, + clean: true, + connectTimeout: 9000, + reconnectPeriod: 1000, + }); + + client.on("reconnect", () => { + runHook("onMqttReconnect", client); + }); + + client.on("message", async (_, blob) => { + return await handleMessage(blob); + }); + + client.on("connect", () => { + const topic = `application/${process.env.APPLICATION_ID}/device/+/event/up`; + client.subscribe(topic, () => { + console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + + runHook("onMqttSubscribed", client, topic); + resolve(true); + }); + + runHook("onMqttConnect", client); + }); + + client.on("error", (err) => { + console.error("Connection error: ", err); + + runHook("onMqttError", err, client); + + client.end(); + reject(err); + }); + }); +} + +export async function handleMessage(blob: Buffer) { + runHook("onMessageReceived", blob); + + const message = JSON.parse(blob.toString()); + const devEui = message["deviceInfo"]["devEui"] || null; + + // Create a logger with devEui context + const logger = createMeterLogger({ devEui }); + + try { + if (!devEui) { + console.log("[warn] Message dropped - no devEui found in message"); + return; + } + + // Check if this specific device is already being processed + if (deviceLocks.get(devEui)) { + logger.warn(`Message dropped - device is already being processed`); + runHook("onMessageDropped", "locked", devEui); + return; + } + + let is_on = true; + + // Set lock for this specific device + deviceLocks.set(devEui, true); + + logger.info(`Received uplink from device: ${JSON.stringify(message)}`); + + // encoded transaction in standard format (payload is hex string) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = Buffer.from(message["data"], "base64"); + const decoded = decodePayload(transactionHex); + let publicKey = decoded.extensions.deviceId; + let payloadHadPublicKey = !!publicKey; + + logger.info(`Decoded payload: ${JSON.stringify(decoded)}`); + + if (!publicKey) { + // try to find public key by DevEui + const meterByDevEui = getMeterByDevEui(devEui); + + if (!meterByDevEui) { + throw new Error("Device EUI not associated with any meter: " + devEui); + } + + publicKey = meterByDevEui.publicKey.replace("0x", ""); + } + + // verify transaction signature + const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex")); + if (!isValid) { + throw new Error("Invalid transaction signature for meter with public key: " + publicKey); + } + + logger.info("Verified signature"); + + if (payloadHadPublicKey) { + logger.info(`Payload contained public key: ${publicKey}`); + // save public key with device EUI mapping if not already saved + const existingMeter = getMeterByPublicKey(`0x${publicKey}`); + + if (!existingMeter) { + const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); + + const latestNonce = await getLatestTransactionNonce(tokenId); + + logger.info(`Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}`); + + // save new meter with devEui + const newMeter = { + publicKey: `0x${publicKey}`, + devEui: message["deviceInfo"]["devEui"], + tokenId, + latestNonce, + }; + + const existingMeter = getMeterByTokenId(tokenId); + + // in-case of the public key being updated + if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { + deleteMeterByPublicKey(`0x${publicKey}`); + } + + saveMeter(newMeter); + logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); + + runHook("onMeterCreated", newMeter); + } else { + // update existing meter with devEui if not already set + if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { + logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); + updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); + } + + // fetch and update latest nonce from chain + const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId); + + logger.info(`Fetched latestNonce from chain and local state: ${latestNonce}`); + + updateMeterNonce(`0x${publicKey}`, latestNonce); + } + } + + let m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; + + if (!m3ter) { + throw new Error("Meter not found for public key: " + publicKey); + } + + logger.info(`Found meter: ${JSON.stringify(m3ter)}`); + + // If both latest nonce and received nonce are 0, enqueue 0 immediately + if (m3ter.latestNonce === 0 && decoded.nonce === 0) { + logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); + + try { + is_on = true; // Always on + // (await getCrossChainRevenue(m3ter.tokenId)) >= + // (await getOwedFromPriceContext(m3ter.tokenId)); + } catch (error) { + logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); + } + + const state = { nonce: 0, is_on }; + + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + + return; // Exit early without processing the transaction + } + + if (m3ter.latestNonce % SYNC_EPOCH === 0) { + // sync with blockchain every SYNC_EPOCH transactions + await pruneAndSyncOnchain(m3ter.tokenId); + + runHook("onSyncEpochReached"); + + logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); + + m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; + + if (!m3ter) { + throw new Error("Meter not found after sync for public key: " + publicKey); + } + } + + const expectedNonce = m3ter.latestNonce + 1; + + logger.info(`Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}`); + + if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { + throw new Error(`Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}`); + } + + // if device nonce is correct + if (decoded.nonce === expectedNonce) { + logger.info(`Nonce is valid: ${decoded.nonce}`); + + // save transaction to local store + const transactionRecord = { + nonce: decoded.nonce, + identifier: m3ter.tokenId, + receivedAt: Date.now(), + raw: transactionHex.toString("hex"), + } as TransactionRecord; + insertTransaction(transactionRecord); + updateMeterNonce(`0x${publicKey}`, expectedNonce); + + logger.info(`Updated meter nonce to: ${expectedNonce}`); + + const pendingTransactions = getAllTransactionRecords(); + runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); + + try { + is_on = await runHook("isOnStateCompute", m3ter.tokenId); + } catch (error) { + runHook("onIsOnStateComputeError", m3ter.tokenId, error); + logger.error(`Error in isOnStateCompute hook: ${error}`); + } + + runHook("onIsOnStateComputed", m3ter.tokenId, is_on); + + const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; + + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + runHook("onStateEnqueued", state, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0); + } + } catch (error) { + logger.error(`Error handling MQTT message: ${error}`); + runHook("onMessageError", error); + } finally { + // Release lock for this specific device + if (devEui) { + deviceLocks.delete(devEui); + runHook("onDeviceUnlocked", devEui); + } + runHook("onMessageProcessingComplete"); + } +} From da21690aca41abe2909ee7ecdafc567608db38dc Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:01:42 +0100 Subject: [PATCH 03/22] feat: add modular decode and encode functions, initialize verifiers cache, and remove unused utils --- .vscode/settings.json | 10 +++ src/lib/decode.ts | 49 ++++++++++ src/lib/encode.ts | 92 +++++++++++++++++++ src/lib/sync.ts | 201 ++++++++++++++++++++++++++++++++++++++++++ src/utils.ts | 69 --------------- 5 files changed, 352 insertions(+), 69 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 src/lib/decode.ts create mode 100644 src/lib/encode.ts create mode 100644 src/lib/sync.ts delete mode 100644 src/utils.ts diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7d1e594 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + "cSpell.words": [ + "ardrive", + "arweave", + "m3ters", + "Emmo00", + "ccip", + "Mauchly", + ] +} \ No newline at end of file diff --git a/src/lib/decode.ts b/src/lib/decode.ts new file mode 100644 index 0000000..49c9185 --- /dev/null +++ b/src/lib/decode.ts @@ -0,0 +1,49 @@ +import type { DecodedPayload } from "../types"; + +export function decodePayload(buf: Buffer) { + if (buf.length < 72) { + throw new Error("Payload too short. Must be at least 72 bytes"); + } + + // --- Core fields --- + const nonce = buf.readUInt32BE(0); + + const rawEnergy = buf.readUInt32BE(4); + const energyKWh = rawEnergy / 1e6; + + const signature = buf.subarray(8, 72).toString("hex"); + + // --- Optional extensions --- + const ext = {} as NonNullable; + let offset = 72; + + if (buf.length >= offset + 2) { + ext.voltage = buf.readUInt16BE(offset) / 10; + offset += 2; + } + + if (buf.length >= offset + 32) { + ext.deviceId = buf.subarray(offset, offset + 32).toString("hex"); + offset += 32; + } + + if (buf.length >= offset + 3) { + let lon = buf.readIntBE(offset, 3); // signed 3-byte int + ext.longitude = lon / 1e5; + offset += 3; + } + + if (buf.length >= offset + 3) { + let lat = buf.readIntBE(offset, 3); // signed 3-byte int + ext.latitude = lat / 1e5; + offset += 3; + } + + return { + nonce, + energy: energyKWh, + signature, + extensions: ext, + buf, + }; +} diff --git a/src/lib/encode.ts b/src/lib/encode.ts new file mode 100644 index 0000000..df9234c --- /dev/null +++ b/src/lib/encode.ts @@ -0,0 +1,92 @@ +import { State, TransactionRecord } from "../types"; + +export function intToByteArray(num: number, byteLength: number = 4) { + const byteArray = new Uint8Array(byteLength); + for (let i = 0; i < byteLength; i++) { + byteArray[byteLength - 1 - i] = (num >> (i * 8)) & 0xff; + } + return Array.from(byteArray); +} + +export function stringToByteArray(str: string, length: number | null = null) { + const encoder = new TextEncoder(); // UTF-8 + const encoded = encoder.encode(str); + const byteArray = new Uint8Array(length === null ? encoded.length : length); + byteArray.set(encoded.slice(0, length == null ? encoded.length : length)); // truncate if longer + return Array.from(byteArray); +} + +function floatToByteArray(float: number) { + const buffer = new ArrayBuffer(4); // 4 bytes for a float (32-bit) + const view = new DataView(buffer); + view.setFloat32(0, float, false); // false for Big Endian + return Array.from(new Uint8Array(buffer)); +} + +/** + * + * @notice only needs `nonce` from the state + */ +export function encode(state: State, latitude: number, longitude: number) { + let responseBytes = floatToByteArray(latitude).concat( + floatToByteArray(longitude) + ); + + let nonce = state.nonce; + + if (nonce) { + responseBytes = intToByteArray(nonce).concat(responseBytes); + } + + responseBytes.unshift(state.is_on ? 1 : 0); + return responseBytes; +} + +/** + * Encode transaction data into a byte array. + * + * format: nonce(4 bytes) | energy (4 bytes) | signature(64 bytes) | voltage(2 bytes) | device_id(32 bytes) | longitude(3 bytes) | latitude(3 bytes) + * + * @param nonce - The transaction nonce. + * @param energy - The energy value. + * @param signature - The transaction signature. + * @param voltage - The voltage value. + * @param deviceId - The device ID. + * @param longitude - The longitude value. + * @returns The encoded transaction byte array. + */ +export function encodeTransaction({ + nonce, + energy, + signature, + voltage, + deviceId, + longitude, + latitude, +}: { + nonce: number; + energy: number; + signature: string; + voltage: number; + deviceId: string; + longitude: number; + latitude: number; +}) { + const encodedNonce = intToByteArray(nonce, 4); + const encodedEnergy = intToByteArray(energy * 10e6, 4); + const encodedSignature = stringToByteArray(signature, 64); + const encodedVoltage = intToByteArray(voltage * 10, 2); + const encodedDeviceId = stringToByteArray(deviceId, 32); + const encodedLongitude = intToByteArray(longitude * 10e5, 3); + const encodedLatitude = intToByteArray(latitude * 10e5, 3); + + return [ + ...encodedNonce, + ...encodedEnergy, + ...encodedSignature, + ...encodedVoltage, + ...encodedDeviceId, + ...encodedLongitude, + ...encodedLatitude, + ]; +} diff --git a/src/lib/sync.ts b/src/lib/sync.ts new file mode 100644 index 0000000..1a241ea --- /dev/null +++ b/src/lib/sync.ts @@ -0,0 +1,201 @@ +import { + getMeterByPublicKey, + getMeterByTokenId, + getTransactionByNonce, + pruneTransactionsAfter, + pruneTransactionsBefore, + updateMeterNonce, +} from "../store/sqlite"; +import { + provider, + rollup as rollupContract, + ccipRevenueReader as ccipRevenueReaderContract, + priceContext as priceContextContract, +} from "../services/context"; +import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; +import { retry } from "./utils"; +import type { VerifierInfo } from "../types"; + +// Cache for verifiers - populated once on startup +let verifiersCache: VerifierInfo[] | null = null; +let isCacheInitialized = false; + +/** + * Initialize verifiers cache on program startup + * Fetches all verifiers and resolves their ENS names once + * Throws error if any fetch/resolution fails + */ +export async function initializeVerifiersCache(): Promise { + try { + console.log("[info] Initializing verifiers cache..."); + + // Get the number of verifiers + const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); + console.log(`[info] Found ${verifierCount} verifiers to cache`); + + const verifiers: VerifierInfo[] = []; + + // Fetch all verifiers and resolve their ENS names + for (let i = 0; i < verifierCount; i++) { + try { + // Get verifier info (ensName, targetContractAddress) + const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); + + console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`); + + // Resolve ENS name to get the verifier address + const verifierAddress = await retry(() => provider.resolveName(ensName)); + + if (!verifierAddress || verifierAddress === ZeroAddress) { + throw new Error(`Failed to resolve ENS name: ${ensName}`); + } + + console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); + + verifiers.push({ + ensName, + targetAddress, + verifierAddress, + }); + } catch (error) { + console.error(`[error] Failed to initialize verifier ${i}:`, error); + throw error; // Fail fast as requested + } + } + + // Cache the verifiers + verifiersCache = verifiers; + isCacheInitialized = true; + + console.log(`[info] Successfully cached ${verifiers.length} verifiers`); + } catch (error) { + console.error("[error] Failed to initialize verifiers cache:", error); + isCacheInitialized = false; + verifiersCache = null; + throw error; + } +} + +/** + * Get cached verifiers, throws error if cache is not initialized + */ +function getCachedVerifiers(): VerifierInfo[] { + if (!isCacheInitialized || !verifiersCache) { + throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + } + return verifiersCache; +} + +/** + * Check if verifiers cache is initialized + */ +export function isVerifiersCacheInitialized(): boolean { + return isCacheInitialized && verifiersCache !== null; +} + +/** + * Get the number of cached verifiers + */ +export function getCachedVerifiersCount(): number { + return verifiersCache?.length ?? 0; +} + +export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { + const meter = + typeof meterIdentifier === "number" + ? getMeterByTokenId(meterIdentifier) + : getMeterByPublicKey(meterIdentifier); + + if (!meter) { + throw new Error(`Meter with identifier ${meterIdentifier} not found`); + } + + // Check the latest nonce on the blockchain + const onchainNonce = Number(await rollupContract.nonce(meter.tokenId)); + const latestNonce = meter.latestNonce; + + if (onchainNonce > latestNonce) { + const publicKey = meter.publicKey; + // If the onchain nonce is greater, update the local record + updateMeterNonce(publicKey, onchainNonce); + } + // prune transactions with nonce less than or equal to onchainNonce + pruneTransactionsBefore(meter.tokenId, onchainNonce); + + return onchainNonce; +} + +export async function getLatestTransactionNonce(meterIdentifier: number): Promise { + // get latest nonce from chain + let latestNonce = Number(await rollupContract.nonce(meterIdentifier)); + + // check local state for the highest nonce we have + while (true) { + const existingTransaction = getTransactionByNonce(latestNonce + 1, meterIdentifier); + if (existingTransaction) { + latestNonce += 1; + } else { + pruneTransactionsAfter(latestNonce, meterIdentifier); + break; + } + } + + return latestNonce; +} + +// get revenue across suppored chains +export async function getCrossChainRevenue(tokenId: number): Promise { + try { + // Use cached verifiers instead of fetching them each time + const verifiers = getCachedVerifiers(); + + let totalRevenue = 0; + + // Iterate through all cached verifiers and get revenue from each chain + for (const verifier of verifiers) { + try { + console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`); + + // Get revenue from this specific chain using CCIP read + // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) + const revenue = await retry(() => + ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, { + enableCcipRead: true, + }) + ); + const revenueAmount = Number(revenue); + + console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`); + totalRevenue += revenueAmount; + } catch (error) { + console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error); + // Continue with other verifiers even if one fails + } + } + + console.log(`[info] Total cross-chain revenue for token ${tokenId}: ${totalRevenue}`); + return totalRevenue; + } catch (error) { + console.error(`[error] Failed to get cross-chain revenue for token ${tokenId}:`, error); + throw error; + } +} + +// get owed from price context +export async function getOwedFromPriceContext(tokenId: number): Promise { + try { + return await retry(async () => { + console.log(`[info] Getting owed amount for token ${tokenId} from price context`); + + // Call the price context to get the amount the user owes with CCIP read enabled + const owedAmount = await priceContextContract.owed(tokenId); + const owed = Number(owedAmount); + + console.log(`[info] Owed amount for token ${tokenId}: ${owed}`); + return owed; + }); + } catch (error) { + console.error(`[error] Failed to get owed amount for token ${tokenId}:`, error); + throw error; + } +} diff --git a/src/utils.ts b/src/utils.ts deleted file mode 100644 index 0fbacdd..0000000 --- a/src/utils.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { TransactionRecord, BatchTransactionPayload } from "./types"; -import { createPublicKey, verify } from "crypto"; -import os from "os"; - -/** - * Retries a function up to 5 times with exponential backoff - * @param fn Function to retry - * @param maxRetries Maximum number of retries (default: 5) - * @param baseDelay Base delay in milliseconds (default: 1000) - * @returns Promise that resolves with the function result or rejects with the last error - */ -export async function retry( - fn: () => Promise, - maxRetries: number = 5, - baseDelay: number = 1000 -): Promise { - let lastError: Error; - - for (let attempt = 0; attempt <= maxRetries; attempt++) { - try { - return await fn(); - } catch (error) { - lastError = error as Error; - - if (attempt === maxRetries) { - throw lastError; - } - - const delay = baseDelay * Math.pow(2, attempt); - console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); - - await new Promise((resolve) => setTimeout(resolve, delay)); - } - } - - throw lastError!; -} - -export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { - return transactions.map((transaction) => ({ - m3ter_id: Number(transaction.identifier), - message: transaction.raw, - })); -} - -export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { - try { - const message = transaction.subarray(0, 8); - const signature = transaction.subarray(8, 72); - - // Wrap raw key in SPKI DER - const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); - const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); - - const publicKey = createPublicKey({ - key: derKey, - format: "der", - type: "spki", - }); - - // Verify - const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); - - return ok; - } catch (error) { - console.error("Error verifying signature:", error); - return false; - } -} From 04047bc8b978f658ffb59d2163b0a6093094962a Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:35:28 +0100 Subject: [PATCH 04/22] docs: add complete hook lifecycle documentation for M3tering Console --- README.md | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/README.md b/README.md index 4074dda..076c822 100644 --- a/README.md +++ b/README.md @@ -54,3 +54,68 @@ npm install npm run dev ``` + + +# Complete Hook Lifecycle for M3tering Console + +## Initialization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onBeforeInit` | Called before any initialization begins | None | +| `onDatabaseSetup` | Called after SQLite tables/jobs are initialized | None | +| `onAfterInit` | Called after all initialization completes successfully | None | +| `onInitError` | Called when an error occurs during initialization | `error: any` | + +## MQTT Connection Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMqttConnect` | Called when MQTT client successfully connects to ChirpStack | `client: MqttClient` | +| `onMqttSubscribed` | Called after subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | +| `onMqttError` | Called when an MQTT connection error occurs | `error: any`, `client: MqttClient` | +| `onMqttReconnect` | Called when attempting to reconnect to the MQTT broker | `client: MqttClient` | + +## Message Ingestion Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageReceived` | Called when a raw MQTT message is received (before parsing) | `blob: Buffer` | +| `onMessageDropped` | Called when a message is dropped (e.g., device already locked) | `reason: string`, `devEui: string` | + +## Meter Discovery & Registration Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` | + +## Nonce Synchronization Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onSyncEpochReached` | Called when the sync interval is reached for blockchain synchronization | None | + +## Downstream Distribution Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onTransactionDistribution` | Called before sending transactions to Arweave, prover node, Streamr, or other stores/loggers | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | + +## State Encoding & Device Response Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `isOnStateCompute` | Called to determine the device's on/off state (returns boolean) | `m3terId: number` | +| `onIsOnStateComputed` | Called after the on/off state has been computed | `m3terId: number`, `isOn: boolean` | +| `onIsOnStateComputeError` | Called when an error occurs during state computation | `m3terId: number`, `error: any` | +| `onStateEnqueued` | Called after the state is enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | + +## Error Handling & Cleanup Phase + +| Hook Name | Description | Parameters | +|-----------|-------------|------------| +| `onMessageError` | Called when any error occurs during message processing | `error: any` | +| `onDeviceUnlocked` | Called when a device lock is released (regardless of outcome) | `devEui: string` | +| `onMessageProcessingComplete` | Called when message processing finishes (success or error) | None | + +--- From 70cdaab621b02e53b2ae2f34362e7be97cd0b6fe Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:45:33 +0100 Subject: [PATCH 05/22] refactor: modularize Streamr client initialization, destroy client instance after action and enhance transaction publishing with retry logic --- src/lib/core/streamr/index.ts | 44 ++++++++++++++--------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index bddec0c..a16d375 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -1,5 +1,5 @@ import { StreamrClient } from "@streamr/sdk"; -import { buildBatchPayload } from "../../utils"; +import { buildBatchPayload, retry } from "../../utils"; import type { Hooks, TransactionRecord } from "../../../types"; const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; @@ -8,28 +8,14 @@ if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); } -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream - .then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); - }) - .catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); - }); - export default class implements Hooks { async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { // send pending transactions to streamr try { console.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); + + await retry(() => publishPendingTransactionsToStreamr(pendingTransactions), 3); + console.info(`Successfully sent pending transactions to streamr`); } catch (error) { console.error(`Error sending pending transactions to streamr: ${error}`); @@ -37,15 +23,19 @@ export default class implements Hooks { } } -async function getStream() { - return await stream; -} +async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY!, + }, + }); -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); -async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); + const batchPayload = buildBatchPayload(pendingTransactions); + + await stream.publish(batchPayload); + + // destroy the client to free resources + await streamrClient.destroy(); } From 684a5aaacc82d45bfef9e44272b9d346b99bae55 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Thu, 8 Jan 2026 19:52:15 +0100 Subject: [PATCH 06/22] refactor: update getCachedVerifiers to return a promise and adjust related tests for context module --- src/lib/sync.ts | 8 ++++---- tests/logic/sync.test.ts | 19 +++++-------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/lib/sync.ts b/src/lib/sync.ts index 1a241ea..3ca21e5 100644 --- a/src/lib/sync.ts +++ b/src/lib/sync.ts @@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise { /** * Get cached verifiers, throws error if cache is not initialized */ -function getCachedVerifiers(): VerifierInfo[] { +async function getCachedVerifiers(): Promise { if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + await initializeVerifiersCache(); } - return verifiersCache; + return verifiersCache!; } /** @@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis export async function getCrossChainRevenue(tokenId: number): Promise { try { // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); + const verifiers = await getCachedVerifiers(); let totalRevenue = 0; diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 22d7db9..de1cb09 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -7,7 +7,7 @@ import { } from "../../src/lib/sync"; // Mock the context module -jest.mock("../../src/logic/context", () => ({ +jest.mock("../../src/services/context", () => ({ provider: { resolveName: jest.fn(), }, @@ -30,7 +30,7 @@ describe("Verifiers Cache", () => { }); it("should initialize cache successfully", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful responses ccipRevenueReader.verifierCount.mockResolvedValue(2n); @@ -55,7 +55,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns null)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -69,7 +69,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns zero address)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -82,17 +82,8 @@ describe("Verifiers Cache", () => { expect(getCachedVerifiersCount()).toBe(0); }); - it("should throw error if cache is not initialized when calling getCrossChainRevenue", async () => { - // Ensure cache is not initialized - expect(isVerifiersCacheInitialized()).toBe(false); - - await expect(getCrossChainRevenue(123)).rejects.toThrow( - "Verifiers cache not initialized. Call initializeVerifiersCache() first." - ); - }); - it("should use cached verifiers for getCrossChainRevenue", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful initialization ccipRevenueReader.verifierCount.mockResolvedValue(1n); From 337750a2d92e2089a4aa1ec27e648e7760f2c7f6 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:07:01 +0100 Subject: [PATCH 07/22] refactor: add node-cron for scheduled transaction publishing and modularize transaction handling methods --- package-lock.json | 10 +++++ package.json | 1 + src/lib/core/prover/index.ts | 72 ++++++++++++++++------------------- src/lib/core/streamr/index.ts | 53 +++++++++++++++----------- 4 files changed, 74 insertions(+), 62 deletions(-) diff --git a/package-lock.json b/package-lock.json index ec0ae02..bbb6486 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "express-handlebars": "^7.1.2", "handlebars": "^4.7.8", "mqtt": "^5.5.0", + "node-cron": "^4.2.1", "ssh2": "^1.17.0", "ws": "^8.18.3" }, @@ -11363,6 +11364,15 @@ "integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA==", "license": "MIT" }, + "node_modules/node-cron": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-4.2.1.tgz", + "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==", + "license": "ISC", + "engines": { + "node": ">=6.0.0" + } + }, "node_modules/node-datachannel": { "version": "0.27.0", "resolved": "https://registry.npmjs.org/node-datachannel/-/node-datachannel-0.27.0.tgz", diff --git a/package.json b/package.json index 0451369..73562d8 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "express-handlebars": "^7.1.2", "handlebars": "^4.7.8", "mqtt": "^5.5.0", + "node-cron": "^4.2.1", "ssh2": "^1.17.0", "ws": "^8.18.3" }, diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts index fcb5383..ca5a017 100644 --- a/src/lib/core/prover/index.ts +++ b/src/lib/core/prover/index.ts @@ -7,10 +7,10 @@ export default class implements Hooks { async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { // send pending transactions to prover node try { - const proverURL = await getProverURL(); + const proverURL = await this.getProverURL(); console.info(`Sending pending transactions to prover: ${proverURL}`); - const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); + const response = await this.sendPendingTransactionsToProver(proverURL!, pendingTransactions); console.info("done sending to prover"); console.info(`Prover response (text): ${await response?.text()}`); @@ -18,49 +18,43 @@ export default class implements Hooks { console.error(`Error sending pending transactions to prover: ${error}`); } } -} - -/** - * Send transactions to prover node for verification - */ -export async function sendTransactionsToProver( - proverURL: string, - transactionData: BatchTransactionPayload[] -): Promise { - try { - const response = await fetch(`${proverURL}/batch-payloads`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(transactionData), - }); - console.log("[info] received", response.status, "from the prover"); - - if (!response.ok) { - throw new Error(`Prover responded with status: ${response.status}`); + async sendTransactionsToProver( + proverURL: string, + transactionData: BatchTransactionPayload[] + ): Promise { + try { + const response = await fetch(`${proverURL}/batch-payloads`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(transactionData), + }); + + console.log("[info] received", response.status, "from the prover"); + + if (!response.ok) { + throw new Error(`Prover responded with status: ${response.status}`); + } + return response; + } catch (err: any) { + console.error("Failed to send transactions to prover:", err.message); + return null; } - return response; - } catch (err: any) { - console.error("Failed to send transactions to prover:", err.message); - return null; } -} -export async function getProverURL(): Promise { - return PREFERRED_PROVER_NODE; -} + async getProverURL(): Promise { + return PREFERRED_PROVER_NODE; + } -export async function sendPendingTransactionsToProver( - proverURL: string, - pendingTransactions: TransactionRecord[] -) { - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); + async sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { + console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); - const requestPayload = buildBatchPayload(pendingTransactions); + const requestPayload = buildBatchPayload(pendingTransactions); - console.log("[info] Request payload:", requestPayload); + console.log("[info] Request payload:", requestPayload); - return await sendTransactionsToProver(proverURL, requestPayload); + return await this.sendTransactionsToProver(proverURL, requestPayload); + } } diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index a16d375..4378dc4 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -1,6 +1,8 @@ +import cron from "node-cron"; import { StreamrClient } from "@streamr/sdk"; +import { getAllTransactionRecords } from "../../../store/sqlite"; import { buildBatchPayload, retry } from "../../utils"; -import type { Hooks, TransactionRecord } from "../../../types"; +import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; @@ -9,33 +11,38 @@ if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { } export default class implements Hooks { - async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { - // send pending transactions to streamr - try { - console.info(`Sending pending transactions to streamr`); - - await retry(() => publishPendingTransactionsToStreamr(pendingTransactions), 3); + async onAfterInit() { + // Schedule a cron job to publish pending transactions every hour + cron.schedule("0 * * * *", async () => { + console.log("Publishing pending transactions to Streamr..."); + + const pendingTransactions = await this.getPendingTransactions(); + if (pendingTransactions.length > 0) { + await retry(() => this.publishPendingTransactionsToStreamr(pendingTransactions), 3, 2000); + } + }); + + return; + } - console.info(`Successfully sent pending transactions to streamr`); - } catch (error) { - console.error(`Error sending pending transactions to streamr: ${error}`); - } + async getPendingTransactions(): Promise { + return getAllTransactionRecords(); } -} -async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY!, - }, - }); + async publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY!, + }, + }); - const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); - const batchPayload = buildBatchPayload(pendingTransactions); + const batchPayload = buildBatchPayload(pendingTransactions); - await stream.publish(batchPayload); + await stream.publish(batchPayload); - // destroy the client to free resources - await streamrClient.destroy(); + // destroy the client to free resources + await streamrClient.destroy(); + } } From 569fc8a58f2a0040263b68233719ed30cd318b55 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:20:43 +0100 Subject: [PATCH 08/22] refactor: update .gitignore to include console configuration file --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 195e462..fa5348b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# console configuration file +console.config.json + #test scripts emulate.ts test-database.ts From be73e486e6e2c913a767dbcd7b3559e9265839a6 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:21:27 +0100 Subject: [PATCH 09/22] refactor: remove STREAMR_STREAM_ID from .env.example and README, delete console.config.json, and add console.example.config.json --- .env.example | 1 - README.md | 1 - console.config.json | 8 -------- console.example.config.json | 14 ++++++++++++++ 4 files changed, 14 insertions(+), 10 deletions(-) delete mode 100644 console.config.json create mode 100644 console.example.config.json diff --git a/.env.example b/.env.example index bb64c2b..50f95b1 100644 --- a/.env.example +++ b/.env.example @@ -5,5 +5,4 @@ CONTRACT_LABEL="M3ters" CHIRPSTACK_HOST="localhost" MAINNET_RPC="https://sepolia.drpc.org" PREFERRED_PROVER_NODE="http://prover.m3ter.ing" -STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" ETHEREUM_PRIVATE_KEY="..." diff --git a/README.md b/README.md index 076c822..0fd89d7 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,6 @@ CHIRPSTACK_HOST=localhost MAINNET_RPC=https://sepolia.drpc.org PREFERRED_PROVER_NODE=http://34.244.149.153 - STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" ETHEREUM_PRIVATE_KEY="..." ``` diff --git a/console.config.json b/console.config.json deleted file mode 100644 index 2503635..0000000 --- a/console.config.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "modules": [ - "core/arweave", - "core/prover", - "core/streamr", - "core/is_on" - ] -} diff --git a/console.example.config.json b/console.example.config.json new file mode 100644 index 0000000..b456fec --- /dev/null +++ b/console.example.config.json @@ -0,0 +1,14 @@ +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on" + ], + "streamr": { + "streamId": [ + "0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" + ], + "cronSchedule": "0 * * * *" + } +} From 606f4cc8b229a45f202ff3f78490c5d19520241b Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:35:13 +0100 Subject: [PATCH 10/22] refactor: modularize Streamr transaction handling and configuration loading --- src/lib/core/streamr/index.ts | 21 +++++++++++++-------- src/lib/utils.ts | 19 ++++++++++++++++++- src/types.ts | 4 ++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index 4378dc4..50f587e 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -1,24 +1,29 @@ import cron from "node-cron"; import { StreamrClient } from "@streamr/sdk"; import { getAllTransactionRecords } from "../../../store/sqlite"; -import { buildBatchPayload, retry } from "../../utils"; +import { buildBatchPayload, loadConfigurations, retry } from "../../utils"; import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; -const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; +const { ETHEREUM_PRIVATE_KEY } = process.env; -if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { - throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); +if (!ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing ETHEREUM_PRIVATE_KEY in environment variables"); } export default class implements Hooks { + private config = loadConfigurations(); + async onAfterInit() { // Schedule a cron job to publish pending transactions every hour - cron.schedule("0 * * * *", async () => { + cron.schedule(this.config.streamr.cronSchedule, async () => { console.log("Publishing pending transactions to Streamr..."); - + const pendingTransactions = await this.getPendingTransactions(); if (pendingTransactions.length > 0) { - await retry(() => this.publishPendingTransactionsToStreamr(pendingTransactions), 3, 2000); + for (const STREAMR_STREAM_ID of this.config.streamr.streamId) { + console.log(`Publishing to Streamr stream: ${STREAMR_STREAM_ID}`); + await retry(() => this.publishPendingTransactionsToStreamr(STREAMR_STREAM_ID, pendingTransactions), 3, 2000); + } } }); @@ -29,7 +34,7 @@ export default class implements Hooks { return getAllTransactionRecords(); } - async publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { + async publishPendingTransactionsToStreamr(STREAMR_STREAM_ID: string, pendingTransactions: TransactionRecord[]) { const streamrClient = new StreamrClient({ auth: { privateKey: ETHEREUM_PRIVATE_KEY!, diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 8fb7d56..b9e268e 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -4,9 +4,26 @@ import { createPublicKey, verify } from "crypto"; import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } from "../types"; const extensions: Hooks[] = []; +export const defaultConfigurations: AppConfig = { + modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on"], + streamr: { + streamId: ["0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"], + cronSchedule: "0 * * * *", + }, +}; + +export function loadConfigurations(configPath: string = "console.config.json"): AppConfig { + try { + const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + return config; + } catch (error) { + console.warn(`Could not load configuration from ${configPath}, using default configurations. Error:`, error); + return defaultConfigurations; + } +} export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise { - const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + const config: AppConfig = loadConfigurations(configPath); for (const modulePath of config.modules) { const resolved = path.resolve(__dirname, modulePath); diff --git a/src/types.ts b/src/types.ts index f80f4fd..198aa6d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,10 @@ import { MqttClient } from "mqtt/*"; // Application configuration type (console.config.json) export type AppConfig = { modules: string[]; + streamr: { + streamId: string[]; + cronSchedule: string; + }; }; // Hooks type for lifecycle events From dc166c98de319aaf220d6db29b31402ee4e3f8f4 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:42:17 +0100 Subject: [PATCH 11/22] refactor: add prune_sync module and update configurations in example and utils --- console.example.config.json | 6 +++++- src/lib/utils.ts | 5 ++++- src/types.ts | 3 +++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/console.example.config.json b/console.example.config.json index b456fec..76d3876 100644 --- a/console.example.config.json +++ b/console.example.config.json @@ -3,12 +3,16 @@ "core/arweave", "core/prover", "core/streamr", - "core/is_on" + "core/is_on", + "core/prune_sync" ], "streamr": { "streamId": [ "0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" ], "cronSchedule": "0 * * * *" + }, + "prune_sync": { + "cronSchedule": "0 * * * *" } } diff --git a/src/lib/utils.ts b/src/lib/utils.ts index b9e268e..20d36df 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -5,11 +5,14 @@ import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } fro const extensions: Hooks[] = []; export const defaultConfigurations: AppConfig = { - modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on"], + modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on", "core/prune_sync"], streamr: { streamId: ["0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"], cronSchedule: "0 * * * *", }, + prune_sync: { + cronSchedule: "0 * * * *", + }, }; export function loadConfigurations(configPath: string = "console.config.json"): AppConfig { diff --git a/src/types.ts b/src/types.ts index 198aa6d..20afbc8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,6 +7,9 @@ export type AppConfig = { streamId: string[]; cronSchedule: string; }; + prune_sync: { + cronSchedule: string; + }; }; // Hooks type for lifecycle events From 51c247aa27efe9b42a1c6a92629d13bd5bab3244 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:52:53 +0100 Subject: [PATCH 12/22] refactor: implement prune_sync module for scheduled transaction pruning and syncing --- src/lib/core/prune_sync/index.ts | 25 +++++++++++++++++++++++++ src/services/mqtt.ts | 16 ---------------- 2 files changed, 25 insertions(+), 16 deletions(-) create mode 100644 src/lib/core/prune_sync/index.ts diff --git a/src/lib/core/prune_sync/index.ts b/src/lib/core/prune_sync/index.ts new file mode 100644 index 0000000..2cc2003 --- /dev/null +++ b/src/lib/core/prune_sync/index.ts @@ -0,0 +1,25 @@ +import cron from "node-cron"; +import { Hooks } from "../../../types"; +import { getAllMeterRecords } from "../../../store/sqlite"; +import { loadConfigurations } from "../../utils"; +import { pruneAndSyncOnchain } from "../../sync"; + +export default class implements Hooks { + private config = loadConfigurations(); + + async onAfterInit() { + console.log("Registering prune_sync cron job..."); + + // Schedule a cron job to perform prune verified transactions and sync with onchain state + cron.schedule(this.config.prune_sync.cronSchedule, async () => { + const m3ters = getAllMeterRecords(); + for (const m3ter of m3ters) { + try { + pruneAndSyncOnchain(m3ter.publicKey); + } catch (error) { + console.error(`Error pruning and syncing meter ${m3ter.publicKey}:`, error); + } + } + }); + } +} diff --git a/src/services/mqtt.ts b/src/services/mqtt.ts index 9c22601..d5572ee 100644 --- a/src/services/mqtt.ts +++ b/src/services/mqtt.ts @@ -20,7 +20,6 @@ import { getLatestTransactionNonce, pruneAndSyncOnchain, getCrossChainRevenue, g import { createMeterLogger } from "../utils/logger"; const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; -const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing export function handleUplinks(): Promise { @@ -200,21 +199,6 @@ export async function handleMessage(blob: Buffer) { return; // Exit early without processing the transaction } - if (m3ter.latestNonce % SYNC_EPOCH === 0) { - // sync with blockchain every SYNC_EPOCH transactions - await pruneAndSyncOnchain(m3ter.tokenId); - - runHook("onSyncEpochReached"); - - logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); - - m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found after sync for public key: " + publicKey); - } - } - const expectedNonce = m3ter.latestNonce + 1; logger.info(`Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}`); From 06008529df4a46a046ec2ad2e0d49985f91b2740 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 09:53:03 +0100 Subject: [PATCH 13/22] refactor: remove unused BatchTransactionPayload import and update cron job comment --- src/lib/core/streamr/index.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index 50f587e..20192f4 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -2,7 +2,7 @@ import cron from "node-cron"; import { StreamrClient } from "@streamr/sdk"; import { getAllTransactionRecords } from "../../../store/sqlite"; import { buildBatchPayload, loadConfigurations, retry } from "../../utils"; -import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; +import type { Hooks, TransactionRecord } from "../../../types"; const { ETHEREUM_PRIVATE_KEY } = process.env; @@ -14,7 +14,9 @@ export default class implements Hooks { private config = loadConfigurations(); async onAfterInit() { - // Schedule a cron job to publish pending transactions every hour + console.log("Registering Streamr cron job..."); + + // Schedule a cron job to publish pending transactions cron.schedule(this.config.streamr.cronSchedule, async () => { console.log("Publishing pending transactions to Streamr..."); From a1ddba4e90e4439158ea0a9ef01ed76b79618e3d Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 13:33:15 +0100 Subject: [PATCH 14/22] refactor: enhance MQTT initialization error handling and improve Streamr cron job configuration --- src/index.ts | 5 +++++ src/lib/core/streamr/index.ts | 28 ++++++++++++++++++---------- src/lib/utils.ts | 2 +- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/index.ts b/src/index.ts index 3e54d04..45a799e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,8 +22,13 @@ async function initializeApp() { runHook("onDatabaseSetup") + try{ // Start MQTT handling await handleUplinks(); + } catch (mqttError) { + console.error("[error] MQTT initialization failed:", mqttError); + // throw mqttError; + } console.log("[info] Application initialization completed successfully"); diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index 20192f4..501b750 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -17,17 +17,25 @@ export default class implements Hooks { console.log("Registering Streamr cron job..."); // Schedule a cron job to publish pending transactions - cron.schedule(this.config.streamr.cronSchedule, async () => { - console.log("Publishing pending transactions to Streamr..."); - - const pendingTransactions = await this.getPendingTransactions(); - if (pendingTransactions.length > 0) { - for (const STREAMR_STREAM_ID of this.config.streamr.streamId) { - console.log(`Publishing to Streamr stream: ${STREAMR_STREAM_ID}`); - await retry(() => this.publishPendingTransactionsToStreamr(STREAMR_STREAM_ID, pendingTransactions), 3, 2000); + cron.schedule( + this.config.streamr.cronSchedule, + async () => { + console.log("Publishing pending transactions to Streamr..."); + + const pendingTransactions = await this.getPendingTransactions(); + if (pendingTransactions.length > 0) { + for (const STREAMR_STREAM_ID of this.config.streamr.streamId) { + console.log(`Publishing to Streamr stream: ${STREAMR_STREAM_ID}`); + await retry( + () => this.publishPendingTransactionsToStreamr(STREAMR_STREAM_ID, pendingTransactions), + 3, + 2000 + ); + } } - } - }); + }, + { name: "streamr-publish-pending-transactions", noOverlap: true } + ); return; } diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 20d36df..09ba1ed 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -44,7 +44,7 @@ export async function runHook(hook: K, ...args: Parameter for (const ext of extensions) { const fn = ext[hook]; let functionReturn; - if (fn) functionReturn = await (fn as any)(...args); + if (fn) functionReturn = await (fn as any).call(ext, ...args); if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") { result = result && functionReturn; From 80f2a9b318c3b3b716f9da9651422cdf0ca3ad12 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 13:35:03 +0100 Subject: [PATCH 15/22] refactor: add logging for cron job registration in prune_sync and Streamr modules --- src/lib/core/prune_sync/index.ts | 3 +++ src/lib/core/streamr/index.ts | 1 + 2 files changed, 4 insertions(+) diff --git a/src/lib/core/prune_sync/index.ts b/src/lib/core/prune_sync/index.ts index 2cc2003..aed1354 100644 --- a/src/lib/core/prune_sync/index.ts +++ b/src/lib/core/prune_sync/index.ts @@ -21,5 +21,8 @@ export default class implements Hooks { } } }); + + console.log("prune_sync cron job registered."); + return; } } diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index 501b750..76a4244 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -37,6 +37,7 @@ export default class implements Hooks { { name: "streamr-publish-pending-transactions", noOverlap: true } ); + console.log("Streamr cron job registered."); return; } From ff9ed94e59f8d1591022e6e8c7b31e0130ba0f70 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 13:35:43 +0100 Subject: [PATCH 16/22] refactor: simplify error logging in loadConfigurations function --- src/lib/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 09ba1ed..b67e4ad 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -20,7 +20,7 @@ export function loadConfigurations(configPath: string = "console.config.json"): const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); return config; } catch (error) { - console.warn(`Could not load configuration from ${configPath}, using default configurations. Error:`, error); + console.warn(`Could not load configuration from ${configPath}, using default configurations.`); return defaultConfigurations; } } From fac32fc2126caed7331628176cd7bf06047ef947 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 14:05:31 +0100 Subject: [PATCH 17/22] refactor: remove unused nonce synchronization phase and improve MQTT message handling --- README.md | 6 ------ src/lib/utils.ts | 2 +- src/services/mqtt.ts | 4 ++-- src/types.ts | 2 -- 4 files changed, 3 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0fd89d7..d8c0eba 100644 --- a/README.md +++ b/README.md @@ -88,12 +88,6 @@ npm run dev |-----------|-------------|------------| | `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` | -## Nonce Synchronization Phase - -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onSyncEpochReached` | Called when the sync interval is reached for blockchain synchronization | None | - ## Downstream Distribution Phase | Hook Name | Description | Parameters | diff --git a/src/lib/utils.ts b/src/lib/utils.ts index b67e4ad..b56e0e0 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -20,7 +20,7 @@ export function loadConfigurations(configPath: string = "console.config.json"): const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); return config; } catch (error) { - console.warn(`Could not load configuration from ${configPath}, using default configurations.`); + console.warn(`Could not load configuration from ${configPath}, using default configurations.` ); return defaultConfigurations; } } diff --git a/src/services/mqtt.ts b/src/services/mqtt.ts index d5572ee..b2c0a56 100644 --- a/src/services/mqtt.ts +++ b/src/services/mqtt.ts @@ -224,7 +224,8 @@ export async function handleMessage(blob: Buffer) { logger.info(`Updated meter nonce to: ${expectedNonce}`); const pendingTransactions = getAllTransactionRecords(); - runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); + await runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); + } try { is_on = await runHook("isOnStateCompute", m3ter.tokenId); @@ -244,7 +245,6 @@ export async function handleMessage(blob: Buffer) { encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) ); runHook("onStateEnqueued", state, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0); - } } catch (error) { logger.error(`Error handling MQTT message: ${error}`); runHook("onMessageError", error); diff --git a/src/types.ts b/src/types.ts index 20afbc8..97b54bd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -29,8 +29,6 @@ export type Hooks = { onMeterCreated?: (newMeter: MeterRecord) => void | Promise; - onSyncEpochReached?: () => void | Promise; - onTransactionDistribution?: ( tokenId: number, decodedPayload: DecodedPayload, From d0192f9e1316a320f815d88cb6df521277d314f2 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 14:14:26 +0100 Subject: [PATCH 18/22] refactor: add check for prover URL before sending pending transactions --- src/lib/core/prover/index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts index ca5a017..8ac956c 100644 --- a/src/lib/core/prover/index.ts +++ b/src/lib/core/prover/index.ts @@ -8,6 +8,12 @@ export default class implements Hooks { // send pending transactions to prover node try { const proverURL = await this.getProverURL(); + + if (!proverURL) { + console.warn("No prover URL configured. Skipping sending transactions to prover."); + return; + } + console.info(`Sending pending transactions to prover: ${proverURL}`); const response = await this.sendPendingTransactionsToProver(proverURL!, pendingTransactions); From 4d826c303c0cfe83d11c35d1e943e643be67cd3b Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Fri, 16 Jan 2026 14:15:10 +0100 Subject: [PATCH 19/22] refactor: remove unnecessary whitespace and logging of request payload in sendPendingTransactionsToProver --- src/lib/core/prover/index.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts index 8ac956c..e647812 100644 --- a/src/lib/core/prover/index.ts +++ b/src/lib/core/prover/index.ts @@ -8,7 +8,7 @@ export default class implements Hooks { // send pending transactions to prover node try { const proverURL = await this.getProverURL(); - + if (!proverURL) { console.warn("No prover URL configured. Skipping sending transactions to prover."); return; @@ -59,8 +59,6 @@ export default class implements Hooks { const requestPayload = buildBatchPayload(pendingTransactions); - console.log("[info] Request payload:", requestPayload); - return await this.sendTransactionsToProver(proverURL, requestPayload); } } From 9d728340bd9dcede4a13201df22788951b21edd3 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Mon, 19 Jan 2026 22:11:14 +0100 Subject: [PATCH 20/22] refactor: implement UI extension system and integrate dynamic module handling --- console.example.config.json | 3 + src/index.ts | 36 ++++++-- src/lib/core/streamr/index.ts | 17 ++-- src/lib/core/streamr/ui.ts | 150 ++++++++++++++++++++++++++++++++++ src/lib/utils.ts | 110 ++++++++++++++++++++++++- src/public/js/actions.js | 119 +++++++++++++++++++++++++++ src/types.ts | 62 ++++++++++++++ src/views/index.hbs | 40 +++++++++ src/views/layouts/main.hbs | 1 + 9 files changed, 524 insertions(+), 14 deletions(-) create mode 100644 src/lib/core/streamr/ui.ts create mode 100644 src/public/js/actions.js diff --git a/console.example.config.json b/console.example.config.json index 76d3876..628ab5f 100644 --- a/console.example.config.json +++ b/console.example.config.json @@ -6,6 +6,9 @@ "core/is_on", "core/prune_sync" ], + "uiModules": { + "streamr": "core/streamr/ui" + }, "streamr": { "streamId": [ "0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" diff --git a/src/index.ts b/src/index.ts index 45a799e..bafbf45 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ import "dotenv/config"; import { handleUplinks } from "./services/mqtt"; import { Request, Response } from "express"; import { app } from "./services/context"; -import { loadExtensionsFromConfig, runHook } from "./lib/utils"; +import { loadExtensionsFromConfig, loadUIExtensionsFromConfig, getUIComponents, invokeUIAction, runHook } from "./lib/utils"; import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite"; // Async initialization function @@ -14,20 +14,24 @@ async function initializeApp() { await loadExtensionsFromConfig(); console.log("[info] Extensions loaded successfully"); + // Load UI extensions + await loadUIExtensionsFromConfig(); + console.log("[info] UI extensions loaded successfully"); + runHook("onBeforeInit"); // Initialize database tables and jobs setupDatabase(); console.log("[info] Database setup completed"); - runHook("onDatabaseSetup") + runHook("onDatabaseSetup"); - try{ - // Start MQTT handling - await handleUplinks(); + try { + // Start MQTT handling + await handleUplinks(); } catch (mqttError) { console.error("[error] MQTT initialization failed:", mqttError); - // throw mqttError; + // throw mqttError; // TODO: uncomment after testing } console.log("[info] Application initialization completed successfully"); @@ -45,10 +49,28 @@ initializeApp(); app.get("/", async (req: Request, res: Response) => { const m3ters = getAllMeterRecords(); - res.render("index", { m3ters }); + + // Get UI components from loaded UI extensions + const { icons, windows } = await getUIComponents(); + + res.render("index", { m3ters, icons, windows }); console.log("[server]: Server handled GET request at `/`"); }); +// API endpoint to invoke UI actions +app.post("/api/actions/:moduleId/:actionId", async (req: Request, res: Response) => { + const { moduleId, actionId } = req.params; + console.log(`[server]: Invoking action '${actionId}' from module '${moduleId}'`); + + const result = await invokeUIAction(moduleId, actionId); + + if (result.success) { + res.status(200).json(result); + } else { + res.status(400).json(result); + } +}); + app.delete("/delete-meter", async (req: Request, res: Response) => { let publicKey = decodeURIComponent(req.query?.publicKey?.toString() as string); if (publicKey) { diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts index 76a4244..90b72fd 100644 --- a/src/lib/core/streamr/index.ts +++ b/src/lib/core/streamr/index.ts @@ -52,13 +52,18 @@ export default class implements Hooks { }, }); - const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); + try { + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); - const batchPayload = buildBatchPayload(pendingTransactions); + const batchPayload = buildBatchPayload(pendingTransactions); - await stream.publish(batchPayload); - - // destroy the client to free resources - await streamrClient.destroy(); + await stream.publish(batchPayload); + } catch (error) { + console.error(`Failed to publish to Streamr: ${(error as Error).message}`); + throw error; + } finally { + // destroy the client to free resources + await streamrClient.destroy(); + } } } diff --git a/src/lib/core/streamr/ui.ts b/src/lib/core/streamr/ui.ts new file mode 100644 index 0000000..9ceaa00 --- /dev/null +++ b/src/lib/core/streamr/ui.ts @@ -0,0 +1,150 @@ +import { getAllTransactionRecords } from "../../../store/sqlite"; +import { buildBatchPayload, loadConfigurations, retry } from "../../utils"; +import type { UIHooks, UIAppIcon, UIAppWindow, UIAction, TransactionRecord } from "../../../types"; +import { StreamrClient } from "@streamr/sdk"; + +const { ETHEREUM_PRIVATE_KEY } = process.env; + +/** + * Streamr UI Module + * Provides UI components for the Streamr integration, including + * a panel showing stream configuration and a manual publish action + */ +export default class implements UIHooks { + private config = loadConfigurations(); + private lastPublishTime: Date | null = null; + private lastPublishStatus: "success" | "error" | null = null; + + getAppIcon(): UIAppIcon { + return { + id: "streamr", + label: "Streamr", + iconHtml: '', + buttonClass: "is-primary", + }; + } + + async getAppWindow(): Promise { + const pendingCount = (await this.getPendingTransactions()).length; + const streamIds = this.config.streamr.streamId; + const cronSchedule = this.config.streamr.cronSchedule; + + return { + id: "streamr", + title: "Streamr Publisher", + containerClass: "", + contentHtml: ` +
+

Configuration

+
+ Cron Schedule: + ${cronSchedule} +
+
+ Stream IDs: +
    + ${streamIds.map((id) => `
  • ${id}
  • `).join("")} +
+
+
+ +
+

Status

+
+ Pending Transactions: + ${pendingCount} +
+
+ Last Publish: + ${this.lastPublishTime ? this.lastPublishTime.toLocaleString() : "Never"} + ${this.lastPublishStatus ? `(${this.lastPublishStatus})` : ""} +
+
+ +
+ + +
+ `, + }; + } + + getActions(): UIAction[] { + return [ + { + id: "publish-now", + label: "Publish Now", + buttonClass: "is-warning", + handler: async () => { + const pendingTransactions = await this.getPendingTransactions(); + + if (pendingTransactions.length === 0) { + return { message: "No pending transactions to publish" }; + } + + try { + for (const streamId of this.config.streamr.streamId) { + console.log(`[streamr-ui] Publishing to stream: ${streamId}`); + await retry(() => this.publishToStreamr(streamId, pendingTransactions), 3, 2000); + } + + this.lastPublishTime = new Date(); + this.lastPublishStatus = "success"; + + return { + message: `Published ${pendingTransactions.length} transactions to ${this.config.streamr.streamId.length} stream(s)`, + data: { count: pendingTransactions.length }, + }; + } catch (error: any) { + this.lastPublishStatus = "error"; + throw new Error(`Failed to publish: ${error.message}`); + } + }, + }, + ]; + } + + async getStatusData(): Promise> { + const pendingTransactions = await this.getPendingTransactions(); + return { + pendingCount: pendingTransactions.length, + streamIds: this.config.streamr.streamId, + cronSchedule: this.config.streamr.cronSchedule, + lastPublishTime: this.lastPublishTime, + lastPublishStatus: this.lastPublishStatus, + }; + } + + private async getPendingTransactions(): Promise { + return getAllTransactionRecords(); + } + + private async publishToStreamr(streamId: string, pendingTransactions: TransactionRecord[]) { + if (!ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing ETHEREUM_PRIVATE_KEY"); + } + + const streamrClient = new StreamrClient({ + auth: { privateKey: ETHEREUM_PRIVATE_KEY }, + }); + + try { + const stream = await streamrClient.getStream(streamId); + const batchPayload = buildBatchPayload(pendingTransactions); + await stream.publish(batchPayload); + } finally { + await streamrClient.destroy(); + } + } +} diff --git a/src/lib/utils.ts b/src/lib/utils.ts index b56e0e0..e3bfdcc 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -1,9 +1,10 @@ import fs from "fs"; import path from "path"; import { createPublicKey, verify } from "crypto"; -import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig } from "../types"; +import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig, UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../types"; const extensions: Hooks[] = []; +const uiExtensions: Map = new Map(); export const defaultConfigurations: AppConfig = { modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on", "core/prune_sync"], streamr: { @@ -54,6 +55,113 @@ export async function runHook(hook: K, ...args: Parameter return result; } +// ========================================== +// UI Extension System +// ========================================== + +/** + * Load UI extensions from configuration file + * Looks for 'uiModules' key in config, which maps module IDs to their paths + */ +export async function loadUIExtensionsFromConfig(configPath: string = "console.config.json"): Promise> { + const config = loadConfigurations(configPath) as AppConfig & { uiModules?: Record }; + + if (!config.uiModules) { + console.log("[ui] No UI modules configured"); + return uiExtensions; + } + + for (const [moduleId, modulePath] of Object.entries(config.uiModules)) { + try { + const resolved = path.resolve(__dirname, modulePath); + const mod = await import(resolved); + const instance = new mod.default(); + uiExtensions.set(moduleId, instance); + console.log(`[ui] Loaded UI module: ${moduleId}`); + } catch (error) { + console.error(`[ui] Failed to load UI module ${moduleId}:`, error); + } + } + + return uiExtensions; +} + +/** + * Get all UI components (icons, windows, actions) from loaded UI extensions + */ +export async function getUIComponents(): Promise<{ + icons: UIAppIcon[]; + windows: UIAppWindow[]; + actions: Map; +}> { + const icons: UIAppIcon[] = []; + const windows: UIAppWindow[] = []; + const actions: Map = new Map(); + + for (const [moduleId, ext] of uiExtensions) { + try { + if (ext.getAppIcon) { + const icon = await ext.getAppIcon(); + icons.push(icon); + } + if (ext.getAppWindow) { + const window = await ext.getAppWindow(); + windows.push(window); + } + if (ext.getActions) { + const moduleActions = await ext.getActions(); + actions.set(moduleId, moduleActions); + } + } catch (error) { + console.error(`[ui] Error getting components from ${moduleId}:`, error); + } + } + + return { icons, windows, actions }; +} + +/** + * Get a specific UI extension by module ID + */ +export function getUIExtension(moduleId: string): UIHooks | undefined { + return uiExtensions.get(moduleId); +} + +/** + * Invoke an action from a UI module + */ +export async function invokeUIAction( + moduleId: string, + actionId: string +): Promise<{ success: boolean; message?: string; data?: any }> { + const ext = uiExtensions.get(moduleId); + if (!ext) { + return { success: false, message: `Module '${moduleId}' not found` }; + } + + if (!ext.getActions) { + return { success: false, message: `Module '${moduleId}' has no actions` }; + } + + const actions = await ext.getActions(); + const action = actions.find((a) => a.id === actionId); + + if (!action) { + return { success: false, message: `Action '${actionId}' not found in module '${moduleId}'` }; + } + + try { + const result = await action.handler(); + return { + success: true, + message: result?.message || `Action '${actionId}' executed successfully`, + data: result?.data, + }; + } catch (error: any) { + return { success: false, message: error.message || "Action failed" }; + } +} + /** * Retries a function up to 5 times with exponential backoff * @param fn Function to retry diff --git a/src/public/js/actions.js b/src/public/js/actions.js new file mode 100644 index 0000000..0de1df0 --- /dev/null +++ b/src/public/js/actions.js @@ -0,0 +1,119 @@ +/** + * UI Actions Handler + * Provides functions for invoking module actions from the frontend + */ + +/** + * Invoke an action from a UI module + * @param {string} moduleId - The module identifier + * @param {string} actionId - The action identifier + * @param {HTMLElement} [buttonElement] - Optional button element to show loading state + * @returns {Promise<{success: boolean, message?: string, data?: any}>} + */ +async function invokeAction(moduleId, actionId, buttonElement) { + // Show loading state if button provided + let originalContent = null; + if (buttonElement) { + originalContent = buttonElement.innerHTML; + buttonElement.innerHTML = "..."; + buttonElement.disabled = true; + } + + try { + const response = await fetch(`/api/actions/${moduleId}/${actionId}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }); + + const result = await response.json(); + + // Show result feedback + if (result.success) { + showNotification(result.message || "Action completed successfully", "success"); + } else { + showNotification(result.message || "Action failed", "error"); + } + + return result; + } catch (error) { + console.error("Action invocation failed:", error); + showNotification("Failed to invoke action: " + error.message, "error"); + return { success: false, message: error.message }; + } finally { + // Restore button state + if (buttonElement && originalContent !== null) { + buttonElement.innerHTML = originalContent; + buttonElement.disabled = false; + } + } +} + +/** + * Show a notification message using NES.css styling + * @param {string} message - The message to display + * @param {string} type - 'success' or 'error' + */ +function showNotification(message, type) { + // Remove any existing notification + const existing = document.getElementById("action-notification"); + if (existing) { + existing.remove(); + } + + // Create notification element + const notification = document.createElement("div"); + notification.id = "action-notification"; + notification.className = `nes-container ${type === "success" ? "is-success" : "is-error"}`; + notification.style.cssText = ` + position: fixed; + bottom: 20px; + right: 20px; + z-index: 9999; + max-width: 400px; + animation: slideIn 0.3s ease-out; + `; + notification.innerHTML = ` +

${message}

+ `; + + document.body.appendChild(notification); + + // Play click sound for feedback + if (typeof clickButton === "function") { + clickButton(); + } + + // Auto-remove after 4 seconds + setTimeout(() => { + notification.style.animation = "slideOut 0.3s ease-in"; + setTimeout(() => notification.remove(), 300); + }, 4000); +} + +// Add CSS animation styles +const styleSheet = document.createElement("style"); +styleSheet.textContent = ` + @keyframes slideIn { + from { + transform: translateX(100%); + opacity: 0; + } + to { + transform: translateX(0); + opacity: 1; + } + } + @keyframes slideOut { + from { + transform: translateX(0); + opacity: 1; + } + to { + transform: translateX(100%); + opacity: 0; + } + } +`; +document.head.appendChild(styleSheet); diff --git a/src/types.ts b/src/types.ts index 97b54bd..3004ac9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,7 @@ import { MqttClient } from "mqtt/*"; // Application configuration type (console.config.json) export type AppConfig = { modules: string[]; + uiModules?: Record; streamr: { streamId: string[]; cronSchedule: string; @@ -98,3 +99,64 @@ export interface VerifierInfo { targetAddress: string; verifierAddress: string; } + +// ========================================== +// UI Extension Types +// ========================================== + +/** + * Represents an app icon displayed in the desktop UI + */ +export interface UIAppIcon { + /** Unique identifier for the icon */ + id: string; + /** Display label shown below the icon */ + label: string; + /** HTML content for the icon (can use NES.css classes) */ + iconHtml: string; + /** Optional CSS class for the button */ + buttonClass?: string; +} + +/** + * Represents an app window/panel in the UI + */ +export interface UIAppWindow { + /** Unique identifier matching the icon id */ + id: string; + /** Window title */ + title: string; + /** HTML content for the window body */ + contentHtml: string; + /** Optional CSS class for the container */ + containerClass?: string; +} + +/** + * Represents an action that can be triggered from the UI + */ +export interface UIAction { + /** Unique identifier for the action */ + id: string; + /** Display label for the action button */ + label: string; + /** Optional CSS class for the button (e.g., 'is-primary', 'is-warning') */ + buttonClass?: string; + /** Handler function called when action is triggered */ + handler: () => void | Promise; +} + +/** + * UI Hooks interface for extending the console UI + * Modules can implement this to add icons, windows, and actions to the web interface + */ +export type UIHooks = { + /** Return an app icon to display on the desktop */ + getAppIcon?: () => UIAppIcon | Promise; + /** Return an app window/panel configuration */ + getAppWindow?: () => UIAppWindow | Promise; + /** Return available actions for this module */ + getActions?: () => UIAction[] | Promise; + /** Return metadata/status data for display */ + getStatusData?: () => Record | Promise>; +} diff --git a/src/views/index.hbs b/src/views/index.hbs index 738061c..249a735 100644 --- a/src/views/index.hbs +++ b/src/views/index.hbs @@ -97,6 +97,21 @@ + {{! {/* Dynamic Module Icons */} }} + {{#each icons}} +
+
+ +

{{label}}

+
+
+ {{/each}} + {{! {/* M3ters Table */} }} + + {{! {/* Dynamic Module Windows */} }} + {{#each windows}} + + {{/each}} diff --git a/src/views/layouts/main.hbs b/src/views/layouts/main.hbs index 9c91081..4d12805 100644 --- a/src/views/layouts/main.hbs +++ b/src/views/layouts/main.hbs @@ -20,4 +20,5 @@ + \ No newline at end of file From 61ed8262f81c1b11170f01af5c9244eea7f84710 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Mon, 19 Jan 2026 22:11:31 +0100 Subject: [PATCH 21/22] refactor: update README for clarity and enhance extension system documentation --- README.md | 289 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 246 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index d8c0eba..6ad68be 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ -# M3tering Console Setup +# M3tering Console + +A modular, extensible service console for providers on the M3tering protocol. Features a hook-based architecture for backend extensibility and a UI extension system for frontend customization. ## Pre-setup - Make sure Public key is set on the M3ter contract -- Make sure the price for evergy as been set on the PriceContext contract +- Make sure the price for energy has been set on the PriceContext contract - Make sure the Console has been granted publish permission on the Streamr stream ## Quick Setup @@ -54,61 +56,262 @@ npm install npm run dev ``` +--- + +# Extension System + +The M3tering Console provides two complementary extension systems: + +1. **Backend Hooks** - Hook into the console lifecycle (MQTT, database, message processing) +2. **UI Hooks** - Add custom icons, panels, and actions to the web interface + +Both systems use a config-driven approach where modules are loaded dynamically from paths specified in `console.config.json`. + +## Configuration + +```json +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on", + "core/prune_sync" + ], + "uiModules": { + "streamr": "core/streamr/ui" + }, + "streamr": { + "streamId": ["0x.../m3tering/test"], + "cronSchedule": "0 * * * *" + } +} +``` + +- **`modules`**: Array of paths to backend hook modules (relative to `src/lib/`) +- **`uiModules`**: Object mapping module IDs to UI module paths (relative to `src/lib/`) + +--- + +# Backend Hooks + +Backend hooks allow modules to react to console lifecycle events. Each module exports a default class implementing the `Hooks` interface. + +## Creating a Backend Module + +```typescript +// src/lib/core/my-module/index.ts +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + onAfterInit() { + console.log("My module initialized!"); + } + + onTransactionDistribution(tokenId, decodedPayload, pendingTransactions) { + // Process transactions + } +} +``` + +Add to `console.config.json`: +```json +{ + "modules": ["core/my-module"] +} +``` + +## Hook Lifecycle Reference + +### Initialization Phase -# Complete Hook Lifecycle for M3tering Console +| Hook | Description | Parameters | +|------|-------------|------------| +| `onBeforeInit` | Before any initialization begins | None | +| `onDatabaseSetup` | After SQLite tables/jobs are initialized | None | +| `onAfterInit` | After all initialization completes successfully | None | +| `onInitError` | When an error occurs during initialization | `error: any` | -## Initialization Phase +### MQTT Connection Phase -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onBeforeInit` | Called before any initialization begins | None | -| `onDatabaseSetup` | Called after SQLite tables/jobs are initialized | None | -| `onAfterInit` | Called after all initialization completes successfully | None | -| `onInitError` | Called when an error occurs during initialization | `error: any` | +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMqttConnect` | MQTT client successfully connects to ChirpStack | `client: MqttClient` | +| `onMqttSubscribed` | After subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | +| `onMqttError` | MQTT connection error occurs | `error: any`, `client: MqttClient` | +| `onMqttReconnect` | Attempting to reconnect to MQTT broker | `client: MqttClient` | -## MQTT Connection Phase +### Message Processing Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMessageReceived` | Raw MQTT message received (before parsing) | `blob: Buffer` | +| `onMessageDropped` | Message dropped (e.g., device locked) | `reason: string`, `devEui: string` | +| `onMeterCreated` | New meter saved to database | `newMeter: MeterRecord` | +| `onTransactionDistribution` | Before sending to Arweave/prover/Streamr | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | + +### State Computation Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `isOnStateCompute` | Determine device on/off state (returns `boolean`) | `m3terId: number` | +| `onIsOnStateComputed` | After on/off state computed | `m3terId: number`, `isOn: boolean` | +| `onIsOnStateComputeError` | Error during state computation | `m3terId: number`, `error: any` | +| `onStateEnqueued` | State enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | + +### Error & Cleanup Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMessageError` | Error during message processing | `error: any` | +| `onDeviceUnlocked` | Device lock released (regardless of outcome) | `devEui: string` | +| `onMessageProcessingComplete` | Message processing finished | None | + +--- -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onMqttConnect` | Called when MQTT client successfully connects to ChirpStack | `client: MqttClient` | -| `onMqttSubscribed` | Called after subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | -| `onMqttError` | Called when an MQTT connection error occurs | `error: any`, `client: MqttClient` | -| `onMqttReconnect` | Called when attempting to reconnect to the MQTT broker | `client: MqttClient` | +# UI Hooks -## Message Ingestion Phase +UI Hooks allow modules to extend the web interface at `http://localhost:3000`. Modules can add desktop icons, app windows/panels, and trigger-able actions. -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onMessageReceived` | Called when a raw MQTT message is received (before parsing) | `blob: Buffer` | -| `onMessageDropped` | Called when a message is dropped (e.g., device already locked) | `reason: string`, `devEui: string` | +## Creating a UI Module -## Meter Discovery & Registration Phase +```typescript +// src/lib/core/my-module/ui.ts +import type { UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../../../types"; -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onMeterCreated` | Called after a new meter is saved to the database | `newMeter: MeterRecord` | +export default class implements UIHooks { + getAppIcon(): UIAppIcon { + return { + id: "my-module", + label: "My Module", + iconHtml: '', + buttonClass: "is-primary", + }; + } -## Downstream Distribution Phase + getAppWindow(): UIAppWindow { + return { + id: "my-module", + title: "My Module Panel", + contentHtml: ` +

Hello from my module!

+ + `, + }; + } + + getActions(): UIAction[] { + return [ + { + id: "do-something", + label: "Do Something", + handler: async () => { + // Perform action + return { message: "Action completed!" }; + }, + }, + ]; + } +} +``` + +Add to `console.config.json`: +```json +{ + "uiModules": { + "my-module": "core/my-module/ui" + } +} +``` + +## UIHooks Interface + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `getAppIcon()` | `UIAppIcon` | Desktop icon displayed in the app grid | +| `getAppWindow()` | `UIAppWindow` | Window/panel shown when icon is clicked | +| `getActions()` | `UIAction[]` | Actions invokable from the frontend | +| `getStatusData()` | `Record` | Metadata for display (optional) | + +## Type Definitions + +### UIAppIcon + +```typescript +interface UIAppIcon { + id: string; // Unique identifier + label: string; // Display label below icon + iconHtml: string; // HTML content (supports NES.css icons) + buttonClass?: string; // Optional button class (e.g., "is-primary") +} +``` + +### UIAppWindow + +```typescript +interface UIAppWindow { + id: string; // Must match icon id + title: string; // Window title bar text + contentHtml: string; // HTML content for window body + containerClass?: string; // Optional container class +} +``` + +### UIAction + +```typescript +interface UIAction { + id: string; // Action identifier + label: string; // Button label + buttonClass?: string; // Optional button class + handler: () => void | Promise<{ message?: string; data?: any }>; +} +``` + +## Frontend API + +### Invoking Actions + +From your panel HTML, use the global `invokeAction()` function: + +```javascript +// invokeAction(moduleId, actionId, buttonElement?) +invokeAction('my-module', 'do-something', this); +``` + +The function: +- Shows loading state on the button (if provided) +- Calls `POST /api/actions/:moduleId/:actionId` +- Displays success/error notification using NES.css styling + +### REST Endpoint + +``` +POST /api/actions/:moduleId/:actionId + +Response: { success: boolean, message?: string, data?: any } +``` + +--- -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onTransactionDistribution` | Called before sending transactions to Arweave, prover node, Streamr, or other stores/loggers | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | +# Built-in Modules -## State Encoding & Device Response Phase +## Backend Modules -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `isOnStateCompute` | Called to determine the device's on/off state (returns boolean) | `m3terId: number` | -| `onIsOnStateComputed` | Called after the on/off state has been computed | `m3terId: number`, `isOn: boolean` | -| `onIsOnStateComputeError` | Called when an error occurs during state computation | `m3terId: number`, `error: any` | -| `onStateEnqueued` | Called after the state is enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | +| Module | Description | +|--------|-------------| +| `core/arweave` | Uploads transaction data to Arweave permanent storage | +| `core/prover` | Sends batched transactions to the prover node | +| `core/streamr` | Publishes transactions to Streamr streams on a cron schedule | +| `core/is_on` | Computes device on/off state based on balance | +| `core/prune_sync` | Cleans up old synchronized transactions | -## Error Handling & Cleanup Phase +## UI Modules -| Hook Name | Description | Parameters | -|-----------|-------------|------------| -| `onMessageError` | Called when any error occurs during message processing | `error: any` | -| `onDeviceUnlocked` | Called when a device lock is released (regardless of outcome) | `devEui: string` | -| `onMessageProcessingComplete` | Called when message processing finishes (success or error) | None | +| Module | Description | +|--------|-------------| +| `streamr` | Panel showing stream config, pending count, and "Publish Now" action | --- From 6d26c6216d8b5448d32f6ef5b22d99a545cb23f4 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Mon, 19 Jan 2026 22:12:53 +0100 Subject: [PATCH 22/22] refactor: enable MQTT error throwing during application initialization --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index bafbf45..ab8b856 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,7 +31,7 @@ async function initializeApp() { await handleUplinks(); } catch (mqttError) { console.error("[error] MQTT initialization failed:", mqttError); - // throw mqttError; // TODO: uncomment after testing + throw mqttError; } console.log("[info] Application initialization completed successfully");