diff --git a/.env.example b/.env.example index bb64c2b..50f95b1 100644 --- a/.env.example +++ b/.env.example @@ -5,5 +5,4 @@ CONTRACT_LABEL="M3ters" CHIRPSTACK_HOST="localhost" MAINNET_RPC="https://sepolia.drpc.org" PREFERRED_PROVER_NODE="http://prover.m3ter.ing" -STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" ETHEREUM_PRIVATE_KEY="..." diff --git a/.gitignore b/.gitignore index 195e462..fa5348b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# console configuration file +console.config.json + #test scripts emulate.ts test-database.ts diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7d1e594 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + "cSpell.words": [ + "ardrive", + "arweave", + "m3ters", + "Emmo00", + "ccip", + "Mauchly", + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 4074dda..6ad68be 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ -# M3tering Console Setup +# M3tering Console + +A modular, extensible service console for providers on the M3tering protocol. Features a hook-based architecture for backend extensibility and a UI extension system for frontend customization. ## Pre-setup - Make sure Public key is set on the M3ter contract -- Make sure the price for evergy as been set on the PriceContext contract +- Make sure the price for energy has been set on the PriceContext contract - Make sure the Console has been granted publish permission on the Streamr stream ## Quick Setup @@ -27,7 +29,6 @@ CHIRPSTACK_HOST=localhost MAINNET_RPC=https://sepolia.drpc.org PREFERRED_PROVER_NODE=http://34.244.149.153 - STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" ETHEREUM_PRIVATE_KEY="..." ``` @@ -54,3 +55,263 @@ npm install npm run dev ``` + +--- + +# Extension System + +The M3tering Console provides two complementary extension systems: + +1. **Backend Hooks** - Hook into the console lifecycle (MQTT, database, message processing) +2. **UI Hooks** - Add custom icons, panels, and actions to the web interface + +Both systems use a config-driven approach where modules are loaded dynamically from paths specified in `console.config.json`. + +## Configuration + +```json +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on", + "core/prune_sync" + ], + "uiModules": { + "streamr": "core/streamr/ui" + }, + "streamr": { + "streamId": ["0x.../m3tering/test"], + "cronSchedule": "0 * * * *" + } +} +``` + +- **`modules`**: Array of paths to backend hook modules (relative to `src/lib/`) +- **`uiModules`**: Object mapping module IDs to UI module paths (relative to `src/lib/`) + +--- + +# Backend Hooks + +Backend hooks allow modules to react to console lifecycle events. Each module exports a default class implementing the `Hooks` interface. + +## Creating a Backend Module + +```typescript +// src/lib/core/my-module/index.ts +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + onAfterInit() { + console.log("My module initialized!"); + } + + onTransactionDistribution(tokenId, decodedPayload, pendingTransactions) { + // Process transactions + } +} +``` + +Add to `console.config.json`: +```json +{ + "modules": ["core/my-module"] +} +``` + +## Hook Lifecycle Reference + +### Initialization Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onBeforeInit` | Before any initialization begins | None | +| `onDatabaseSetup` | After SQLite tables/jobs are initialized | None | +| `onAfterInit` | After all initialization completes successfully | None | +| `onInitError` | When an error occurs during initialization | `error: any` | + +### MQTT Connection Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMqttConnect` | MQTT client successfully connects to ChirpStack | `client: MqttClient` | +| `onMqttSubscribed` | After subscribing to the device uplink topic | `client: MqttClient`, `topic: string` | +| `onMqttError` | MQTT connection error occurs | `error: any`, `client: MqttClient` | +| `onMqttReconnect` | Attempting to reconnect to MQTT broker | `client: MqttClient` | + +### Message Processing Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMessageReceived` | Raw MQTT message received (before parsing) | `blob: Buffer` | +| `onMessageDropped` | Message dropped (e.g., device locked) | `reason: string`, `devEui: string` | +| `onMeterCreated` | New meter saved to database | `newMeter: MeterRecord` | +| `onTransactionDistribution` | Before sending to Arweave/prover/Streamr | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` | + +### State Computation Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `isOnStateCompute` | Determine device on/off state (returns `boolean`) | `m3terId: number` | +| `onIsOnStateComputed` | After on/off state computed | `m3terId: number`, `isOn: boolean` | +| `onIsOnStateComputeError` | Error during state computation | `m3terId: number`, `error: any` | +| `onStateEnqueued` | State enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` | + +### Error & Cleanup Phase + +| Hook | Description | Parameters | +|------|-------------|------------| +| `onMessageError` | Error during message processing | `error: any` | +| `onDeviceUnlocked` | Device lock released (regardless of outcome) | `devEui: string` | +| `onMessageProcessingComplete` | Message processing finished | None | + +--- + +# UI Hooks + +UI Hooks allow modules to extend the web interface at `http://localhost:3000`. Modules can add desktop icons, app windows/panels, and trigger-able actions. + +## Creating a UI Module + +```typescript +// src/lib/core/my-module/ui.ts +import type { UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../../../types"; + +export default class implements UIHooks { + getAppIcon(): UIAppIcon { + return { + id: "my-module", + label: "My Module", + iconHtml: '', + buttonClass: "is-primary", + }; + } + + getAppWindow(): UIAppWindow { + return { + id: "my-module", + title: "My Module Panel", + contentHtml: ` +

Hello from my module!

