diff --git a/config/default.yaml b/config/default.yaml index b8f7bad..374989d 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -60,6 +60,8 @@ relay: burst_size: 5 min_relay_rssi: -110.0 max_relay_rssi: -50.0 + # channel_throttle_percent: # per-channel relay duty budget (est. ToA) + # "0": 100 # omit = 100% all channels; EU capped at 1% upstream: enabled: true diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index f12b077..ff5c1c8 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -405,6 +405,19 @@ relay: The relay path is independent from RX: transmission never blocks packet reception. Packets are deduplicated by ID, rate-limited, and filtered by signal strength before relay. +### Per-channel relay throttle (est.) + +Optional rolling 1-hour ToA budget per Meshtastic channel index. **Relay TX only** — does not change native `transmit` messaging. Omitted channels default to 100%. + +```yaml +relay: + channel_throttle_percent: + "0": 100 + "1": 50 +``` + +On EU868 the effective ceiling is capped at the 1% regional hint. Values are **estimates**, not spectrum-analyser measurements. Configure from **Configuration → Advanced → Relay channel throttle** or edit `local.yaml`. + --- ## Transmit (Native Messaging) @@ -717,6 +730,7 @@ relay: # experimental: re-broadcast captured packets via USB rad burst_size: 5 min_relay_rssi: -110.0 max_relay_rssi: -50.0 + channel_throttle_percent: {} # per-channel relay ToA budget (est.); omit = 100% upstream: # cloud (Meshradar) connection enabled: true diff --git a/frontend/css/configuration.css b/frontend/css/configuration.css index 70b484c..ac5fe46 100644 --- a/frontend/css/configuration.css +++ b/frontend/css/configuration.css @@ -41,19 +41,6 @@ line-height: 1.5; } -.cfg-card__hint--nested { - margin: -4px 0 4px; -} - -.cfg-inline-link { - color: var(--accent-cyan, #22d3ee); - text-decoration: none; -} - -.cfg-inline-link:hover { - text-decoration: underline; -} - .cfg-form { display: flex; flex-direction: column; @@ -242,6 +229,92 @@ select.cfg-field__input option:checked { transform: translateX(-50%) translateY(0); } +.cfg-id-list { + display: flex; + flex-direction: column; + gap: 0.35rem; + margin-bottom: 0.5rem; +} + +.cfg-id-empty { + margin: 0; + font-size: 12px; + color: rgba(243, 244, 246, 0.45); + font-style: italic; +} + +.cfg-id-row { + display: flex; + align-items: center; + gap: 0.5rem; +} + +.cfg-id-chip { + font-family: 'JetBrains Mono', Menlo, monospace; + font-size: 12px; + padding: 0.15rem 0.45rem; + background: rgba(6, 182, 212, 0.1); + border: 1px solid rgba(6, 182, 212, 0.25); + border-radius: 6px; + color: #e2e8f0; +} + +.cfg-id-remove { + background: transparent; + border: none; + color: rgba(243, 244, 246, 0.5); + font-size: 1.1rem; + line-height: 1; + cursor: pointer; + padding: 0 0.25rem; +} + +.cfg-id-remove:hover { + color: #ef4444; +} + +.cfg-id-add { + display: flex; + gap: 0.5rem; + align-items: center; +} + +.cfg-id-add .cfg-field__input { + flex: 1; +} + +.cfg-throttle-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(220px, 1fr)); + gap: 0.5rem 1rem; + margin-top: 0.5rem; +} + +.cfg-throttle-row { + display: grid; + grid-template-columns: 2.5rem 1fr 3rem; + align-items: center; + gap: 0.5rem; + font-size: 12px; +} + +.cfg-throttle-label { + color: rgba(243, 244, 246, 0.65); + font-family: 'JetBrains Mono', Menlo, monospace; +} + +.cfg-throttle-range { + width: 100%; + accent-color: var(--brand-cyan, #06b6d4); +} + +.cfg-throttle-value { + text-align: right; + color: var(--brand-amber, #ffb84d); + font-family: 'JetBrains Mono', Menlo, monospace; + font-size: 11px; +} + @keyframes cfgFadeIn { from { opacity: 0; transform: translateY(8px); } to { opacity: 1; transform: translateY(0); } diff --git a/frontend/css/stats.css b/frontend/css/stats.css index 82eadef..21c4710 100644 --- a/frontend/css/stats.css +++ b/frontend/css/stats.css @@ -160,6 +160,68 @@ max-height: 220px; } +.stats-card--hourly { + min-height: 300px; +} + +.stats-card--hourly canvas { + max-height: 260px; +} + +.stats-duty-meter { + display: flex; + flex-direction: column; + gap: 0.45rem; + margin-top: 0.5rem; +} + +.stats-duty-row { + display: grid; + grid-template-columns: 2.5rem 1fr 5.5rem; + align-items: center; + gap: 0.5rem; + font-size: 11px; +} + +.stats-duty-ch { + color: var(--text-secondary, #94a3b8); + font-family: 'JetBrains Mono', monospace; +} + +.stats-duty-bar { + height: 8px; + background: rgba(30, 41, 59, 0.8); + border-radius: 4px; + overflow: hidden; +} + +.stats-duty-fill { + height: 100%; + background: linear-gradient(90deg, #06b6d4, #f59e0b); + border-radius: 4px; + transition: width 0.3s ease; +} + +.stats-duty-val { + text-align: right; + color: var(--text-secondary, #94a3b8); + font-family: 'JetBrains Mono', monospace; + font-size: 10px; +} + +.stats-duty-empty { + margin: 0; + font-size: 12px; + color: var(--text-secondary, #94a3b8); + font-style: italic; +} + +.stats-duty-total { + margin: 0.35rem 0 0; + font-size: 12px; + color: var(--text-primary, #e2e8f0); +} + /* --- Range cards --- */ .stats-range-grid { display: grid; diff --git a/frontend/index.html b/frontend/index.html index 239398c..5d036b0 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -718,6 +718,7 @@

