diff --git a/.env.example b/.env.example
index bb64c2b..50f95b1 100644
--- a/.env.example
+++ b/.env.example
@@ -5,5 +5,4 @@ CONTRACT_LABEL="M3ters"
CHIRPSTACK_HOST="localhost"
MAINNET_RPC="https://sepolia.drpc.org"
PREFERRED_PROVER_NODE="http://prover.m3ter.ing"
-STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"
ETHEREUM_PRIVATE_KEY="..."
diff --git a/.gitignore b/.gitignore
index 195e462..fa5348b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# console configuration file
+console.config.json
+
#test scripts
emulate.ts
test-database.ts
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..7d1e594
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,10 @@
+{
+ "cSpell.words": [
+ "ardrive",
+ "arweave",
+ "m3ters",
+ "Emmo00",
+ "ccip",
+ "Mauchly",
+ ]
+}
\ No newline at end of file
diff --git a/README.md b/README.md
index 4074dda..6ad68be 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,11 @@
-# M3tering Console Setup
+# M3tering Console
+
+A modular, extensible service console for providers on the M3tering protocol. Features a hook-based architecture for backend extensibility and a UI extension system for frontend customization.
## Pre-setup
- Make sure Public key is set on the M3ter contract
-- Make sure the price for evergy as been set on the PriceContext contract
+- Make sure the price for energy has been set on the PriceContext contract
- Make sure the Console has been granted publish permission on the Streamr stream
## Quick Setup
@@ -27,7 +29,6 @@
CHIRPSTACK_HOST=localhost
MAINNET_RPC=https://sepolia.drpc.org
PREFERRED_PROVER_NODE=http://34.244.149.153
- STREAMR_STREAM_ID="0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"
ETHEREUM_PRIVATE_KEY="..."
```
@@ -54,3 +55,263 @@
npm install
npm run dev
```
+
+---
+
+# Extension System
+
+The M3tering Console provides two complementary extension systems:
+
+1. **Backend Hooks** - Hook into the console lifecycle (MQTT, database, message processing)
+2. **UI Hooks** - Add custom icons, panels, and actions to the web interface
+
+Both systems use a config-driven approach where modules are loaded dynamically from paths specified in `console.config.json`.
+
+## Configuration
+
+```json
+{
+ "modules": [
+ "core/arweave",
+ "core/prover",
+ "core/streamr",
+ "core/is_on",
+ "core/prune_sync"
+ ],
+ "uiModules": {
+ "streamr": "core/streamr/ui"
+ },
+ "streamr": {
+ "streamId": ["0x.../m3tering/test"],
+ "cronSchedule": "0 * * * *"
+ }
+}
+```
+
+- **`modules`**: Array of paths to backend hook modules (relative to `src/lib/`)
+- **`uiModules`**: Object mapping module IDs to UI module paths (relative to `src/lib/`)
+
+---
+
+# Backend Hooks
+
+Backend hooks allow modules to react to console lifecycle events. Each module exports a default class implementing the `Hooks` interface.
+
+## Creating a Backend Module
+
+```typescript
+// src/lib/core/my-module/index.ts
+import type { Hooks } from "../../../types";
+
+export default class implements Hooks {
+ onAfterInit() {
+ console.log("My module initialized!");
+ }
+
+ onTransactionDistribution(tokenId, decodedPayload, pendingTransactions) {
+ // Process transactions
+ }
+}
+```
+
+Add to `console.config.json`:
+```json
+{
+ "modules": ["core/my-module"]
+}
+```
+
+## Hook Lifecycle Reference
+
+### Initialization Phase
+
+| Hook | Description | Parameters |
+|------|-------------|------------|
+| `onBeforeInit` | Before any initialization begins | None |
+| `onDatabaseSetup` | After SQLite tables/jobs are initialized | None |
+| `onAfterInit` | After all initialization completes successfully | None |
+| `onInitError` | When an error occurs during initialization | `error: any` |
+
+### MQTT Connection Phase
+
+| Hook | Description | Parameters |
+|------|-------------|------------|
+| `onMqttConnect` | MQTT client successfully connects to ChirpStack | `client: MqttClient` |
+| `onMqttSubscribed` | After subscribing to the device uplink topic | `client: MqttClient`, `topic: string` |
+| `onMqttError` | MQTT connection error occurs | `error: any`, `client: MqttClient` |
+| `onMqttReconnect` | Attempting to reconnect to MQTT broker | `client: MqttClient` |
+
+### Message Processing Phase
+
+| Hook | Description | Parameters |
+|------|-------------|------------|
+| `onMessageReceived` | Raw MQTT message received (before parsing) | `blob: Buffer` |
+| `onMessageDropped` | Message dropped (e.g., device locked) | `reason: string`, `devEui: string` |
+| `onMeterCreated` | New meter saved to database | `newMeter: MeterRecord` |
+| `onTransactionDistribution` | Before sending to Arweave/prover/Streamr | `tokenId: number`, `decodedPayload: DecodedPayload`, `pendingTransactions: TransactionRecord[]` |
+
+### State Computation Phase
+
+| Hook | Description | Parameters |
+|------|-------------|------------|
+| `isOnStateCompute` | Determine device on/off state (returns `boolean`) | `m3terId: number` |
+| `onIsOnStateComputed` | After on/off state computed | `m3terId: number`, `isOn: boolean` |
+| `onIsOnStateComputeError` | Error during state computation | `m3terId: number`, `error: any` |
+| `onStateEnqueued` | State enqueued to gRPC for device response | `state: any`, `latitude: number`, `longitude: number` |
+
+### Error & Cleanup Phase
+
+| Hook | Description | Parameters |
+|------|-------------|------------|
+| `onMessageError` | Error during message processing | `error: any` |
+| `onDeviceUnlocked` | Device lock released (regardless of outcome) | `devEui: string` |
+| `onMessageProcessingComplete` | Message processing finished | None |
+
+---
+
+# UI Hooks
+
+UI Hooks allow modules to extend the web interface at `http://localhost:3000`. Modules can add desktop icons, app windows/panels, and trigger-able actions.
+
+## Creating a UI Module
+
+```typescript
+// src/lib/core/my-module/ui.ts
+import type { UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../../../types";
+
+export default class implements UIHooks {
+ getAppIcon(): UIAppIcon {
+ return {
+ id: "my-module",
+ label: "My Module",
+ iconHtml: '',
+ buttonClass: "is-primary",
+ };
+ }
+
+ getAppWindow(): UIAppWindow {
+ return {
+ id: "my-module",
+ title: "My Module Panel",
+ contentHtml: `
+
Hello from my module!
+
+ `,
+ };
+ }
+
+ getActions(): UIAction[] {
+ return [
+ {
+ id: "do-something",
+ label: "Do Something",
+ handler: async () => {
+ // Perform action
+ return { message: "Action completed!" };
+ },
+ },
+ ];
+ }
+}
+```
+
+Add to `console.config.json`:
+```json
+{
+ "uiModules": {
+ "my-module": "core/my-module/ui"
+ }
+}
+```
+
+## UIHooks Interface
+
+| Method | Return Type | Description |
+|--------|-------------|-------------|
+| `getAppIcon()` | `UIAppIcon` | Desktop icon displayed in the app grid |
+| `getAppWindow()` | `UIAppWindow` | Window/panel shown when icon is clicked |
+| `getActions()` | `UIAction[]` | Actions invokable from the frontend |
+| `getStatusData()` | `Record` | Metadata for display (optional) |
+
+## Type Definitions
+
+### UIAppIcon
+
+```typescript
+interface UIAppIcon {
+ id: string; // Unique identifier
+ label: string; // Display label below icon
+ iconHtml: string; // HTML content (supports NES.css icons)
+ buttonClass?: string; // Optional button class (e.g., "is-primary")
+}
+```
+
+### UIAppWindow
+
+```typescript
+interface UIAppWindow {
+ id: string; // Must match icon id
+ title: string; // Window title bar text
+ contentHtml: string; // HTML content for window body
+ containerClass?: string; // Optional container class
+}
+```
+
+### UIAction
+
+```typescript
+interface UIAction {
+ id: string; // Action identifier
+ label: string; // Button label
+ buttonClass?: string; // Optional button class
+ handler: () => void | Promise<{ message?: string; data?: any }>;
+}
+```
+
+## Frontend API
+
+### Invoking Actions
+
+From your panel HTML, use the global `invokeAction()` function:
+
+```javascript
+// invokeAction(moduleId, actionId, buttonElement?)
+invokeAction('my-module', 'do-something', this);
+```
+
+The function:
+- Shows loading state on the button (if provided)
+- Calls `POST /api/actions/:moduleId/:actionId`
+- Displays success/error notification using NES.css styling
+
+### REST Endpoint
+
+```
+POST /api/actions/:moduleId/:actionId
+
+Response: { success: boolean, message?: string, data?: any }
+```
+
+---
+
+# Built-in Modules
+
+## Backend Modules
+
+| Module | Description |
+|--------|-------------|
+| `core/arweave` | Uploads transaction data to Arweave permanent storage |
+| `core/prover` | Sends batched transactions to the prover node |
+| `core/streamr` | Publishes transactions to Streamr streams on a cron schedule |
+| `core/is_on` | Computes device on/off state based on balance |
+| `core/prune_sync` | Cleans up old synchronized transactions |
+
+## UI Modules
+
+| Module | Description |
+|--------|-------------|
+| `streamr` | Panel showing stream config, pending count, and "Publish Now" action |
+
+---
diff --git a/console.example.config.json b/console.example.config.json
new file mode 100644
index 0000000..628ab5f
--- /dev/null
+++ b/console.example.config.json
@@ -0,0 +1,21 @@
+{
+ "modules": [
+ "core/arweave",
+ "core/prover",
+ "core/streamr",
+ "core/is_on",
+ "core/prune_sync"
+ ],
+ "uiModules": {
+ "streamr": "core/streamr/ui"
+ },
+ "streamr": {
+ "streamId": [
+ "0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"
+ ],
+ "cronSchedule": "0 * * * *"
+ },
+ "prune_sync": {
+ "cronSchedule": "0 * * * *"
+ }
+}
diff --git a/package-lock.json b/package-lock.json
index ec0ae02..bbb6486 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -21,6 +21,7 @@
"express-handlebars": "^7.1.2",
"handlebars": "^4.7.8",
"mqtt": "^5.5.0",
+ "node-cron": "^4.2.1",
"ssh2": "^1.17.0",
"ws": "^8.18.3"
},
@@ -11363,6 +11364,15 @@
"integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA==",
"license": "MIT"
},
+ "node_modules/node-cron": {
+ "version": "4.2.1",
+ "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-4.2.1.tgz",
+ "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==",
+ "license": "ISC",
+ "engines": {
+ "node": ">=6.0.0"
+ }
+ },
"node_modules/node-datachannel": {
"version": "0.27.0",
"resolved": "https://registry.npmjs.org/node-datachannel/-/node-datachannel-0.27.0.tgz",
diff --git a/package.json b/package.json
index 0451369..73562d8 100644
--- a/package.json
+++ b/package.json
@@ -24,6 +24,7 @@
"express-handlebars": "^7.1.2",
"handlebars": "^4.7.8",
"mqtt": "^5.5.0",
+ "node-cron": "^4.2.1",
"ssh2": "^1.17.0",
"ws": "^8.18.3"
},
diff --git a/src/index.ts b/src/index.ts
index c2e4d79..ab8b856 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -1,34 +1,45 @@
import "dotenv/config";
-import { handleUplinks } from "./logic/mqtt";
+import { handleUplinks } from "./services/mqtt";
import { Request, Response } from "express";
-import { app } from "./logic/context";
+import { app } from "./services/context";
+import { loadExtensionsFromConfig, loadUIExtensionsFromConfig, getUIComponents, invokeUIAction, runHook } from "./lib/utils";
import setupDatabase, { getAllMeterRecords, deleteMeterByPublicKey } from "./store/sqlite";
-import { initializeVerifiersCache } from "./logic/sync";
-import { publishHeartbeatToStream } from "./logic/streamr";
// Async initialization function
async function initializeApp() {
try {
console.log("[info] Starting application initialization...");
+ // Load extensions from config
+ await loadExtensionsFromConfig();
+ console.log("[info] Extensions loaded successfully");
+
+ // Load UI extensions
+ await loadUIExtensionsFromConfig();
+ console.log("[info] UI extensions loaded successfully");
+
+ runHook("onBeforeInit");
+
// Initialize database tables and jobs
setupDatabase();
console.log("[info] Database setup completed");
- // Initialize verifiers cache on startup
- // (disable ccip read initialization)
- // await initializeVerifiersCache();
- // console.log("[info] Verifiers cache initialized successfully");
+ runHook("onDatabaseSetup");
- // Start MQTT handling
- handleUplinks();
- console.log("[info] MQTT uplinks handler started");
-
- await publishHeartbeatToStream();
+ try {
+ // Start MQTT handling
+ await handleUplinks();
+ } catch (mqttError) {
+ console.error("[error] MQTT initialization failed:", mqttError);
+ throw mqttError;
+ }
console.log("[info] Application initialization completed successfully");
+
+ runHook("onAfterInit");
} catch (error) {
console.error("[fatal] Failed to initialize application:", error);
+ runHook("onInitError", error);
process.exit(1);
}
}
@@ -38,10 +49,28 @@ initializeApp();
app.get("/", async (req: Request, res: Response) => {
const m3ters = getAllMeterRecords();
- res.render("index", { m3ters });
+
+ // Get UI components from loaded UI extensions
+ const { icons, windows } = await getUIComponents();
+
+ res.render("index", { m3ters, icons, windows });
console.log("[server]: Server handled GET request at `/`");
});
+// API endpoint to invoke UI actions
+app.post("/api/actions/:moduleId/:actionId", async (req: Request, res: Response) => {
+ const { moduleId, actionId } = req.params;
+ console.log(`[server]: Invoking action '${actionId}' from module '${moduleId}'`);
+
+ const result = await invokeUIAction(moduleId, actionId);
+
+ if (result.success) {
+ res.status(200).json(result);
+ } else {
+ res.status(400).json(result);
+ }
+});
+
app.delete("/delete-meter", async (req: Request, res: Response) => {
let publicKey = decodeURIComponent(req.query?.publicKey?.toString() as string);
if (publicKey) {
diff --git a/src/lib/core/arweave/index.ts b/src/lib/core/arweave/index.ts
new file mode 100644
index 0000000..d0e076a
--- /dev/null
+++ b/src/lib/core/arweave/index.ts
@@ -0,0 +1,64 @@
+import { ArweaveSigner, TurboFactory } from "@ardrive/turbo-sdk";
+import { Readable } from "stream";
+import Arweave from "arweave";
+import type { DecodedPayload, Hooks } from "../../../types";
+
+export default class implements Hooks {
+ async onTransactionDistribution(m3terId: number, decoded: DecodedPayload) {
+ // encode transaction into standard format (payload[0])
+ // format: nonce | energy | signature | voltage | device_id | longitude | latitude
+ const transactionHex = decoded.buf;
+
+ const arweave = Arweave.init({
+ host: "arweave.net",
+ protocol: "https",
+ port: 443,
+ });
+
+ const key = await arweave.wallets.generate();
+ const signer = new ArweaveSigner(key);
+ const turbo = TurboFactory.authenticated({ signer });
+
+ const contractLabel = process.env.CONTRACT_LABEL || "M3ters";
+
+ const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8");
+
+ await turbo.uploadFile({
+ fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }),
+ fileSizeFactory: () => byteLength,
+ dataItemOpts: {
+ paidBy: await arweave.wallets.jwkToAddress(key),
+ tags: [
+ { name: "Contract-Label", value: contractLabel },
+ { name: "Contract-Use", value: "M3tering Protocol Test" },
+ { name: "Content-Type", value: "text/plain" },
+ { name: "M3ter-ID", value: m3terId.toString() },
+ { name: "Timestamp", value: Date.now().toString() },
+ { name: "Nonce", value: decoded.nonce.toString() },
+ { name: "Energy", value: decoded.energy.toString() },
+ { name: "Signature", value: decoded.signature },
+ { name: "Voltage", value: decoded.extensions?.voltage?.toString() ?? "" },
+ { name: "Device-ID", value: decoded.extensions?.deviceId?.toString() ?? "" },
+ { name: "Longitude", value: decoded.extensions?.longitude?.toString() ?? "" },
+ { name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" },
+ ],
+ },
+ events: {
+ onUploadProgress: (progress) => {
+ console.log("[arweave] Upload progress:", progress);
+ },
+ onError: (error) => {
+ console.error("[arweave] Upload error:", error);
+ },
+ onSuccess(event) {
+ console.log("[arweave] Upload successful! Transaction ID:", event);
+ },
+ onUploadSuccess(event) {
+ console.log("[arweave] Upload completed! Transaction ID:", event);
+ },
+ },
+ });
+
+ console.log(`[arweave] Uploaded transaction ${decoded.nonce} for M3ter ID ${m3terId} to Arweave.`);
+ }
+}
diff --git a/src/lib/core/is_on/index.ts b/src/lib/core/is_on/index.ts
new file mode 100644
index 0000000..040e0cf
--- /dev/null
+++ b/src/lib/core/is_on/index.ts
@@ -0,0 +1,7 @@
+import type { Hooks } from "../../../types";
+
+export default class implements Hooks {
+ isOnStateCompute(m3terId: number) {
+ return true;
+ }
+}
diff --git a/src/lib/core/prover/index.ts b/src/lib/core/prover/index.ts
new file mode 100644
index 0000000..e647812
--- /dev/null
+++ b/src/lib/core/prover/index.ts
@@ -0,0 +1,64 @@
+import { buildBatchPayload } from "../../utils";
+import type { BatchTransactionPayload, Hooks, TransactionRecord } from "../../../types";
+
+const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing";
+
+export default class implements Hooks {
+ async onTransactionDistribution(_: any, __: any, pendingTransactions: TransactionRecord[]) {
+ // send pending transactions to prover node
+ try {
+ const proverURL = await this.getProverURL();
+
+ if (!proverURL) {
+ console.warn("No prover URL configured. Skipping sending transactions to prover.");
+ return;
+ }
+
+ console.info(`Sending pending transactions to prover: ${proverURL}`);
+
+ const response = await this.sendPendingTransactionsToProver(proverURL!, pendingTransactions);
+
+ console.info("done sending to prover");
+ console.info(`Prover response (text): ${await response?.text()}`);
+ } catch (error) {
+ console.error(`Error sending pending transactions to prover: ${error}`);
+ }
+ }
+
+ async sendTransactionsToProver(
+ proverURL: string,
+ transactionData: BatchTransactionPayload[]
+ ): Promise {
+ try {
+ const response = await fetch(`${proverURL}/batch-payloads`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ },
+ body: JSON.stringify(transactionData),
+ });
+
+ console.log("[info] received", response.status, "from the prover");
+
+ if (!response.ok) {
+ throw new Error(`Prover responded with status: ${response.status}`);
+ }
+ return response;
+ } catch (err: any) {
+ console.error("Failed to send transactions to prover:", err.message);
+ return null;
+ }
+ }
+
+ async getProverURL(): Promise {
+ return PREFERRED_PROVER_NODE;
+ }
+
+ async sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) {
+ console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL);
+
+ const requestPayload = buildBatchPayload(pendingTransactions);
+
+ return await this.sendTransactionsToProver(proverURL, requestPayload);
+ }
+}
diff --git a/src/lib/core/prune_sync/index.ts b/src/lib/core/prune_sync/index.ts
new file mode 100644
index 0000000..aed1354
--- /dev/null
+++ b/src/lib/core/prune_sync/index.ts
@@ -0,0 +1,28 @@
+import cron from "node-cron";
+import { Hooks } from "../../../types";
+import { getAllMeterRecords } from "../../../store/sqlite";
+import { loadConfigurations } from "../../utils";
+import { pruneAndSyncOnchain } from "../../sync";
+
+export default class implements Hooks {
+ private config = loadConfigurations();
+
+ async onAfterInit() {
+ console.log("Registering prune_sync cron job...");
+
+ // Schedule a cron job to perform prune verified transactions and sync with onchain state
+ cron.schedule(this.config.prune_sync.cronSchedule, async () => {
+ const m3ters = getAllMeterRecords();
+ for (const m3ter of m3ters) {
+ try {
+ pruneAndSyncOnchain(m3ter.publicKey);
+ } catch (error) {
+ console.error(`Error pruning and syncing meter ${m3ter.publicKey}:`, error);
+ }
+ }
+ });
+
+ console.log("prune_sync cron job registered.");
+ return;
+ }
+}
diff --git a/src/lib/core/streamr/index.ts b/src/lib/core/streamr/index.ts
new file mode 100644
index 0000000..90b72fd
--- /dev/null
+++ b/src/lib/core/streamr/index.ts
@@ -0,0 +1,69 @@
+import cron from "node-cron";
+import { StreamrClient } from "@streamr/sdk";
+import { getAllTransactionRecords } from "../../../store/sqlite";
+import { buildBatchPayload, loadConfigurations, retry } from "../../utils";
+import type { Hooks, TransactionRecord } from "../../../types";
+
+const { ETHEREUM_PRIVATE_KEY } = process.env;
+
+if (!ETHEREUM_PRIVATE_KEY) {
+ throw new Error("Missing ETHEREUM_PRIVATE_KEY in environment variables");
+}
+
+export default class implements Hooks {
+ private config = loadConfigurations();
+
+ async onAfterInit() {
+ console.log("Registering Streamr cron job...");
+
+ // Schedule a cron job to publish pending transactions
+ cron.schedule(
+ this.config.streamr.cronSchedule,
+ async () => {
+ console.log("Publishing pending transactions to Streamr...");
+
+ const pendingTransactions = await this.getPendingTransactions();
+ if (pendingTransactions.length > 0) {
+ for (const STREAMR_STREAM_ID of this.config.streamr.streamId) {
+ console.log(`Publishing to Streamr stream: ${STREAMR_STREAM_ID}`);
+ await retry(
+ () => this.publishPendingTransactionsToStreamr(STREAMR_STREAM_ID, pendingTransactions),
+ 3,
+ 2000
+ );
+ }
+ }
+ },
+ { name: "streamr-publish-pending-transactions", noOverlap: true }
+ );
+
+ console.log("Streamr cron job registered.");
+ return;
+ }
+
+ async getPendingTransactions(): Promise {
+ return getAllTransactionRecords();
+ }
+
+ async publishPendingTransactionsToStreamr(STREAMR_STREAM_ID: string, pendingTransactions: TransactionRecord[]) {
+ const streamrClient = new StreamrClient({
+ auth: {
+ privateKey: ETHEREUM_PRIVATE_KEY!,
+ },
+ });
+
+ try {
+ const stream = await streamrClient.getStream(STREAMR_STREAM_ID!);
+
+ const batchPayload = buildBatchPayload(pendingTransactions);
+
+ await stream.publish(batchPayload);
+ } catch (error) {
+ console.error(`Failed to publish to Streamr: ${(error as Error).message}`);
+ throw error;
+ } finally {
+ // destroy the client to free resources
+ await streamrClient.destroy();
+ }
+ }
+}
diff --git a/src/lib/core/streamr/ui.ts b/src/lib/core/streamr/ui.ts
new file mode 100644
index 0000000..9ceaa00
--- /dev/null
+++ b/src/lib/core/streamr/ui.ts
@@ -0,0 +1,150 @@
+import { getAllTransactionRecords } from "../../../store/sqlite";
+import { buildBatchPayload, loadConfigurations, retry } from "../../utils";
+import type { UIHooks, UIAppIcon, UIAppWindow, UIAction, TransactionRecord } from "../../../types";
+import { StreamrClient } from "@streamr/sdk";
+
+const { ETHEREUM_PRIVATE_KEY } = process.env;
+
+/**
+ * Streamr UI Module
+ * Provides UI components for the Streamr integration, including
+ * a panel showing stream configuration and a manual publish action
+ */
+export default class implements UIHooks {
+ private config = loadConfigurations();
+ private lastPublishTime: Date | null = null;
+ private lastPublishStatus: "success" | "error" | null = null;
+
+ getAppIcon(): UIAppIcon {
+ return {
+ id: "streamr",
+ label: "Streamr",
+ iconHtml: '',
+ buttonClass: "is-primary",
+ };
+ }
+
+ async getAppWindow(): Promise {
+ const pendingCount = (await this.getPendingTransactions()).length;
+ const streamIds = this.config.streamr.streamId;
+ const cronSchedule = this.config.streamr.cronSchedule;
+
+ return {
+ id: "streamr",
+ title: "Streamr Publisher",
+ containerClass: "",
+ contentHtml: `
+
+
Configuration
+
+ Cron Schedule:
+ ${cronSchedule}
+
+
+
Stream IDs:
+
+ ${streamIds.map((id) => `- ${id}
`).join("")}
+
+
+
+
+
+
Status
+
+ Pending Transactions:
+ ${pendingCount}
+
+
+ Last Publish:
+ ${this.lastPublishTime ? this.lastPublishTime.toLocaleString() : "Never"}
+ ${this.lastPublishStatus ? `(${this.lastPublishStatus})` : ""}
+
+
+
+
+
+
+
+ `,
+ };
+ }
+
+ getActions(): UIAction[] {
+ return [
+ {
+ id: "publish-now",
+ label: "Publish Now",
+ buttonClass: "is-warning",
+ handler: async () => {
+ const pendingTransactions = await this.getPendingTransactions();
+
+ if (pendingTransactions.length === 0) {
+ return { message: "No pending transactions to publish" };
+ }
+
+ try {
+ for (const streamId of this.config.streamr.streamId) {
+ console.log(`[streamr-ui] Publishing to stream: ${streamId}`);
+ await retry(() => this.publishToStreamr(streamId, pendingTransactions), 3, 2000);
+ }
+
+ this.lastPublishTime = new Date();
+ this.lastPublishStatus = "success";
+
+ return {
+ message: `Published ${pendingTransactions.length} transactions to ${this.config.streamr.streamId.length} stream(s)`,
+ data: { count: pendingTransactions.length },
+ };
+ } catch (error: any) {
+ this.lastPublishStatus = "error";
+ throw new Error(`Failed to publish: ${error.message}`);
+ }
+ },
+ },
+ ];
+ }
+
+ async getStatusData(): Promise> {
+ const pendingTransactions = await this.getPendingTransactions();
+ return {
+ pendingCount: pendingTransactions.length,
+ streamIds: this.config.streamr.streamId,
+ cronSchedule: this.config.streamr.cronSchedule,
+ lastPublishTime: this.lastPublishTime,
+ lastPublishStatus: this.lastPublishStatus,
+ };
+ }
+
+ private async getPendingTransactions(): Promise {
+ return getAllTransactionRecords();
+ }
+
+ private async publishToStreamr(streamId: string, pendingTransactions: TransactionRecord[]) {
+ if (!ETHEREUM_PRIVATE_KEY) {
+ throw new Error("Missing ETHEREUM_PRIVATE_KEY");
+ }
+
+ const streamrClient = new StreamrClient({
+ auth: { privateKey: ETHEREUM_PRIVATE_KEY },
+ });
+
+ try {
+ const stream = await streamrClient.getStream(streamId);
+ const batchPayload = buildBatchPayload(pendingTransactions);
+ await stream.publish(batchPayload);
+ } finally {
+ await streamrClient.destroy();
+ }
+ }
+}
diff --git a/src/logic/decode.ts b/src/lib/decode.ts
similarity index 100%
rename from src/logic/decode.ts
rename to src/lib/decode.ts
diff --git a/src/logic/encode.ts b/src/lib/encode.ts
similarity index 100%
rename from src/logic/encode.ts
rename to src/lib/encode.ts
diff --git a/src/logic/sync.ts b/src/lib/sync.ts
similarity index 96%
rename from src/logic/sync.ts
rename to src/lib/sync.ts
index 909e397..3ca21e5 100644
--- a/src/logic/sync.ts
+++ b/src/lib/sync.ts
@@ -11,9 +11,9 @@ import {
rollup as rollupContract,
ccipRevenueReader as ccipRevenueReaderContract,
priceContext as priceContextContract,
-} from "./context";
+} from "../services/context";
import { JsonRpcProvider, Contract, ZeroAddress } from "ethers";
-import { retry } from "../utils";
+import { retry } from "./utils";
import type { VerifierInfo } from "../types";
// Cache for verifiers - populated once on startup
@@ -79,11 +79,11 @@ export async function initializeVerifiersCache(): Promise {
/**
* Get cached verifiers, throws error if cache is not initialized
*/
-function getCachedVerifiers(): VerifierInfo[] {
+async function getCachedVerifiers(): Promise {
if (!isCacheInitialized || !verifiersCache) {
- throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first.");
+ await initializeVerifiersCache();
}
- return verifiersCache;
+ return verifiersCache!;
}
/**
@@ -147,7 +147,7 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis
export async function getCrossChainRevenue(tokenId: number): Promise {
try {
// Use cached verifiers instead of fetching them each time
- const verifiers = getCachedVerifiers();
+ const verifiers = await getCachedVerifiers();
let totalRevenue = 0;
diff --git a/src/lib/utils.ts b/src/lib/utils.ts
new file mode 100644
index 0000000..e3bfdcc
--- /dev/null
+++ b/src/lib/utils.ts
@@ -0,0 +1,225 @@
+import fs from "fs";
+import path from "path";
+import { createPublicKey, verify } from "crypto";
+import type { TransactionRecord, BatchTransactionPayload, Hooks, AppConfig, UIHooks, UIAppIcon, UIAppWindow, UIAction } from "../types";
+
+const extensions: Hooks[] = [];
+const uiExtensions: Map = new Map();
+export const defaultConfigurations: AppConfig = {
+ modules: ["core/arweave", "core/prover", "core/streamr", "core/is_on", "core/prune_sync"],
+ streamr: {
+ streamId: ["0x567853282663b601bfdb9203819b1fbb3fe18926/m3tering/test"],
+ cronSchedule: "0 * * * *",
+ },
+ prune_sync: {
+ cronSchedule: "0 * * * *",
+ },
+};
+
+export function loadConfigurations(configPath: string = "console.config.json"): AppConfig {
+ try {
+ const config: AppConfig = JSON.parse(fs.readFileSync(configPath, "utf-8"));
+ return config;
+ } catch (error) {
+ console.warn(`Could not load configuration from ${configPath}, using default configurations.` );
+ return defaultConfigurations;
+ }
+}
+
+export async function loadExtensionsFromConfig(configPath: string = "console.config.json"): Promise {
+ const config: AppConfig = loadConfigurations(configPath);
+
+ for (const modulePath of config.modules) {
+ const resolved = path.resolve(__dirname, modulePath);
+
+ const mod = await import(resolved);
+ extensions.push(new mod.default());
+ }
+
+ return extensions;
+}
+
+export async function runHook(hook: K, ...args: Parameters>) {
+ let result: ReturnType> | boolean = true;
+
+ for (const ext of extensions) {
+ const fn = ext[hook];
+ let functionReturn;
+ if (fn) functionReturn = await (fn as any).call(ext, ...args);
+
+ if (typeof functionReturn === "boolean" && hook === "isOnStateCompute") {
+ result = result && functionReturn;
+ }
+ }
+
+ return result;
+}
+
+// ==========================================
+// UI Extension System
+// ==========================================
+
+/**
+ * Load UI extensions from configuration file
+ * Looks for 'uiModules' key in config, which maps module IDs to their paths
+ */
+export async function loadUIExtensionsFromConfig(configPath: string = "console.config.json"): Promise