+ + `, + }; + } + + getActions(): UIAction[] { + return [ + { + id: "do-something", + label: "Do Something", + handler: async () => { + // Perform action + return { message: "Action completed!" }; + }, + }, + ]; + } +} +``` + +Add to `console.config.json`: +```json +{ + "uiModules": { + "my-module": "core/my-module/ui" + } +} +``` + +## UIHooks Interface + +| Method | Return Type | Description | +|--------|-------------|-------------| +| `getAppIcon()` | `UIAppIcon` | Desktop icon displayed in the app grid | +| `getAppWindow()` | `UIAppWindow` | Window/panel shown when icon is clicked | +| `getActions()` | `UIAction[]` | Actions invokable from the frontend | +| `getStatusData()` | `Record` | Metadata for display (optional) | + +## Type Definitions + +### UIAppIcon + +```typescript +interface UIAppIcon { + id: string; // Unique identifier + label: string; // Display label below icon + iconHtml: string; // HTML content (supports NES.css icons) + buttonClass?: string; // Optional button class (e.g., "is-primary") +} +``` + +### UIAppWindow + +```typescript +interface UIAppWindow { + id: string; // Must match icon id + title: string; // Window title bar text + contentHtml: string; // HTML content for window body + containerClass?: string; // Optional container class +} +``` + +### UIAction + +```typescript +interface UIAction { + id: string; // Action identifier + label: string; // Button label + buttonClass?: string; // Optional button class + handler: () => void | Promise<{ message?: string; data?: any }>; +} +``` + +## Frontend API + +### Invoking Actions + +From your panel HTML, use the global `invokeAction()` function: + +```javascript +// invokeAction(moduleId, actionId, buttonElement?) +invokeAction('my-module', 'do-something', this); +``` + +The function: +- Shows loading state on the button (if provided) +- Calls `POST /api/actions/:moduleId/:actionId` +- Displays success/error notification using NES.css styling + +### REST Endpoint + +``` +POST /api/actions/:moduleId/:actionId + +Response: { success: boolean, message?: string, data?: any } +``` + +--- + +# Built-in Modules + +## Backend Modules + +| Module | Description | +|--------|-------------| +| `core/arweave` | Uploads transaction data to Arweave permanent storage | +| `core/prover` | Sends batched transactions to the prover node | +| `core/streamr` | Publishes transactions to Streamr streams on a cron schedule | +| `core/is_on` | Computes device on/off state based on balance | +| `core/prune_sync` | Cleans up old synchronized transactions | + +## UI Modules + +| Module | Description | +|--------|-------------| +| `streamr` | Panel showing stream config, pending count, and "Publish Now" action | + +--- diff --git a/console.example.config.json b/console.example.config.json new file mode 100644 index 0000000..628ab5f --- /dev/null +++ b/console.example.config.json @@ -0,0 +1,21 @@ +{ + "modules": [ + "core/arweave", + "core/prover", + "core/streamr", + "core/is_on", + "core/prune_sync" + ], + "uiModules": { + "streamr": "core/streamr/ui" + }, + "streamr": { + "streamId": [ + "0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test" + ], + "cronSchedule": "0 * * * *" + }, + "prune_sync": { + "cronSchedule": "0 * * * *" + } +} diff --git a/package-lock.json b/package-lock.json index ec0ae02..bbb6486 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "express-handlebars": "^7.1.2", "handlebars": "^4.7.8", "mqtt": "^5.5.0", + "node-cron": "^4.2.1", "ssh2": "^1.17.0", "ws": "^8.18.3" }, @@ -11363,6 +11364,15 @@ "integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA==", "license": "MIT" }, + "node_modules/node-cron": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-4.2.1.tgz", + "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==", + "license": "ISC", + "engines": { + "node": ">=6.0.0" + } + }, "node_modules/node-datachannel": { "version": "0.27.0", "resolved": "https://registry.npmjs.org/node-datachannel/-/node-datachannel-0.27.0.tgz", diff --git a/package.json b/package.json index 0451369..73562d8 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "express-handlebars": "^7.1.2", "handlebars": "^4.7.8", "mqtt": "^5.5.0", + "node-cron": "^4.2.1", "ssh2": "^1.17.0", "ws": "^8.18.3" }, diff --git a/src/index.ts b/src/index.ts index c2e4d79..ab8b856 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,34 +1,45 @@ import "dotenv/config"; -import { handleUplinks } from "./logic/mqtt"; +import { handleUplinks } from "./services/mqtt"; import { Request, Response } from "express"; -import { app } from "./logic/context"; +import { app } from "./services/context"; +import { loadExtensionsFromConfig, loadUIExtensionsFromConfig, getUIComponents, invokeUIAction, runHook } from "./lib/utils"; import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite"; -import { initializeVerifiersCache } from "./logic/sync"; -import { publishHeartbeatToStream } from "./logic/streamr"; // Async initialization function async function initializeApp() { try { console.log("[info] Starting application initialization..."); + // Load extensions from config + await loadExtensionsFromConfig(); + console.log("[info] Extensions loaded successfully"); + + // Load UI extensions + await loadUIExtensionsFromConfig(); + console.log("[info] UI extensions loaded successfully"); + + runHook("onBeforeInit"); + // Initialize database tables and jobs setupDatabase(); console.log("[info] Database setup completed"); - // Initialize verifiers cache on startup - // (disable ccip read initialization) - // await initializeVerifiersCache(); - // console.log("[info] Verifiers cache initialized successfully"); + runHook("onDatabaseSetup"); - // Start MQTT handling - handleUplinks(); - console.log("[info] MQTT uplinks handler started"); - - await publishHeartbeatToStream(); + try { + // Start MQTT handling + await handleUplinks(); + } catch (mqttError) { + console.error("[error] MQTT initialization failed:", mqttError); + throw mqttError; + } console.log("[info] Application initialization completed successfully"); + + runHook("onAfterInit"); } catch (error) { console.error("[fatal] Failed to initialize application:", error); + runHook("onInitError", error); process.exit(1); } } @@ -38,10 +49,28 @@ initializeApp(); app.get("/", async (req: Request, res: Response) => { const m3ters = getAllMeterRecords(); - res.render("index", { m3ters }); + + // Get UI components from loaded UI extensions + const { icons, windows } = await getUIComponents(); + + res.render("index", { m3ters, icons, windows }); console.log("[server]: Server handled GET request at `/`"); }); +// API endpoint to invoke UI actions +app.post("/api/actions/:moduleId/:actionId", async (req: Request, res: Response) => { + const { moduleId, actionId } = req.params; + console.log(`[server]: Invoking action '${actionId}' from module '${moduleId}'`); + + const result = await invokeUIAction(moduleId, actionId); + + if (result.success) { + res.status(200).json(result); + } else { + res.status(400).json(result); + } +}); + app.delete("/delete-meter", async (req: Request, res: Response) => { let publicKey = decodeURIComponent(req.query?.publicKey?.toString() as string); if (publicKey) { diff --git a/src/lib/core/arweave/index.ts b/src/lib/core/arweave/index.ts new file mode 100644 index 0000000..d0e076a --- /dev/null +++ b/src/lib/core/arweave/index.ts @@ -0,0 +1,64 @@ +import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; +import { Readable } from "stream"; +import Arweave from "arweave"; +import type { DecodedPayload, Hooks } from "../../../types"; + +export default class implements Hooks { + async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) { + // encode transaction into standard format (payload[0]) + // format: nonce | energy | signature | voltage | device_id | longitude | latitude + const transactionHex = decoded.buf; + + const arweave = Arweave.init({ + host: "arweave.net", + protocol: "https", + port: 443, + }); + + const key = await arweave.wallets.generate(); + const signer = new ArweaveSigner(key); + const turbo = TurboFactory.authenticated({ signer }); + + const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; + + const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); + + await turbo.uploadFile({ + fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), + fileSizeFactory: () => byteLength, + dataItemOpts: { + paidBy: await arweave.wallets.jwkToAddress(key), + tags: [ + { name: "Contract-Label", value: contractLabel }, + { name: "Contract-Use", value: "M3tering Protocol Test" }, + { name: "Content-Type", value: "text/plain" }, + { name: "M3ter-ID", value: m3terId.toString() }, + { name: "Timestamp", value: Date.now().toString() }, + { name: "Nonce", value: decoded.nonce.toString() }, + { name: "Energy", value: decoded.energy.toString() }, + { name: "Signature", value: decoded.signature }, + { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, + { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, + { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, + { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, + ], + }, + events: { + onUploadProgress: (progress) => { + console.log("[arweave] Upload progress:", progress); + }, + onError: (error) => { + console.error("[arweave] Upload error:", error); + }, + onSuccess(event) { + console.log("[arweave] Upload successful! Transaction ID:", event); + }, + onUploadSuccess(event) { + console.log("[arweave] Upload completed! Transaction ID:", event); + }, + }, + }); + + console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`); + } +} diff --git a/src/lib/core/is_on/index.ts b/src/lib/core/is_on/index.ts new file mode 100644 index 0000000..040e0cf --- /dev/null +++ b/src/lib/core/is_on/index.ts @@ -0,0 +1,7 @@ +import type { Hooks } from "../../../types"; + +export default class implements Hooks { + isOnStateCompute(m3terId: number) { + return true; + } +} diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts new file mode 100644 index 0000000..e647812 --- /dev/null +++ b/src/lib/core/prover/index.ts @@ -0,0 +1,64 @@ +import { buildBatchPayload } from "../../utils"; +import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types"; + +const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; + +export default class implements Hooks { + async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) { + // send pending transactions to prover node + try { + const proverURL = await this.getProverURL(); + + if (!proverURL) { + console.warn("No prover URL configured. Skipping sending transactions to prover."); + return; + } + + console.info(`Sending pending transactions to prover: ${proverURL}`); + + const response = await this.sendPendingTransactionsToProver(proverURL!, pendingTransactions); + + console.info("done sending to prover"); + console.info(`Prover response (text): ${await response?.text()}`); + } catch (error) { + console.error(`Error sending pending transactions to prover: ${error}`); + } + } + + async sendTransactionsToProver( + proverURL: string, + transactionData: BatchTransactionPayload[] + ): Promise { + try { + const response = await fetch(`${proverURL}/batch-payloads`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(transactionData), + }); + + console.log("[info] received", response.status, "from the prover"); + + if (!response.ok) { + throw new Error(`Prover responded with status: ${response.status}`); + } + return response; + } catch (err: any) { + console.error("Failed to send transactions to prover:", err.message); + return null; + } + } + + async getProverURL(): Promise { + return PREFERRED_PROVER_NODE; + } + + async sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { + console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); + + const requestPayload = buildBatchPayload(pendingTransactions); + + return await this.sendTransactionsToProver(proverURL, requestPayload); + } +} diff --git a/src/lib/core/prune_sync/index.ts b/src/lib/core/prune_sync/index.ts new file mode 100644 index 0000000..aed1354 --- /dev/null +++ b/src/lib/core/prune_sync/index.ts @@ -0,0 +1,28 @@ +import cron from "node-cron"; +import { Hooks } from "../../../types"; +import { getAllMeterRecords } from "../../../store/sqlite"; +import { loadConfigurations } from "../../utils"; +import { pruneAndSyncOnchain } from "../../sync"; + +export default class implements Hooks { + private config = loadConfigurations(); + + async onAfterInit() { + console.log("Registering prune_sync cron job..."); + + // Schedule a cron job to perform prune verified transactions and sync with onchain state + cron.schedule(this.config.prune_sync.cronSchedule, async () => { + const m3ters = getAllMeterRecords(); + for (const m3ter of m3ters) { + try { + pruneAndSyncOnchain(m3ter.publicKey); + } catch (error) { + console.error(`Error pruning and syncing meter ${m3ter.publicKey}:`, error); + } + } + }); + + console.log("prune_sync cron job registered."); + return; + } +} diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts new file mode 100644 index 0000000..90b72fd --- /dev/null +++ b/src/lib/core/streamr/index.ts @@ -0,0 +1,69 @@ +import cron from "node-cron"; +import { StreamrClient } from "@streamr/sdk"; +import { getAllTransactionRecords } from "../../../store/sqlite"; +import { buildBatchPayload, loadConfigurations, retry } from "../../utils"; +import type { Hooks, TransactionRecord } from "../../../types"; + +const { ETHEREUM_PRIVATE_KEY } = process.env; + +if (!ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing ETHEREUM_PRIVATE_KEY in environment variables"); +} + +export default class implements Hooks { + private config = loadConfigurations(); + + async onAfterInit() { + console.log("Registering Streamr cron job..."); + + // Schedule a cron job to publish pending transactions + cron.schedule( + this.config.streamr.cronSchedule, + async () => { + console.log("Publishing pending transactions to Streamr..."); + + const pendingTransactions = await this.getPendingTransactions(); + if (pendingTransactions.length > 0) { + for (const STREAMR_STREAM_ID of this.config.streamr.streamId) { + console.log(`Publishing to Streamr stream: ${STREAMR_STREAM_ID}`); + await retry( + () => this.publishPendingTransactionsToStreamr(STREAMR_STREAM_ID, pendingTransactions), + 3, + 2000 + ); + } + } + }, + { name: "streamr-publish-pending-transactions", noOverlap: true } + ); + + console.log("Streamr cron job registered."); + return; + } + + async getPendingTransactions(): Promise { + return getAllTransactionRecords(); + } + + async publishPendingTransactionsToStreamr(STREAMR_STREAM_ID: string, pendingTransactions: TransactionRecord[]) { + const streamrClient = new StreamrClient({ + auth: { + privateKey: ETHEREUM_PRIVATE_KEY!, + }, + }); + + try { + const stream = await streamrClient.getStream(STREAMR_STREAM_ID!); + + const batchPayload = buildBatchPayload(pendingTransactions); + + await stream.publish(batchPayload); + } catch (error) { + console.error(`Failed to publish to Streamr: ${(error as Error).message}`); + throw error; + } finally { + // destroy the client to free resources + await streamrClient.destroy(); + } + } +} diff --git a/src/lib/core/streamr/ui.ts b/src/lib/core/streamr/ui.ts new file mode 100644 index 0000000..9ceaa00 --- /dev/null +++ b/src/lib/core/streamr/ui.ts @@ -0,0 +1,150 @@ +import { getAllTransactionRecords } from "../../../store/sqlite"; +import { buildBatchPayload, loadConfigurations, retry } from "../../utils"; +import type { UIHooks, UIAppIcon, UIAppWindow, UIAction, TransactionRecord } from "../../../types"; +import { StreamrClient } from "@streamr/sdk"; + +const { ETHEREUM_PRIVATE_KEY } = process.env; + +/** + * Streamr UI Module + * Provides UI components for the Streamr integration, including + * a panel showing stream configuration and a manual publish action + */ +export default class implements UIHooks { + private config = loadConfigurations(); + private lastPublishTime: Date | null = null; + private lastPublishStatus: "success" | "error" | null = null; + + getAppIcon(): UIAppIcon { + return { + id: "streamr", + label: "Streamr", + iconHtml: '', + buttonClass: "is-primary", + }; + } + + async getAppWindow(): Promise { + const pendingCount = (await this.getPendingTransactions()).length; + const streamIds = this.config.streamr.streamId; + const cronSchedule = this.config.streamr.cronSchedule; + + return { + id: "streamr", + title: "Streamr Publisher", + containerClass: "", + contentHtml: ` +
+