Service actions

+ diff --git a/frontend/js/configuration/configuration_panel.js b/frontend/js/configuration/configuration_panel.js index 6972cc1..474faac 100644 --- a/frontend/js/configuration/configuration_panel.js +++ b/frontend/js/configuration/configuration_panel.js @@ -121,13 +121,21 @@ class ConfigurationPanel { card.mount(host); this._cards.set('gps', card); } - } else if (section === 'advanced' && window.AdvancedConfigCard) { + } else if (section === 'advanced') { const host = document.getElementById('cfg-advanced-panel'); if (host) { - host.innerHTML = ''; - const card = new window.AdvancedConfigCard(api); - card.mount(host); - this._cards.set('advanced', card); + host.innerHTML = '
'; + const mount = host.querySelector('[data-adv-mount]'); + if (window.AdvancedConfigCard) { + const card = new window.AdvancedConfigCard(api); + card.mount(mount); + this._cards.set('advanced', card); + } + if (window.RelayThrottleCard) { + const throttle = new window.RelayThrottleCard(api); + throttle.mount(mount); + this._cards.set('relay-throttle', throttle); + } } } this._mounted.add(section); diff --git a/frontend/js/configuration/relay_throttle_card.js b/frontend/js/configuration/relay_throttle_card.js new file mode 100644 index 0000000..0915b9b --- /dev/null +++ b/frontend/js/configuration/relay_throttle_card.js @@ -0,0 +1,105 @@ +/** + * Configuration → Advanced — per-channel relay duty throttle (est.). + */ + +class RelayThrottleCard { + constructor(api) { + this._api = api; + this._root = null; + this._throttle = {}; + } + + mount(root) { + this._root = root; + this._root.innerHTML = ` +
+
+

Relay channel throttle (est.)

+

+ Rolling 1 h ToA budget per channel. Relay TX only — does not limit native messaging. + EU868 capped at 1% regulatory ceiling. +

+
+
+
+
+ +
+

+
+
+ `; + this._throttleGrid = this._root.querySelector('[data-throttle-grid]'); + this._statusEl = this._root.querySelector('[data-relay-throttle-status]'); + this._root.querySelector('[data-relay-throttle-form]') + .addEventListener('submit', (e) => this._onSubmit(e)); + this._paintThrottleGrid(); + } + + render(config) { + const relay = config.relay || {}; + this._throttle = { ...(relay.channel_throttle_percent || {}) }; + this._paintThrottleGrid(); + } + + _paintThrottleGrid() { + if (!this._throttleGrid) return; + const rows = []; + for (let ch = 0; ch <= 7; ch += 1) { + const key = String(ch); + const value = this._throttle[key] != null ? this._throttle[key] : 100; + rows.push(` + + `); + } + this._throttleGrid.innerHTML = rows.join(''); + this._throttleGrid.querySelectorAll('[data-throttle-ch]').forEach((input) => { + input.addEventListener('input', () => { + const ch = input.dataset.throttleCh; + const pct = Number(input.value); + this._throttle[ch] = pct; + const valEl = this._throttleGrid.querySelector(`[data-throttle-val="${ch}"]`); + if (valEl) valEl.textContent = `${pct}%`; + }); + }); + } + + _throttlePayload() { + const payload = {}; + for (let ch = 0; ch <= 7; ch += 1) { + const key = String(ch); + const pct = Number(this._throttle[key] != null ? this._throttle[key] : 100); + if (pct !== 100) payload[key] = pct; + } + return payload; + } + + async _onSubmit(event) { + event.preventDefault(); + this._setStatus('pending', 'Saving…'); + const result = await this._api.put('/api/config/relay', { + channel_throttle_percent: this._throttlePayload(), + }); + if (!result) { + this._setStatus('error', 'Save failed.'); + return; + } + this._setStatus('success', 'Saved.'); + this._api.toast('Relay throttle applied (no restart required).'); + this._api.refresh(); + } + + _setStatus(kind, message) { + if (!this._statusEl) return; + this._statusEl.dataset.kind = kind; + this._statusEl.textContent = message; + } +} + +window.RelayThrottleCard = RelayThrottleCard; diff --git a/frontend/js/stats_tab.js b/frontend/js/stats_tab.js index bafc368..1f81930 100644 --- a/frontend/js/stats_tab.js +++ b/frontend/js/stats_tab.js @@ -69,6 +69,7 @@ class StatsTab { this._container = document.getElementById(containerId); this._charts = {}; this._refreshInterval = null; + this._dutyInterval = null; this._rendered = false; } @@ -93,9 +94,47 @@ class StatsTab { } else { clearInterval(this._refreshInterval); this._refreshInterval = null; + this._stopDutyRefresh(); } }, 15000); } + + this._ensureDutyRefresh(); + } + + async refreshDutyCycle() { + try { + const res = await fetch('/api/stats/duty_cycle'); + if (!res.ok) return; + const data = await res.json(); + this._updateDutyMeter(data); + } catch (e) { + console.error('Duty cycle refresh failed:', e); + } + } + + _ensureDutyRefresh() { + const section = document.querySelector('[data-section="stats"]'); + if (!section || !section.classList.contains('section--active')) return; + + this.refreshDutyCycle(); + if (!this._dutyInterval) { + this._dutyInterval = setInterval(() => { + const active = document.querySelector('[data-section="stats"]'); + if (active && active.classList.contains('section--active')) { + this.refreshDutyCycle(); + } else { + this._stopDutyRefresh(); + } + }, 60000); + } + } + + _stopDutyRefresh() { + if (this._dutyInterval) { + clearInterval(this._dutyInterval); + this._dutyInterval = null; + } } _buildLayout() { @@ -254,6 +293,11 @@ class StatsTab {
Why packets were not relayed
+
+
Relay duty budget (est.)
+
Per-channel rolling 1 h ToA usage vs throttle
+
+
@@ -500,6 +544,46 @@ class StatsTab { this._renderHorizontalBar('sc-reject', labels, values, '#ef4444'); } + _updateDutyMeter(data) { + const el = document.getElementById('sc-duty-meter'); + const desc = document.getElementById('sc-duty-desc'); + if (!el) return; + + const channels = data.channels || []; + const ceiling = data.regulatory_ceiling_percent; + let descText = 'Per-channel rolling 1 h relay ToA usage vs throttle (est.)'; + if (ceiling != null) { + descText += ` — EU regulatory ceiling ${ceiling}%`; + } + if (desc) desc.textContent = descText; + + const visible = channels.filter( + (c) => c.used_toa_ms_estimated > 0 || c.throttle_percent < 100, + ); + const rows = (visible.length ? visible : channels.slice(0, 4)).map((c) => { + const limit = c.effective_limit_percent || 100; + const fill = limit > 0 + ? Math.min(100, (c.usage_percent / limit) * 100) + : 0; + return ` +
+ Ch ${c.channel} +
+
+
+ ${c.usage_percent}% / ${limit}% +
+ `; + }).join(''); + + const total = data.relay_total_usage_percent != null + ? data.relay_total_usage_percent + : 0; + el.innerHTML = `${rows || '

No relay airtime in the current window.

'}` + + `

Aggregate relay usage (est.): ` + + `${total}%

`; + } + _renderDoughnut(canvasId, labels, values, colors, centerText) { const centerPlugin = centerText != null ? { id: `center-${canvasId}`, diff --git a/src/analytics/toa_estimate.py b/src/analytics/toa_estimate.py new file mode 100644 index 0000000..0fcc6b0 --- /dev/null +++ b/src/analytics/toa_estimate.py @@ -0,0 +1,103 @@ +"""LoRa time-on-air estimates for RX traffic analytics. + +Shared by the 24-hour traffic histogram (PR 2) and future duty-cycle +work (PR 7). Uses the same fallback formula as ``TxService._estimate_airtime`` +so dashboard estimates stay consistent with TX budgeting when HAL ToA +is unavailable. +""" + +from __future__ import annotations + +DEFAULT_PAYLOAD_BYTES = 40 +MIN_PAYLOAD_BYTES = 20 + + +def estimate_toa_ms( + spreading_factor: int, + bandwidth_khz: float, + *, + preamble_length: int = 16, + payload_bytes: int = DEFAULT_PAYLOAD_BYTES, +) -> int: + """Rough airtime per packet in milliseconds. + + Args: + spreading_factor: LoRa SF (7–12 typical). + bandwidth_khz: Channel bandwidth in kHz. + preamble_length: Preamble symbol count from radio config. + payload_bytes: Payload size used for the symbol estimate. + """ + if spreading_factor <= 0 or bandwidth_khz <= 0: + return 0 + + payload = max(MIN_PAYLOAD_BYTES, int(payload_bytes)) + symbol_time_ms = (2 ** spreading_factor) / bandwidth_khz + n_symbols = 8 + max( + ( + (8 * payload - 4 * spreading_factor + 28 + 16) + // (4 * spreading_factor) + ) + * 5 + + 8, + 0, + ) + return int((preamble_length + n_symbols) * symbol_time_ms) + + +def estimate_packet_toa_ms( + packet, + *, + default_sf: int, + default_bw_khz: float, + default_preamble: int = 16, +) -> int: + """Estimate airtime for a single packet using signal metadata when present.""" + sf = default_sf + bw = default_bw_khz + signal = getattr(packet, "signal", None) + if signal is not None: + if getattr(signal, "spreading_factor", None): + sf = int(signal.spreading_factor) + if getattr(signal, "bandwidth_khz", None): + bw = float(signal.bandwidth_khz) + + payload_bytes = DEFAULT_PAYLOAD_BYTES + raw = getattr(packet, "raw_app_payload", None) + if raw: + payload_bytes = len(raw) + else: + decoded = getattr(packet, "decoded_payload", None) or {} + text = decoded.get("text") + if text: + payload_bytes = len(str(text).encode("utf-8")) + + return estimate_toa_ms( + sf, + bw, + preamble_length=default_preamble, + payload_bytes=payload_bytes, + ) + + +def sum_hourly_toa_ms( + buckets: list[dict], + *, + default_sf: int, + default_bw_khz: float, + default_preamble: int = 16, + default_payload_bytes: int = DEFAULT_PAYLOAD_BYTES, +) -> int: + """Sum estimated airtime for modem-grouped hourly buckets.""" + total = 0 + for row in buckets: + sf = int(row.get("sf") or default_sf) + bw = float(row.get("bw") or default_bw_khz) + count = int(row.get("packet_count") or 0) + payload = int(row.get("avg_payload") or default_payload_bytes) + total += count * estimate_toa_ms( + sf, + bw, + preamble_length=default_preamble, + payload_bytes=payload, + ) + return total diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py index 5f0ae3a..b687649 100644 --- a/src/api/routes/config_enrichment.py +++ b/src/api/routes/config_enrichment.py @@ -52,6 +52,7 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict: "burst_size": relay.burst_size, "min_relay_rssi": relay.min_relay_rssi, "max_relay_rssi": relay.max_relay_rssi, + "channel_throttle_percent": dict(relay.channel_throttle_percent or {}), } base["radio_advanced"] = { "spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds, diff --git a/src/api/routes/stats_routes.py b/src/api/routes/stats_routes.py index a8e5c06..6df7b72 100644 --- a/src/api/routes/stats_routes.py +++ b/src/api/routes/stats_routes.py @@ -108,6 +108,23 @@ async def stats_summary(): } +@router.get("/duty_cycle") +async def stats_duty_cycle(): + """Live per-channel relay duty budget (ToA estimates).""" + if _relay_manager is None: + return { + "region": "US", + "regulatory_ceiling_percent": None, + "window_seconds": 3600, + "estimate_note": ( + "ToA estimates — not spectrum analyser measurements" + ), + "relay_total_usage_percent": 0.0, + "channels": [], + } + return _relay_manager.get_stats().get("channel_budget", {}) + + def _get_device_context() -> dict: try: config = load_config() diff --git a/src/api/routes/system_config_routes.py b/src/api/routes/system_config_routes.py index fa2a14f..12af610 100644 --- a/src/api/routes/system_config_routes.py +++ b/src/api/routes/system_config_routes.py @@ -13,22 +13,30 @@ from src.api.auth.dependencies import require_admin from src.api.auth.jwt_session import SessionClaims from src.config import AppConfig, save_section_to_yaml +from src.relay.channel_budget import normalize_channel_throttle +from src.relay.relay_manager import RelayManager logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/config", tags=["config"]) _config: AppConfig | None = None +_relay_manager: RelayManager | None = None -def init_routes(config: AppConfig) -> None: - global _config +def init_routes( + config: AppConfig, + relay_manager: RelayManager | None = None, +) -> None: + global _config, _relay_manager _config = config + _relay_manager = relay_manager def reset_routes() -> None: - global _config + global _config, _relay_manager _config = None + _relay_manager = None class StorageUpdate(BaseModel): @@ -51,6 +59,7 @@ class RelayUpdate(BaseModel): burst_size: Optional[int] = Field(None, ge=1, le=50) min_relay_rssi: Optional[float] = Field(None, ge=-150, le=0) max_relay_rssi: Optional[float] = Field(None, ge=-150, le=0) + channel_throttle_percent: Optional[dict[str, float]] = None class RadioAdvancedUpdate(BaseModel): @@ -194,6 +203,16 @@ async def update_relay( raise HTTPException(400, "max_relay_rssi must be greater than min_relay_rssi") relay.max_relay_rssi = req.max_relay_rssi updates["max_relay_rssi"] = req.max_relay_rssi + throttle_reload = False + if req.channel_throttle_percent is not None: + try: + relay.channel_throttle_percent = normalize_channel_throttle( + req.channel_throttle_percent + ) + except ValueError as exc: + raise HTTPException(400, str(exc)) from exc + updates["channel_throttle_percent"] = relay.channel_throttle_percent + throttle_reload = True if not updates: return {"saved": False, "restart_required": False} @@ -206,6 +225,13 @@ async def update_relay( except PermissionError as exc: raise HTTPException(403, str(exc)) from exc + if throttle_reload and _relay_manager is not None: + region = _config.radio.region if _config else None + _relay_manager.reload_channel_budget( + channel_throttle_percent=relay.channel_throttle_percent, + region=region, + ) + return {"saved": True, "restart_required": restart_needed, "updates": updates} diff --git a/src/api/server.py b/src/api/server.py index 3bd4fb0..a3fe504 100644 --- a/src/api/server.py +++ b/src/api/server.py @@ -1265,7 +1265,10 @@ def _init_routes( upstream_config_routes.init_routes(config=config) device_config_routes.init_routes(config=config, identity=identity) gps_status.init_routes(location_source=coord.location_source) - system_config_routes.init_routes(config=config) + system_config_routes.init_routes( + config=config, + relay_manager=coord.relay_manager, + ) meshcore_config_routes.init_routes(config=config, tx_service=tx_service) diff --git a/src/config.py b/src/config.py index 4622e2b..bd75faa 100644 --- a/src/config.py +++ b/src/config.py @@ -150,6 +150,7 @@ class RelayConfig: burst_size: int = 5 min_relay_rssi: float = -110.0 max_relay_rssi: float = -50.0 + channel_throttle_percent: dict[str, float] = field(default_factory=dict) @dataclass diff --git a/src/coordinator.py b/src/coordinator.py index 665e9ca..e09c4c6 100644 --- a/src/coordinator.py +++ b/src/coordinator.py @@ -43,12 +43,18 @@ def __init__(self, config: AppConfig): self._router = PacketRouter(self._crypto) self._capture = CaptureCoordinator() relay_cfg = config.relay + radio = config.radio self._relay = RelayManager( enabled=relay_cfg.enabled, max_relay_per_minute=relay_cfg.max_relay_per_minute, burst_size=relay_cfg.burst_size, min_relay_rssi=relay_cfg.min_relay_rssi, max_relay_rssi=relay_cfg.max_relay_rssi, + channel_throttle_percent=relay_cfg.channel_throttle_percent, + region=radio.region, + default_sf=radio.spreading_factor, + default_bw_khz=radio.bandwidth_khz, + default_preamble=radio.preamble_length, ) self._transmitter: Optional[MeshtasticTransmitter] = None self._mqtt: Optional[MqttPublisher] = None diff --git a/src/relay/channel_budget.py b/src/relay/channel_budget.py new file mode 100644 index 0000000..22e8ab5 --- /dev/null +++ b/src/relay/channel_budget.py @@ -0,0 +1,223 @@ +"""Per-channel rolling ToA budget for relay TX only. + +Tracks estimated airtime per Meshtastic channel index over a sliding +window and enforces operator throttle percentages. Does not affect +native concentrator TX (``transmit.enabled``). +""" + +from __future__ import annotations + +import time +from collections import defaultdict, deque +from dataclasses import dataclass + +from src.analytics.toa_estimate import estimate_packet_toa_ms +from src.models.packet import Packet +from src.transmit.duty_cycle import DEFAULT_WINDOW_SECONDS, DUTY_CYCLE_LIMITS + +_MAX_CHANNEL = 7 + + +def resolve_relay_regulatory_ceiling(region: str) -> float | None: + """Return regional duty ceiling when below 100%, else operator-only.""" + limit = DUTY_CYCLE_LIMITS.get(region, 1.0) + if limit >= 100.0: + return None + return limit + + +def normalize_channel_throttle( + throttle_percent: dict[str, float] | None, +) -> dict[str, float]: + """Validate and normalize channel throttle map (keys 0–7, values 1–100).""" + if not throttle_percent: + return {} + normalized: dict[str, float] = {} + for key, value in throttle_percent.items(): + ch = str(key).strip() + if not ch.isdigit() or int(ch) < 0 or int(ch) > _MAX_CHANNEL: + raise ValueError(f"invalid channel throttle key: {key!r}") + pct = float(value) + if not 1.0 <= pct <= 100.0: + raise ValueError( + f"channel {ch} throttle must be between 1 and 100 percent" + ) + normalized[ch] = pct + return normalized + + +def throttle_percent_for( + channel: int, + throttle_percent: dict[str, float], +) -> float: + """Configured throttle for *channel*; default 100% when omitted.""" + return float(throttle_percent.get(str(channel), 100.0)) + + +def effective_limit_percent( + channel: int, + throttle_percent: dict[str, float], + regulatory_ceiling: float | None, +) -> float: + """Operator throttle capped by regional ceiling when present.""" + throttle = throttle_percent_for(channel, throttle_percent) + if regulatory_ceiling is None: + return throttle + return min(throttle, regulatory_ceiling) + + +@dataclass +class _TxRecord: + timestamp: float + airtime_ms: int + + +class ChannelBudget: + """Rolling per-channel relay airtime budget.""" + + def __init__( + self, + *, + throttle_percent: dict[str, float] | None = None, + region: str = "US", + window_seconds: int = DEFAULT_WINDOW_SECONDS, + default_sf: int = 11, + default_bw_khz: float = 250.0, + default_preamble: int = 16, + ): + self._throttle = normalize_channel_throttle(throttle_percent) + self._region = region + self._regulatory_ceiling = resolve_relay_regulatory_ceiling(region) + self._window_seconds = window_seconds + self._default_sf = default_sf + self._default_bw_khz = default_bw_khz + self._default_preamble = default_preamble + self._records: dict[int, deque[_TxRecord]] = defaultdict(deque) + + @property + def region(self) -> str: + return self._region + + @property + def regulatory_ceiling_percent(self) -> float | None: + return self._regulatory_ceiling + + @property + def window_seconds(self) -> int: + return self._window_seconds + + def reload( + self, + *, + throttle_percent: dict[str, float] | None = None, + region: str | None = None, + ) -> None: + if throttle_percent is not None: + self._throttle = normalize_channel_throttle(throttle_percent) + if region is not None: + self._region = region + self._regulatory_ceiling = resolve_relay_regulatory_ceiling(region) + + def estimate_packet_toa_ms(self, packet: Packet) -> int: + return estimate_packet_toa_ms( + packet, + default_sf=self._default_sf, + default_bw_khz=self._default_bw_khz, + default_preamble=self._default_preamble, + ) + + def _channel_index(self, packet: Packet) -> int: + ch = int(packet.channel_hash or 0) + return max(0, min(_MAX_CHANNEL, ch)) + + def check_packet(self, packet: Packet) -> bool: + channel = self._channel_index(packet) + return self.check_budget(channel, self.estimate_packet_toa_ms(packet)) + + def check_budget(self, channel: int, airtime_ms: int) -> bool: + channel = max(0, min(_MAX_CHANNEL, int(channel))) + self._prune(channel) + limit_pct = effective_limit_percent( + channel, self._throttle, self._regulatory_ceiling + ) + window_ms = self._window_seconds * 1000 + current_ms = sum(r.airtime_ms for r in self._records[channel]) + projected = ((current_ms + airtime_ms) / window_ms) * 100 + return projected <= limit_pct + + def record_packet(self, packet: Packet) -> None: + channel = self._channel_index(packet) + self.record_tx(channel, self.estimate_packet_toa_ms(packet)) + + def record_tx(self, channel: int, airtime_ms: int) -> None: + channel = max(0, min(_MAX_CHANNEL, int(channel))) + self._records[channel].append( + _TxRecord(timestamp=time.monotonic(), airtime_ms=airtime_ms) + ) + self._prune(channel) + + def channel_status(self, channel: int) -> dict: + channel = max(0, min(_MAX_CHANNEL, int(channel))) + self._prune(channel) + throttle = throttle_percent_for(channel, self._throttle) + limit_pct = effective_limit_percent( + channel, self._throttle, self._regulatory_ceiling + ) + used_ms = sum(r.airtime_ms for r in self._records[channel]) + window_ms = self._window_seconds * 1000 + budget_ms = int(window_ms * limit_pct / 100) + usage_pct = round((used_ms / window_ms) * 100, 2) if window_ms else 0.0 + return { + "channel": channel, + "throttle_percent": throttle, + "effective_limit_percent": limit_pct, + "usage_percent": usage_pct, + "used_toa_ms_estimated": used_ms, + "budget_toa_ms_estimated": budget_ms, + "remaining_toa_ms_estimated": max(0, budget_ms - used_ms), + } + + def all_channel_status(self) -> list[dict]: + return [self.channel_status(ch) for ch in range(_MAX_CHANNEL + 1)] + + def summary(self) -> dict: + channels = self.all_channel_status() + total_used = sum(c["used_toa_ms_estimated"] for c in channels) + total_budget = sum(c["budget_toa_ms_estimated"] for c in channels) + window_ms = self._window_seconds * 1000 + aggregate_usage = ( + round((total_used / window_ms) * 100, 2) if window_ms else 0.0 + ) + return { + "region": self._region, + "regulatory_ceiling_percent": self._regulatory_ceiling, + "window_seconds": self._window_seconds, + "estimate_note": ( + "ToA estimates — not spectrum analyser measurements" + ), + "relay_total_usage_percent": aggregate_usage, + "channels": channels, + } + + def _prune(self, channel: int) -> None: + cutoff = time.monotonic() - self._window_seconds + records = self._records[channel] + while records and records[0].timestamp < cutoff: + records.popleft() + + +def build_channel_budget( + *, + throttle_percent: dict[str, float] | None, + region: str, + default_sf: int, + default_bw_khz: float, + default_preamble: int = 16, +) -> ChannelBudget: + return ChannelBudget( + throttle_percent=throttle_percent, + region=region, + default_sf=default_sf, + default_bw_khz=default_bw_khz, + default_preamble=default_preamble, + ) diff --git a/src/relay/relay_manager.py b/src/relay/relay_manager.py index 835fc06..c178e74 100644 --- a/src/relay/relay_manager.py +++ b/src/relay/relay_manager.py @@ -5,6 +5,7 @@ from typing import Optional from src.models.packet import Packet, PacketType +from src.relay.channel_budget import ChannelBudget, build_channel_budget from src.relay.dedup_filter import DeduplicationFilter from src.relay.rate_limiter import RateLimiter @@ -51,12 +52,25 @@ def __init__( min_relay_rssi: float = -110.0, max_relay_rssi: float = -50.0, enabled: bool = False, + channel_budget: ChannelBudget | None = None, + channel_throttle_percent: dict[str, float] | None = None, + region: str = "US", + default_sf: int = 11, + default_bw_khz: float = 250.0, + default_preamble: int = 16, ): self._dedup = DeduplicationFilter() self._limiter = RateLimiter(max_relay_per_minute, burst_size) self._min_rssi = min_relay_rssi self._max_rssi = max_relay_rssi self._enabled = enabled + self._channel_budget = channel_budget or build_channel_budget( + throttle_percent=channel_throttle_percent, + region=region, + default_sf=default_sf, + default_bw_khz=default_bw_khz, + default_preamble=default_preamble, + ) self._local_node_hex: str | None = None self._relay_count = 0 self._rejected_count = 0 @@ -80,6 +94,18 @@ def set_transmit_function(self, fn: callable) -> None: """Register the function used to transmit relay packets.""" self._transmit_fn = fn + def reload_channel_budget( + self, + *, + channel_throttle_percent: dict[str, float] | None = None, + region: str | None = None, + ) -> None: + """Hot-reload per-channel relay throttle settings.""" + self._channel_budget.reload( + throttle_percent=channel_throttle_percent, + region=region, + ) + def evaluate(self, packet: Packet) -> RelayDecision: """Decide whether a captured packet should be relayed.""" if not self._enabled: @@ -111,6 +137,9 @@ def evaluate(self, packet: Packet) -> RelayDecision: if not self._limiter.allow(): return RelayDecision(False, "rate_limited") + if not self._channel_budget.check_packet(packet): + return RelayDecision(False, "channel_throttled") + return RelayDecision(True, "approved") async def process_packet(self, packet: Packet) -> None: @@ -161,6 +190,7 @@ async def _relay(self, packet: Packet) -> None: # Sync transmit (legacy USB-companion path) blocks on # serial I/O, so it must run off the event loop. await asyncio.to_thread(self._transmit_fn, packet) + self._channel_budget.record_packet(packet) except Exception: logger.exception("Relay transmission failed") @@ -173,4 +203,5 @@ def get_stats(self) -> dict: "dedup_cache_size": self._dedup.size, "rate_remaining": self._limiter.remaining_capacity, "current_rate": self._limiter.current_rate, + "channel_budget": self._channel_budget.summary(), } diff --git a/tests/test_relay_duty_cycle.py b/tests/test_relay_duty_cycle.py new file mode 100644 index 0000000..cfb673a --- /dev/null +++ b/tests/test_relay_duty_cycle.py @@ -0,0 +1,151 @@ +"""Per-channel relay duty budget and throttle controls.""" + +from __future__ import annotations + +import unittest + +from src.models.packet import Packet, PacketType, Protocol +from src.models.signal import SignalMetrics +from src.relay.channel_budget import ( + ChannelBudget, + effective_limit_percent, + normalize_channel_throttle, + resolve_relay_regulatory_ceiling, +) +from src.relay.relay_manager import RelayManager + + +def _relay_packet( + *, + source_id: str = "a3f2b1c0", + packet_id: str = "pkt001", + channel_hash: int = 0, +) -> Packet: + return Packet( + packet_id=packet_id, + source_id=source_id, + destination_id="ffffffff", + protocol=Protocol.MESHTASTIC, + packet_type=PacketType.TEXT, + hop_limit=2, + hop_start=3, + channel_hash=channel_hash, + signal=SignalMetrics( + rssi=-95.0, + snr=5.0, + frequency_mhz=906.875, + spreading_factor=11, + bandwidth_khz=250.0, + ), + ) + + +class TestChannelBudget(unittest.TestCase): + def test_omitted_throttle_defaults_to_full_budget(self) -> None: + budget = ChannelBudget(region="US", window_seconds=3600) + toa = budget.estimate_packet_toa_ms(_relay_packet()) + self.assertTrue(budget.check_budget(0, toa)) + + def test_fifty_percent_throttle_blocks_after_budget_exhausted(self) -> None: + budget = ChannelBudget( + throttle_percent={"0": 50}, + region="US", + window_seconds=3600, + ) + toa = budget.estimate_packet_toa_ms(_relay_packet()) + window_ms = 3600 * 1000 + max_ms = int(window_ms * 0.5) + packets_to_fill = max(1, max_ms // max(toa, 1)) + + for i in range(packets_to_fill): + self.assertTrue( + budget.check_budget(0, toa), + f"packet {i} should fit in 50% budget", + ) + budget.record_tx(0, toa) + + self.assertFalse(budget.check_budget(0, toa)) + + def test_eu_regulatory_ceiling_caps_effective_limit(self) -> None: + self.assertEqual(resolve_relay_regulatory_ceiling("EU_868"), 1.0) + self.assertIsNone(resolve_relay_regulatory_ceiling("US")) + self.assertEqual( + effective_limit_percent(0, {"0": 100}, 1.0), + 1.0, + ) + self.assertEqual( + effective_limit_percent(0, {"0": 50}, 1.0), + 1.0, + ) + + def test_normalize_rejects_invalid_channel(self) -> None: + with self.assertRaises(ValueError): + normalize_channel_throttle({"99": 50}) + + def test_summary_includes_all_channels(self) -> None: + budget = ChannelBudget(throttle_percent={"1": 75}, region="US") + summary = budget.summary() + self.assertEqual(len(summary["channels"]), 8) + self.assertEqual(summary["channels"][1]["throttle_percent"], 75) + + +class TestRelayManagerDutyCycle(unittest.TestCase): + def test_channel_throttle_rejects_under_synthetic_load(self) -> None: + manager = RelayManager( + enabled=True, + max_relay_per_minute=10_000, + burst_size=10_000, + channel_throttle_percent={"0": 50}, + region="US", + ) + toa = manager._channel_budget.estimate_packet_toa_ms(_relay_packet()) + window_ms = 3600 * 1000 + max_ms = int(window_ms * 0.5) + packets_to_fill = max(1, max_ms // max(toa, 1)) + + for i in range(packets_to_fill): + pkt = _relay_packet(packet_id=f"p{i}") + decision = manager.evaluate(pkt) + self.assertTrue(decision.should_relay, decision.reason) + manager._channel_budget.record_packet(pkt) + + blocked = manager.evaluate(_relay_packet(packet_id="overflow")) + self.assertFalse(blocked.should_relay) + self.assertEqual(blocked.reason, "channel_throttled") + + def test_full_throttle_allows_more_than_fifty_percent(self) -> None: + full = RelayManager( + enabled=True, + max_relay_per_minute=10_000, + burst_size=10_000, + channel_throttle_percent={}, + region="US", + ) + half = RelayManager( + enabled=True, + max_relay_per_minute=10_000, + burst_size=10_000, + channel_throttle_percent={"0": 50}, + region="US", + ) + toa = full._channel_budget.estimate_packet_toa_ms(_relay_packet()) + window_ms = 3600 * 1000 + half_max = int(window_ms * 0.5) + packets_to_fill = max(1, half_max // max(toa, 1)) + 1 + + for i in range(packets_to_fill): + full._channel_budget.record_tx(0, toa) + half._channel_budget.record_tx(0, toa) + + self.assertTrue(full._channel_budget.check_budget(0, toa)) + self.assertFalse(half._channel_budget.check_budget(0, toa)) + + def test_reload_channel_budget_updates_throttle(self) -> None: + manager = RelayManager(enabled=True, channel_throttle_percent={"0": 100}) + manager.reload_channel_budget(channel_throttle_percent={"0": 1}) + status = manager._channel_budget.channel_status(0) + self.assertEqual(status["effective_limit_percent"], 1) + + +if __name__ == "__main__": + unittest.main()