diff --git a/config/default.yaml b/config/default.yaml index b8f7bad..2c9ebe4 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -11,6 +11,11 @@ radio: sync_word: 0x2B preamble_length: 16 tx_power_dbm: 22 + # HAL GPS/PPS (RAK Pi HAT u-blox → concentrator PPS pin). Off by default. + # gps_pps_enabled: false + # gps_pps_tty_path: "/dev/ttyAMA0" + # gps_family: "ubx7" + # gps_pps_target_baud: 0 # 0 = HAL default (9600) meshtastic: default_key_b64: "AQ==" @@ -50,7 +55,7 @@ device: longitude: 0.0 altitude: 25 hardware_description: "RAK2287 + Raspberry Pi 4" - firmware_version: "0.7.6" + firmware_version: "0.7.5.1" relay: enabled: false @@ -60,6 +65,9 @@ relay: burst_size: 5 min_relay_rssi: -110.0 max_relay_rssi: -50.0 + # blocklist: [] # 8-char hex node IDs — relay only, feed unchanged + # priority_list: [] # bypass burst gate under congestion + # dedup_ttl_seconds: 300 upstream: enabled: true @@ -80,13 +88,6 @@ transmit: nodeinfo: interval_minutes: 180 # 5..1440 min, default 180 (3 hr); 0 = disabled startup_delay_seconds: 60 # delay before first broadcast on boot - position: - interval_minutes: 15 - startup_delay_seconds: 180 - # LoRa mesh POSITION packets (Meshtastic app map). Meshradar fleet pin - # always uses device.latitude/longitude above, not live GPS. - coordinate_source: "static" # static | live (live needs gpsd/uart source) - location_precision: "approximate" # exact | approximate | none (live only) web_auth: # First-run state: hashes are empty -> dashboard forces /setup. @@ -124,18 +125,23 @@ mqtt: homeassistant_discovery: false location: - # Where live GPS fixes come from (skyplot + optional mesh POSITION). - # Registered coordinates for Meshradar live in device.latitude/longitude - # and are not overwritten by gpsd. + # Where the Meshpoint's reported lat/lon/alt comes from. # static : use device.latitude/longitude/altitude (default) # gpsd : poll a local or remote gpsd daemon for live fixes - # uart : reserved (RAK Pi HAT GPS, not yet wired) + # uart : on-board RAK Pi HAT GPS via /dev/ttyAMA0 (NMEA GGA) source: "static" # gpsd connection. Defaults match the well-known local socket; only # change when running gpsd on a peer machine on the LAN. gpsd_host: "127.0.0.1" gpsd_port: 2947 + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 # How often the coordinator wakes to read the active source. update_interval_seconds: 5 # Minimum acceptable fix mode: 0=any (incl. no-fix), 1=2D, 2=3D. min_fix_quality: 1 + +# signal_health: +# green_rssi_floor: -100 # badge green when 1h avg RSSI is above this +# yellow_rssi_floor: -115 # badge yellow above this, red below +# min_packets_per_hour: 5 # hide badge when fewer packets in the last hour diff --git a/frontend/css/configuration.css b/frontend/css/configuration.css index 70b484c..f0352dc 100644 --- a/frontend/css/configuration.css +++ b/frontend/css/configuration.css @@ -41,19 +41,6 @@ line-height: 1.5; } -.cfg-card__hint--nested { - margin: -4px 0 4px; -} - -.cfg-inline-link { - color: var(--accent-cyan, #22d3ee); - text-decoration: none; -} - -.cfg-inline-link:hover { - text-decoration: underline; -} - .cfg-form { display: flex; flex-direction: column; @@ -242,6 +229,60 @@ select.cfg-field__input option:checked { transform: translateX(-50%) translateY(0); } +.cfg-id-list { + display: flex; + flex-direction: column; + gap: 0.35rem; + margin-bottom: 0.5rem; +} + +.cfg-id-empty { + margin: 0; + font-size: 12px; + color: rgba(243, 244, 246, 0.45); + font-style: italic; +} + +.cfg-id-row { + display: flex; + align-items: center; + gap: 0.5rem; +} + +.cfg-id-chip { + font-family: 'JetBrains Mono', Menlo, monospace; + font-size: 12px; + padding: 0.15rem 0.45rem; + background: rgba(6, 182, 212, 0.1); + border: 1px solid rgba(6, 182, 212, 0.25); + border-radius: 6px; + color: #e2e8f0; +} + +.cfg-id-remove { + background: transparent; + border: none; + color: rgba(243, 244, 246, 0.5); + font-size: 1.1rem; + line-height: 1; + cursor: pointer; + padding: 0 0.25rem; +} + +.cfg-id-remove:hover { + color: #ef4444; +} + +.cfg-id-add { + display: flex; + gap: 0.5rem; + align-items: center; +} + +.cfg-id-add .cfg-field__input { + flex: 1; +} + @keyframes cfgFadeIn { from { opacity: 0; transform: translateY(8px); } to { opacity: 1; transform: translateY(0); } diff --git a/frontend/index.html b/frontend/index.html index 239398c..064b792 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -36,6 +36,7 @@ + @@ -677,10 +678,12 @@

Service actions

+ + @@ -718,6 +721,7 @@

Service actions