Configuration

+
+ Cron Schedule: + ${cronSchedule} +
+
+ Stream IDs: +
    + ${streamIds.map((id) => `
  • ${id}
  • `).join("")} +
+
+
+ +
+

Status

+
+ Pending Transactions: + ${pendingCount} +
+
+ Last Publish: + ${this.lastPublishTime ? this.lastPublishTime.toLocaleString() : "Never"} + ${this.lastPublishStatus ? `(${this.lastPublishStatus})` : ""} +
+
+ +
+ + +
+ `, + }; + } + + getActions(): UIAction[] { + return [ + { + id: "publish-now", + label: "Publish Now", + buttonClass: "is-warning", + handler: async () => { + const pendingTransactions = await this.getPendingTransactions(); + + if (pendingTransactions.length === 0) { + return { message: "No pending transactions to publish" }; + } + + try { + for (const streamId of this.config.streamr.streamId) { + console.log(`[streamr-ui] Publishing to stream: ${streamId}`); + await retry(() => this.publishToStreamr(streamId, pendingTransactions), 3, 2000); + } + + this.lastPublishTime = new Date(); + this.lastPublishStatus = "success"; + + return { + message: `Published ${pendingTransactions.length} transactions to ${this.config.streamr.streamId.length} stream(s)`, + data: { count: pendingTransactions.length }, + }; + } catch (error: any) { + this.lastPublishStatus = "error"; + throw new Error(`Failed to publish: ${error.message}`); + } + }, + }, + ]; + } + + async getStatusData(): Promise> { + const pendingTransactions = await this.getPendingTransactions(); + return { + pendingCount: pendingTransactions.length, + streamIds: this.config.streamr.streamId, + cronSchedule: this.config.streamr.cronSchedule, + lastPublishTime: this.lastPublishTime, + lastPublishStatus: this.lastPublishStatus, + }; + } + + private async getPendingTransactions(): Promise { + return getAllTransactionRecords(); + } + + private async publishToStreamr(streamId: string, pendingTransactions: TransactionRecord[]) { + if (!ETHEREUM_PRIVATE_KEY) { + throw new Error("Missing ETHEREUM_PRIVATE_KEY"); + } + + const streamrClient = new StreamrClient({ + auth: { privateKey: ETHEREUM_PRIVATE_KEY }, + }); + + try { + const stream = await streamrClient.getStream(streamId); + const batchPayload = buildBatchPayload(pendingTransactions); + await stream.publish(batchPayload); + } finally { + await streamrClient.destroy(); + } + } +} diff --git a/src/logic/decode.ts b/src/lib/decode.ts similarity index 100% rename from src/logic/decode.ts rename to src/lib/decode.ts diff --git a/src/logic/encode.ts b/src/lib/encode.ts similarity index 100% rename from src/logic/encode.ts rename to src/lib/encode.ts diff --git a/src/logic/sync.ts b/src/lib/sync.ts similarity index 96% rename from src/logic/sync.ts rename to src/lib/sync.ts index 909e397..3ca21e5 100644 --- a/src/logic/sync.ts +++ b/src/lib/sync.ts @@ -11,9 +11,9 @@ import { rollup as rollupContract, ccipRevenueReader as ccipRevenueReaderContract, priceContext as priceContextContract, -} from "./context"; +} from "../services/context"; import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; -import { retry } from "../utils"; +import { retry } from "./utils"; import type { VerifierInfo } from "../types"; // Cache for verifiers - populated once on startup @@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise { /** * Get cached verifiers, throws error if cache is not initialized */ -function getCachedVerifiers(): VerifierInfo[] { +async function getCachedVerifiers(): Promise { if (!isCacheInitialized || !verifiersCache) { - throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + await initializeVerifiersCache(); } - return verifiersCache; + return verifiersCache!; } /** @@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis export async function getCrossChainRevenue(tokenId: number): Promise { try { // Use cached verifiers instead of fetching them each time - const verifiers = getCachedVerifiers(); + const verifiers = await getCachedVerifiers(); let totalRevenue = 0; diff --git a/src/lib/utils.ts b/src/lib/utils.ts new file mode 100644 index 0000000..e3bfdcc --- /dev/null +++ b/src/lib/utils.ts @@ -0,0 +1,225 @@ +import fs from "fs"; +import path from "path"; +import { createPublicKey, verify } from "crypto"; +import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig, UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../types"; + +const extensions: Hooks[] = []; +const uiExtensions: Map = new Map(); +export const defaultConfigurations: AppConfig = { + modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on", "core/prune_sync"], + streamr: { + streamId: ["0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"], + cronSchedule: "0 * * * *", + }, + prune_sync: { + cronSchedule: "0 * * * *", + }, +}; + +export function loadConfigurations(configPath: string = "console.config.json"): AppConfig { + try { + const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8")); + return config; + } catch (error) { + console.warn(`Could not load configuration from ${configPath}, using default configurations.` ); + return defaultConfigurations; + } +} + +export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise { + const config: AppConfig = loadConfigurations(configPath); + + for (const modulePath of config.modules) { + const resolved = path.resolve(__dirname, modulePath); + + const mod = await import(resolved); + extensions.push(new mod.default()); + } + + return extensions; +} + +export async function runHook(hook: K, ...args: Parameters>) { + let result: ReturnType> | boolean = true; + + for (const ext of extensions) { + const fn = ext[hook]; + let functionReturn; + if (fn) functionReturn = await (fn as any).call(ext, ...args); + + if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") { + result = result && functionReturn; + } + } + + return result; +} + +// ========================================== +// UI Extension System +// ========================================== + +/** + * Load UI extensions from configuration file + * Looks for 'uiModules' key in config, which maps module IDs to their paths + */ +export async function loadUIExtensionsFromConfig(configPath: string = "console.config.json"): Promise> { + const config = loadConfigurations(configPath) as AppConfig & { uiModules?: Record }; + + if (!config.uiModules) { + console.log("[ui] No UI modules configured"); + return uiExtensions; + } + + for (const [moduleId, modulePath] of Object.entries(config.uiModules)) { + try { + const resolved = path.resolve(__dirname, modulePath); + const mod = await import(resolved); + const instance = new mod.default(); + uiExtensions.set(moduleId, instance); + console.log(`[ui] Loaded UI module: ${moduleId}`); + } catch (error) { + console.error(`[ui] Failed to load UI module ${moduleId}:`, error); + } + } + + return uiExtensions; +} + +/** + * Get all UI components (icons, windows, actions) from loaded UI extensions + */ +export async function getUIComponents(): Promise<{ + icons: UIAppIcon[]; + windows: UIAppWindow[]; + actions: Map; +}> { + const icons: UIAppIcon[] = []; + const windows: UIAppWindow[] = []; + const actions: Map = new Map(); + + for (const [moduleId, ext] of uiExtensions) { + try { + if (ext.getAppIcon) { + const icon = await ext.getAppIcon(); + icons.push(icon); + } + if (ext.getAppWindow) { + const window = await ext.getAppWindow(); + windows.push(window); + } + if (ext.getActions) { + const moduleActions = await ext.getActions(); + actions.set(moduleId, moduleActions); + } + } catch (error) { + console.error(`[ui] Error getting components from ${moduleId}:`, error); + } + } + + return { icons, windows, actions }; +} + +/** + * Get a specific UI extension by module ID + */ +export function getUIExtension(moduleId: string): UIHooks | undefined { + return uiExtensions.get(moduleId); +} + +/** + * Invoke an action from a UI module + */ +export async function invokeUIAction( + moduleId: string, + actionId: string +): Promise<{ success: boolean; message?: string; data?: any }> { + const ext = uiExtensions.get(moduleId); + if (!ext) { + return { success: false, message: `Module '${moduleId}' not found` }; + } + + if (!ext.getActions) { + return { success: false, message: `Module '${moduleId}' has no actions` }; + } + + const actions = await ext.getActions(); + const action = actions.find((a) => a.id === actionId); + + if (!action) { + return { success: false, message: `Action '${actionId}' not found in module '${moduleId}'` }; + } + + try { + const result = await action.handler(); + return { + success: true, + message: result?.message || `Action '${actionId}' executed successfully`, + data: result?.data, + }; + } catch (error: any) { + return { success: false, message: error.message || "Action failed" }; + } +} + +/** + * Retries a function up to 5 times with exponential backoff + * @param fn Function to retry + * @param maxRetries Maximum number of retries (default: 5) + * @param baseDelay Base delay in milliseconds (default: 1000) + * @returns Promise that resolves with the function result or rejects with the last error + */ +export async function retry(fn: () => Promise, maxRetries: number = 5, baseDelay: number = 1000): Promise { + let lastError: Error; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + + if (attempt === maxRetries) { + throw lastError; + } + + const delay = baseDelay * Math.pow(2, attempt); + console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); + + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + throw lastError!; +} + +export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { + return transactions.map((transaction) => ({ + m3ter_id: Number(transaction.identifier), + message: transaction.raw, + })); +} + +export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { + try { + const message = transaction.subarray(0, 8); + const signature = transaction.subarray(8, 72); + + // Wrap raw key in SPKI DER + const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); + const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); + + const publicKey = createPublicKey({ + key: derKey, + format: "der", + type: "spki", + }); + + // Verify + const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); + + return ok; + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +} diff --git a/src/logic/arweave.ts b/src/logic/arweave.ts deleted file mode 100644 index a75f0d8..0000000 --- a/src/logic/arweave.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk"; -import { Readable } from "stream"; -import Arweave from "arweave"; -import type { DecodedPayload } from "../types"; - -export async function interact(m3terId: number, decoded: DecodedPayload) { - // encode transaction into standard format (payload[0]) - // format: nonce | energy | signature | voltage | device_id | longitude | latitude - const transactionHex = decoded.buf; - - const arweave = Arweave.init({ - host: "arweave.net", - protocol: "https", - port: 443, - }); - - const key = await arweave.wallets.generate(); - const signer = new ArweaveSigner(key); - const turbo = TurboFactory.authenticated({ signer }); - - const contractLabel = process.env.CONTRACT_LABEL || "M3ters"; - - const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8"); - - return await turbo.uploadFile({ - fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }), - fileSizeFactory: () => byteLength, - dataItemOpts: { - paidBy: await arweave.wallets.jwkToAddress(key), - tags: [ - { name: "Contract-Label", value: contractLabel }, - { name: "Contract-Use", value: "M3tering Protocol Test" }, - { name: "Content-Type", value: "text/plain" }, - { name: "M3ter-ID", value: m3terId.toString() }, - { name: "Timestamp", value: Date.now().toString() }, - { name: "Nonce", value: decoded.nonce.toString() }, - { name: "Energy", value: decoded.energy.toString() }, - { name: "Signature", value: decoded.signature }, - { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" }, - { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" }, - { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" }, - { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" }, - ], - }, - events: { - onUploadProgress: (progress) => { - console.log("[arweave] Upload progress:", progress); - }, - onError: (error) => { - console.error("[arweave] Upload error:", error); - }, - onSuccess(event) { - console.log("[arweave] Upload successful! Transaction ID:", event); - }, - onUploadSuccess(event) { - console.log("[arweave] Upload completed! Transaction ID:", event); - }, - }, - }); -} diff --git a/src/logic/gps.ts b/src/logic/gps.ts deleted file mode 100644 index f7d44c3..0000000 --- a/src/logic/gps.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { readFileSync } from "fs"; -import { join } from "path"; - -export function getGPS(): number[] { - try { - const data = readFileSync(join("gps_service", "data.json"), "utf-8"); - const gpsData = JSON.parse(data); - if (gpsData.onBoard.lat && gpsData.onBoard.lon) { - return [gpsData.onBoard.lat.toFixed(2), gpsData.onBoard.lon.toFixed(2)]; - } else { - return [gpsData.wifi.lat.toFixed(2), gpsData.wifi.lon.toFixed(2)]; - } - } catch (err) { - return [0.00, 0.00]; - } -} diff --git a/src/logic/prover.ts b/src/logic/prover.ts deleted file mode 100644 index c4f828a..0000000 --- a/src/logic/prover.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { BatchTransactionPayload, TransactionRecord } from "../types"; -import { rollup } from "./context"; -import { buildBatchPayload } from "../utils"; -import { getAllTransactionRecords } from "../store/sqlite"; - -const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing"; - -// Prover node structure -export interface ProverNode { - id: string; - url: string; - isActive: boolean; - lastSeen: number; -} - -/** - * Get prover node list from smart contract - * Makes a request to a smart contract to get an array of node structs - */ -export async function getProverNodeList(): Promise { - try { - // TODO: Implement smart contract call to get prover nodes - console.log("Fetching prover node list from smart contract..."); - return []; - } catch (err: any) { - console.error("Failed to get prover node list:", err.message); - return []; - } -} - -/** - * Choose a prover node from the fetched list - * Picks one prover node from the available nodes (e.g., random selection, load balancing, etc.) - */ -export function chooseProverNode(nodes: ProverNode[]): ProverNode | null { - try { - // TODO: Implement node selection logic - // Filter active nodes - const activeNodes = nodes.filter((node) => node.isActive); - - if (activeNodes.length === 0) { - console.warn("No active prover nodes available"); - return null; - } - const randomIndex = Math.floor(Math.random() * activeNodes.length); - const selectedNode = activeNodes[randomIndex]; - - console.log("Selected prover node:", { - id: selectedNode.id, - url: selectedNode.url, - }); - return selectedNode; - } catch (err: any) { - console.error("❌ Failed to choose prover node:", err.message); - return null; - } -} - -/** - * Send transactions to prover node for verification - */ -export async function sendTransactionsToProver( - proverURL: string, - transactionData: BatchTransactionPayload[] -): Promise { - try { - const response = await fetch(`${proverURL}/batch-payloads`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(transactionData), - }); - - console.log("[info] received", response.status, "from the prover"); - - if (!response.ok) { - throw new Error(`Prover responded with status: ${response.status}`); - } - return response; - } catch (err: any) { - console.error("Failed to send transactions to prover:", err.message); - return null; - } -} - -/** - * Check prover node status - */ -export async function checkProverNodeStatus(proverURL: string) { - try { - const response = await fetch(`${proverURL}/status`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - }); - - if (!response.ok) { - return false; - } - - return true; - } catch (err: any) { - console.error("❌ Failed to check prover node status:", err); - return false; - } -} - -/** - * Checks the nonce for a meter id onchain - * - * @returns {Promise} meter nonce onchain - */ -export async function checkNonceOnchain(meterId: string): Promise { - try { - const nonce = await rollup.nonce(meterId); - return nonce; - } catch (err: any) { - console.error("Failed to check nonce onchain:", err); - throw err; - } -} - -export async function getProverURL(): Promise { - return PREFERRED_PROVER_NODE; -} - -export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); - - const requestPayload = buildBatchPayload(pendingTransactions); - - console.log("[info] Request payload:", requestPayload); - - return await sendTransactionsToProver(proverURL, requestPayload); -} diff --git a/src/logic/streamr.ts b/src/logic/streamr.ts deleted file mode 100644 index 45ba21c..0000000 --- a/src/logic/streamr.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { StreamrClient } from "@streamr/sdk"; -import { TransactionRecord } from "../types"; -import { buildBatchPayload } from "../utils"; - -const { STREAMR_STREAM_ID, ETHEREUM_PRIVATE_KEY } = process.env; - -if (!STREAMR_STREAM_ID || !ETHEREUM_PRIVATE_KEY) { - throw new Error("Missing STREAMR_STREAM_ID or ETHEREUM_PRIVATE_KEY in environment variables"); -} - -export const streamrClient = new StreamrClient({ - auth: { - privateKey: ETHEREUM_PRIVATE_KEY, - }, -}); - -const stream = streamrClient.getStream(STREAMR_STREAM_ID); - -stream.then((stream) => { - console.log(`[Streamr] Connected to stream: ${stream.id}`); -}).catch((error) => { - console.error("[Streamr] Error connecting to stream:", error); -}); - -async function getStream() { - return await stream; -} - -export async function publishHeartbeatToStream() { - const stream = await getStream(); - const heartbeatPayload = { - timestamp: new Date().toISOString(), - }; - await stream.publish(heartbeatPayload); - console.log("[Streamr] Published heartbeat:", heartbeatPayload); -} - -async function publishToStream(data: any) { - const stream = await getStream(); - await stream.publish(data); -} - -export async function publishPendingTransactionsToStreamr(pendingTransactions: TransactionRecord[]) { - await publishToStream(buildBatchPayload(pendingTransactions)); -} \ No newline at end of file diff --git a/src/public/js/actions.js b/src/public/js/actions.js new file mode 100644 index 0000000..0de1df0 --- /dev/null +++ b/src/public/js/actions.js @@ -0,0 +1,119 @@ +/** + * UI Actions Handler + * Provides functions for invoking module actions from the frontend + */ + +/** + * Invoke an action from a UI module + * @param {string} moduleId - The module identifier + * @param {string} actionId - The action identifier + * @param {HTMLElement} [buttonElement] - Optional button element to show loading state + * @returns {Promise<{success: boolean, message?: string, data?: any}>} + */ +async function invokeAction(moduleId, actionId, buttonElement) { + // Show loading state if button provided + let originalContent = null; + if (buttonElement) { + originalContent = buttonElement.innerHTML; + buttonElement.innerHTML = "..."; + buttonElement.disabled = true; + } + + try { + const response = await fetch(`/api/actions/${moduleId}/${actionId}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }); + + const result = await response.json(); + + // Show result feedback + if (result.success) { + showNotification(result.message || "Action completed successfully", "success"); + } else { + showNotification(result.message || "Action failed", "error"); + } + + return result; + } catch (error) { + console.error("Action invocation failed:", error); + showNotification("Failed to invoke action: " + error.message, "error"); + return { success: false, message: error.message }; + } finally { + // Restore button state + if (buttonElement && originalContent !== null) { + buttonElement.innerHTML = originalContent; + buttonElement.disabled = false; + } + } +} + +/** + * Show a notification message using NES.css styling + * @param {string} message - The message to display + * @param {string} type - 'success' or 'error' + */ +function showNotification(message, type) { + // Remove any existing notification + const existing = document.getElementById("action-notification"); + if (existing) { + existing.remove(); + } + + // Create notification element + const notification = document.createElement("div"); + notification.id = "action-notification"; + notification.className = `nes-container ${type === "success" ? "is-success" : "is-error"}`; + notification.style.cssText = ` + position: fixed; + bottom: 20px; + right: 20px; + z-index: 9999; + max-width: 400px; + animation: slideIn 0.3s ease-out; + `; + notification.innerHTML = ` +

