diff --git a/config/default.yaml b/config/default.yaml
index b8f7bad..2c9ebe4 100644
--- a/config/default.yaml
+++ b/config/default.yaml
@@ -11,6 +11,11 @@ radio:
sync_word: 0x2B
preamble_length: 16
tx_power_dbm: 22
+ # HAL GPS/PPS (RAK Pi HAT u-blox → concentrator PPS pin). Off by default.
+ # gps_pps_enabled: false
+ # gps_pps_tty_path: "/dev/ttyAMA0"
+ # gps_family: "ubx7"
+ # gps_pps_target_baud: 0 # 0 = HAL default (9600)
meshtastic:
default_key_b64: "AQ=="
@@ -50,7 +55,7 @@ device:
longitude: 0.0
altitude: 25
hardware_description: "RAK2287 + Raspberry Pi 4"
- firmware_version: "0.7.6"
+ firmware_version: "0.7.5.1"
relay:
enabled: false
@@ -60,6 +65,9 @@ relay:
burst_size: 5
min_relay_rssi: -110.0
max_relay_rssi: -50.0
+ # blocklist: [] # 8-char hex node IDs — relay only, feed unchanged
+ # priority_list: [] # bypass burst gate under congestion
+ # dedup_ttl_seconds: 300
upstream:
enabled: true
@@ -80,13 +88,6 @@ transmit:
nodeinfo:
interval_minutes: 180 # 5..1440 min, default 180 (3 hr); 0 = disabled
startup_delay_seconds: 60 # delay before first broadcast on boot
- position:
- interval_minutes: 15
- startup_delay_seconds: 180
- # LoRa mesh POSITION packets (Meshtastic app map). Meshradar fleet pin
- # always uses device.latitude/longitude above, not live GPS.
- coordinate_source: "static" # static | live (live needs gpsd/uart source)
- location_precision: "approximate" # exact | approximate | none (live only)
web_auth:
# First-run state: hashes are empty -> dashboard forces /setup.
@@ -124,18 +125,23 @@ mqtt:
homeassistant_discovery: false
location:
- # Where live GPS fixes come from (skyplot + optional mesh POSITION).
- # Registered coordinates for Meshradar live in device.latitude/longitude
- # and are not overwritten by gpsd.
+ # Where the Meshpoint's reported lat/lon/alt comes from.
# static : use device.latitude/longitude/altitude (default)
# gpsd : poll a local or remote gpsd daemon for live fixes
- # uart : reserved (RAK Pi HAT GPS, not yet wired)
+ # uart : on-board RAK Pi HAT GPS via /dev/ttyAMA0 (NMEA GGA)
source: "static"
# gpsd connection. Defaults match the well-known local socket; only
# change when running gpsd on a peer machine on the LAN.
gpsd_host: "127.0.0.1"
gpsd_port: 2947
+ uart_path: "/dev/ttyAMA0"
+ uart_baud: 9600
# How often the coordinator wakes to read the active source.
update_interval_seconds: 5
# Minimum acceptable fix mode: 0=any (incl. no-fix), 1=2D, 2=3D.
min_fix_quality: 1
+
+# signal_health:
+# green_rssi_floor: -100 # badge green when 1h avg RSSI is above this
+# yellow_rssi_floor: -115 # badge yellow above this, red below
+# min_packets_per_hour: 5 # hide badge when fewer packets in the last hour
diff --git a/frontend/css/configuration.css b/frontend/css/configuration.css
index 70b484c..f0352dc 100644
--- a/frontend/css/configuration.css
+++ b/frontend/css/configuration.css
@@ -41,19 +41,6 @@
line-height: 1.5;
}
-.cfg-card__hint--nested {
- margin: -4px 0 4px;
-}
-
-.cfg-inline-link {
- color: var(--accent-cyan, #22d3ee);
- text-decoration: none;
-}
-
-.cfg-inline-link:hover {
- text-decoration: underline;
-}
-
.cfg-form {
display: flex;
flex-direction: column;
@@ -242,6 +229,60 @@ select.cfg-field__input option:checked {
transform: translateX(-50%) translateY(0);
}
+.cfg-id-list {
+ display: flex;
+ flex-direction: column;
+ gap: 0.35rem;
+ margin-bottom: 0.5rem;
+}
+
+.cfg-id-empty {
+ margin: 0;
+ font-size: 12px;
+ color: rgba(243, 244, 246, 0.45);
+ font-style: italic;
+}
+
+.cfg-id-row {
+ display: flex;
+ align-items: center;
+ gap: 0.5rem;
+}
+
+.cfg-id-chip {
+ font-family: 'JetBrains Mono', Menlo, monospace;
+ font-size: 12px;
+ padding: 0.15rem 0.45rem;
+ background: rgba(6, 182, 212, 0.1);
+ border: 1px solid rgba(6, 182, 212, 0.25);
+ border-radius: 6px;
+ color: #e2e8f0;
+}
+
+.cfg-id-remove {
+ background: transparent;
+ border: none;
+ color: rgba(243, 244, 246, 0.5);
+ font-size: 1.1rem;
+ line-height: 1;
+ cursor: pointer;
+ padding: 0 0.25rem;
+}
+
+.cfg-id-remove:hover {
+ color: #ef4444;
+}
+
+.cfg-id-add {
+ display: flex;
+ gap: 0.5rem;
+ align-items: center;
+}
+
+.cfg-id-add .cfg-field__input {
+ flex: 1;
+}
+
@keyframes cfgFadeIn {
from { opacity: 0; transform: translateY(8px); }
to { opacity: 1; transform: translateY(0); }
diff --git a/frontend/index.html b/frontend/index.html
index 239398c..064b792 100644
--- a/frontend/index.html
+++ b/frontend/index.html
@@ -36,6 +36,7 @@
+
@@ -677,10 +678,12 @@
Service actions
+
+
@@ -718,6 +721,7 @@ Service actions
+
diff --git a/frontend/js/configuration/configuration_panel.js b/frontend/js/configuration/configuration_panel.js
index 6972cc1..a2ef107 100644
--- a/frontend/js/configuration/configuration_panel.js
+++ b/frontend/js/configuration/configuration_panel.js
@@ -124,10 +124,18 @@ class ConfigurationPanel {
} else if (section === 'advanced' && window.AdvancedConfigCard) {
const host = document.getElementById('cfg-advanced-panel');
if (host) {
- host.innerHTML = '';
+ host.innerHTML = `
+
+
+ `;
const card = new window.AdvancedConfigCard(api);
- card.mount(host);
+ card.mount(host.querySelector('[data-cfg-advanced]'));
this._cards.set('advanced', card);
+ if (window.RelayFiltersCard) {
+ const relayCard = new window.RelayFiltersCard(api);
+ relayCard.mount(host.querySelector('[data-cfg-relay-filters]'));
+ this._cards.set('relay-filters', relayCard);
+ }
}
}
this._mounted.add(section);
diff --git a/frontend/js/configuration/relay_filters_card.js b/frontend/js/configuration/relay_filters_card.js
new file mode 100644
index 0000000..ae1b7ea
--- /dev/null
+++ b/frontend/js/configuration/relay_filters_card.js
@@ -0,0 +1,181 @@
+/**
+ * Configuration → Advanced — relay filter controls.
+ *
+ * Blocklist, priority list, and dedup TTL via PUT /api/config/relay.
+ * Filter changes hot-reload without a full service restart.
+ */
+
+class RelayFiltersCard {
+ static NODE_ID_RE = /^[0-9a-f]{8}$/i;
+
+ constructor(api) {
+ this._api = api;
+ this._root = null;
+ this._blocklist = [];
+ this._priority = [];
+ }
+
+ mount(root) {
+ this._root = root;
+ this._root.innerHTML = `
+
+
+
+
+ `;
+
+ this._blocklistEl = this._root.querySelector('[data-blocklist]');
+ this._priorityEl = this._root.querySelector('[data-priority-list]');
+ this._form = this._root.querySelector('[data-relay-filters-form]');
+ this._statusEl = this._root.querySelector('[data-relay-filters-status]');
+
+ this._root.querySelector('[data-blocklist-add]').addEventListener('click', () => {
+ this._addId('blocklist');
+ });
+ this._root.querySelector('[data-priority-add]').addEventListener('click', () => {
+ this._addId('priority');
+ });
+ this._form.addEventListener('submit', (e) => this._onSubmit(e));
+ }
+
+ render(config) {
+ const relay = config.relay || {};
+ this._blocklist = (relay.blocklist || []).map((id) => this._normalizeId(id));
+ this._priority = (relay.priority_list || []).map((id) => this._normalizeId(id));
+ this._paintLists();
+ const ttl = relay.dedup_ttl_seconds;
+ const ttlEl = this._root.querySelector('[data-dedup-ttl]');
+ if (ttlEl) ttlEl.value = ttl != null ? ttl : 300;
+ }
+
+ _normalizeId(raw) {
+ return String(raw || '').trim().toLowerCase().replace(/^!/, '');
+ }
+
+ _addId(which) {
+ const input = this._root.querySelector(
+ which === 'blocklist' ? '[data-blocklist-input]' : '[data-priority-input]',
+ );
+ const id = this._normalizeId(input.value);
+ if (!RelayFiltersCard.NODE_ID_RE.test(id)) {
+ this._setStatus('error', 'Node ID must be 8 hex characters (no ! prefix).');
+ return;
+ }
+ const list = which === 'blocklist' ? this._blocklist : this._priority;
+ if (!list.includes(id)) list.push(id);
+ input.value = '';
+ this._paintLists();
+ }
+
+ _removeId(which, id) {
+ if (which === 'blocklist') {
+ this._blocklist = this._blocklist.filter((x) => x !== id);
+ } else {
+ this._priority = this._priority.filter((x) => x !== id);
+ }
+ this._paintLists();
+ }
+
+ _paintLists() {
+ this._blocklistEl.innerHTML = this._listHtml('blocklist', this._blocklist);
+ this._priorityEl.innerHTML = this._listHtml('priority', this._priority);
+ this._blocklistEl.querySelectorAll('[data-remove-id]').forEach((btn) => {
+ btn.addEventListener('click', () => {
+ this._removeId(btn.dataset.list, btn.dataset.id);
+ });
+ });
+ this._priorityEl.querySelectorAll('[data-remove-id]').forEach((btn) => {
+ btn.addEventListener('click', () => {
+ this._removeId(btn.dataset.list, btn.dataset.id);
+ });
+ });
+ }
+
+ _listHtml(which, ids) {
+ if (!ids.length) {
+ return 'None
';
+ }
+ return ids.map((id) => `
+
+ !${this._api.escape(id)}
+ ×
+
+ `).join('');
+ }
+
+ async _onSubmit(event) {
+ event.preventDefault();
+ const ttl = Number(this._root.querySelector('[data-dedup-ttl]').value);
+ if (!Number.isFinite(ttl) || ttl < 5 || ttl > 3600) {
+ this._setStatus('error', 'Dedup TTL must be between 5 and 3600 seconds.');
+ return;
+ }
+ this._setStatus('pending', 'Saving…');
+ const result = await this._api.put('/api/config/relay', {
+ blocklist: this._blocklist,
+ priority_list: this._priority,
+ dedup_ttl_seconds: ttl,
+ });
+ if (!result) {
+ this._setStatus('error', 'Save failed.');
+ return;
+ }
+ this._setStatus('success', 'Saved.');
+ if (result.restart_required) {
+ this._api.signalRestart('Relay settings updated.');
+ } else {
+ this._api.toast('Relay filters applied (no restart required).');
+ }
+ this._api.refresh();
+ }
+
+ _setStatus(kind, message) {
+ if (!this._statusEl) return;
+ this._statusEl.dataset.kind = kind;
+ this._statusEl.textContent = message;
+ }
+}
+
+window.RelayFiltersCard = RelayFiltersCard;
diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py
index 5f0ae3a..c034f91 100644
--- a/src/api/routes/config_enrichment.py
+++ b/src/api/routes/config_enrichment.py
@@ -52,24 +52,32 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict:
"burst_size": relay.burst_size,
"min_relay_rssi": relay.min_relay_rssi,
"max_relay_rssi": relay.max_relay_rssi,
+ "blocklist": list(relay.blocklist or []),
+ "priority_list": list(relay.priority_list or []),
+ "dedup_ttl_seconds": relay.dedup_ttl_seconds,
}
base["radio_advanced"] = {
"spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds,
"sx1261_spi_path": radio.sx1261_spi_path or "",
+ "carrier_type": radio.carrier_type or "",
+ "gps_pps_enabled": radio.gps_pps_enabled,
+ "gps_pps_tty_path": radio.gps_pps_tty_path,
+ "gps_family": radio.gps_family,
+ "gps_pps_target_baud": radio.gps_pps_target_baud,
}
base["location"] = {
"source": location.source,
"gpsd_host": location.gpsd_host,
"gpsd_port": location.gpsd_port,
+ "uart_path": location.uart_path,
+ "uart_baud": location.uart_baud,
"update_interval_seconds": location.update_interval_seconds,
"min_fix_quality": location.min_fix_quality,
}
- pos = cfg.transmit.position
- if "transmit" in base:
- base["transmit"]["position"] = {
- "interval_minutes": pos.interval_minutes,
- "startup_delay_seconds": pos.startup_delay_seconds,
- "coordinate_source": pos.coordinate_source,
- "location_precision": pos.location_precision,
- }
+ sh = cfg.signal_health
+ base["signal_health"] = {
+ "green_rssi_floor": sh.green_rssi_floor,
+ "yellow_rssi_floor": sh.yellow_rssi_floor,
+ "min_packets_per_hour": sh.min_packets_per_hour,
+ }
return base
diff --git a/src/api/routes/system_config_routes.py b/src/api/routes/system_config_routes.py
index fa2a14f..97081e4 100644
--- a/src/api/routes/system_config_routes.py
+++ b/src/api/routes/system_config_routes.py
@@ -13,22 +13,30 @@
from src.api.auth.dependencies import require_admin
from src.api.auth.jwt_session import SessionClaims
from src.config import AppConfig, save_section_to_yaml
+from src.relay.node_id import validate_node_ids
+from src.relay.relay_manager import RelayManager
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/config", tags=["config"])
_config: AppConfig | None = None
+_relay_manager: RelayManager | None = None
-def init_routes(config: AppConfig) -> None:
- global _config
+def init_routes(
+ config: AppConfig,
+ relay_manager: RelayManager | None = None,
+) -> None:
+ global _config, _relay_manager
_config = config
+ _relay_manager = relay_manager
def reset_routes() -> None:
- global _config
+ global _config, _relay_manager
_config = None
+ _relay_manager = None
class StorageUpdate(BaseModel):
@@ -51,6 +59,9 @@ class RelayUpdate(BaseModel):
burst_size: Optional[int] = Field(None, ge=1, le=50)
min_relay_rssi: Optional[float] = Field(None, ge=-150, le=0)
max_relay_rssi: Optional[float] = Field(None, ge=-150, le=0)
+ blocklist: Optional[list[str]] = None
+ priority_list: Optional[list[str]] = None
+ dedup_ttl_seconds: Optional[int] = Field(None, ge=5, le=3600)
class RadioAdvancedUpdate(BaseModel):
@@ -195,6 +206,26 @@ async def update_relay(
relay.max_relay_rssi = req.max_relay_rssi
updates["max_relay_rssi"] = req.max_relay_rssi
+ filter_reload = False
+ if req.blocklist is not None:
+ try:
+ relay.blocklist = validate_node_ids(req.blocklist)
+ except ValueError as exc:
+ raise HTTPException(400, str(exc)) from exc
+ updates["blocklist"] = relay.blocklist
+ filter_reload = True
+ if req.priority_list is not None:
+ try:
+ relay.priority_list = validate_node_ids(req.priority_list)
+ except ValueError as exc:
+ raise HTTPException(400, str(exc)) from exc
+ updates["priority_list"] = relay.priority_list
+ filter_reload = True
+ if req.dedup_ttl_seconds is not None:
+ relay.dedup_ttl_seconds = req.dedup_ttl_seconds
+ updates["dedup_ttl_seconds"] = req.dedup_ttl_seconds
+ filter_reload = True
+
if not updates:
return {"saved": False, "restart_required": False}
@@ -206,6 +237,13 @@ async def update_relay(
except PermissionError as exc:
raise HTTPException(403, str(exc)) from exc
+ if filter_reload and _relay_manager is not None:
+ _relay_manager.reload_filters(
+ blocklist=relay.blocklist,
+ priority_list=relay.priority_list,
+ dedup_ttl_seconds=relay.dedup_ttl_seconds,
+ )
+
return {"saved": True, "restart_required": restart_needed, "updates": updates}
diff --git a/src/api/server.py b/src/api/server.py
index 3bd4fb0..6609f73 100644
--- a/src/api/server.py
+++ b/src/api/server.py
@@ -41,6 +41,7 @@
dangerous_routes,
device,
device_config_routes,
+ gps_pps_status,
gps_status,
identity_routes,
messages,
@@ -76,9 +77,6 @@
NodeInfoBroadcaster,
clamp_interval_minutes,
)
-from src.transmit.position_broadcaster import PositionBroadcaster
-from src.transmit.telemetry_broadcaster import TelemetryBroadcaster
-from src.transmit.meshtastic_inbound_handler import MeshtasticInboundHandler
from src.transmit.tx_service import TxService
from src.version import __version__
@@ -89,8 +87,6 @@
pipeline: PipelineCoordinator | None = None
upstream: UpstreamClient | None = None
nodeinfo_broadcaster: NodeInfoBroadcaster | None = None
-telemetry_broadcaster: TelemetryBroadcaster | None = None
-position_broadcaster: PositionBroadcaster | None = None
noise_floor_tracker = NoiseFloorTracker()
_noise_floor_emitter_task = None
_spectral_scan_service: SpectralScanService | None = None
@@ -131,7 +127,6 @@ def create_app(config: AppConfig | None = None) -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
global pipeline, upstream, nodeinfo_broadcaster
- global telemetry_broadcaster, position_broadcaster
warn_if_stale_so_files()
validate_activation(config)
identity = DeviceIdentity(
@@ -150,12 +145,20 @@ async def lifespan(app: FastAPI):
pipeline.on_packet(lambda pkt: print_packet(pkt))
pipeline.on_packet(public_radar_routes.public_radar_packet_callback)
+ # Mirror live GPS fixes from the location source into DeviceIdentity
+ # so /api/device (the local map) and the upstream registration
+ # payload (Meshradar fleet view) both see fresh coordinates.
+ def _sync_identity_position(lat, lon, alt):
+ identity.latitude = lat
+ identity.longitude = lon
+ if alt is not None:
+ identity.altitude = alt
+
+ pipeline.on_location_update(_sync_identity_position)
+
if config.transmit.enabled:
_inject_tx_gain_into_source(pipeline)
- _bootstrap_pki(config, pipeline)
- await _hydrate_public_keys(pipeline)
-
await pipeline.start()
message_repo = MessageRepository(pipeline.database)
@@ -170,9 +173,8 @@ async def lifespan(app: FastAPI):
_send_meshcore_advert(meshcore_tx_ref, mc_source)
)
_setup_message_interception(
- pipeline, message_repo, config, meshcore_tx_ref, tx_service
+ pipeline, message_repo, config, meshcore_tx_ref
)
- _setup_inbound_responder(pipeline, tx_service, config)
setup_meshcore_contact_enrichment(pipeline, meshcore_tx_ref)
if meshcore_tx_ref and meshcore_tx_ref.connected:
import asyncio
@@ -195,18 +197,6 @@ async def lifespan(app: FastAPI):
if nodeinfo_broadcaster is not None:
await nodeinfo_broadcaster.start()
- telemetry_broadcaster = _build_telemetry_broadcaster(
- config, tx_service, pipeline
- )
- if telemetry_broadcaster is not None:
- await telemetry_broadcaster.start()
-
- position_broadcaster = _build_position_broadcaster(
- config, tx_service, pipeline
- )
- if position_broadcaster is not None:
- await position_broadcaster.start()
-
_wire_native_relay(pipeline, tx_service)
global _noise_floor_emitter_task
@@ -239,10 +229,6 @@ async def lifespan(app: FastAPI):
pass
if nodeinfo_broadcaster is not None:
await nodeinfo_broadcaster.stop()
- if telemetry_broadcaster is not None:
- await telemetry_broadcaster.stop()
- if position_broadcaster is not None:
- await position_broadcaster.stop()
await upstream.stop()
await pipeline.stop()
session_manager.shutdown()
@@ -276,6 +262,7 @@ async def lifespan(app: FastAPI):
app.include_router(upstream_config_routes.router, dependencies=protected)
app.include_router(device_config_routes.router, dependencies=protected)
app.include_router(gps_status.router, dependencies=protected)
+ app.include_router(gps_pps_status.router, dependencies=protected)
app.include_router(system_config_routes.router, dependencies=protected)
app.include_router(meshcore_config_routes.router, dependencies=protected)
app.include_router(config_routes.router, dependencies=protected)
@@ -394,212 +381,6 @@ def _add_meshcore_usb_source(
)
-def _resolve_mesh_node_id(config: AppConfig) -> int | None:
- configured = config.transmit.node_id
- if configured is not None:
- return configured
- device_id = (config.device.device_id or "").strip()
- if not device_id:
- return None
- try:
- return TxService._derive_node_id(device_id)
- except RuntimeError:
- return None
-
-
-def _bootstrap_pki(config: AppConfig, coord: PipelineCoordinator) -> None:
- """Load PKI keypair and wire decoder identity before packet capture starts."""
- from src.identity.keypair import (
- KeypairStore,
- resolve_keypair_path,
- resolve_keypair_path_from_env,
- )
-
- if not hasattr(coord, "_crypto"):
- return
-
- coord._crypto.set_node_db_path(config.storage.database_path)
- our_node_id = _resolve_mesh_node_id(config)
- if our_node_id is not None:
- coord._router.meshtastic_decoder.configure_identity(our_node_id)
- logger.info(
- "Meshtastic PKI identity configured: 0x%08x", our_node_id
- )
-
- override = resolve_keypair_path_from_env()
- key_path = override or resolve_keypair_path(config.storage.database_path)
- try:
- keypair = KeypairStore(key_path).load_or_create()
- coord._crypto.set_keypair(keypair.private_key, keypair.public_key)
- logger.info("Meshtastic PKI keypair loaded from %s", key_path)
- except Exception:
- logger.exception("Failed to load Meshtastic PKI keypair")
-
-
-async def _hydrate_public_keys(coord: PipelineCoordinator) -> None:
- if not hasattr(coord, "_crypto"):
- return
- await coord.database.connect()
- rows = await coord.database.fetch_all(
- """
- SELECT node_id, public_key FROM nodes
- WHERE public_key IS NOT NULL AND public_key != ''
- LIMIT 5000
- """
- )
- for row in rows:
- node_id = row.get("node_id")
- public_key = row.get("public_key")
- if not node_id or not public_key:
- continue
- try:
- coord._crypto.register_public_key(
- int(node_id, 16),
- bytes.fromhex(public_key),
- )
- except ValueError:
- continue
-
-
-def _telemetry_metrics_providers(
- tx_service: TxService,
- coord: PipelineCoordinator,
- service_started: float,
- *,
- noise_floor_tracker=None,
- relay_manager=None,
-):
- """Shared device_metrics / local_stats snapshots for TX paths."""
- import time
-
- duty = getattr(tx_service, "_duty", None)
- stats = getattr(coord, "stats_reporter", None)
-
- def device_metrics() -> dict:
- air_util = duty.current_usage_percent() if duty else 0.0
- return {
- "battery_level": 101,
- "voltage": 5.0,
- "channel_utilization": 0.0,
- "air_util_tx": round(air_util, 2),
- "uptime_seconds": int(time.monotonic() - service_started),
- }
-
- def local_stats() -> dict:
- dm = device_metrics()
- noise_floor = None
- if noise_floor_tracker is not None:
- snap = noise_floor_tracker.snapshot()
- value_dbm = snap.get("value_dbm")
- if value_dbm is not None:
- noise_floor = int(round(value_dbm))
- relayed = 0
- if relay_manager is not None:
- relayed = int(relay_manager.get_stats().get("relayed", 0))
- return {
- "uptime_seconds": dm["uptime_seconds"],
- "channel_utilization": dm["channel_utilization"],
- "air_util_tx": dm["air_util_tx"],
- "num_packets_tx": 0,
- "num_packets_rx": stats.total_packets if stats else 0,
- "num_packets_rx_bad": 0,
- "num_online_nodes": 0,
- "num_total_nodes": 0,
- "num_tx_relay": relayed,
- "noise_floor": noise_floor,
- }
-
- return device_metrics, local_stats
-
-
-def _setup_inbound_responder(
- coord: PipelineCoordinator,
- tx_service: TxService | None,
- config: AppConfig,
-) -> None:
- if tx_service is None or not tx_service.meshtastic_enabled:
- return
-
- import time
-
- our_node_id = tx_service.source_node_id
- our_node_hex = f"{our_node_id:08x}"
- coord._router.meshtastic_decoder.configure_identity(our_node_id)
- coord.relay_manager.set_local_node_id(our_node_hex)
- device_fn, local_fn = _telemetry_metrics_providers(
- tx_service,
- coord,
- time.monotonic(),
- noise_floor_tracker=noise_floor_tracker,
- relay_manager=coord.relay_manager,
- )
- tx_service.set_telemetry_reply_providers(device_fn, local_fn)
- handler = MeshtasticInboundHandler(tx_service, our_node_id)
-
- def on_packet(packet: Packet) -> None:
- import asyncio
-
- try:
- asyncio.get_running_loop().create_task(handler.handle(packet))
- except RuntimeError:
- pass
-
- coord.on_packet(on_packet)
-
-
-def _build_telemetry_broadcaster(
- config: AppConfig,
- tx_service: TxService | None,
- coord: PipelineCoordinator,
-) -> TelemetryBroadcaster | None:
- if tx_service is None or not tx_service.meshtastic_enabled:
- return None
- telem = config.transmit.telemetry
- if clamp_interval_minutes(telem.interval_minutes) == 0:
- return None
-
- import time
-
- service_started = time.monotonic()
- device_fn, _local_fn = _telemetry_metrics_providers(
- tx_service,
- coord,
- service_started,
- noise_floor_tracker=noise_floor_tracker,
- relay_manager=coord.relay_manager,
- )
-
- return TelemetryBroadcaster(
- tx_service,
- interval_minutes=telem.interval_minutes,
- startup_delay_seconds=telem.startup_delay_seconds,
- metrics_provider=device_fn,
- )
-
-
-def _build_position_broadcaster(
- config: AppConfig,
- tx_service: TxService | None,
- coord: PipelineCoordinator,
-) -> PositionBroadcaster | None:
- if tx_service is None or not tx_service.meshtastic_enabled:
- return None
- pos_cfg = config.transmit.position
- if clamp_interval_minutes(pos_cfg.interval_minutes) == 0:
- return None
-
- from src.transmit.mesh_position_resolver import MeshPositionResolver
-
- resolver = MeshPositionResolver(config, coord.location_source)
-
- return PositionBroadcaster(
- tx_service,
- interval_minutes=pos_cfg.interval_minutes,
- startup_delay_seconds=pos_cfg.startup_delay_seconds,
- coords_provider=resolver.resolve,
- )
-
-
def _build_tx_service(
config: AppConfig, coord: PipelineCoordinator
) -> TxService | None:
@@ -690,7 +471,6 @@ async def _native_relay(packet):
)
relay.set_transmit_function(_native_relay)
- relay.set_local_node_id(f"{tx_service.source_node_id:08x}")
logger.info(
"Relay backend: native onboard SX1302 (identity-preserving)"
)
@@ -925,7 +705,6 @@ def _setup_message_interception(
message_repo: MessageRepository,
config: AppConfig,
meshcore_tx=None,
- tx_service: TxService | None = None,
) -> None:
"""Register a callback to intercept TEXT messages for storage.
@@ -933,14 +712,9 @@ def _setup_message_interception(
conversations. DMs between other nodes are tagged as 'overheard'.
MeshCore DMs use destination_id='self' to indicate they're for us.
"""
- from src.api.message_name_resolver import MessageNameResolver
from src.models.packet import PacketType, Protocol
- name_resolver = MessageNameResolver(coord.node_repo, meshcore_tx)
-
our_node_id = config.transmit.node_id
- if our_node_id is None and tx_service is not None:
- our_node_id = tx_service.source_node_id
our_node_hex = f"{our_node_id:08x}" if our_node_id else ""
mc_name_cache: dict[str, str] = {}
@@ -1080,17 +854,19 @@ async def _save_and_notify() -> None:
if (
packet.protocol == Protocol.MESHTASTIC
and direction == "received"
+ and not node_name
):
- sender_lookup = (
- (packet.source_id or "")
- if is_broadcast
- else node_id
- )
- node_name = await name_resolver.resolve(
- sender_lookup,
- packet.protocol.value,
- node_name or packet.source_id or "",
- )
+ src_id = (packet.source_id or "").lower()
+ if src_id:
+ row = await coord.node_repo._db.fetch_one(
+ "SELECT long_name, short_name FROM nodes "
+ "WHERE LOWER(node_id) = ? AND protocol = 'meshtastic'",
+ (src_id,),
+ )
+ if row:
+ node_name = row["long_name"] or row["short_name"] or ""
+ if not node_name:
+ node_name = packet.source_id or ""
if is_broadcast and packet.protocol == Protocol.MESHCORE:
node_name = (packet.decoded_payload or {}).get("long_name", "")
@@ -1170,20 +946,10 @@ async def _save_and_notify() -> None:
"snr": round(row["snr"], 1) if row and row["snr"] else None,
})
else:
- ws_name_lookup = (
- (packet.source_id or "")
- if is_broadcast and packet.protocol == Protocol.MESHTASTIC
- else node_id
- )
- display_name = await name_resolver.resolve(
- ws_name_lookup,
- packet.protocol.value,
- node_name,
- )
ws_payload = {
"text": text,
"node_id": node_id,
- "node_name": display_name,
+ "node_name": node_name,
"protocol": packet.protocol.value,
"direction": direction,
"packet_id": packet.packet_id or "",
@@ -1247,7 +1013,6 @@ def _init_routes(
node_repo=coord.node_repo,
meshcore_tx=meshcore_tx,
config=config,
- packet_repo=coord.packet_repo,
)
crypto = coord._crypto if hasattr(coord, "_crypto") else None
@@ -1265,7 +1030,13 @@ def _init_routes(
upstream_config_routes.init_routes(config=config)
device_config_routes.init_routes(config=config, identity=identity)
gps_status.init_routes(location_source=coord.location_source)
- system_config_routes.init_routes(config=config)
+ gps_pps_status.init_routes(
+ get_wrapper=lambda: _get_concentrator_wrapper(coord),
+ )
+ system_config_routes.init_routes(
+ config=config,
+ relay_manager=coord.relay_manager,
+ )
meshcore_config_routes.init_routes(config=config, tx_service=tx_service)
diff --git a/src/config.py b/src/config.py
index 4622e2b..df8efd3 100644
--- a/src/config.py
+++ b/src/config.py
@@ -1,7 +1,6 @@
from __future__ import annotations
import dataclasses
-import logging
import os
import sys
from dataclasses import dataclass, field
@@ -12,8 +11,6 @@
from src.version import __version__
-logger = logging.getLogger(__name__)
-
# Band-start frequencies (MHz) for the Meshtastic slot formula
# freq = freqStart + BW/2000 + (slot-1) * BW/1000
@@ -60,14 +57,26 @@ class RadioConfig:
# (default; spectral scan stays unavailable, packet-derived
# noise floor remains in use).
#
- # On RAK2287 / RAK5146 / SenseCap M1 this is typically
- # ``/dev/spidev0.1`` (separate from the SX1302's
- # ``/dev/spidev0.0``). Some carriers daisy-chain the SX1261
- # behind the SX1302's SPI router and want this set to the same
- # path as the SX1302 SPI device. Wrong path = HAL refuses to
- # ``lgw_start`` after our config attempt, so we ship empty by
- # default and ask interested users to opt in explicitly.
+ # On RAK2287 / RAK5146 / SenseCap M1 the SX1261 is behind the
+ # concentrator's internal SPI router, not on a Pi chip-select —
+ # leave empty. Only the Semtech reference kit and custom carriers
+ # with a dedicated SX1261 CE line need a path (e.g.
+ # ``/dev/spidev0.1``). Wrong path can brick ``lgw_start`` on
+ # boards without Pi-visible SX1261; ``carrier_type`` guard clears
+ # mistaken values on RAK/SenseCap.
sx1261_spi_path: str = ""
+ # Carrier board signature from setup wizard I2C probe (``rak``,
+ # ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI
+ # paths that brick ``lgw_start()`` on RAK/SenseCap concentrators.
+ carrier_type: str = ""
+ # HAL GPS/PPS: align concentrator packet timestamps with GPS time via
+ # libloragw ``lgw_gps_*`` + ``sx1302_gps_enable``. Requires a HAL build
+ # that includes loragw_gps.c. Exclusive with ``location.source: uart`` on
+ # the same TTY (only one process may open the GPS serial port).
+ gps_pps_enabled: bool = False
+ gps_pps_tty_path: str = "/dev/ttyAMA0"
+ gps_family: str = "ubx7"
+ gps_pps_target_baud: int = 0
@dataclass
@@ -150,29 +159,9 @@ class RelayConfig:
burst_size: int = 5
min_relay_rssi: float = -110.0
max_relay_rssi: float = -50.0
-
-
-@dataclass
-class TelemetryConfig:
- """Periodic device_metrics telemetry broadcast settings."""
-
- interval_minutes: int = 30
- startup_delay_seconds: int = 120
-
-
-@dataclass
-class PositionConfig:
- """Periodic POSITION broadcast settings."""
-
- interval_minutes: int = 15
- startup_delay_seconds: int = 180
- # Coordinates sent on the public LoRa mesh (Meshtastic POSITION packets).
- # ``static`` uses ``device.{latitude,longitude,altitude}`` (wizard pin).
- # ``live`` reads the active ``LocationSource`` (gpsd/uart) when a fix exists.
- coordinate_source: str = "static"
- # Privacy when ``coordinate_source`` is ``live``: exact, approximate
- # (~1.1 km rounding), or none (skip position on mesh). Ignored for static.
- location_precision: str = "approximate"
+ blocklist: list[str] = field(default_factory=list)
+ priority_list: list[str] = field(default_factory=list)
+ dedup_ttl_seconds: int = 300
@dataclass
@@ -184,8 +173,6 @@ class MqttConfig:
password: str = "large4cats"
topic_root: str = "msh"
region: str = "US"
- tls_enabled: bool = False
- tls_ca_cert: str = ""
# Optional ``!xxxxxxxx`` override; blank uses MD5 hash of device name.
gateway_id: Optional[str] = None
publish_channels: list[str] = field(default_factory=lambda: ["LongFast", "MeshCore"])
@@ -230,8 +217,6 @@ class TransmitConfig:
short_name: str = "MPNT"
hop_limit: int = 3
nodeinfo: NodeInfoConfig = field(default_factory=NodeInfoConfig)
- telemetry: TelemetryConfig = field(default_factory=TelemetryConfig)
- position: PositionConfig = field(default_factory=PositionConfig)
@dataclass
@@ -241,15 +226,12 @@ class LocationConfig:
``source`` values:
- ``"static"`` : use ``device.latitude/longitude/altitude`` from
``local.yaml``. Backward-compatible default.
- - ``"gpsd"`` : connect to a local or remote ``gpsd`` daemon for
- live fixes (skyplot, optional mesh POSITION).
- Does not change ``device.{lat,lon,alt}`` (Meshradar
- pin). Auto-installed by ``scripts/install.sh``.
- - ``"uart"`` : reserved for direct on-board UART NMEA reading
- (RAK Pi HAT GPS). Plumbing exists in
- ``src.hal.gps_reader`` but is not wired into
- the runtime yet; treated as ``static`` until
- the source is implemented.
+ - ``"gpsd"`` : connect to a local or remote ``gpsd`` daemon and
+ overwrite ``device.{lat,lon,alt}`` when fixes
+ arrive. Auto-installed by ``scripts/install.sh``.
+ - ``"uart"`` : read NMEA GGA from an on-board UART GPS (RAK Pi
+ HAT on ``/dev/ttyAMA0``). Uses
+ ``src.hal.gps_reader.GpsReader``.
``gpsd_host`` / ``gpsd_port`` default to gpsd's well-known
localhost socket. Override only when running gpsd on a peer
@@ -269,6 +251,8 @@ class LocationConfig:
source: str = "static"
gpsd_host: str = "127.0.0.1"
gpsd_port: int = 2947
+ uart_path: str = "/dev/ttyAMA0"
+ uart_baud: int = 9600
update_interval_seconds: int = 5
min_fix_quality: int = 1
@@ -304,6 +288,15 @@ class WebAuthConfig:
session_version: int = 1
+@dataclass
+class SignalHealthConfig:
+ """Thresholds for node-card RSSI health badges and sparklines."""
+
+ green_rssi_floor: float = -100
+ yellow_rssi_floor: float = -115
+ min_packets_per_hour: int = 5
+
+
@dataclass
class AppConfig:
radio: RadioConfig = field(default_factory=RadioConfig)
@@ -319,6 +312,7 @@ class AppConfig:
transmit: TransmitConfig = field(default_factory=TransmitConfig)
web_auth: WebAuthConfig = field(default_factory=WebAuthConfig)
location: LocationConfig = field(default_factory=LocationConfig)
+ signal_health: SignalHealthConfig = field(default_factory=SignalHealthConfig)
def _resolve_radio_frequency(radio: "RadioConfig") -> None:
@@ -356,25 +350,6 @@ def _merge_dataclass(instance, overrides: dict):
setattr(instance, key, value)
-def _collect_unknown_keys(instance, overrides: dict, prefix: str = "") -> list[str]:
- """Return dotted paths of override keys with no matching dataclass field.
-
- Mirrors the descent rules in :func:`_merge_dataclass`: it only recurses
- into a nested dataclass (e.g. ``transmit.nodeinfo``), so user-supplied
- mapping fields such as ``meshtastic.channel_keys`` are treated as opaque
- values rather than scanned for "unknown" keys.
- """
- unknown: list[str] = []
- for key, value in overrides.items():
- if not hasattr(instance, key):
- unknown.append(f"{prefix}{key}")
- continue
- current = getattr(instance, key)
- if dataclasses.is_dataclass(current) and isinstance(value, dict):
- unknown.extend(_collect_unknown_keys(current, value, f"{prefix}{key}."))
- return unknown
-
-
def _apply_yaml(cfg: AppConfig, path: Path) -> None:
"""Merge a single YAML file into an existing AppConfig."""
if not path.exists():
@@ -383,10 +358,6 @@ def _apply_yaml(cfg: AppConfig, path: Path) -> None:
with open(path, "r") as fh:
raw = yaml.safe_load(fh) or {}
- if not isinstance(raw, dict):
- logger.warning("Ignoring %s: top-level YAML is not a mapping.", path)
- return
-
section_map = {
"radio": cfg.radio,
"meshtastic": cfg.meshtastic,
@@ -401,28 +372,12 @@ def _apply_yaml(cfg: AppConfig, path: Path) -> None:
"transmit": cfg.transmit,
"web_auth": cfg.web_auth,
"location": cfg.location,
+ "signal_health": cfg.signal_health,
}
- unknown_keys: list[str] = []
- for section_name, section_value in raw.items():
- section_instance = section_map.get(section_name)
- if section_instance is None:
- unknown_keys.append(section_name)
- continue
- _merge_dataclass(section_instance, section_value)
- if isinstance(section_value, dict):
- unknown_keys.extend(
- _collect_unknown_keys(section_instance, section_value, f"{section_name}.")
- )
-
- if unknown_keys:
- logger.warning(
- "Ignoring %d unknown config key(s) in %s: %s. "
- "These were not applied -- check for typos against the documented schema.",
- len(unknown_keys),
- path,
- ", ".join(sorted(unknown_keys)),
- )
+ for section_name, section_instance in section_map.items():
+ if section_name in raw:
+ _merge_dataclass(section_instance, raw[section_name])
_VALID_CONFIG_EXTENSIONS = {".yaml", ".yml"}
@@ -449,10 +404,28 @@ def load_config(config_path: Optional[str] = None) -> AppConfig:
local = config_path or os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml")
_apply_yaml(cfg, _validated_config_path(local))
_resolve_radio_frequency(cfg.radio)
+ validate_config_consistency(cfg)
return cfg
+def validate_config_consistency(config: AppConfig) -> None:
+ """Reject impossible radio/location combinations before hardware starts."""
+ if not config.radio.gps_pps_enabled:
+ return
+ if config.location.source != "uart":
+ return
+ pps_tty = os.path.normpath(config.radio.gps_pps_tty_path)
+ uart_tty = os.path.normpath(config.location.uart_path)
+ if pps_tty == uart_tty:
+ raise ValueError(
+ "radio.gps_pps_enabled and location.source=uart cannot share "
+ f"the same serial device ({pps_tty!r}). Use location.source=gpsd "
+ "or static for dashboard coordinates while PPS owns the HAT UART, "
+ "or disable gps_pps_enabled when using UART for location only."
+ )
+
+
def _get_local_yaml_path() -> Path:
"""Resolve the local.yaml path used for user overrides."""
raw = os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml")
diff --git a/src/coordinator.py b/src/coordinator.py
index 665e9ca..bad773f 100644
--- a/src/coordinator.py
+++ b/src/coordinator.py
@@ -49,6 +49,9 @@ def __init__(self, config: AppConfig):
burst_size=relay_cfg.burst_size,
min_relay_rssi=relay_cfg.min_relay_rssi,
max_relay_rssi=relay_cfg.max_relay_rssi,
+ dedup_ttl_seconds=relay_cfg.dedup_ttl_seconds,
+ blocklist=relay_cfg.blocklist,
+ priority_list=relay_cfg.priority_list,
)
self._transmitter: Optional[MeshtasticTransmitter] = None
self._mqtt: Optional[MqttPublisher] = None
@@ -70,9 +73,6 @@ def __init__(self, config: AppConfig):
self._pipeline_task: Optional[asyncio.Task] = None
self._cleanup_task: Optional[asyncio.Task] = None
self._location_refresh_task: Optional[asyncio.Task] = None
- self._last_live_lat: Optional[float] = None
- self._last_live_lon: Optional[float] = None
- self._last_live_alt: Optional[float] = None
@property
def database(self) -> DatabaseManager:
@@ -121,10 +121,13 @@ def on_location_update(
self,
callback: Callable[[Optional[float], Optional[float], Optional[float]], None],
) -> None:
- """Register a callback fired when a live GPS source publishes a new fix.
+ """Register a callback fired when the live location source publishes
+ a fresh position fix.
- ``device.{latitude,longitude,altitude}`` (the Meshradar pin) is never
- mutated. Callbacks receive live fix coordinates only.
+ Called with ``(latitude, longitude, altitude_m)``. Fires only when
+ the new fix actually differs from the cached device position, so
+ listeners can rely on it as a real change signal instead of
+ polling.
"""
self._on_location_callbacks.append(callback)
@@ -202,9 +205,16 @@ async def _cleanup_loop(self) -> None:
async def _location_refresh_loop(self) -> None:
"""Periodically pull the latest fix from the active location source.
- Live sources (gpsd/uart) notify listeners when the fix changes.
- ``device.{latitude,longitude,altitude}`` stays the registered Meshradar
- pin and is not overwritten here.
+ When the source publishes a real position fix, write it back into
+ ``self._config.device.latitude/longitude/altitude`` so every
+ downstream consumer (farthest-direct calculation, heartbeat,
+ node-position math, dashboard map placement) sees the live
+ coordinate without further plumbing.
+
+ Static sources also flow through this loop: their ``get_status``
+ returns the same configured coordinates every tick, so it's a
+ cheap no-op. We deliberately do not write back to ``local.yaml``
+ on every tick: in-memory updates suffice.
"""
interval = max(1, self._config.location.update_interval_seconds)
try:
@@ -217,33 +227,28 @@ async def _location_refresh_loop(self) -> None:
logger.exception("Location refresh loop error")
def _apply_latest_location_fix(self) -> None:
- if self._location_source.source_name == "static":
- return
-
status = self._location_source.get_status()
if not status.available or status.fix is None:
return
if not status.fix.has_position:
return
- lat = status.fix.latitude
- lon = status.fix.longitude
- alt = status.fix.altitude_m
-
+ device = self._config.device
if (
- self._last_live_lat == lat
- and self._last_live_lon == lon
- and self._last_live_alt == alt
+ device.latitude == status.fix.latitude
+ and device.longitude == status.fix.longitude
+ and device.altitude == status.fix.altitude_m
):
- return
+ return # no change
- self._last_live_lat = lat
- self._last_live_lon = lon
- self._last_live_alt = alt
+ device.latitude = status.fix.latitude
+ device.longitude = status.fix.longitude
+ if status.fix.altitude_m is not None:
+ device.altitude = status.fix.altitude_m
for cb in self._on_location_callbacks:
try:
- cb(lat, lon, alt)
+ cb(device.latitude, device.longitude, device.altitude)
except Exception:
logger.exception("Location update callback failed")
@@ -270,10 +275,10 @@ async def _process_capture(self, raw: RawCapture) -> None:
packet.capture_source = raw.capture_source
await self._store_packet(packet)
- self._notify_callbacks(packet)
await self._relay.process_packet(packet)
self._publish_mqtt(packet)
self._record_stats(packet)
+ self._notify_callbacks(packet)
@staticmethod
def _adapt_meshcore_usb(raw: RawCapture) -> Optional[Packet]:
@@ -299,22 +304,6 @@ async def _update_node(self, packet: Packet) -> None:
await self._node_repo.upsert(node_update)
self._last_node_update[node_update.node_id] = node_update
self._stats_reporter.record_node(node_update.to_dict())
- if node_update.public_key:
- try:
- node_int = int(node_update.node_id, 16)
- new_key = bytes.fromhex(node_update.public_key)
- prior = self._crypto.lookup_public_key(node_int)
- self._crypto.register_public_key(node_int, new_key)
- if prior != new_key:
- logger.info(
- "Updated peer PKI public_key for %s",
- node_update.node_id,
- )
- except ValueError:
- logger.debug(
- "Ignoring invalid public_key for node %s",
- node_update.node_id,
- )
elif packet.source_id:
await self._node_repo.increment_packet_count(packet.source_id)
@@ -466,8 +455,9 @@ def _setup_location_banner(self) -> None:
detail = f"gpsd @ {host}:{port}"
color = GREEN
elif source_name == "uart":
- detail = "on-board UART (placeholder, falls back to static)"
- color = DIM
+ path = self._config.location.uart_path
+ detail = f"UART NMEA @ {path}"
+ color = GREEN
else:
detail = "static config coordinates"
color = DIM
diff --git a/src/relay/dedup_filter.py b/src/relay/dedup_filter.py
index 9df04fe..68b0b44 100644
--- a/src/relay/dedup_filter.py
+++ b/src/relay/dedup_filter.py
@@ -54,5 +54,13 @@ def _enforce_max_size(self) -> None:
def size(self) -> int:
return len(self._seen)
+ @property
+ def ttl_seconds(self) -> float:
+ return self._ttl
+
+ def set_ttl(self, ttl_seconds: float) -> None:
+ """Update duplicate window without clearing the cache."""
+ self._ttl = max(1.0, float(ttl_seconds))
+
def clear(self) -> None:
self._seen.clear()
diff --git a/src/relay/node_id.py b/src/relay/node_id.py
new file mode 100644
index 0000000..f7b5be1
--- /dev/null
+++ b/src/relay/node_id.py
@@ -0,0 +1,28 @@
+"""Normalize and validate Meshtastic node IDs for relay filter lists."""
+
+from __future__ import annotations
+
+import re
+
+_NODE_ID_RE = re.compile(r"^[0-9a-f]{8}$", re.IGNORECASE)
+
+
+def normalize_node_id(node_id: str) -> str:
+ """Strip optional ``!`` prefix and lowercase for consistent matching."""
+ return (node_id or "").strip().lower().lstrip("!")
+
+
+def validate_node_ids(node_ids: list[str]) -> list[str]:
+ """Return normalized unique node IDs or raise ValueError."""
+ seen: set[str] = set()
+ normalized: list[str] = []
+ for raw in node_ids:
+ nid = normalize_node_id(raw)
+ if not _NODE_ID_RE.match(nid):
+ raise ValueError(
+ f"Invalid node ID {raw!r} — expected 8 hex chars (no ! prefix)"
+ )
+ if nid not in seen:
+ seen.add(nid)
+ normalized.append(nid)
+ return normalized
diff --git a/src/relay/rate_limiter.py b/src/relay/rate_limiter.py
index 41e6cc7..c109c23 100644
--- a/src/relay/rate_limiter.py
+++ b/src/relay/rate_limiter.py
@@ -41,6 +41,17 @@ def allow(self) -> bool:
self._timestamps.append(now)
return True
+ def allow_priority(self) -> bool:
+ """Allow relay while enforcing per-minute cap but skipping burst gate."""
+ now = time.monotonic()
+ self._prune(now)
+
+ if len(self._timestamps) >= self._max_per_minute:
+ return False
+
+ self._timestamps.append(now)
+ return True
+
def _prune(self, now: float) -> None:
cutoff = now - self._window_seconds
while self._timestamps and self._timestamps[0] < cutoff:
diff --git a/src/relay/relay_manager.py b/src/relay/relay_manager.py
index 835fc06..079067d 100644
--- a/src/relay/relay_manager.py
+++ b/src/relay/relay_manager.py
@@ -6,6 +6,7 @@
from src.models.packet import Packet, PacketType
from src.relay.dedup_filter import DeduplicationFilter
+from src.relay.node_id import normalize_node_id, validate_node_ids
from src.relay.rate_limiter import RateLimiter
logger = logging.getLogger(__name__)
@@ -38,7 +39,6 @@ class RelayManager:
- Hop filtering: don't relay packets with 0 hops remaining
- Type filtering: only relay useful packet types
- Signal filtering: don't relay strong signals (nearby nodes)
- - Destination filtering: never relay unicast packets addressed to us
The actual transmission is handled by an external radio
(SX1262 via meshtastic-python serial interface).
@@ -51,22 +51,22 @@ def __init__(
min_relay_rssi: float = -110.0,
max_relay_rssi: float = -50.0,
enabled: bool = False,
+ dedup_ttl_seconds: float = 300.0,
+ blocklist: list[str] | None = None,
+ priority_list: list[str] | None = None,
):
- self._dedup = DeduplicationFilter()
+ self._dedup = DeduplicationFilter(ttl_seconds=dedup_ttl_seconds)
self._limiter = RateLimiter(max_relay_per_minute, burst_size)
self._min_rssi = min_relay_rssi
self._max_rssi = max_relay_rssi
self._enabled = enabled
- self._local_node_hex: str | None = None
+ self._blocklist = set(validate_node_ids(blocklist or []))
+ self._priority_list = set(validate_node_ids(priority_list or []))
self._relay_count = 0
self._rejected_count = 0
self._rejection_reasons: dict[str, int] = {}
self._transmit_fn: Optional[callable] = None
- def set_local_node_id(self, node_hex: str) -> None:
- """Skip relay for unicast packets addressed to this Meshpoint."""
- self._local_node_hex = node_hex.lower()
-
@property
def enabled(self) -> bool:
return self._enabled
@@ -80,25 +80,42 @@ def set_transmit_function(self, fn: callable) -> None:
"""Register the function used to transmit relay packets."""
self._transmit_fn = fn
+ def reload_filters(
+ self,
+ *,
+ blocklist: list[str] | None = None,
+ priority_list: list[str] | None = None,
+ dedup_ttl_seconds: float | None = None,
+ ) -> None:
+ """Apply relay filter changes without a full service restart."""
+ if blocklist is not None:
+ self._blocklist = set(validate_node_ids(blocklist))
+ if priority_list is not None:
+ self._priority_list = set(validate_node_ids(priority_list))
+ if dedup_ttl_seconds is not None:
+ self._dedup.set_ttl(dedup_ttl_seconds)
+ logger.info(
+ "Relay filters reloaded (blocklist=%d, priority=%d, dedup_ttl=%.0fs)",
+ len(self._blocklist),
+ len(self._priority_list),
+ self._dedup.ttl_seconds,
+ )
+
def evaluate(self, packet: Packet) -> RelayDecision:
"""Decide whether a captured packet should be relayed."""
if not self._enabled:
return RelayDecision(False, "relay_disabled")
+ source_id = normalize_node_id(packet.source_id)
+ if source_id in self._blocklist:
+ return RelayDecision(False, "blocklisted")
+
if self._dedup.is_duplicate(packet.source_id, packet.packet_id):
return RelayDecision(False, "duplicate")
if packet.hop_limit <= 0:
return RelayDecision(False, "no_hops_remaining")
- dest = (packet.destination_id or "").lower()
- if (
- self._local_node_hex
- and dest == self._local_node_hex
- and dest not in (BROADCAST_ADDR_MESHTASTIC, BROADCAST_ADDR_MESHCORE)
- ):
- return RelayDecision(False, "dest_local")
-
if packet.packet_type not in RELAY_WORTHY_TYPES:
return RelayDecision(False, "non_relayable_type")
@@ -108,6 +125,10 @@ def evaluate(self, packet: Packet) -> RelayDecision:
if packet.signal.rssi < self._min_rssi:
return RelayDecision(False, "signal_too_weak")
+ if source_id in self._priority_list:
+ if self._limiter.allow_priority():
+ return RelayDecision(True, "approved_priority")
+
if not self._limiter.allow():
return RelayDecision(False, "rate_limited")
diff --git a/tests/test_relay_filters.py b/tests/test_relay_filters.py
new file mode 100644
index 0000000..0b208e2
--- /dev/null
+++ b/tests/test_relay_filters.py
@@ -0,0 +1,179 @@
+"""Relay blocklist, priority list, and dedup TTL controls."""
+
+from __future__ import annotations
+
+import unittest
+from unittest.mock import MagicMock, patch
+
+from src.config import RelayConfig
+from src.models.packet import Packet, PacketType, Protocol
+from src.models.signal import SignalMetrics
+from src.relay.dedup_filter import DeduplicationFilter
+from src.relay.node_id import validate_node_ids
+from src.relay.rate_limiter import RateLimiter
+from src.relay.relay_manager import RelayManager
+
+
+def _relay_packet(
+ source_id: str = "a3f2b1c0",
+ packet_id: str = "pkt001",
+) -> Packet:
+ return Packet(
+ packet_id=packet_id,
+ source_id=source_id,
+ destination_id="ffffffff",
+ protocol=Protocol.MESHTASTIC,
+ packet_type=PacketType.TEXT,
+ hop_limit=2,
+ hop_start=3,
+ signal=SignalMetrics(
+ rssi=-95.0,
+ snr=5.0,
+ frequency_mhz=906.875,
+ spreading_factor=11,
+ bandwidth_khz=250.0,
+ ),
+ )
+
+
+class TestRelayManagerFilters(unittest.TestCase):
+ def test_blocklist_rejects_before_dedup(self):
+ manager = RelayManager(
+ enabled=True,
+ blocklist=["a3f2b1c0"],
+ )
+ pkt = _relay_packet(source_id="a3f2b1c0")
+ decision = manager.evaluate(pkt)
+ self.assertFalse(decision.should_relay)
+ self.assertEqual(decision.reason, "blocklisted")
+
+ def test_priority_bypasses_burst_gate(self):
+ manager = RelayManager(
+ enabled=True,
+ max_relay_per_minute=10,
+ burst_size=1,
+ priority_list=["deadbeef"],
+ )
+ first = manager.evaluate(_relay_packet(source_id="11111111", packet_id="p1"))
+ second = manager.evaluate(_relay_packet(source_id="22222222", packet_id="p2"))
+ self.assertTrue(first.should_relay)
+ self.assertFalse(second.should_relay)
+ self.assertEqual(second.reason, "rate_limited")
+
+ priority = manager.evaluate(
+ _relay_packet(source_id="deadbeef", packet_id="p3"),
+ )
+ self.assertTrue(priority.should_relay)
+ self.assertEqual(priority.reason, "approved_priority")
+
+ def test_reload_filters_updates_runtime_state(self):
+ manager = RelayManager(enabled=True, dedup_ttl_seconds=300)
+ manager.reload_filters(
+ blocklist=["abcdef01"],
+ priority_list=["12345678"],
+ dedup_ttl_seconds=60,
+ )
+ self.assertEqual(manager._dedup.ttl_seconds, 60)
+ pkt = _relay_packet(source_id="abcdef01")
+ self.assertEqual(manager.evaluate(pkt).reason, "blocklisted")
+
+ def test_dedup_ttl_is_configurable(self):
+ dedup = DeduplicationFilter(ttl_seconds=10.0)
+ dedup.set_ttl(600)
+ self.assertEqual(dedup.ttl_seconds, 600)
+
+
+class TestRateLimiterPriority(unittest.TestCase):
+ def test_allow_priority_skips_burst_only(self):
+ limiter = RateLimiter(max_per_minute=5, burst_size=1)
+ self.assertTrue(limiter.allow())
+ self.assertFalse(limiter.allow())
+ self.assertTrue(limiter.allow_priority())
+
+
+class TestValidateNodeIds(unittest.TestCase):
+ def test_accepts_normalized_ids(self):
+ ids = validate_node_ids(["!A3F2B1C0", "deadbeef"])
+ self.assertEqual(ids, ["a3f2b1c0", "deadbeef"])
+
+ def test_rejects_invalid_id(self):
+ with self.assertRaises(ValueError):
+ validate_node_ids(["not-a-node"])
+
+
+class TestRelayConfigApi(unittest.TestCase):
+ def setUp(self):
+ try:
+ from fastapi import FastAPI
+ from fastapi.testclient import TestClient
+ from src.api.auth.dependencies import require_admin
+ from src.api.auth.jwt_session import ROLE_ADMIN, SessionClaims
+ from src.api.routes import system_config_routes as relay_cfg_module
+ except ImportError as exc:
+ raise unittest.SkipTest(f"API test deps unavailable: {exc}") from exc
+
+ def _admin_claims() -> SessionClaims:
+ return SessionClaims(subject="admin", role=ROLE_ADMIN, session_version=1)
+
+ cfg = MagicMock()
+ cfg.relay = RelayConfig(
+ enabled=True,
+ blocklist=[],
+ priority_list=[],
+ dedup_ttl_seconds=300,
+ )
+ self.manager = RelayManager(enabled=True)
+ relay_cfg_module._config = cfg
+ relay_cfg_module._relay_manager = self.manager
+ self.app = FastAPI()
+ self.app.dependency_overrides[require_admin] = _admin_claims
+ from src.api.audit.dependencies import get_audit_writer
+
+ audit = MagicMock()
+
+ class _Ctx:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ return False
+
+ audit.timed_action.return_value = _Ctx()
+ self.app.dependency_overrides[get_audit_writer] = lambda: audit
+ self.app.include_router(relay_cfg_module.router)
+ self.client = TestClient(self.app)
+ self._relay_cfg_module = relay_cfg_module
+
+ def tearDown(self):
+ if hasattr(self, "_relay_cfg_module"):
+ self._relay_cfg_module.reset_routes()
+
+ def test_rejects_invalid_blocklist_id(self):
+ resp = self.client.put(
+ "/api/config/relay",
+ json={"blocklist": ["bad-id"]},
+ )
+ self.assertEqual(resp.status_code, 400)
+
+ @patch("src.api.routes.system_config_routes.save_section_to_yaml")
+ def test_filter_update_hot_reloads_without_restart(self, mock_save):
+ resp = self.client.put(
+ "/api/config/relay",
+ json={
+ "blocklist": ["a3f2b1c0"],
+ "priority_list": ["deadbeef"],
+ "dedup_ttl_seconds": 120,
+ },
+ )
+ self.assertEqual(resp.status_code, 200)
+ body = resp.json()
+ self.assertTrue(body["saved"])
+ self.assertFalse(body["restart_required"])
+ mock_save.assert_called_once()
+ pkt = _relay_packet(source_id="a3f2b1c0")
+ self.assertEqual(self.manager.evaluate(pkt).reason, "blocklisted")
+ self.assertEqual(self.manager._dedup.ttl_seconds, 120)
+
+
+if __name__ == "__main__":
+ unittest.main()