+ diff --git a/frontend/js/configuration/configuration_panel.js b/frontend/js/configuration/configuration_panel.js index 6972cc1..a2ef107 100644 --- a/frontend/js/configuration/configuration_panel.js +++ b/frontend/js/configuration/configuration_panel.js @@ -124,10 +124,18 @@ class ConfigurationPanel { } else if (section === 'advanced' && window.AdvancedConfigCard) { const host = document.getElementById('cfg-advanced-panel'); if (host) { - host.innerHTML = ''; + host.innerHTML = ` +
+
+ `; const card = new window.AdvancedConfigCard(api); - card.mount(host); + card.mount(host.querySelector('[data-cfg-advanced]')); this._cards.set('advanced', card); + if (window.RelayFiltersCard) { + const relayCard = new window.RelayFiltersCard(api); + relayCard.mount(host.querySelector('[data-cfg-relay-filters]')); + this._cards.set('relay-filters', relayCard); + } } } this._mounted.add(section); diff --git a/frontend/js/configuration/relay_filters_card.js b/frontend/js/configuration/relay_filters_card.js new file mode 100644 index 0000000..ae1b7ea --- /dev/null +++ b/frontend/js/configuration/relay_filters_card.js @@ -0,0 +1,181 @@ +/** + * Configuration → Advanced — relay filter controls. + * + * Blocklist, priority list, and dedup TTL via PUT /api/config/relay. + * Filter changes hot-reload without a full service restart. + */ + +class RelayFiltersCard { + static NODE_ID_RE = /^[0-9a-f]{8}$/i; + + constructor(api) { + this._api = api; + this._root = null; + this._blocklist = []; + this._priority = []; + } + + mount(root) { + this._root = root; + this._root.innerHTML = ` +
+
+

Relay filters

+

+ Blocklist affects relay only — packets still appear in the live feed. + Priority nodes bypass the burst gate (not the per-minute cap). Dedup default is 300 s. +

+
+
+
+ Blocklist +
+
+ + +
+
+
+ Priority list +
+
+ + +
+
+ +
+ +
+

+
+
+ `; + + this._blocklistEl = this._root.querySelector('[data-blocklist]'); + this._priorityEl = this._root.querySelector('[data-priority-list]'); + this._form = this._root.querySelector('[data-relay-filters-form]'); + this._statusEl = this._root.querySelector('[data-relay-filters-status]'); + + this._root.querySelector('[data-blocklist-add]').addEventListener('click', () => { + this._addId('blocklist'); + }); + this._root.querySelector('[data-priority-add]').addEventListener('click', () => { + this._addId('priority'); + }); + this._form.addEventListener('submit', (e) => this._onSubmit(e)); + } + + render(config) { + const relay = config.relay || {}; + this._blocklist = (relay.blocklist || []).map((id) => this._normalizeId(id)); + this._priority = (relay.priority_list || []).map((id) => this._normalizeId(id)); + this._paintLists(); + const ttl = relay.dedup_ttl_seconds; + const ttlEl = this._root.querySelector('[data-dedup-ttl]'); + if (ttlEl) ttlEl.value = ttl != null ? ttl : 300; + } + + _normalizeId(raw) { + return String(raw || '').trim().toLowerCase().replace(/^!/, ''); + } + + _addId(which) { + const input = this._root.querySelector( + which === 'blocklist' ? '[data-blocklist-input]' : '[data-priority-input]', + ); + const id = this._normalizeId(input.value); + if (!RelayFiltersCard.NODE_ID_RE.test(id)) { + this._setStatus('error', 'Node ID must be 8 hex characters (no ! prefix).'); + return; + } + const list = which === 'blocklist' ? this._blocklist : this._priority; + if (!list.includes(id)) list.push(id); + input.value = ''; + this._paintLists(); + } + + _removeId(which, id) { + if (which === 'blocklist') { + this._blocklist = this._blocklist.filter((x) => x !== id); + } else { + this._priority = this._priority.filter((x) => x !== id); + } + this._paintLists(); + } + + _paintLists() { + this._blocklistEl.innerHTML = this._listHtml('blocklist', this._blocklist); + this._priorityEl.innerHTML = this._listHtml('priority', this._priority); + this._blocklistEl.querySelectorAll('[data-remove-id]').forEach((btn) => { + btn.addEventListener('click', () => { + this._removeId(btn.dataset.list, btn.dataset.id); + }); + }); + this._priorityEl.querySelectorAll('[data-remove-id]').forEach((btn) => { + btn.addEventListener('click', () => { + this._removeId(btn.dataset.list, btn.dataset.id); + }); + }); + } + + _listHtml(which, ids) { + if (!ids.length) { + return '

None

'; + } + return ids.map((id) => ` +
+ !${this._api.escape(id)} + +
+ `).join(''); + } + + async _onSubmit(event) { + event.preventDefault(); + const ttl = Number(this._root.querySelector('[data-dedup-ttl]').value); + if (!Number.isFinite(ttl) || ttl < 5 || ttl > 3600) { + this._setStatus('error', 'Dedup TTL must be between 5 and 3600 seconds.'); + return; + } + this._setStatus('pending', 'Saving…'); + const result = await this._api.put('/api/config/relay', { + blocklist: this._blocklist, + priority_list: this._priority, + dedup_ttl_seconds: ttl, + }); + if (!result) { + this._setStatus('error', 'Save failed.'); + return; + } + this._setStatus('success', 'Saved.'); + if (result.restart_required) { + this._api.signalRestart('Relay settings updated.'); + } else { + this._api.toast('Relay filters applied (no restart required).'); + } + this._api.refresh(); + } + + _setStatus(kind, message) { + if (!this._statusEl) return; + this._statusEl.dataset.kind = kind; + this._statusEl.textContent = message; + } +} + +window.RelayFiltersCard = RelayFiltersCard; diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py index 5f0ae3a..c034f91 100644 --- a/src/api/routes/config_enrichment.py +++ b/src/api/routes/config_enrichment.py @@ -52,24 +52,32 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict: "burst_size": relay.burst_size, "min_relay_rssi": relay.min_relay_rssi, "max_relay_rssi": relay.max_relay_rssi, + "blocklist": list(relay.blocklist or []), + "priority_list": list(relay.priority_list or []), + "dedup_ttl_seconds": relay.dedup_ttl_seconds, } base["radio_advanced"] = { "spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds, "sx1261_spi_path": radio.sx1261_spi_path or "", + "carrier_type": radio.carrier_type or "", + "gps_pps_enabled": radio.gps_pps_enabled, + "gps_pps_tty_path": radio.gps_pps_tty_path, + "gps_family": radio.gps_family, + "gps_pps_target_baud": radio.gps_pps_target_baud, } base["location"] = { "source": location.source, "gpsd_host": location.gpsd_host, "gpsd_port": location.gpsd_port, + "uart_path": location.uart_path, + "uart_baud": location.uart_baud, "update_interval_seconds": location.update_interval_seconds, "min_fix_quality": location.min_fix_quality, } - pos = cfg.transmit.position - if "transmit" in base: - base["transmit"]["position"] = { - "interval_minutes": pos.interval_minutes, - "startup_delay_seconds": pos.startup_delay_seconds, - "coordinate_source": pos.coordinate_source, - "location_precision": pos.location_precision, - } + sh = cfg.signal_health + base["signal_health"] = { + "green_rssi_floor": sh.green_rssi_floor, + "yellow_rssi_floor": sh.yellow_rssi_floor, + "min_packets_per_hour": sh.min_packets_per_hour, + } return base diff --git a/src/api/routes/system_config_routes.py b/src/api/routes/system_config_routes.py index fa2a14f..97081e4 100644 --- a/src/api/routes/system_config_routes.py +++ b/src/api/routes/system_config_routes.py @@ -13,22 +13,30 @@ from src.api.auth.dependencies import require_admin from src.api.auth.jwt_session import SessionClaims from src.config import AppConfig, save_section_to_yaml +from src.relay.node_id import validate_node_ids +from src.relay.relay_manager import RelayManager logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/config", tags=["config"]) _config: AppConfig | None = None +_relay_manager: RelayManager | None = None -def init_routes(config: AppConfig) -> None: - global _config +def init_routes( + config: AppConfig, + relay_manager: RelayManager | None = None, +) -> None: + global _config, _relay_manager _config = config + _relay_manager = relay_manager def reset_routes() -> None: - global _config + global _config, _relay_manager _config = None + _relay_manager = None class StorageUpdate(BaseModel): @@ -51,6 +59,9 @@ class RelayUpdate(BaseModel): burst_size: Optional[int] = Field(None, ge=1, le=50) min_relay_rssi: Optional[float] = Field(None, ge=-150, le=0) max_relay_rssi: Optional[float] = Field(None, ge=-150, le=0) + blocklist: Optional[list[str]] = None + priority_list: Optional[list[str]] = None + dedup_ttl_seconds: Optional[int] = Field(None, ge=5, le=3600) class RadioAdvancedUpdate(BaseModel): @@ -195,6 +206,26 @@ async def update_relay( relay.max_relay_rssi = req.max_relay_rssi updates["max_relay_rssi"] = req.max_relay_rssi + filter_reload = False + if req.blocklist is not None: + try: + relay.blocklist = validate_node_ids(req.blocklist) + except ValueError as exc: + raise HTTPException(400, str(exc)) from exc + updates["blocklist"] = relay.blocklist + filter_reload = True + if req.priority_list is not None: + try: + relay.priority_list = validate_node_ids(req.priority_list) + except ValueError as exc: + raise HTTPException(400, str(exc)) from exc + updates["priority_list"] = relay.priority_list + filter_reload = True + if req.dedup_ttl_seconds is not None: + relay.dedup_ttl_seconds = req.dedup_ttl_seconds + updates["dedup_ttl_seconds"] = req.dedup_ttl_seconds + filter_reload = True + if not updates: return {"saved": False, "restart_required": False} @@ -206,6 +237,13 @@ async def update_relay( except PermissionError as exc: raise HTTPException(403, str(exc)) from exc + if filter_reload and _relay_manager is not None: + _relay_manager.reload_filters( + blocklist=relay.blocklist, + priority_list=relay.priority_list, + dedup_ttl_seconds=relay.dedup_ttl_seconds, + ) + return {"saved": True, "restart_required": restart_needed, "updates": updates} diff --git a/src/api/server.py b/src/api/server.py index 3bd4fb0..6609f73 100644 --- a/src/api/server.py +++ b/src/api/server.py @@ -41,6 +41,7 @@ dangerous_routes, device, device_config_routes, + gps_pps_status, gps_status, identity_routes, messages, @@ -76,9 +77,6 @@ NodeInfoBroadcaster, clamp_interval_minutes, ) -from src.transmit.position_broadcaster import PositionBroadcaster -from src.transmit.telemetry_broadcaster import TelemetryBroadcaster -from src.transmit.meshtastic_inbound_handler import MeshtasticInboundHandler from src.transmit.tx_service import TxService from src.version import __version__ @@ -89,8 +87,6 @@ pipeline: PipelineCoordinator | None = None upstream: UpstreamClient | None = None nodeinfo_broadcaster: NodeInfoBroadcaster | None = None -telemetry_broadcaster: TelemetryBroadcaster | None = None -position_broadcaster: PositionBroadcaster | None = None noise_floor_tracker = NoiseFloorTracker() _noise_floor_emitter_task = None _spectral_scan_service: SpectralScanService | None = None @@ -131,7 +127,6 @@ def create_app(config: AppConfig | None = None) -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): global pipeline, upstream, nodeinfo_broadcaster - global telemetry_broadcaster, position_broadcaster warn_if_stale_so_files() validate_activation(config) identity = DeviceIdentity( @@ -150,12 +145,20 @@ async def lifespan(app: FastAPI): pipeline.on_packet(lambda pkt: print_packet(pkt)) pipeline.on_packet(public_radar_routes.public_radar_packet_callback) + # Mirror live GPS fixes from the location source into DeviceIdentity + # so /api/device (the local map) and the upstream registration + # payload (Meshradar fleet view) both see fresh coordinates. + def _sync_identity_position(lat, lon, alt): + identity.latitude = lat + identity.longitude = lon + if alt is not None: + identity.altitude = alt + + pipeline.on_location_update(_sync_identity_position) + if config.transmit.enabled: _inject_tx_gain_into_source(pipeline) - _bootstrap_pki(config, pipeline) - await _hydrate_public_keys(pipeline) - await pipeline.start() message_repo = MessageRepository(pipeline.database) @@ -170,9 +173,8 @@ async def lifespan(app: FastAPI): _send_meshcore_advert(meshcore_tx_ref, mc_source) ) _setup_message_interception( - pipeline, message_repo, config, meshcore_tx_ref, tx_service + pipeline, message_repo, config, meshcore_tx_ref ) - _setup_inbound_responder(pipeline, tx_service, config) setup_meshcore_contact_enrichment(pipeline, meshcore_tx_ref) if meshcore_tx_ref and meshcore_tx_ref.connected: import asyncio @@ -195,18 +197,6 @@ async def lifespan(app: FastAPI): if nodeinfo_broadcaster is not None: await nodeinfo_broadcaster.start() - telemetry_broadcaster = _build_telemetry_broadcaster( - config, tx_service, pipeline - ) - if telemetry_broadcaster is not None: - await telemetry_broadcaster.start() - - position_broadcaster = _build_position_broadcaster( - config, tx_service, pipeline - ) - if position_broadcaster is not None: - await position_broadcaster.start() - _wire_native_relay(pipeline, tx_service) global _noise_floor_emitter_task @@ -239,10 +229,6 @@ async def lifespan(app: FastAPI): pass if nodeinfo_broadcaster is not None: await nodeinfo_broadcaster.stop() - if telemetry_broadcaster is not None: - await telemetry_broadcaster.stop() - if position_broadcaster is not None: - await position_broadcaster.stop() await upstream.stop() await pipeline.stop() session_manager.shutdown() @@ -276,6 +262,7 @@ async def lifespan(app: FastAPI): app.include_router(upstream_config_routes.router, dependencies=protected) app.include_router(device_config_routes.router, dependencies=protected) app.include_router(gps_status.router, dependencies=protected) + app.include_router(gps_pps_status.router, dependencies=protected) app.include_router(system_config_routes.router, dependencies=protected) app.include_router(meshcore_config_routes.router, dependencies=protected) app.include_router(config_routes.router, dependencies=protected) @@ -394,212 +381,6 @@ def _add_meshcore_usb_source( ) -def _resolve_mesh_node_id(config: AppConfig) -> int | None: - configured = config.transmit.node_id - if configured is not None: - return configured - device_id = (config.device.device_id or "").strip() - if not device_id: - return None - try: - return TxService._derive_node_id(device_id) - except RuntimeError: - return None - - -def _bootstrap_pki(config: AppConfig, coord: PipelineCoordinator) -> None: - """Load PKI keypair and wire decoder identity before packet capture starts.""" - from src.identity.keypair import ( - KeypairStore, - resolve_keypair_path, - resolve_keypair_path_from_env, - ) - - if not hasattr(coord, "_crypto"): - return - - coord._crypto.set_node_db_path(config.storage.database_path) - our_node_id = _resolve_mesh_node_id(config) - if our_node_id is not None: - coord._router.meshtastic_decoder.configure_identity(our_node_id) - logger.info( - "Meshtastic PKI identity configured: 0x%08x", our_node_id - ) - - override = resolve_keypair_path_from_env() - key_path = override or resolve_keypair_path(config.storage.database_path) - try: - keypair = KeypairStore(key_path).load_or_create() - coord._crypto.set_keypair(keypair.private_key, keypair.public_key) - logger.info("Meshtastic PKI keypair loaded from %s", key_path) - except Exception: - logger.exception("Failed to load Meshtastic PKI keypair") - - -async def _hydrate_public_keys(coord: PipelineCoordinator) -> None: - if not hasattr(coord, "_crypto"): - return - await coord.database.connect() - rows = await coord.database.fetch_all( - """ - SELECT node_id, public_key FROM nodes - WHERE public_key IS NOT NULL AND public_key != '' - LIMIT 5000 - """ - ) - for row in rows: - node_id = row.get("node_id") - public_key = row.get("public_key") - if not node_id or not public_key: - continue - try: - coord._crypto.register_public_key( - int(node_id, 16), - bytes.fromhex(public_key), - ) - except ValueError: - continue - - -def _telemetry_metrics_providers( - tx_service: TxService, - coord: PipelineCoordinator, - service_started: float, - *, - noise_floor_tracker=None, - relay_manager=None, -): - """Shared device_metrics / local_stats snapshots for TX paths.""" - import time - - duty = getattr(tx_service, "_duty", None) - stats = getattr(coord, "stats_reporter", None) - - def device_metrics() -> dict: - air_util = duty.current_usage_percent() if duty else 0.0 - return { - "battery_level": 101, - "voltage": 5.0, - "channel_utilization": 0.0, - "air_util_tx": round(air_util, 2), - "uptime_seconds": int(time.monotonic() - service_started), - } - - def local_stats() -> dict: - dm = device_metrics() - noise_floor = None - if noise_floor_tracker is not None: - snap = noise_floor_tracker.snapshot() - value_dbm = snap.get("value_dbm") - if value_dbm is not None: - noise_floor = int(round(value_dbm)) - relayed = 0 - if relay_manager is not None: - relayed = int(relay_manager.get_stats().get("relayed", 0)) - return { - "uptime_seconds": dm["uptime_seconds"], - "channel_utilization": dm["channel_utilization"], - "air_util_tx": dm["air_util_tx"], - "num_packets_tx": 0, - "num_packets_rx": stats.total_packets if stats else 0, - "num_packets_rx_bad": 0, - "num_online_nodes": 0, - "num_total_nodes": 0, - "num_tx_relay": relayed, - "noise_floor": noise_floor, - } - - return device_metrics, local_stats - - -def _setup_inbound_responder( - coord: PipelineCoordinator, - tx_service: TxService | None, - config: AppConfig, -) -> None: - if tx_service is None or not tx_service.meshtastic_enabled: - return - - import time - - our_node_id = tx_service.source_node_id - our_node_hex = f"{our_node_id:08x}" - coord._router.meshtastic_decoder.configure_identity(our_node_id) - coord.relay_manager.set_local_node_id(our_node_hex) - device_fn, local_fn = _telemetry_metrics_providers( - tx_service, - coord, - time.monotonic(), - noise_floor_tracker=noise_floor_tracker, - relay_manager=coord.relay_manager, - ) - tx_service.set_telemetry_reply_providers(device_fn, local_fn) - handler = MeshtasticInboundHandler(tx_service, our_node_id) - - def on_packet(packet: Packet) -> None: - import asyncio - - try: - asyncio.get_running_loop().create_task(handler.handle(packet)) - except RuntimeError: - pass - - coord.on_packet(on_packet) - - -def _build_telemetry_broadcaster( - config: AppConfig, - tx_service: TxService | None, - coord: PipelineCoordinator, -) -> TelemetryBroadcaster | None: - if tx_service is None or not tx_service.meshtastic_enabled: - return None - telem = config.transmit.telemetry - if clamp_interval_minutes(telem.interval_minutes) == 0: - return None - - import time - - service_started = time.monotonic() - device_fn, _local_fn = _telemetry_metrics_providers( - tx_service, - coord, - service_started, - noise_floor_tracker=noise_floor_tracker, - relay_manager=coord.relay_manager, - ) - - return TelemetryBroadcaster( - tx_service, - interval_minutes=telem.interval_minutes, - startup_delay_seconds=telem.startup_delay_seconds, - metrics_provider=device_fn, - ) - - -def _build_position_broadcaster( - config: AppConfig, - tx_service: TxService | None, - coord: PipelineCoordinator, -) -> PositionBroadcaster | None: - if tx_service is None or not tx_service.meshtastic_enabled: - return None - pos_cfg = config.transmit.position - if clamp_interval_minutes(pos_cfg.interval_minutes) == 0: - return None - - from src.transmit.mesh_position_resolver import MeshPositionResolver - - resolver = MeshPositionResolver(config, coord.location_source) - - return PositionBroadcaster( - tx_service, - interval_minutes=pos_cfg.interval_minutes, - startup_delay_seconds=pos_cfg.startup_delay_seconds, - coords_provider=resolver.resolve, - ) - - def _build_tx_service( config: AppConfig, coord: PipelineCoordinator ) -> TxService | None: @@ -690,7 +471,6 @@ async def _native_relay(packet): ) relay.set_transmit_function(_native_relay) - relay.set_local_node_id(f"{tx_service.source_node_id:08x}") logger.info( "Relay backend: native onboard SX1302 (identity-preserving)" ) @@ -925,7 +705,6 @@ def _setup_message_interception( message_repo: MessageRepository, config: AppConfig, meshcore_tx=None, - tx_service: TxService | None = None, ) -> None: """Register a callback to intercept TEXT messages for storage. @@ -933,14 +712,9 @@ def _setup_message_interception( conversations. DMs between other nodes are tagged as 'overheard'. MeshCore DMs use destination_id='self' to indicate they're for us. """ - from src.api.message_name_resolver import MessageNameResolver from src.models.packet import PacketType, Protocol - name_resolver = MessageNameResolver(coord.node_repo, meshcore_tx) - our_node_id = config.transmit.node_id - if our_node_id is None and tx_service is not None: - our_node_id = tx_service.source_node_id our_node_hex = f"{our_node_id:08x}" if our_node_id else "" mc_name_cache: dict[str, str] = {} @@ -1080,17 +854,19 @@ async def _save_and_notify() -> None: if ( packet.protocol == Protocol.MESHTASTIC and direction == "received" + and not node_name ): - sender_lookup = ( - (packet.source_id or "") - if is_broadcast - else node_id - ) - node_name = await name_resolver.resolve( - sender_lookup, - packet.protocol.value, - node_name or packet.source_id or "", - ) + src_id = (packet.source_id or "").lower() + if src_id: + row = await coord.node_repo._db.fetch_one( + "SELECT long_name, short_name FROM nodes " + "WHERE LOWER(node_id) = ? AND protocol = 'meshtastic'", + (src_id,), + ) + if row: + node_name = row["long_name"] or row["short_name"] or "" + if not node_name: + node_name = packet.source_id or "" if is_broadcast and packet.protocol == Protocol.MESHCORE: node_name = (packet.decoded_payload or {}).get("long_name", "") @@ -1170,20 +946,10 @@ async def _save_and_notify() -> None: "snr": round(row["snr"], 1) if row and row["snr"] else None, }) else: - ws_name_lookup = ( - (packet.source_id or "") - if is_broadcast and packet.protocol == Protocol.MESHTASTIC - else node_id - ) - display_name = await name_resolver.resolve( - ws_name_lookup, - packet.protocol.value, - node_name, - ) ws_payload = { "text": text, "node_id": node_id, - "node_name": display_name, + "node_name": node_name, "protocol": packet.protocol.value, "direction": direction, "packet_id": packet.packet_id or "", @@ -1247,7 +1013,6 @@ def _init_routes( node_repo=coord.node_repo, meshcore_tx=meshcore_tx, config=config, - packet_repo=coord.packet_repo, ) crypto = coord._crypto if hasattr(coord, "_crypto") else None @@ -1265,7 +1030,13 @@ def _init_routes( upstream_config_routes.init_routes(config=config) device_config_routes.init_routes(config=config, identity=identity) gps_status.init_routes(location_source=coord.location_source) - system_config_routes.init_routes(config=config) + gps_pps_status.init_routes( + get_wrapper=lambda: _get_concentrator_wrapper(coord), + ) + system_config_routes.init_routes( + config=config, + relay_manager=coord.relay_manager, + ) meshcore_config_routes.init_routes(config=config, tx_service=tx_service) diff --git a/src/config.py b/src/config.py index 4622e2b..df8efd3 100644 --- a/src/config.py +++ b/src/config.py @@ -1,7 +1,6 @@ from __future__ import annotations import dataclasses -import logging import os import sys from dataclasses import dataclass, field @@ -12,8 +11,6 @@ from src.version import __version__ -logger = logging.getLogger(__name__) - # Band-start frequencies (MHz) for the Meshtastic slot formula # freq = freqStart + BW/2000 + (slot-1) * BW/1000 @@ -60,14 +57,26 @@ class RadioConfig: # (default; spectral scan stays unavailable, packet-derived # noise floor remains in use). # - # On RAK2287 / RAK5146 / SenseCap M1 this is typically - # ``/dev/spidev0.1`` (separate from the SX1302's - # ``/dev/spidev0.0``). Some carriers daisy-chain the SX1261 - # behind the SX1302's SPI router and want this set to the same - # path as the SX1302 SPI device. Wrong path = HAL refuses to - # ``lgw_start`` after our config attempt, so we ship empty by - # default and ask interested users to opt in explicitly. + # On RAK2287 / RAK5146 / SenseCap M1 the SX1261 is behind the + # concentrator's internal SPI router, not on a Pi chip-select — + # leave empty. Only the Semtech reference kit and custom carriers + # with a dedicated SX1261 CE line need a path (e.g. + # ``/dev/spidev0.1``). Wrong path can brick ``lgw_start`` on + # boards without Pi-visible SX1261; ``carrier_type`` guard clears + # mistaken values on RAK/SenseCap. sx1261_spi_path: str = "" + # Carrier board signature from setup wizard I2C probe (``rak``, + # ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI + # paths that brick ``lgw_start()`` on RAK/SenseCap concentrators. + carrier_type: str = "" + # HAL GPS/PPS: align concentrator packet timestamps with GPS time via + # libloragw ``lgw_gps_*`` + ``sx1302_gps_enable``. Requires a HAL build + # that includes loragw_gps.c. Exclusive with ``location.source: uart`` on + # the same TTY (only one process may open the GPS serial port). + gps_pps_enabled: bool = False + gps_pps_tty_path: str = "/dev/ttyAMA0" + gps_family: str = "ubx7" + gps_pps_target_baud: int = 0 @dataclass @@ -150,29 +159,9 @@ class RelayConfig: burst_size: int = 5 min_relay_rssi: float = -110.0 max_relay_rssi: float = -50.0 - - -@dataclass -class TelemetryConfig: - """Periodic device_metrics telemetry broadcast settings.""" - - interval_minutes: int = 30 - startup_delay_seconds: int = 120 - - -@dataclass -class PositionConfig: - """Periodic POSITION broadcast settings.""" - - interval_minutes: int = 15 - startup_delay_seconds: int = 180 - # Coordinates sent on the public LoRa mesh (Meshtastic POSITION packets). - # ``static`` uses ``device.{latitude,longitude,altitude}`` (wizard pin). - # ``live`` reads the active ``LocationSource`` (gpsd/uart) when a fix exists. - coordinate_source: str = "static" - # Privacy when ``coordinate_source`` is ``live``: exact, approximate - # (~1.1 km rounding), or none (skip position on mesh). Ignored for static. - location_precision: str = "approximate" + blocklist: list[str] = field(default_factory=list) + priority_list: list[str] = field(default_factory=list) + dedup_ttl_seconds: int = 300 @dataclass @@ -184,8 +173,6 @@ class MqttConfig: password: str = "large4cats" topic_root: str = "msh" region: str = "US" - tls_enabled: bool = False - tls_ca_cert: str = "" # Optional ``!xxxxxxxx`` override; blank uses MD5 hash of device name. gateway_id: Optional[str] = None publish_channels: list[str] = field(default_factory=lambda: ["LongFast", "MeshCore"]) @@ -230,8 +217,6 @@ class TransmitConfig: short_name: str = "MPNT" hop_limit: int = 3 nodeinfo: NodeInfoConfig = field(default_factory=NodeInfoConfig) - telemetry: TelemetryConfig = field(default_factory=TelemetryConfig) - position: PositionConfig = field(default_factory=PositionConfig) @dataclass @@ -241,15 +226,12 @@ class LocationConfig: ``source`` values: - ``"static"`` : use ``device.latitude/longitude/altitude`` from ``local.yaml``. Backward-compatible default. - - ``"gpsd"`` : connect to a local or remote ``gpsd`` daemon for - live fixes (skyplot, optional mesh POSITION). - Does not change ``device.{lat,lon,alt}`` (Meshradar - pin). Auto-installed by ``scripts/install.sh``. - - ``"uart"`` : reserved for direct on-board UART NMEA reading - (RAK Pi HAT GPS). Plumbing exists in - ``src.hal.gps_reader`` but is not wired into - the runtime yet; treated as ``static`` until - the source is implemented. + - ``"gpsd"`` : connect to a local or remote ``gpsd`` daemon and + overwrite ``device.{lat,lon,alt}`` when fixes + arrive. Auto-installed by ``scripts/install.sh``. + - ``"uart"`` : read NMEA GGA from an on-board UART GPS (RAK Pi + HAT on ``/dev/ttyAMA0``). Uses + ``src.hal.gps_reader.GpsReader``. ``gpsd_host`` / ``gpsd_port`` default to gpsd's well-known localhost socket. Override only when running gpsd on a peer @@ -269,6 +251,8 @@ class LocationConfig: source: str = "static" gpsd_host: str = "127.0.0.1" gpsd_port: int = 2947 + uart_path: str = "/dev/ttyAMA0" + uart_baud: int = 9600 update_interval_seconds: int = 5 min_fix_quality: int = 1 @@ -304,6 +288,15 @@ class WebAuthConfig: session_version: int = 1 +@dataclass +class SignalHealthConfig: + """Thresholds for node-card RSSI health badges and sparklines.""" + + green_rssi_floor: float = -100 + yellow_rssi_floor: float = -115 + min_packets_per_hour: int = 5 + + @dataclass class AppConfig: radio: RadioConfig = field(default_factory=RadioConfig) @@ -319,6 +312,7 @@ class AppConfig: transmit: TransmitConfig = field(default_factory=TransmitConfig) web_auth: WebAuthConfig = field(default_factory=WebAuthConfig) location: LocationConfig = field(default_factory=LocationConfig) + signal_health: SignalHealthConfig = field(default_factory=SignalHealthConfig) def _resolve_radio_frequency(radio: "RadioConfig") -> None: @@ -356,25 +350,6 @@ def _merge_dataclass(instance, overrides: dict): setattr(instance, key, value) -def _collect_unknown_keys(instance, overrides: dict, prefix: str = "") -> list[str]: - """Return dotted paths of override keys with no matching dataclass field. - - Mirrors the descent rules in :func:`_merge_dataclass`: it only recurses - into a nested dataclass (e.g. ``transmit.nodeinfo``), so user-supplied - mapping fields such as ``meshtastic.channel_keys`` are treated as opaque - values rather than scanned for "unknown" keys. - """ - unknown: list[str] = [] - for key, value in overrides.items(): - if not hasattr(instance, key): - unknown.append(f"{prefix}{key}") - continue - current = getattr(instance, key) - if dataclasses.is_dataclass(current) and isinstance(value, dict): - unknown.extend(_collect_unknown_keys(current, value, f"{prefix}{key}.")) - return unknown - - def _apply_yaml(cfg: AppConfig, path: Path) -> None: """Merge a single YAML file into an existing AppConfig.""" if not path.exists(): @@ -383,10 +358,6 @@ def _apply_yaml(cfg: AppConfig, path: Path) -> None: with open(path, "r") as fh: raw = yaml.safe_load(fh) or {} - if not isinstance(raw, dict): - logger.warning("Ignoring %s: top-level YAML is not a mapping.", path) - return - section_map = { "radio": cfg.radio, "meshtastic": cfg.meshtastic, @@ -401,28 +372,12 @@ def _apply_yaml(cfg: AppConfig, path: Path) -> None: "transmit": cfg.transmit, "web_auth": cfg.web_auth, "location": cfg.location, + "signal_health": cfg.signal_health, } - unknown_keys: list[str] = [] - for section_name, section_value in raw.items(): - section_instance = section_map.get(section_name) - if section_instance is None: - unknown_keys.append(section_name) - continue - _merge_dataclass(section_instance, section_value) - if isinstance(section_value, dict): - unknown_keys.extend( - _collect_unknown_keys(section_instance, section_value, f"{section_name}.") - ) - - if unknown_keys: - logger.warning( - "Ignoring %d unknown config key(s) in %s: %s. " - "These were not applied -- check for typos against the documented schema.", - len(unknown_keys), - path, - ", ".join(sorted(unknown_keys)), - ) + for section_name, section_instance in section_map.items(): + if section_name in raw: + _merge_dataclass(section_instance, raw[section_name]) _VALID_CONFIG_EXTENSIONS = {".yaml", ".yml"} @@ -449,10 +404,28 @@ def load_config(config_path: Optional[str] = None) -> AppConfig: local = config_path or os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml") _apply_yaml(cfg, _validated_config_path(local)) _resolve_radio_frequency(cfg.radio) + validate_config_consistency(cfg) return cfg +def validate_config_consistency(config: AppConfig) -> None: + """Reject impossible radio/location combinations before hardware starts.""" + if not config.radio.gps_pps_enabled: + return + if config.location.source != "uart": + return + pps_tty = os.path.normpath(config.radio.gps_pps_tty_path) + uart_tty = os.path.normpath(config.location.uart_path) + if pps_tty == uart_tty: + raise ValueError( + "radio.gps_pps_enabled and location.source=uart cannot share " + f"the same serial device ({pps_tty!r}). Use location.source=gpsd " + "or static for dashboard coordinates while PPS owns the HAT UART, " + "or disable gps_pps_enabled when using UART for location only." + ) + + def _get_local_yaml_path() -> Path: """Resolve the local.yaml path used for user overrides.""" raw = os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml") diff --git a/src/coordinator.py b/src/coordinator.py index 665e9ca..bad773f 100644 --- a/src/coordinator.py +++ b/src/coordinator.py @@ -49,6 +49,9 @@ def __init__(self, config: AppConfig): burst_size=relay_cfg.burst_size, min_relay_rssi=relay_cfg.min_relay_rssi, max_relay_rssi=relay_cfg.max_relay_rssi, + dedup_ttl_seconds=relay_cfg.dedup_ttl_seconds, + blocklist=relay_cfg.blocklist, + priority_list=relay_cfg.priority_list, ) self._transmitter: Optional[MeshtasticTransmitter] = None self._mqtt: Optional[MqttPublisher] = None @@ -70,9 +73,6 @@ def __init__(self, config: AppConfig): self._pipeline_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None self._location_refresh_task: Optional[asyncio.Task] = None - self._last_live_lat: Optional[float] = None - self._last_live_lon: Optional[float] = None - self._last_live_alt: Optional[float] = None @property def database(self) -> DatabaseManager: @@ -121,10 +121,13 @@ def on_location_update( self, callback: Callable[[Optional[float], Optional[float], Optional[float]], None], ) -> None: - """Register a callback fired when a live GPS source publishes a new fix. + """Register a callback fired when the live location source publishes + a fresh position fix. - ``device.{latitude,longitude,altitude}`` (the Meshradar pin) is never - mutated. Callbacks receive live fix coordinates only. + Called with ``(latitude, longitude, altitude_m)``. Fires only when + the new fix actually differs from the cached device position, so + listeners can rely on it as a real change signal instead of + polling. """ self._on_location_callbacks.append(callback) @@ -202,9 +205,16 @@ async def _cleanup_loop(self) -> None: async def _location_refresh_loop(self) -> None: """Periodically pull the latest fix from the active location source. - Live sources (gpsd/uart) notify listeners when the fix changes. - ``device.{latitude,longitude,altitude}`` stays the registered Meshradar - pin and is not overwritten here. + When the source publishes a real position fix, write it back into + ``self._config.device.latitude/longitude/altitude`` so every + downstream consumer (farthest-direct calculation, heartbeat, + node-position math, dashboard map placement) sees the live + coordinate without further plumbing. + + Static sources also flow through this loop: their ``get_status`` + returns the same configured coordinates every tick, so it's a + cheap no-op. We deliberately do not write back to ``local.yaml`` + on every tick: in-memory updates suffice. """ interval = max(1, self._config.location.update_interval_seconds) try: @@ -217,33 +227,28 @@ async def _location_refresh_loop(self) -> None: logger.exception("Location refresh loop error") def _apply_latest_location_fix(self) -> None: - if self._location_source.source_name == "static": - return - status = self._location_source.get_status() if not status.available or status.fix is None: return if not status.fix.has_position: return - lat = status.fix.latitude - lon = status.fix.longitude - alt = status.fix.altitude_m - + device = self._config.device if ( - self._last_live_lat == lat - and self._last_live_lon == lon - and self._last_live_alt == alt + device.latitude == status.fix.latitude + and device.longitude == status.fix.longitude + and device.altitude == status.fix.altitude_m ): - return + return # no change - self._last_live_lat = lat - self._last_live_lon = lon - self._last_live_alt = alt + device.latitude = status.fix.latitude + device.longitude = status.fix.longitude + if status.fix.altitude_m is not None: + device.altitude = status.fix.altitude_m for cb in self._on_location_callbacks: try: - cb(lat, lon, alt) + cb(device.latitude, device.longitude, device.altitude) except Exception: logger.exception("Location update callback failed") @@ -270,10 +275,10 @@ async def _process_capture(self, raw: RawCapture) -> None: packet.capture_source = raw.capture_source await self._store_packet(packet) - self._notify_callbacks(packet) await self._relay.process_packet(packet) self._publish_mqtt(packet) self._record_stats(packet) + self._notify_callbacks(packet) @staticmethod def _adapt_meshcore_usb(raw: RawCapture) -> Optional[Packet]: @@ -299,22 +304,6 @@ async def _update_node(self, packet: Packet) -> None: await self._node_repo.upsert(node_update) self._last_node_update[node_update.node_id] = node_update self._stats_reporter.record_node(node_update.to_dict()) - if node_update.public_key: - try: - node_int = int(node_update.node_id, 16) - new_key = bytes.fromhex(node_update.public_key) - prior = self._crypto.lookup_public_key(node_int) - self._crypto.register_public_key(node_int, new_key) - if prior != new_key: - logger.info( - "Updated peer PKI public_key for %s", - node_update.node_id, - ) - except ValueError: - logger.debug( - "Ignoring invalid public_key for node %s", - node_update.node_id, - ) elif packet.source_id: await self._node_repo.increment_packet_count(packet.source_id) @@ -466,8 +455,9 @@ def _setup_location_banner(self) -> None: detail = f"gpsd @ {host}:{port}" color = GREEN elif source_name == "uart": - detail = "on-board UART (placeholder, falls back to static)" - color = DIM + path = self._config.location.uart_path + detail = f"UART NMEA @ {path}" + color = GREEN else: detail = "static config coordinates" color = DIM diff --git a/src/relay/dedup_filter.py b/src/relay/dedup_filter.py index 9df04fe..68b0b44 100644 --- a/src/relay/dedup_filter.py +++ b/src/relay/dedup_filter.py @@ -54,5 +54,13 @@ def _enforce_max_size(self) -> None: def size(self) -> int: return len(self._seen) + @property + def ttl_seconds(self) -> float: + return self._ttl + + def set_ttl(self, ttl_seconds: float) -> None: + """Update duplicate window without clearing the cache.""" + self._ttl = max(1.0, float(ttl_seconds)) + def clear(self) -> None: self._seen.clear() diff --git a/src/relay/node_id.py b/src/relay/node_id.py new file mode 100644 index 0000000..f7b5be1 --- /dev/null +++ b/src/relay/node_id.py @@ -0,0 +1,28 @@ +"""Normalize and validate Meshtastic node IDs for relay filter lists.""" + +from __future__ import annotations + +import re + +_NODE_ID_RE = re.compile(r"^[0-9a-f]{8}$", re.IGNORECASE) + + +def normalize_node_id(node_id: str) -> str: + """Strip optional ``!`` prefix and lowercase for consistent matching.""" + return (node_id or "").strip().lower().lstrip("!") + + +def validate_node_ids(node_ids: list[str]) -> list[str]: + """Return normalized unique node IDs or raise ValueError.""" + seen: set[str] = set() + normalized: list[str] = [] + for raw in node_ids: + nid = normalize_node_id(raw) + if not _NODE_ID_RE.match(nid): + raise ValueError( + f"Invalid node ID {raw!r} — expected 8 hex chars (no ! prefix)" + ) + if nid not in seen: + seen.add(nid) + normalized.append(nid) + return normalized diff --git a/src/relay/rate_limiter.py b/src/relay/rate_limiter.py index 41e6cc7..c109c23 100644 --- a/src/relay/rate_limiter.py +++ b/src/relay/rate_limiter.py @@ -41,6 +41,17 @@ def allow(self) -> bool: self._timestamps.append(now) return True + def allow_priority(self) -> bool: + """Allow relay while enforcing per-minute cap but skipping burst gate.""" + now = time.monotonic() + self._prune(now) + + if len(self._timestamps) >= self._max_per_minute: + return False + + self._timestamps.append(now) + return True + def _prune(self, now: float) -> None: cutoff = now - self._window_seconds while self._timestamps and self._timestamps[0] < cutoff: diff --git a/src/relay/relay_manager.py b/src/relay/relay_manager.py index 835fc06..079067d 100644 --- a/src/relay/relay_manager.py +++ b/src/relay/relay_manager.py @@ -6,6 +6,7 @@ from src.models.packet import Packet, PacketType from src.relay.dedup_filter import DeduplicationFilter +from src.relay.node_id import normalize_node_id, validate_node_ids from src.relay.rate_limiter import RateLimiter logger = logging.getLogger(__name__) @@ -38,7 +39,6 @@ class RelayManager: - Hop filtering: don't relay packets with 0 hops remaining - Type filtering: only relay useful packet types - Signal filtering: don't relay strong signals (nearby nodes) - - Destination filtering: never relay unicast packets addressed to us The actual transmission is handled by an external radio (SX1262 via meshtastic-python serial interface). @@ -51,22 +51,22 @@ def __init__( min_relay_rssi: float = -110.0, max_relay_rssi: float = -50.0, enabled: bool = False, + dedup_ttl_seconds: float = 300.0, + blocklist: list[str] | None = None, + priority_list: list[str] | None = None, ): - self._dedup = DeduplicationFilter() + self._dedup = DeduplicationFilter(ttl_seconds=dedup_ttl_seconds) self._limiter = RateLimiter(max_relay_per_minute, burst_size) self._min_rssi = min_relay_rssi self._max_rssi = max_relay_rssi self._enabled = enabled - self._local_node_hex: str | None = None + self._blocklist = set(validate_node_ids(blocklist or [])) + self._priority_list = set(validate_node_ids(priority_list or [])) self._relay_count = 0 self._rejected_count = 0 self._rejection_reasons: dict[str, int] = {} self._transmit_fn: Optional[callable] = None - def set_local_node_id(self, node_hex: str) -> None: - """Skip relay for unicast packets addressed to this Meshpoint.""" - self._local_node_hex = node_hex.lower() - @property def enabled(self) -> bool: return self._enabled @@ -80,25 +80,42 @@ def set_transmit_function(self, fn: callable) -> None: """Register the function used to transmit relay packets.""" self._transmit_fn = fn + def reload_filters( + self, + *, + blocklist: list[str] | None = None, + priority_list: list[str] | None = None, + dedup_ttl_seconds: float | None = None, + ) -> None: + """Apply relay filter changes without a full service restart.""" + if blocklist is not None: + self._blocklist = set(validate_node_ids(blocklist)) + if priority_list is not None: + self._priority_list = set(validate_node_ids(priority_list)) + if dedup_ttl_seconds is not None: + self._dedup.set_ttl(dedup_ttl_seconds) + logger.info( + "Relay filters reloaded (blocklist=%d, priority=%d, dedup_ttl=%.0fs)", + len(self._blocklist), + len(self._priority_list), + self._dedup.ttl_seconds, + ) + def evaluate(self, packet: Packet) -> RelayDecision: """Decide whether a captured packet should be relayed.""" if not self._enabled: return RelayDecision(False, "relay_disabled") + source_id = normalize_node_id(packet.source_id) + if source_id in self._blocklist: + return RelayDecision(False, "blocklisted") + if self._dedup.is_duplicate(packet.source_id, packet.packet_id): return RelayDecision(False, "duplicate") if packet.hop_limit <= 0: return RelayDecision(False, "no_hops_remaining") - dest = (packet.destination_id or "").lower() - if ( - self._local_node_hex - and dest == self._local_node_hex - and dest not in (BROADCAST_ADDR_MESHTASTIC, BROADCAST_ADDR_MESHCORE) - ): - return RelayDecision(False, "dest_local") - if packet.packet_type not in RELAY_WORTHY_TYPES: return RelayDecision(False, "non_relayable_type") @@ -108,6 +125,10 @@ def evaluate(self, packet: Packet) -> RelayDecision: if packet.signal.rssi < self._min_rssi: return RelayDecision(False, "signal_too_weak") + if source_id in self._priority_list: + if self._limiter.allow_priority(): + return RelayDecision(True, "approved_priority") + if not self._limiter.allow(): return RelayDecision(False, "rate_limited") diff --git a/tests/test_relay_filters.py b/tests/test_relay_filters.py new file mode 100644 index 0000000..0b208e2 --- /dev/null +++ b/tests/test_relay_filters.py @@ -0,0 +1,179 @@ +"""Relay blocklist, priority list, and dedup TTL controls.""" + +from __future__ import annotations + +import unittest +from unittest.mock import MagicMock, patch + +from src.config import RelayConfig +from src.models.packet import Packet, PacketType, Protocol +from src.models.signal import SignalMetrics +from src.relay.dedup_filter import DeduplicationFilter +from src.relay.node_id import validate_node_ids +from src.relay.rate_limiter import RateLimiter +from src.relay.relay_manager import RelayManager + + +def _relay_packet( + source_id: str = "a3f2b1c0", + packet_id: str = "pkt001", +) -> Packet: + return Packet( + packet_id=packet_id, + source_id=source_id, + destination_id="ffffffff", + protocol=Protocol.MESHTASTIC, + packet_type=PacketType.TEXT, + hop_limit=2, + hop_start=3, + signal=SignalMetrics( + rssi=-95.0, + snr=5.0, + frequency_mhz=906.875, + spreading_factor=11, + bandwidth_khz=250.0, + ), + ) + + +class TestRelayManagerFilters(unittest.TestCase): + def test_blocklist_rejects_before_dedup(self): + manager = RelayManager( + enabled=True, + blocklist=["a3f2b1c0"], + ) + pkt = _relay_packet(source_id="a3f2b1c0") + decision = manager.evaluate(pkt) + self.assertFalse(decision.should_relay) + self.assertEqual(decision.reason, "blocklisted") + + def test_priority_bypasses_burst_gate(self): + manager = RelayManager( + enabled=True, + max_relay_per_minute=10, + burst_size=1, + priority_list=["deadbeef"], + ) + first = manager.evaluate(_relay_packet(source_id="11111111", packet_id="p1")) + second = manager.evaluate(_relay_packet(source_id="22222222", packet_id="p2")) + self.assertTrue(first.should_relay) + self.assertFalse(second.should_relay) + self.assertEqual(second.reason, "rate_limited") + + priority = manager.evaluate( + _relay_packet(source_id="deadbeef", packet_id="p3"), + ) + self.assertTrue(priority.should_relay) + self.assertEqual(priority.reason, "approved_priority") + + def test_reload_filters_updates_runtime_state(self): + manager = RelayManager(enabled=True, dedup_ttl_seconds=300) + manager.reload_filters( + blocklist=["abcdef01"], + priority_list=["12345678"], + dedup_ttl_seconds=60, + ) + self.assertEqual(manager._dedup.ttl_seconds, 60) + pkt = _relay_packet(source_id="abcdef01") + self.assertEqual(manager.evaluate(pkt).reason, "blocklisted") + + def test_dedup_ttl_is_configurable(self): + dedup = DeduplicationFilter(ttl_seconds=10.0) + dedup.set_ttl(600) + self.assertEqual(dedup.ttl_seconds, 600) + + +class TestRateLimiterPriority(unittest.TestCase): + def test_allow_priority_skips_burst_only(self): + limiter = RateLimiter(max_per_minute=5, burst_size=1) + self.assertTrue(limiter.allow()) + self.assertFalse(limiter.allow()) + self.assertTrue(limiter.allow_priority()) + + +class TestValidateNodeIds(unittest.TestCase): + def test_accepts_normalized_ids(self): + ids = validate_node_ids(["!A3F2B1C0", "deadbeef"]) + self.assertEqual(ids, ["a3f2b1c0", "deadbeef"]) + + def test_rejects_invalid_id(self): + with self.assertRaises(ValueError): + validate_node_ids(["not-a-node"]) + + +class TestRelayConfigApi(unittest.TestCase): + def setUp(self): + try: + from fastapi import FastAPI + from fastapi.testclient import TestClient + from src.api.auth.dependencies import require_admin + from src.api.auth.jwt_session import ROLE_ADMIN, SessionClaims + from src.api.routes import system_config_routes as relay_cfg_module + except ImportError as exc: + raise unittest.SkipTest(f"API test deps unavailable: {exc}") from exc + + def _admin_claims() -> SessionClaims: + return SessionClaims(subject="admin", role=ROLE_ADMIN, session_version=1) + + cfg = MagicMock() + cfg.relay = RelayConfig( + enabled=True, + blocklist=[], + priority_list=[], + dedup_ttl_seconds=300, + ) + self.manager = RelayManager(enabled=True) + relay_cfg_module._config = cfg + relay_cfg_module._relay_manager = self.manager + self.app = FastAPI() + self.app.dependency_overrides[require_admin] = _admin_claims + from src.api.audit.dependencies import get_audit_writer + + audit = MagicMock() + + class _Ctx: + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + return False + + audit.timed_action.return_value = _Ctx() + self.app.dependency_overrides[get_audit_writer] = lambda: audit + self.app.include_router(relay_cfg_module.router) + self.client = TestClient(self.app) + self._relay_cfg_module = relay_cfg_module + + def tearDown(self): + if hasattr(self, "_relay_cfg_module"): + self._relay_cfg_module.reset_routes() + + def test_rejects_invalid_blocklist_id(self): + resp = self.client.put( + "/api/config/relay", + json={"blocklist": ["bad-id"]}, + ) + self.assertEqual(resp.status_code, 400) + + @patch("src.api.routes.system_config_routes.save_section_to_yaml") + def test_filter_update_hot_reloads_without_restart(self, mock_save): + resp = self.client.put( + "/api/config/relay", + json={ + "blocklist": ["a3f2b1c0"], + "priority_list": ["deadbeef"], + "dedup_ttl_seconds": 120, + }, + ) + self.assertEqual(resp.status_code, 200) + body = resp.json() + self.assertTrue(body["saved"]) + self.assertFalse(body["restart_required"]) + mock_save.assert_called_once() + pkt = _relay_packet(source_id="a3f2b1c0") + self.assertEqual(self.manager.evaluate(pkt).reason, "blocklisted") + self.assertEqual(self.manager._dedup.ttl_seconds, 120) + + +if __name__ == "__main__": + unittest.main()