${message}

+ `; + + document.body.appendChild(notification); + + // Play click sound for feedback + if (typeof clickButton === "function") { + clickButton(); + } + + // Auto-remove after 4 seconds + setTimeout(() => { + notification.style.animation = "slideOut 0.3s ease-in"; + setTimeout(() => notification.remove(), 300); + }, 4000); +} + +// Add CSS animation styles +const styleSheet = document.createElement("style"); +styleSheet.textContent = ` + @keyframes slideIn { + from { + transform: translateX(100%); + opacity: 0; + } + to { + transform: translateX(0); + opacity: 1; + } + } + @keyframes slideOut { + from { + transform: translateX(0); + opacity: 1; + } + to { + transform: translateX(100%); + opacity: 0; + } + } +`; +document.head.appendChild(styleSheet); diff --git a/src/logic/context.ts b/src/services/context.ts similarity index 100% rename from src/logic/context.ts rename to src/services/context.ts diff --git a/src/logic/grpc.ts b/src/services/grpc.ts similarity index 100% rename from src/logic/grpc.ts rename to src/services/grpc.ts diff --git a/src/logic/mqtt.ts b/src/services/mqtt.ts similarity index 54% rename from src/logic/mqtt.ts rename to src/services/mqtt.ts index 537a9af..b2c0a56 100644 --- a/src/logic/mqtt.ts +++ b/src/services/mqtt.ts @@ -1,8 +1,7 @@ import { connect } from "mqtt"; import { enqueue } from "./grpc"; -import { interact } from "./arweave"; -import { encode } from "./encode"; -import { m3ter as m3terContract } from "./context"; +import { encode } from "../lib/encode"; +import { app, m3ter as m3terContract } from "./context"; import { deleteMeterByPublicKey, getAllTransactionRecords, @@ -14,64 +13,66 @@ import { updateMeterDevEui, updateMeterNonce, } from "../store/sqlite"; -import { State, TransactionRecord } from "../types"; -import { getProverURL, sendPendingTransactionsToProver } from "./prover"; -import { decodePayload } from "./decode"; -import { verifyPayloadSignature } from "../utils"; -import { - getLatestTransactionNonce, - pruneAndSyncOnchain, - getCrossChainRevenue, - getOwedFromPriceContext, -} from "./sync"; -import { createMeterLogger, MeterLogger } from "../utils/logger"; -import { publishPendingTransactionsToStreamr } from "./streamr"; +import type { State, TransactionRecord } from "../types"; +import { decodePayload } from "../lib/decode"; +import { runHook, verifyPayloadSignature } from "../lib/utils"; +import { getLatestTransactionNonce, pruneAndSyncOnchain, getCrossChainRevenue, getOwedFromPriceContext } from "../lib/sync"; +import { createMeterLogger } from "../utils/logger"; const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; -const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing -export function handleUplinks() { - const client = connect({ - host: CHIRPSTACK_HOST, - port: 1883, - clean: true, - connectTimeout: 9000, - reconnectPeriod: 1000, - }); +export function handleUplinks(): Promise { + return new Promise(function (resolve, reject) { + const client = connect({ + host: CHIRPSTACK_HOST, + port: 1883, + clean: true, + connectTimeout: 9000, + reconnectPeriod: 1000, + }); - client.on("connect", () => { - client.subscribe(`application/${process.env.APPLICATION_ID}/device/+/event/up`, () => { - console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + client.on("reconnect", () => { + runHook("onMqttReconnect", client); }); - }); - client.on("error", (err) => { - console.error("Connection error: ", err); - client.end(); - process.exit(1); - }); + client.on("message", async (_, blob) => { + return await handleMessage(blob); + }); - client.on("reconnect", () => { - console.log("Reconnecting..."); - }); + client.on("connect", () => { + const topic = `application/${process.env.APPLICATION_ID}/device/+/event/up`; + client.subscribe(topic, () => { + console.log(`\nConnected & Subscribed to CHIRPSTACK_HOST: ${CHIRPSTACK_HOST}\n`); + + runHook("onMqttSubscribed", client, topic); + resolve(true); + }); + + runHook("onMqttConnect", client); + }); + + client.on("error", (err) => { + console.error("Connection error: ", err); + + runHook("onMqttError", err, client); - client.on("message", async (_, blob) => { - return await handleMessage(blob); + client.end(); + reject(err); + }); }); } export async function handleMessage(blob: Buffer) { - let devEui: string | undefined; - let logger: MeterLogger = createMeterLogger({}); // Default logger for early errors + runHook("onMessageReceived", blob); - try { - const message = JSON.parse(blob.toString()); - devEui = message["deviceInfo"]["devEui"]; + const message = JSON.parse(blob.toString()); + const devEui = message["deviceInfo"]["devEui"] || null; - // Initialize logger with devEui context - logger = createMeterLogger({ devEui }); + // Create a logger with devEui context + const logger = createMeterLogger({ devEui }); + try { if (!devEui) { console.log("[warn] Message dropped - no devEui found in message"); return; @@ -80,6 +81,7 @@ export async function handleMessage(blob: Buffer) { // Check if this specific device is already being processed if (deviceLocks.get(devEui)) { logger.warn(`Message dropped - device is already being processed`); + runHook("onMessageDropped", "locked", devEui); return; } @@ -90,7 +92,7 @@ export async function handleMessage(blob: Buffer) { logger.info(`Received uplink from device: ${JSON.stringify(message)}`); - // encode transaction into standard format (payload is hex string) + // encoded transaction in standard format (payload is hex string) // format: nonce | energy | signature | voltage | device_id | longitude | latitude const transactionHex = Buffer.from(message["data"], "base64"); const decoded = decodePayload(transactionHex); @@ -128,12 +130,7 @@ export async function handleMessage(blob: Buffer) { const latestNonce = await getLatestTransactionNonce(tokenId); - // Update logger with tokenId context now that we have it - logger = createMeterLogger({ devEui, tokenId, publicKey: `0x${publicKey}` }); - - logger.info( - `Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}` - ); + logger.info(`Fetched tokenId and latestNonce from chain and local state: ${tokenId}, ${latestNonce}`); // save new meter with devEui const newMeter = { @@ -145,21 +142,16 @@ export async function handleMessage(blob: Buffer) { const existingMeter = getMeterByTokenId(tokenId); - // incase of the public key being updated + // in-case of the public key being updated if (existingMeter && existingMeter.publicKey !== `0x${publicKey}`) { deleteMeterByPublicKey(`0x${publicKey}`); } saveMeter(newMeter); logger.info(`Saved new meter: ${JSON.stringify(newMeter)}`); - } else { - // Update logger with existing meter context - logger = createMeterLogger({ - devEui, - tokenId: existingMeter.tokenId, - publicKey: `0x${publicKey}`, - }); + runHook("onMeterCreated", newMeter); + } else { // update existing meter with devEui if not already set if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); @@ -181,9 +173,6 @@ export async function handleMessage(blob: Buffer) { throw new Error("Meter not found for public key: " + publicKey); } - // Update logger with complete meter context - logger = createMeterLogger({ devEui, tokenId: m3ter.tokenId, publicKey: `0x${publicKey}` }); - logger.info(`Found meter: ${JSON.stringify(m3ter)}`); // If both latest nonce and received nonce are 0, enqueue 0 immediately @@ -191,9 +180,9 @@ export async function handleMessage(blob: Buffer) { logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); + is_on = true; // Always on + // (await getCrossChainRevenue(m3ter.tokenId)) >= + // (await getOwedFromPriceContext(m3ter.tokenId)); } catch (error) { logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); } @@ -210,40 +199,18 @@ export async function handleMessage(blob: Buffer) { return; // Exit early without processing the transaction } - if (m3ter.latestNonce % SYNC_EPOCH === 0) { - // sync with blockchain every SYNC_EPOCH transactions - await pruneAndSyncOnchain(m3ter.tokenId); - - logger.info(`Synced meter with blockchain: ${m3ter.tokenId}`); - - m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; - - if (!m3ter) { - throw new Error("Meter not found after sync for public key: " + publicKey); - } - } - const expectedNonce = m3ter.latestNonce + 1; - logger.info( - `Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}` - ); + logger.info(`Received blob for meter ${m3ter?.tokenId}, expected nonce: ${expectedNonce}, got: ${decoded.nonce}`); if (decoded.nonce !== expectedNonce && decoded.nonce !== 0) { - throw new Error( - `Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}` - ); + throw new Error(`Invalid nonce. Expected ${expectedNonce}, got ${decoded.nonce}. Public key: ${publicKey}`); } // if device nonce is correct if (decoded.nonce === expectedNonce) { logger.info(`Nonce is valid: ${decoded.nonce}`); - // Upload to arweave - await interact(m3ter.tokenId, decoded); - - logger.info(`Uploaded transaction to Arweave for meter ${m3ter.tokenId}`); - // save transaction to local store const transactionRecord = { nonce: decoded.nonce, @@ -251,59 +218,42 @@ export async function handleMessage(blob: Buffer) { receivedAt: Date.now(), raw: transactionHex.toString("hex"), } as TransactionRecord; - insertTransaction(transactionRecord); - updateMeterNonce(`0x${publicKey}`, expectedNonce); logger.info(`Updated meter nonce to: ${expectedNonce}`); const pendingTransactions = getAllTransactionRecords(); + await runHook("onTransactionDistribution", m3ter.tokenId, decoded, pendingTransactions); + } - // send pending transactions to prover node try { - const proverURL = await getProverURL(); - logger.info(`Sending pending transactions to prover: ${proverURL}`); - - const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); - - logger.info("done sending to prover"); - logger.info(`Prover response (text): ${await response?.text()}`); + is_on = await runHook("isOnStateCompute", m3ter.tokenId); } catch (error) { - logger.error(`Error sending pending transactions to prover: ${error}`); + runHook("onIsOnStateComputeError", m3ter.tokenId, error); + logger.error(`Error in isOnStateCompute hook: ${error}`); } - // send pending transactions to streamr - try { - logger.info(`Sending pending transactions to streamr`); - await publishPendingTransactionsToStreamr(pendingTransactions); - logger.info(`Successfully sent pending transactions to streamr`); - } catch (error) { - logger.error(`Error sending pending transactions to streamr: ${error}`); - } - } + runHook("onIsOnStateComputed", m3ter.tokenId, is_on); - try { - is_on = true; // Always on - // (await getCrossChainRevenue(m3ter.tokenId)) >= - // (await getOwedFromPriceContext(m3ter.tokenId)); - } catch (error) { - logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); - } - const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; + const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; - logger.info(`Enqueuing state: ${JSON.stringify(state)}`); + logger.info(`Enqueuing state: ${JSON.stringify(state)}`); - enqueue( - message["deviceInfo"]["devEui"], - encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) - ); + enqueue( + message["deviceInfo"]["devEui"], + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) + ); + runHook("onStateEnqueued", state, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0); } catch (error) { logger.error(`Error handling MQTT message: ${error}`); + runHook("onMessageError", error); } finally { // Release lock for this specific device if (devEui) { deviceLocks.delete(devEui); + runHook("onDeviceUnlocked", devEui); } + runHook("onMessageProcessingComplete"); } } diff --git a/src/types.ts b/src/types.ts index 8082331..3004ac9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,51 @@ +import { MqttClient } from "mqtt/*"; + +// Application configuration type (console.config.json) +export type AppConfig = { + modules: string[]; + uiModules?: Record; + streamr: { + streamId: string[]; + cronSchedule: string; + }; + prune_sync: { + cronSchedule: string; + }; +}; + +// Hooks type for lifecycle events +export type Hooks = { + onBeforeInit?: () => void | Promise; + onDatabaseSetup?: () => void | Promise; + onAfterInit?: () => void | Promise; + onInitError?: (error: any) => void | Promise; + + onMqttConnect?: (client: MqttClient) => void | Promise; + onMqttSubscribed?: (client: MqttClient, topic: string) => void | Promise; + onMqttError?: (error: any, client: MqttClient) => void | Promise; + onMqttReconnect?: (client: MqttClient) => void | Promise; + + onMessageReceived?: (blob: Buffer) => void | Promise; + onMessageDropped?: (reason: string, devEui: string) => void | Promise; + + onMeterCreated?: (newMeter: MeterRecord) => void | Promise; + + onTransactionDistribution?: ( + tokenId: number, + decodedPayload: DecodedPayload, + pendingTransactions: TransactionRecord[] + ) => void | Promise; + + isOnStateCompute?: (m3terId: number) => boolean | Promise; + onIsOnStateComputed?: (m3terId: number, isOn: boolean) => void | Promise; + onIsOnStateComputeError?: (m3terId: number, error: any) => void | Promise; + onStateEnqueued?: (state: any, latitude: number, longitude: number) => void | Promise; + + onMessageError?: (error: any) => void | Promise; + onDeviceUnlocked?: (devEui: string) => void | Promise; + onMessageProcessingComplete?: () => void | Promise; +}; + // Meter interface for database operations export interface MeterRecord { publicKey: string; @@ -50,4 +98,65 @@ export interface VerifierInfo { ensName: string; targetAddress: string; verifierAddress: string; -} \ No newline at end of file +} + +// ========================================== +// UI Extension Types +// ========================================== + +/** + * Represents an app icon displayed in the desktop UI + */ +export interface UIAppIcon { + /** Unique identifier for the icon */ + id: string; + /** Display label shown below the icon */ + label: string; + /** HTML content for the icon (can use NES.css classes) */ + iconHtml: string; + /** Optional CSS class for the button */ + buttonClass?: string; +} + +/** + * Represents an app window/panel in the UI + */ +export interface UIAppWindow { + /** Unique identifier matching the icon id */ + id: string; + /** Window title */ + title: string; + /** HTML content for the window body */ + contentHtml: string; + /** Optional CSS class for the container */ + containerClass?: string; +} + +/** + * Represents an action that can be triggered from the UI + */ +export interface UIAction { + /** Unique identifier for the action */ + id: string; + /** Display label for the action button */ + label: string; + /** Optional CSS class for the button (e.g., 'is-primary', 'is-warning') */ + buttonClass?: string; + /** Handler function called when action is triggered */ + handler: () => void | Promise; +} + +/** + * UI Hooks interface for extending the console UI + * Modules can implement this to add icons, windows, and actions to the web interface + */ +export type UIHooks = { + /** Return an app icon to display on the desktop */ + getAppIcon?: () => UIAppIcon | Promise; + /** Return an app window/panel configuration */ + getAppWindow?: () => UIAppWindow | Promise; + /** Return available actions for this module */ + getActions?: () => UIAction[] | Promise; + /** Return metadata/status data for display */ + getStatusData?: () => Record | Promise>; +} diff --git a/src/utils.ts b/src/utils.ts deleted file mode 100644 index 0fbacdd..0000000 --- a/src/utils.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { TransactionRecord, BatchTransactionPayload } from "./types"; -import { createPublicKey, verify } from "crypto"; -import os from "os"; - -/** - * Retries a function up to 5 times with exponential backoff - * @param fn Function to retry - * @param maxRetries Maximum number of retries (default: 5) - * @param baseDelay Base delay in milliseconds (default: 1000) - * @returns Promise that resolves with the function result or rejects with the last error - */ -export async function retry( - fn: () => Promise, - maxRetries: number = 5, - baseDelay: number = 1000 -): Promise { - let lastError: Error; - - for (let attempt = 0; attempt <= maxRetries; attempt++) { - try { - return await fn(); - } catch (error) { - lastError = error as Error; - - if (attempt === maxRetries) { - throw lastError; - } - - const delay = baseDelay * Math.pow(2, attempt); - console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`, error); - - await new Promise((resolve) => setTimeout(resolve, delay)); - } - } - - throw lastError!; -} - -export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] { - return transactions.map((transaction) => ({ - m3ter_id: Number(transaction.identifier), - message: transaction.raw, - })); -} - -export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean { - try { - const message = transaction.subarray(0, 8); - const signature = transaction.subarray(8, 72); - - // Wrap raw key in SPKI DER - const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex"); - const derKey = Buffer.concat([new Uint8Array(spkiPrefix), new Uint8Array(rawPubKey)]); - - const publicKey = createPublicKey({ - key: derKey, - format: "der", - type: "spki", - }); - - // Verify - const ok = verify(null, new Uint8Array(message), publicKey, new Uint8Array(signature)); - - return ok; - } catch (error) { - console.error("Error verifying signature:", error); - return false; - } -} diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 815b8a1..90b0598 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -8,18 +8,10 @@ export interface MeterLogger { export interface MeterContext { devEui?: string; - tokenId?: number; - publicKey?: string; } export function createMeterLogger(context: MeterContext): MeterLogger { - const prefix = context.tokenId - ? `[Meter-${context.tokenId}${context.devEui ? `|${context.devEui}` : ''}]` - : context.devEui - ? `[Device-${context.devEui}]` - : context.publicKey - ? `[PubKey-${context.publicKey.slice(-8)}]` - : '[Unknown-Meter]'; + const prefix = context.devEui ? `[Device-${context.devEui}]` : "[Unknown-Meter]"; return { info: (message: string) => console.log(`${prefix} [info] ${message}`), @@ -27,4 +19,4 @@ export function createMeterLogger(context: MeterContext): MeterLogger { error: (message: string) => console.error(`${prefix} [error] ${message}`), debug: (message: string) => console.log(`${prefix} [debug] ${message}`), }; -} \ No newline at end of file +} diff --git a/src/views/index.hbs b/src/views/index.hbs index 738061c..249a735 100644 --- a/src/views/index.hbs +++ b/src/views/index.hbs @@ -97,6 +97,21 @@ + {{! {/* Dynamic Module Icons */} }} + {{#each icons}} +
+
+ +

{{label}}

+
+
+ {{/each}} + {{! {/* M3ters Table */} }} + + {{! {/* Dynamic Module Windows */} }} + {{#each windows}} + + {{/each}} diff --git a/src/views/layouts/main.hbs b/src/views/layouts/main.hbs index 9c91081..4d12805 100644 --- a/src/views/layouts/main.hbs +++ b/src/views/layouts/main.hbs @@ -20,4 +20,5 @@ + \ No newline at end of file diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts index 4582f68..de1cb09 100644 --- a/tests/logic/sync.test.ts +++ b/tests/logic/sync.test.ts @@ -4,10 +4,10 @@ import { isVerifiersCacheInitialized, getCachedVerifiersCount, getCrossChainRevenue, -} from "../../src/logic/sync"; +} from "../../src/lib/sync"; // Mock the context module -jest.mock("../../src/logic/context", () => ({ +jest.mock("../../src/services/context", () => ({ provider: { resolveName: jest.fn(), }, @@ -19,7 +19,7 @@ jest.mock("../../src/logic/context", () => ({ })); // Mock the retry utility -jest.mock("../../src/utils", () => ({ +jest.mock("../../src/lib/utils", () => ({ retry: jest.fn((fn) => fn()), })); @@ -30,7 +30,7 @@ describe("Verifiers Cache", () => { }); it("should initialize cache successfully", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful responses ccipRevenueReader.verifierCount.mockResolvedValue(2n); @@ -55,7 +55,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns null)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -69,7 +69,7 @@ describe("Verifiers Cache", () => { }); it("should throw error if ENS resolution fails (returns zero address)", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock responses with ENS resolution failure ccipRevenueReader.verifierCount.mockResolvedValue(1n); @@ -82,17 +82,8 @@ describe("Verifiers Cache", () => { expect(getCachedVerifiersCount()).toBe(0); }); - it("should throw error if cache is not initialized when calling getCrossChainRevenue", async () => { - // Ensure cache is not initialized - expect(isVerifiersCacheInitialized()).toBe(false); - - await expect(getCrossChainRevenue(123)).rejects.toThrow( - "Verifiers cache not initialized. Call initializeVerifiersCache() first." - ); - }); - it("should use cached verifiers for getCrossChainRevenue", async () => { - const { provider, ccipRevenueReader } = require("../../src/logic/context"); + const { provider, ccipRevenueReader } = require("../../src/services/context"); // Mock successful initialization ccipRevenueReader.verifierCount.mockResolvedValue(1n);