diff --git a/README.md b/README.md index 6bfd4fa..7807bfb 100644 --- a/README.md +++ b/README.md @@ -266,6 +266,8 @@ See [docs/COMMON-ERRORS.md](docs/COMMON-ERRORS.md#upgrades) if the service fails **Chip version 0x00:** Concentrator not responding. Check that the concentrator module is seated, SPI is enabled (`raspi-config` → Interface Options → SPI), and try a full power cycle (unplug for 10+ seconds). Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). +**`sx1261_check_status` / `lgw_start() failed` on RAK V2:** Clear `radio.sx1261_spi_path` in `local.yaml` (leave it `""`). The SX1261 is not Pi-visible on RAK/SenseCap boards. Use `location.source: uart` for onboard HAT GPS or `gpsd` for USB GPS. See [Common Errors](docs/COMMON-ERRORS.md). + **No packets:** Verify antenna is connected and frequency matches your region. Check `meshpoint logs` for `lgw_receive returned N packet(s)`. **Upstream 401:** Bad API key. Get a free one at [meshradar.io](https://meshradar.io) and re-run `sudo meshpoint setup`. diff --git a/config/default.yaml b/config/default.yaml index b8f7bad..4d8947d 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -11,6 +11,11 @@ radio: sync_word: 0x2B preamble_length: 16 tx_power_dbm: 22 + # HAL GPS/PPS (RAK Pi HAT u-blox → concentrator PPS pin). Off by default. + # gps_pps_enabled: false + # gps_pps_tty_path: "/dev/ttyAMA0" + # gps_family: "ubx7" + # gps_pps_target_baud: 0 # 0 = HAL default (9600) meshtastic: default_key_b64: "AQ==" @@ -129,12 +134,14 @@ location: # and are not overwritten by gpsd. # static : use device.latitude/longitude/altitude (default) # gpsd : poll a local or remote gpsd daemon for live fixes - # uart : reserved (RAK Pi HAT GPS, not yet wired) + # uart : on-board RAK Pi HAT GPS via /dev/ttyAMA0 (NMEA GGA) source: "static" # gpsd connection. Defaults match the well-known local socket; only # change when running gpsd on a peer machine on the LAN. gpsd_host: "127.0.0.1" gpsd_port: 2947 + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 # How often the coordinator wakes to read the active source. update_interval_seconds: 5 # Minimum acceptable fix mode: 0=any (incl. no-fix), 1=2D, 2=3D. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b561b94..d105635 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,12 @@ - **MQTT broker TLS.** Transport TLS (`mqtts`, CA bundle, cert validation) is not implemented on `mqtt_publisher.py` (plain TCP only). Until then use plain port 1883 or a LAN broker without TLS. +- **UART GPS.** `location.source: uart` reads NMEA GGA from the RAK Pi HAT (`/dev/ttyAMA0` by default). Configuration → GPS UART fieldset; `meshpoint setup` can select uart when the wizard probe gets a fix. Requires `pyserial` in the venv (`pip install -r requirements.txt` after upgrade). +- **Concentrator model ID.** Startup logs SX1302 vs SX1303 when `libloragw` exposes `sx1302_get_model_id` (e.g. `Concentrator model ID: 0x12 (SX1303)`). +- **SX1261 guard.** Non-empty `radio.sx1261_spi_path` on RAK/SenseCap carriers (`radio.carrier_type`) is cleared at configure time with an explicit warning, preventing `lgw_start()` failures from misconfigured spectral scan on Pi-invisible SX1261 wiring. +- **SPI preflight.** Missing `/dev/spidev0.0` raises a clear error before `lgw_start()` (raspi-config / spi group hints). +- **GPS PPS (HAL).** Optional `radio.gps_pps_*` enables `lgw_gps_*` / `sx1302_gps_enable` timestamp sync on RAK Pi HATs with PPS wired to the concentrator. `GET /api/device/gps-pps-status`; config rejects sharing the HAT UART with `location.source: uart`. + ### v0.7.6 (June 2026) Meshtastic mesh participant release on `main` (merge `feat/v0.7.6`). Edge-only, pure Python, no concentrator recompile. **Upgrade:** Settings → Updates → **Stable**, or the full SSH block in `docs/COMMON-ERRORS.md` (`git fetch`, `checkout main`, `pull`, `scripts/install.sh`, `restart`). Required this release: new `cryptography` dependency for PKI and an updated `meshpoint.service` unit (RAK V2 reset fix). Pull-only upgrades can miss both. Witness-tested on RAK V2. Settings → Updates RC picker now points at **v0.7.7** on `feat/v0.7.7`. diff --git a/docs/COMMON-ERRORS.md b/docs/COMMON-ERRORS.md index 0616469..6c2f4f1 100644 --- a/docs/COMMON-ERRORS.md +++ b/docs/COMMON-ERRORS.md @@ -430,16 +430,54 @@ disabled in `raspi-config`, or the SPI bus latched after a hard power cut. 3. Full power cycle: `sudo poweroff`, wait for green LED to stop, unplug for 10+ seconds, then plug back in. -Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). +Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). Startup +also logs `Concentrator model ID: 0x12 (SX1303)` when the HAL exposes +`sx1302_get_model_id`. + +### `sx1261_check_status: got:0x00 expected:0x22` / `failed to patch sx1261` + +**Cause:** `radio.sx1261_spi_path` is set on a carrier where the SX1261 +companion chip is **not** wired to a Pi-visible SPI bus (RAK2287, RAK5146, +SenseCap M1, most RAK Hotspot V2 units). The HAL probes a chip that is not +there and may refuse `lgw_start()`. + +**Fix:** + +1. Edit `config/local.yaml` and clear the path: + + ```yaml + radio: + sx1261_spi_path: "" + ``` + +2. Restart: `sudo systemctl restart meshpoint`. + +3. Confirm the journal shows `SX1261 spi_path empty; spectral scan disabled` + and `Application startup complete`. + +On current Meshpoint builds with `radio.carrier_type: rak` or +`sensecap_m1` (written by `meshpoint setup`), a mistaken path is cleared +automatically at startup with a warning. Re-run `sudo meshpoint setup` if +`carrier_type` is missing. + +See [Configuration > Spectral Scan](CONFIGURATION.md#spectral-scan-noise-floor). ### `lgw_start() failed` or `Failed to set SX1250_0 in STANDBY_RC mode` -**Cause:** SPI bus latch from a hard power cut. The Meshpoint shutdown -handler holds the concentrator in reset on `sudo reboot` and -`sudo systemctl restart`, so this only appears after yanked-cable shutdowns, -breaker trips, or outages. +**Cause:** Usually one of: + +1. **SX1261 misconfiguration** — see the entry above if the log mentions + `sx1261_check_status` or `failed to patch sx1261`. +2. **SPI bus latch** from a hard power cut. The Meshpoint shutdown handler + holds the concentrator in reset on `sudo reboot` and + `sudo systemctl restart`, so latch typically follows yanked-cable + shutdowns, breaker trips, or outages. +3. **SPI disabled or device missing** — `/dev/spidev0.0` not present; + enable SPI in `raspi-config` and confirm the `meshpoint` user is in + the `spi` group. -**Fix:** Full power cycle: +**Fix:** If SX1261 lines appear in the log, fix `sx1261_spi_path` first. +Otherwise full power cycle: ```bash sudo poweroff diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index f12b077..8406311 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -26,6 +26,7 @@ radio: tx_power_dbm: 22 # SX1302 concentrator output power spectral_scan_interval_seconds: 60 # noise floor sampler cadence (0 disables) sx1261_spi_path: "" # SX1261 SPI device for spectral scan (empty = disabled) + carrier_type: "" # rak | sensecap_m1 (set by meshpoint setup; guards SX1261 path) ``` The region sets the base frequency, spreading factor, and bandwidth automatically. You only need `region` in most cases. Override `frequency_mhz`, `spreading_factor`, or `bandwidth_khz` individually to tune for non-default presets (MediumFast, ShortFast, etc.) or custom frequency slots. @@ -76,8 +77,38 @@ ERROR: failed to patch sx1261 radio for LBT/Spectral Scan …and `lgw_start()` may then refuse to bring up the concentrator. If you see that, revert `sx1261_spi_path` to `""`, restart the service, and stay on the packet-derived fallback. +On RAK and SenseCap carriers, `meshpoint setup` writes `radio.carrier_type`. When that field is `rak` or `sensecap_m1`, Meshpoint **clears** a non-empty `sx1261_spi_path` at startup and logs a warning instead of calling `lgw_sx1261_setconf`. + If your `libloragw` build does not expose the spectral scan symbols at all (older HAL revisions), the service logs a single info line at startup and falls back automatically. +### GPS PPS time sync (HAL) + +Separate from [Location (GPS) source](#location-gps-source) (dashboard lat/lon). PPS sync wires the concentrator's internal packet counter to GPS time via Semtech `lgw_gps_*` and `sx1302_gps_enable` — useful when you need accurate `timestamp_us` on RX metadata (logging, TDoA-style analysis, correlating captures to wall clock). + +```yaml +radio: + gps_pps_enabled: false # set true on RAK Pi HAT with PPS wired to SX1303 + gps_pps_tty_path: "/dev/ttyAMA0" + gps_family: "ubx7" # passed to lgw_gps_enable + gps_pps_target_baud: 0 # 0 = HAL default (9600) +``` + +Requirements: + +- `libloragw` built from the SX1302 HAL tree with `loragw_gps.c` linked (stock `lora_gateway` packages on some images omit these symbols — startup logs `GPS/PPS sync unavailable` and continues). +- u-blox on the HAT UART with PPS routed to the concentrator (RAK2287/5146/SX1303 HAT). +- Clear sky: first `lgw_gps_sync` may take minutes until UBX time messages arrive. + +**Do not** enable `gps_pps_enabled` and `location.source: uart` on the same `tty_path`. Only one owner can open the serial port. Typical RAK deployments: + +| Goal | `location.source` | `radio.gps_pps_enabled` | +|---|---|---| +| Map position from HAT GPS | `uart` | `false` | +| Accurate packet timestamps + map from USB gpsd | `gpsd` | `true` on `/dev/ttyAMA0` | +| Timestamps only, static map coords | `static` | `true` | + +Status: `GET /api/device/gps-pps-status` (enabled, sync count, last error, reference counter). Config keys also appear under `radio_advanced` in `GET /api/config`. + ### Standard Meshtastic Presets To match a Meshtastic preset, set `spreading_factor` and `bandwidth_khz` together: @@ -169,8 +200,10 @@ location: source: "static" # static | gpsd | uart gpsd_host: "127.0.0.1" # gpsd TCP host (only when source=gpsd) gpsd_port: 2947 # gpsd TCP port + uart_path: "/dev/ttyAMA0" # serial device (only when source=uart) + uart_baud: 9600 # NMEA baud (uart only) update_interval_seconds: 5 # how often the coordinator polls the source - min_fix_quality: 1 # minimum NMEA fix quality (1=2D, 3=3D) + min_fix_quality: 1 # minimum fix quality (1=2D, 2=3D for gpsd/uart) ``` `location.source` selects where the Meshpoint reads **live GPS fixes** @@ -185,7 +218,7 @@ coordinates and mesh position settings hot-reload from the dashboard. |---|---| | `static` (default) | No live GPS hardware. Registered coordinates live in `device.*` only. Skyplot shows the static pin. | | `gpsd` | Reads live fixes from the system `gpsd` daemon over TCP (`127.0.0.1:2947`). Recommended for any USB GPS receiver (u-blox 7, u-blox 8, VFAN puck, generic CDC ACM sticks). Skyplot and stats update from the live fix. | -| `uart` | Reserved for direct-serial reads from a Pi HAT GPS (e.g. RAK 7248). Currently a placeholder; falls back to static and surfaces an explanatory error in the dashboard. | +| `uart` | Reads NMEA GGA from the on-board RAK Pi HAT GPS on `/dev/ttyAMA0` (or `uart_path`). Fix and satellite count update live; full skyplot az/el/SNR needs `gpsd` + USB receiver. | ### Mesh position broadcasts (LoRa / Meshtastic app map) @@ -253,13 +286,35 @@ in `gpsd-clients`) or `gpsmon`. | u-blox 7 USB stick | USB CDC ACM, NMEA + UBX | yes (RAK V2 .141) | | u-blox 8 USB stick | USB CDC ACM, NMEA + UBX | yes | | VFAN ublox 7 USB puck | USB CDC ACM, NMEA + UBX | yes | -| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | placeholder (`source: uart`, not yet wired) | +| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | yes (`source: uart`; `install.sh` enables UART) | Other USB receivers should work as long as `gpsd` recognizes the device's VID. If `cgps` shows data but the dashboard does not, check `journalctl -u meshpoint | grep -i gpsd` for connection errors and confirm `source: gpsd` in `local.yaml`. +### Using uart (RAK Pi HAT onboard GPS) + +`scripts/install.sh` enables the primary UART (`/dev/ttyAMA0`), disables +the serial console, and turns off Bluetooth on the UART pins so the HAT +GPS module can talk to the Pi. + +1. Run `sudo meshpoint setup` outdoors and accept **Use live UART GPS** + when the wizard acquires a fix, **or** set in `local.yaml`: + + ```yaml + location: + source: "uart" + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 + ``` + +2. Restart: `sudo systemctl restart meshpoint`. + +3. Open **Configuration → GPS** → **UART**. Coordinates and satellite + count update every few seconds outdoors. The skyplot stays empty + until GSV parsing is added (use `gpsd` for a full skyplot today). + ### Privacy Three independent surfaces: @@ -693,6 +748,8 @@ location: # GPS / location source source: "static" # static | gpsd | uart gpsd_host: "127.0.0.1" gpsd_port: 2947 + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 update_interval_seconds: 5 min_fix_quality: 1 diff --git a/docs/HARDWARE-MATRIX.md b/docs/HARDWARE-MATRIX.md index f751fcc..e651c6b 100644 --- a/docs/HARDWARE-MATRIX.md +++ b/docs/HARDWARE-MATRIX.md @@ -17,6 +17,7 @@ Application code is plain Python (v0.7.0+); **aarch64** is required. |---|---|---|---|---| | **Host** | Pi 4 (SD) | Pi 4 (SD) | CM4 (eMMC) | Pi 4 (SD) | | **Concentrator** | RAK2287 (SX1302) | WM1303 (SX1303) | SX1302 (onboard) | RAK2287 (SX1302) | +| **HAL chip version log** | `0x10` (SX1302) | `0x12` (SX1303) | `0x10` (SX1302) | `0x10` (SX1302) | | **TX support** | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) | | **RX channels** | 8 simultaneous | 8 simultaneous | 8 simultaneous | 8 simultaneous | | **Spreading factors** | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous | @@ -212,8 +213,10 @@ window-mounted deployments. For better coverage: GPS antenna (u.FL to SMA pigtail) is optional. If your carrier board has a u-blox GPS module, plugging in a GPS antenna gives you automatic -positioning during the setup wizard. Otherwise enter coordinates manually -(right-click any spot in Google Maps to copy in decimal format). +positioning during the setup wizard. Set `location.source: uart` for the +on-board RAK HAT module, or `location.source: gpsd` for a USB GPS stick. +Otherwise enter coordinates manually (right-click any spot in Google Maps +to copy in decimal format). --- diff --git a/docs/plans/gps-pps-follow-up-issue.md b/docs/plans/gps-pps-follow-up-issue.md new file mode 100644 index 0000000..35710b1 --- /dev/null +++ b/docs/plans/gps-pps-follow-up-issue.md @@ -0,0 +1,71 @@ +# GitHub issue draft: HAL GPS/PPS sync on SX1303 (follow-up to #65) + +**Title:** `feat(hal): GPS PPS timestamp sync via lgw_gps_* for RAK SX1303 HAT` + +**Labels:** `enhancement`, `hardware`, `hal` + +--- + +## Context + +PR #65 (RAK SX1303 HAL guard + `location.source: uart`) deliberately split two GPS concerns: + +| Layer | Config | Purpose | +|--------|--------|---------| +| Dashboard / NodeInfo | `location.source` (`static`, `gpsd`, `uart`) | Human-readable lat/lon | +| Concentrator timestamps | `radio.gps_pps_*` + Semtech `lgw_gps_*` | Align `timestamp_us` with GPS via PPS | + +On RAK Pi gateways the u-blox speaks NMEA/UBX on `/dev/ttyAMA0` and drives a PPS line into the SX1302/SX1303. Meshpoint can use the HAL to parse UBX, call `lgw_gps_sync` on `UBX-NAV-TIMEGPS`, and enable `sx1302_gps_enable(true)` — but only when `libloragw` includes `loragw_gps.c`. + +## Problem + +Without PPS sync, packet `timestamp_us` values are concentrator-relative counters. That is fine for RSSI/SNR analytics but weak for: + +- Correlating captures to wall-clock or external sensors +- Future TDoA / multilateration experiments +- Debugging “when did this burst happen?” across reboots + +## Proposed solution + +1. ctypes bindings for `lgw_gps_enable`, `lgw_parse_nmea`/`ubx`, `lgw_gps_get`, `lgw_gps_sync`, `lgw_get_trigcnt`, `lgw_cnt2utc`, `sx1302_gps_enable`. +2. Background reader thread after `lgw_start()`. +3. Config under `radio:`: + - `gps_pps_enabled` (default `false`) + - `gps_pps_tty_path` (default `/dev/ttyAMA0`) + - `gps_family` (default `ubx7`) + - `gps_pps_target_baud` (default `0` = HAL default) +4. Startup guard: reject `gps_pps_enabled` + `location.source: uart` on the same TTY. +5. `GET /api/device/gps-pps-status` for operators. +6. Docs in `CONFIGURATION.md` with the uart vs PPS matrix. + +## Acceptance criteria + +- [ ] With `gps_pps_enabled: true` on hardware with PPS wired, logs show `GPS/PPS sync #1 ok` after fix. +- [ ] `GET /api/device/gps-pps-status` reports `last_sync_ok: true` and increasing `sync_count`. +- [ ] Misconfigured dual-UART use fails at config load with a clear error. +- [ ] Graceful degrade when HAL lacks GPS symbols (info log, concentrator still starts). +- [ ] Unit tests mock `libloragw` GPS entry points. + +## Out of scope (this issue) + +- Automatic UBX CFG-PPS programming (operators may use u-center / `gpsd` once). +- Sharing one UART between HAL PPS and `UartSource` (documented conflict; use `gpsd` or `static` for map coords). +- Spectral scan / SX1261 path changes (covered in #65). + +## Hardware test plan + +1. RAK2287/5146 + Pi, `chip version 0x12` (SX1303), outdoor antenna. +2. `config/local.yaml`: + ```yaml + radio: + gps_pps_enabled: true + location: + source: static # or gpsd on USB + ``` +3. `journalctl -u meshpoint -f` → expect PPS sync lines. +4. `curl -s -H "Authorization: Bearer …" http://127.0.0.1:8080/api/device/gps-pps-status | jq` + +## References + +- Semtech `loragw_gps.h` / `loragw_sx1302.h` in sx1302_hal +- Related: #65, RAK `install.sh` UART enable for `/dev/ttyAMA0` diff --git a/frontend/js/configuration/gps_card.js b/frontend/js/configuration/gps_card.js index 3f90211..8352776 100644 --- a/frontend/js/configuration/gps_card.js +++ b/frontend/js/configuration/gps_card.js @@ -34,9 +34,10 @@ class GpsConfigCard {

GPS and placement

- Registered coordinates go to Meshradar. Choose whether - the LoRa mesh hears your registered pin or a live GPS - fix, with optional privacy rounding on live. + Registered coordinates go to Meshradar. Live fixes from + gpsd (USB), UART (RAK Pi HAT), or static entry feed the + skyplot. Mesh position broadcasts on the LoRa mesh are + configured separately below.

@@ -132,6 +133,42 @@ class GpsConfigCard {

+ +
Mesh position broadcasts

@@ -188,6 +225,11 @@ class GpsConfigCard { this._staticFields = this._root.querySelector('[data-static-fields]'); this._gpsdFields = this._root.querySelector('[data-gpsd-fields]'); + this._uartFields = this._root.querySelector('[data-uart-fields]'); + this._uartPath = this._root.querySelector('[data-uart-path]'); + this._uartBaud = this._root.querySelector('[data-uart-baud]'); + this._uartInterval = this._root.querySelector('[data-uart-interval]'); + this._uartQuality = this._root.querySelector('[data-uart-quality]'); this._meshLiveChip = this._root.querySelector('[data-mesh-live-chip]'); this._meshPrecisionWrap = this._root.querySelector('[data-mesh-precision-wrap]'); this._meshPrecision = this._root.querySelector('[data-mesh-precision]'); @@ -233,6 +275,19 @@ class GpsConfigCard { this._gpsdQuality.value = String(location.min_fix_quality); } + if (this._uartPath) { + this._uartPath.value = location.uart_path || '/dev/ttyAMA0'; + } + if (this._uartBaud) { + this._uartBaud.value = location.uart_baud || 9600; + } + if (this._uartInterval && location.update_interval_seconds) { + this._uartInterval.value = location.update_interval_seconds; + } + if (this._uartQuality && location.min_fix_quality) { + this._uartQuality.value = String(location.min_fix_quality); + } + const position = (config && config.transmit && config.transmit.position) || {}; const meshSource = (position.coordinate_source || 'static').toLowerCase(); const meshRadio = this._root.querySelector( @@ -299,8 +354,10 @@ class GpsConfigCard { _showFieldsetForSource(source) { if (!this._staticFields || !this._gpsdFields) return; const isGpsd = source === 'gpsd'; + const isUart = source === 'uart'; this._staticFields.hidden = false; this._gpsdFields.hidden = !isGpsd; + if (this._uartFields) this._uartFields.hidden = !isUart; } _updateSourceHint(source) { @@ -312,9 +369,8 @@ class GpsConfigCard { + 'to the daemon.'; } else if (source === 'uart') { this._sourceHint.textContent = - 'Reserved for the on-board RAK Pi HAT GPS module. Not yet ' - + 'wired in v0.7.5; falls back to the static coordinates ' - + 'on save.'; + 'Live NMEA from the on-board RAK Pi HAT GPS (/dev/ttyAMA0). ' + + 'Switching source requires a service restart.'; } else { this._sourceHint.textContent = 'Coordinates are entered manually and stay fixed until ' @@ -324,7 +380,7 @@ class GpsConfigCard { _restartPolling(source) { this._stopPolling(); - const interval = source === 'gpsd' ? 2000 : 30000; + const interval = (source === 'gpsd' || source === 'uart') ? 2000 : 30000; this._pollOnce(); this._timer = window.setInterval(() => this._pollOnce(), interval); } @@ -383,6 +439,15 @@ class GpsConfigCard { if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw); const qualityRaw = this._gpsdQuality.value; if (qualityRaw) payload.min_fix_quality = Number(qualityRaw); + } else if (source === 'uart') { + const path = this._uartPath.value.trim(); + if (path) payload.uart_path = path; + const baudRaw = this._uartBaud.value.trim(); + if (baudRaw) payload.uart_baud = Number(baudRaw); + const intervalRaw = this._uartInterval.value.trim(); + if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw); + const qualityRaw = this._uartQuality.value; + if (qualityRaw) payload.min_fix_quality = Number(qualityRaw); } const gpsResult = await this._api.put('/api/config/gps', payload); diff --git a/pr-body-gps-pps.md b/pr-body-gps-pps.md new file mode 100644 index 0000000..968f05c --- /dev/null +++ b/pr-body-gps-pps.md @@ -0,0 +1,64 @@ +## Summary + +Follow-up to #65: adds Semtech **HAL GPS/PPS** bindings so the SX1302/SX1303 concentrator can align internal packet `timestamp_us` with GPS time on RAK Pi HATs (u-blox on `/dev/ttyAMA0`, PPS into the concentrator). + +This is **orthogonal** to `location.source: uart` / `gpsd`, which only feed dashboard coordinates and NodeInfo. + +## Changes + +**HAL (`src/hal/`)** +- `sx1302_gps_types.py` — `TimespecS`, `TrefS`, `CoordS`, GPS message constants. +- `sx1302_gps_signatures.py` + `apply_gps_signatures()` from `apply_signatures()`. +- `sx1302_gps.py` — `HalGpsPpsSync`: `lgw_gps_enable`, background UBX/NMEA parser, `lgw_gps_sync` on `UBX-NAV-TIMEGPS`, `sx1302_gps_enable`, optional `lgw_cnt2utc`. +- `SX1302Wrapper.start_gps_pps()` / `stop_gps_pps()`; `ConcentratorCaptureSource` starts PPS after `lgw_start()` when configured. + +**Config** +- `radio.gps_pps_enabled`, `gps_pps_tty_path`, `gps_family`, `gps_pps_target_baud`. +- `validate_config_consistency()` rejects `gps_pps_enabled` + `location.source: uart` on the same TTY. + +**API** +- `GET /api/device/gps-pps-status` — sync count, last error, reference counter. +- `radio_advanced` in `GET /api/config` exposes PPS fields. + +**Docs** +- `CONFIGURATION.md` — GPS PPS section + uart/PPS matrix. +- `docs/plans/gps-pps-follow-up-issue.md` — issue template for trackers. + +## Why + +#65 fixed `lgw_start()` on RAK/SX1303 and wired UART for **map** GPS. Operators running SX1303 with PPS still need concentrator-time alignment for accurate packet timestamps — that path lives in `lgw_gps_*`, not in NMEA GGA parsing. + +## Operator notes + +| Need | Config | +|------|--------| +| Live map from HAT UART | `location.source: uart`, `gps_pps_enabled: false` | +| PPS timestamps + USB gpsd map | `location.source: gpsd`, `gps_pps_enabled: true` | +| PPS only, fixed coords | `location.source: static`, `gps_pps_enabled: true` | + +Requires `libloragw` built with `loragw_gps.c`. If symbols are missing, Meshpoint logs once and continues without PPS. + +## Type + +- [x] Feature +- [x] Hardware change +- [x] Docs +- [ ] Bug fix +- [ ] UI (status API only; no dashboard card in this PR) + +## Testing + +- [x] Local unit tests +- [ ] Tested on RAK hardware (needs reviewer / field) + +```bash +python -m unittest tests.test_hal_gps_pps tests.test_sx1302_wrapper_hal_guard -v +``` + +## Depends on + +- Best reviewed **after** #65 merges (stacked from the same fork branch family). Can rebase onto `main` once #65 lands. + +## Closes + + diff --git a/requirements.txt b/requirements.txt index 89b9c0b..81381fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ meshcore>=2.1.0 paho-mqtt>=2.1.0 bcrypt>=4.2.0 PyJWT>=2.10.0 +pyserial>=3.5 diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py index 5f0ae3a..6171ff4 100644 --- a/src/api/routes/config_enrichment.py +++ b/src/api/routes/config_enrichment.py @@ -56,11 +56,18 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict: base["radio_advanced"] = { "spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds, "sx1261_spi_path": radio.sx1261_spi_path or "", + "carrier_type": radio.carrier_type or "", + "gps_pps_enabled": radio.gps_pps_enabled, + "gps_pps_tty_path": radio.gps_pps_tty_path, + "gps_family": radio.gps_family, + "gps_pps_target_baud": radio.gps_pps_target_baud, } base["location"] = { "source": location.source, "gpsd_host": location.gpsd_host, "gpsd_port": location.gpsd_port, + "uart_path": location.uart_path, + "uart_baud": location.uart_baud, "update_interval_seconds": location.update_interval_seconds, "min_fix_quality": location.min_fix_quality, } diff --git a/src/api/routes/device_config_routes.py b/src/api/routes/device_config_routes.py index 45f5977..83463a6 100644 --- a/src/api/routes/device_config_routes.py +++ b/src/api/routes/device_config_routes.py @@ -74,8 +74,8 @@ class GpsUpdate(BaseModel): * ``gpsd`` -- live position from a running gpsd daemon (defaults to 127.0.0.1:2947). The ``location:`` section of ``local.yaml`` holds the connection details and update cadence. - * ``uart`` -- placeholder for the on-board RAK Pi HAT GPS module. - Not yet wired in v0.7.5; falls back to static. + * ``uart`` -- on-board RAK Pi HAT GPS via NMEA on ``/dev/ttyAMA0`` + (or a custom ``uart_path``). """ source: str = "static" @@ -88,8 +88,11 @@ class GpsUpdate(BaseModel): gpsd_port: Optional[int] = Field(None, ge=1, le=65535) update_interval_seconds: Optional[int] = Field(None, ge=1, le=300) min_fix_quality: Optional[int] = Field(None, ge=1, le=3) - # uart-mode fields (kept for forward-compat; not yet wired) - baud: Optional[int] = Field(None, ge=9600, le=921600) + # uart-mode fields + uart_path: Optional[str] = Field(None, min_length=1, max_length=128) + uart_baud: Optional[int] = Field(None, ge=4800, le=115200) + # Legacy alias accepted from older dashboard builds + baud: Optional[int] = Field(None, ge=4800, le=115200) timeout_seconds: Optional[int] = Field(None, ge=1, le=3600) # Meshtastic POSITION on the LoRa mesh (not Meshradar upstream pin). mesh_coordinate_source: Optional[str] = None @@ -240,6 +243,25 @@ async def update_gps( if source_changed: location.source = "uart" location_updates["source"] = "uart" + baud = req.uart_baud if req.uart_baud is not None else req.baud + if req.uart_path is not None and req.uart_path != location.uart_path: + location.uart_path = req.uart_path.strip() + location_updates["uart_path"] = location.uart_path + if baud is not None and baud != location.uart_baud: + location.uart_baud = baud + location_updates["uart_baud"] = location.uart_baud + if ( + req.update_interval_seconds is not None + and req.update_interval_seconds != location.update_interval_seconds + ): + location.update_interval_seconds = req.update_interval_seconds + location_updates["update_interval_seconds"] = req.update_interval_seconds + if ( + req.min_fix_quality is not None + and req.min_fix_quality != location.min_fix_quality + ): + location.min_fix_quality = req.min_fix_quality + location_updates["min_fix_quality"] = req.min_fix_quality if req.source == "static" and pos.coordinate_source == "live": pos.coordinate_source = "static" @@ -319,6 +341,8 @@ async def update_gps( "gpsd_port": location.gpsd_port, "update_interval_seconds": location.update_interval_seconds, "min_fix_quality": location.min_fix_quality, + "uart_path": location.uart_path, + "uart_baud": location.uart_baud, "mesh_coordinate_source": pos.coordinate_source, "mesh_location_precision": pos.location_precision, }, diff --git a/src/api/routes/gps_pps_status.py b/src/api/routes/gps_pps_status.py new file mode 100644 index 0000000..cba2a4f --- /dev/null +++ b/src/api/routes/gps_pps_status.py @@ -0,0 +1,59 @@ +"""Read-only HAL GPS/PPS sync status from the concentrator wrapper.""" + +from __future__ import annotations + +import logging +from typing import Callable, Optional + +from fastapi import APIRouter + +from src.hal.sx1302_wrapper import SX1302Wrapper + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/device", tags=["device"]) + +_get_wrapper: Optional[Callable[[], Optional[SX1302Wrapper]]] = None + + +def init_routes( + get_wrapper: Callable[[], Optional[SX1302Wrapper]], +) -> None: + global _get_wrapper + _get_wrapper = get_wrapper + + +def reset_routes() -> None: + global _get_wrapper + _get_wrapper = None + + +@router.get("/gps-pps-status") +async def gps_pps_status() -> dict: + """Return HAL PPS sync state (separate from ``/api/device/gps-status``).""" + if _get_wrapper is None: + return _idle_payload(error="routes not initialized") + + wrapper = _get_wrapper() + if wrapper is None: + return _idle_payload(error="concentrator capture source not running") + + gps = wrapper.gps_pps + if gps is None: + return _idle_payload() + + return gps.get_status().to_dict() + + +def _idle_payload(error: Optional[str] = None) -> dict: + return { + "enabled": False, + "available": False, + "tty_path": "", + "gps_family": "", + "sync_count": 0, + "last_sync_ok": False, + "last_error": error, + "xtal_err": None, + "reference_count_us": None, + } diff --git a/src/api/server.py b/src/api/server.py index 3bd4fb0..ebfa5a6 100644 --- a/src/api/server.py +++ b/src/api/server.py @@ -41,6 +41,7 @@ dangerous_routes, device, device_config_routes, + gps_pps_status, gps_status, identity_routes, messages, @@ -276,6 +277,7 @@ async def lifespan(app: FastAPI): app.include_router(upstream_config_routes.router, dependencies=protected) app.include_router(device_config_routes.router, dependencies=protected) app.include_router(gps_status.router, dependencies=protected) + app.include_router(gps_pps_status.router, dependencies=protected) app.include_router(system_config_routes.router, dependencies=protected) app.include_router(meshcore_config_routes.router, dependencies=protected) app.include_router(config_routes.router, dependencies=protected) @@ -1265,6 +1267,9 @@ def _init_routes( upstream_config_routes.init_routes(config=config) device_config_routes.init_routes(config=config, identity=identity) gps_status.init_routes(location_source=coord.location_source) + gps_pps_status.init_routes( + get_wrapper=lambda: _get_concentrator_wrapper(coord), + ) system_config_routes.init_routes(config=config) meshcore_config_routes.init_routes(config=config, tx_service=tx_service) diff --git a/src/capture/concentrator_source.py b/src/capture/concentrator_source.py index b3cd2fb..b5241a7 100644 --- a/src/capture/concentrator_source.py +++ b/src/capture/concentrator_source.py @@ -47,6 +47,7 @@ def __init__( ) self._poll_interval = poll_interval_ms / 1000.0 self._syncword = syncword + self._radio_config = radio_config self._running = False self._restart_lock = asyncio.Lock() @@ -76,6 +77,8 @@ def is_running(self) -> bool: async def start(self) -> None: self._wrapper.load() + if self._radio_config is not None: + self._wrapper.set_carrier_type(self._radio_config.carrier_type) late_reset = os.environ.get("CONCENTRATOR_LATE_RESET", "0") == "1" @@ -93,6 +96,17 @@ async def start(self) -> None: self._wrapper.start() self._wrapper.set_syncword(self._syncword) + if self._radio_config is not None and self._radio_config.gps_pps_enabled: + ok = self._wrapper.start_gps_pps( + tty_path=self._radio_config.gps_pps_tty_path, + gps_family=self._radio_config.gps_family, + target_baud=self._radio_config.gps_pps_target_baud, + ) + if not ok: + logger.warning( + "GPS/PPS requested but HAL sync did not start " + "(see earlier warnings)" + ) self._running = True logger.info( "Concentrator capture started (syncword=0x%02X)", @@ -101,6 +115,7 @@ async def start(self) -> None: async def stop(self) -> None: self._running = False + self._wrapper.stop_gps_pps() self._wrapper.stop() logger.info("Concentrator capture stopped") diff --git a/src/cli/setup_wizard.py b/src/cli/setup_wizard.py index 4c12de7..da56a69 100644 --- a/src/cli/setup_wizard.py +++ b/src/cli/setup_wizard.py @@ -198,6 +198,8 @@ def _step_capture_source(config: dict, report: HardwareReport) -> None: config.setdefault("device", {})["hardware_description"] = ( report.hardware_description ) + if report.carrier_type: + config.setdefault("radio", {})["carrier_type"] = report.carrier_type elif report.serial_ports: port = _choose_from_list( "Select capture serial port:", report.serial_ports @@ -286,7 +288,23 @@ def _step_location( if gps.got_fix: print(f" GPS fix acquired: {gps.latitude}, {gps.longitude}") print(f" Altitude: {gps.altitude}m | Satellites: {gps.satellites}") - if _confirm("Use this GPS position?", default_yes=True): + if _confirm( + "Use live UART GPS for ongoing position (recommended on RAK HAT)?", + default_yes=True, + ): + config.setdefault("location", {}).update({ + "source": "uart", + "uart_path": gps.uart_path, + }) + config.setdefault("device", {}).update({ + "latitude": gps.latitude, + "longitude": gps.longitude, + "altitude": gps.altitude, + }) + print(" Location source set to uart (on-board GPS).") + print() + return + if _confirm("Use this GPS position as static coordinates only?", default_yes=True): config.setdefault("device", {}).update({ "latitude": gps.latitude, "longitude": gps.longitude, diff --git a/src/config.py b/src/config.py index 4622e2b..c5adf37 100644 --- a/src/config.py +++ b/src/config.py @@ -60,14 +60,26 @@ class RadioConfig: # (default; spectral scan stays unavailable, packet-derived # noise floor remains in use). # - # On RAK2287 / RAK5146 / SenseCap M1 this is typically - # ``/dev/spidev0.1`` (separate from the SX1302's - # ``/dev/spidev0.0``). Some carriers daisy-chain the SX1261 - # behind the SX1302's SPI router and want this set to the same - # path as the SX1302 SPI device. Wrong path = HAL refuses to - # ``lgw_start`` after our config attempt, so we ship empty by - # default and ask interested users to opt in explicitly. + # On RAK2287 / RAK5146 / SenseCap M1 the SX1261 is behind the + # concentrator's internal SPI router, not on a Pi chip-select — + # leave empty. Only the Semtech reference kit and custom carriers + # with a dedicated SX1261 CE line need a path (e.g. + # ``/dev/spidev0.1``). Wrong path can brick ``lgw_start`` on + # boards without Pi-visible SX1261; ``carrier_type`` guard clears + # mistaken values on RAK/SenseCap. sx1261_spi_path: str = "" + # Carrier board signature from setup wizard I2C probe (``rak``, + # ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI + # paths that brick ``lgw_start()`` on RAK/SenseCap concentrators. + carrier_type: str = "" + # HAL GPS/PPS: align concentrator packet timestamps with GPS time via + # libloragw ``lgw_gps_*`` + ``sx1302_gps_enable``. Requires a HAL build + # that includes loragw_gps.c. Exclusive with ``location.source: uart`` on + # the same TTY (only one process may open the GPS serial port). + gps_pps_enabled: bool = False + gps_pps_tty_path: str = "/dev/ttyAMA0" + gps_family: str = "ubx7" + gps_pps_target_baud: int = 0 @dataclass @@ -245,11 +257,11 @@ class LocationConfig: live fixes (skyplot, optional mesh POSITION). Does not change ``device.{lat,lon,alt}`` (Meshradar pin). Auto-installed by ``scripts/install.sh``. - - ``"uart"`` : reserved for direct on-board UART NMEA reading - (RAK Pi HAT GPS). Plumbing exists in - ``src.hal.gps_reader`` but is not wired into - the runtime yet; treated as ``static`` until - the source is implemented. + - ``"uart"`` : read NMEA GGA from an on-board UART GPS (RAK Pi + HAT on ``/dev/ttyAMA0``). Uses + ``src.hal.gps_reader.GpsReader``. Same Meshradar + pin split as gpsd; live fix feeds skyplot and + optional mesh POSITION only. ``gpsd_host`` / ``gpsd_port`` default to gpsd's well-known localhost socket. Override only when running gpsd on a peer @@ -269,6 +281,8 @@ class LocationConfig: source: str = "static" gpsd_host: str = "127.0.0.1" gpsd_port: int = 2947 + uart_path: str = "/dev/ttyAMA0" + uart_baud: int = 9600 update_interval_seconds: int = 5 min_fix_quality: int = 1 @@ -449,10 +463,28 @@ def load_config(config_path: Optional[str] = None) -> AppConfig: local = config_path or os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml") _apply_yaml(cfg, _validated_config_path(local)) _resolve_radio_frequency(cfg.radio) + validate_config_consistency(cfg) return cfg +def validate_config_consistency(config: AppConfig) -> None: + """Reject impossible radio/location combinations before hardware starts.""" + if not config.radio.gps_pps_enabled: + return + if config.location.source != "uart": + return + pps_tty = os.path.normpath(config.radio.gps_pps_tty_path) + uart_tty = os.path.normpath(config.location.uart_path) + if pps_tty == uart_tty: + raise ValueError( + "radio.gps_pps_enabled and location.source=uart cannot share " + f"the same serial device ({pps_tty!r}). Use location.source=gpsd " + "or static for dashboard coordinates while PPS owns the HAT UART, " + "or disable gps_pps_enabled when using UART for location only." + ) + + def _get_local_yaml_path() -> Path: """Resolve the local.yaml path used for user overrides.""" raw = os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml") diff --git a/src/coordinator.py b/src/coordinator.py index 665e9ca..6264522 100644 --- a/src/coordinator.py +++ b/src/coordinator.py @@ -466,8 +466,9 @@ def _setup_location_banner(self) -> None: detail = f"gpsd @ {host}:{port}" color = GREEN elif source_name == "uart": - detail = "on-board UART (placeholder, falls back to static)" - color = DIM + path = self._config.location.uart_path + detail = f"UART NMEA @ {path}" + color = GREEN else: detail = "static config coordinates" color = DIM diff --git a/src/hal/gps_reader.py b/src/hal/gps_reader.py index 2c756c1..fb48262 100644 --- a/src/hal/gps_reader.py +++ b/src/hal/gps_reader.py @@ -61,14 +61,22 @@ async def stop(self) -> None: logger.info("GPS reader stopped") async def _read_loop(self) -> None: - """Read NMEA sentences from the GPS UART.""" + """Read NMEA sentences from the GPS UART. + + Linux device nodes (``/dev/tty*``) require pyserial; TCP-style + ``asyncio.open_connection`` only applies to network hosts. + """ + if self._uart_path.startswith("/dev/"): + await self._fallback_loop() + return + try: reader, writer = await asyncio.open_connection( self._uart_path, self._baud ) except Exception: logger.warning( - "GPS UART not available at %s -- using fallback polling", + "GPS UART not available at %s -- using serial fallback", self._uart_path, ) await self._fallback_loop() diff --git a/src/hal/location/factory.py b/src/hal/location/factory.py index a4583c4..79e3ad7 100644 --- a/src/hal/location/factory.py +++ b/src/hal/location/factory.py @@ -39,7 +39,11 @@ def build_location_source( min_fix_quality=location_config.min_fix_quality, ) if source == "uart": - return UartSource() + return UartSource( + uart_path=location_config.uart_path, + baud=location_config.uart_baud, + min_fix_quality=location_config.min_fix_quality, + ) logger.warning( "Unknown location.source=%r in config -- falling back to static", diff --git a/src/hal/location/uart_source.py b/src/hal/location/uart_source.py index a2a68b7..51aec31 100644 --- a/src/hal/location/uart_source.py +++ b/src/hal/location/uart_source.py @@ -1,52 +1,141 @@ -"""UART location source: placeholder for direct on-board NMEA reading. - -The existing ``src.hal.gps_reader.GpsReader`` (created during the v0.4.x -multi-region work but never wired into the runtime) parses NMEA from a -serial port and exposes ``current_position``. Wiring it through the -``LocationSource`` contract is a follow-on item: probably v0.7.6 or -when a user actually reports they want to use the RAK Pi HAT's -on-board u-blox. - -For v0.7.5 this source returns ``available=False`` with an explanatory -error so the GPS card surfaces the correct "not yet implemented" -message instead of falling silently to zero coordinates. +"""UART location source: on-board NMEA GPS (RAK Pi HAT /dev/ttyAMA0). + +Wraps ``GpsReader`` so the coordinator and ``GET /api/device/gps-status`` +share the same ``LocationSource`` contract as gpsd. Skyplot az/el/SNR +requires GSV sentences (not parsed yet); GGA provides fix + satellite +count only. """ from __future__ import annotations import logging from datetime import datetime, timezone +from typing import Optional +from src.hal.gps_reader import GpsReader, GpsPosition from src.hal.location.base import LocationSource -from src.hal.location.models import GpsStatus +from src.hal.location.models import ( + GpsDeviceInfo, + GpsStatus, + LocationFix, + SatellitesView, +) logger = logging.getLogger(__name__) +def _gga_fix_mode(fix_quality: int) -> int: + """Map NMEA GGA fix quality to gpsd-style mode (2=2D, 3=3D).""" + if fix_quality >= 2: + return 3 + if fix_quality == 1: + return 2 + return 1 + + +def _min_gga_quality(min_fix_quality: int) -> int: + """Translate LocationConfig min_fix_quality to minimum GGA fix_quality. + + Config uses gpsd semantics: 1=2D, 2=3D. GGA uses 0=no fix, 1=GPS, + 2=DGPS, etc. We accept any non-zero GGA fix when min is 1, and + require GGA fix_quality >= 2 when min is 2. + """ + if min_fix_quality >= 2: + return 2 + return 1 + + class UartSource(LocationSource): - """Reserved location source for on-board UART GPS (RAK Pi HAT etc.).""" + """Live fixes from a serial NMEA GPS on the Pi UART.""" + + def __init__( + self, + uart_path: str = "/dev/ttyAMA0", + baud: int = 9600, + min_fix_quality: int = 1, + ) -> None: + self._uart_path = uart_path + self._baud = baud + self._min_fix_quality = min_fix_quality + self._reader: Optional[GpsReader] = None + self._started = False + self._last_error: Optional[str] = None @property def source_name(self) -> str: return "uart" async def start(self) -> None: + if self._reader is not None: + return + self._reader = GpsReader(uart_path=self._uart_path, baud=self._baud) + await self._reader.start() + self._started = True + self._last_error = None logger.info( - "UART location source: not yet wired -- using static " - "device coordinates as fallback" + "UART location source: reading NMEA from %s @ %d baud", + self._uart_path, + self._baud, ) async def stop(self) -> None: - return + if self._reader is None: + return + await self._reader.stop() + self._reader = None + self._started = False def get_status(self) -> GpsStatus: + now = datetime.now(timezone.utc) + device = GpsDeviceInfo( + driver="nmea", + path=self._uart_path, + model="RAK Pi HAT GPS", + subtype=f"{self._baud} baud", + ) + + if not self._started or self._reader is None: + return GpsStatus( + source="uart", + available=False, + device=device, + error=self._last_error or "UART reader not started", + last_update=now, + ) + + pos = self._reader.latest_position + if pos is None or not self._position_meets_quality(pos): + return GpsStatus( + source="uart", + available=True, + device=device, + error=None, + last_update=now, + ) + return GpsStatus( source="uart", - available=False, - error=( - "UART GPS source is reserved for the RAK Pi HAT on-board " - "module and is not yet wired into the runtime. Switch to " - "'static' or 'gpsd' under Configuration -> GPS." - ), - last_update=datetime.now(timezone.utc), + available=True, + fix=self._position_to_fix(pos), + satellites=self._satellites_from_position(pos), + device=device, + last_update=pos.timestamp, ) + + def _position_meets_quality(self, pos: GpsPosition) -> bool: + return pos.fix_quality >= _min_gga_quality(self._min_fix_quality) + + @staticmethod + def _position_to_fix(pos: GpsPosition) -> LocationFix: + return LocationFix( + mode=_gga_fix_mode(pos.fix_quality), + latitude=pos.latitude, + longitude=pos.longitude, + altitude_m=pos.altitude, + time=pos.timestamp, + ) + + @staticmethod + def _satellites_from_position(pos: GpsPosition) -> SatellitesView: + count = max(0, pos.satellites) + return SatellitesView(in_view=count, used=count, satellites=()) diff --git a/src/hal/sx1302_gps.py b/src/hal/sx1302_gps.py new file mode 100644 index 0000000..220ffe4 --- /dev/null +++ b/src/hal/sx1302_gps.py @@ -0,0 +1,307 @@ +"""Semtech libloragw GPS/PPS time synchronization (lgw_gps_* bindings). + +Aligns the SX1302/SX1303 internal packet counter with GPS time using +the HAL's serial parser and ``lgw_gps_sync``. This is separate from +``location.source: uart`` / ``gpsd``, which only feed dashboard +coordinates — PPS sync is for accurate ``timestamp_us`` on RX packets. + +Typical RAK Pi HAT wiring: u-blox on ``/dev/ttyAMA0``, PPS into the +concentrator. Do not enable PPS and ``location.source: uart`` on the +same TTY simultaneously (exclusive open). +""" + +from __future__ import annotations + +import ctypes +import logging +import os +import threading +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + +from src.hal.sx1302_gps_types import ( + CoordS, + GPS_MSG_UBX_NAV_TIMEGPS, + LGW_GPS_ERROR, + LGW_GPS_SUCCESS, + TrefS, + TimespecS, +) + +if TYPE_CHECKING: + from ctypes import CDLL + +logger = logging.getLogger(__name__) + +LGW_HAL_SUCCESS = 0 +_READ_CHUNK = 256 +_PARSE_BUF = 4096 + + +@dataclass +class GpsPpsStatus: + """Runtime snapshot for dashboards and ``GET /api/device/gps-pps-status``.""" + + enabled: bool + available: bool + tty_path: str + gps_family: str + sync_count: int + last_sync_ok: bool + last_error: Optional[str] = None + xtal_err: Optional[float] = None + reference_count_us: Optional[int] = None + + def to_dict(self) -> dict: + return { + "enabled": self.enabled, + "available": self.available, + "tty_path": self.tty_path, + "gps_family": self.gps_family, + "sync_count": self.sync_count, + "last_sync_ok": self.last_sync_ok, + "last_error": self.last_error, + "xtal_err": self.xtal_err, + "reference_count_us": self.reference_count_us, + } + + +class HalGpsPpsSync: + """Background GPS reader + periodic ``lgw_gps_sync`` against PPS.""" + + def __init__( + self, + lib: CDLL, + tty_path: str = "/dev/ttyAMA0", + gps_family: str = "ubx7", + target_baud: int = 0, + ) -> None: + self._lib = lib + self._tty_path = tty_path + self._gps_family = gps_family + self._target_baud = target_baud + self._fd: int = -1 + self._thread: Optional[threading.Thread] = None + self._stop = threading.Event() + self._lock = threading.Lock() + self._ref = TrefS() + self._ref.systime = 0 + self._sync_count = 0 + self._last_sync_ok = False + self._last_error: Optional[str] = None + self._enabled = False + + @property + def supported(self) -> bool: + return hasattr(self._lib, "lgw_gps_enable") and hasattr( + self._lib, "lgw_gps_sync" + ) + + def get_status(self) -> GpsPpsStatus: + with self._lock: + return GpsPpsStatus( + enabled=self._enabled, + available=self._fd >= 0 and self.supported, + tty_path=self._tty_path, + gps_family=self._gps_family, + sync_count=self._sync_count, + last_sync_ok=self._last_sync_ok, + last_error=self._last_error, + xtal_err=self._ref.xtal_err if self._last_sync_ok else None, + reference_count_us=( + int(self._ref.count_us) if self._last_sync_ok else None + ), + ) + + def start(self) -> bool: + """Open GPS UART via HAL and enable SX1302 PPS sampling if available.""" + if not self.supported: + self._last_error = "libloragw lacks lgw_gps_enable/lgw_gps_sync" + logger.info("GPS/PPS sync unavailable: %s", self._last_error) + return False + + if self._enabled: + return True + + fd = ctypes.c_int(-1) + tty_b = self._tty_path.encode("ascii") + family_b = self._gps_family.encode("ascii") + baud = self._target_baud + if hasattr(self._lib, "lgw_gps_enable"): + try: + from termios import B9600 + + if baud == 0: + baud = B9600 + except ImportError: + if baud == 0: + baud = 13 # B9600 on Linux; Pi-only path + + rc = self._lib.lgw_gps_enable( + tty_b, family_b, ctypes.c_uint(baud), ctypes.byref(fd) + ) + if rc != LGW_GPS_SUCCESS: + self._last_error = f"lgw_gps_enable({self._tty_path}) failed (rc={rc})" + logger.warning("GPS/PPS: %s", self._last_error) + return False + + self._fd = int(fd.value) + if hasattr(self._lib, "sx1302_gps_enable"): + sx_rc = self._lib.sx1302_gps_enable(True) + if sx_rc != LGW_HAL_SUCCESS: + logger.warning( + "sx1302_gps_enable(true) returned %s (PPS may be unavailable)", + sx_rc, + ) + + self._stop.clear() + self._thread = threading.Thread( + target=self._reader_loop, + name="hal-gps-pps", + daemon=True, + ) + self._thread.start() + self._enabled = True + logger.info( + "GPS/PPS sync started (tty=%s family=%s fd=%d)", + self._tty_path, + self._gps_family, + self._fd, + ) + return True + + def stop(self) -> None: + if not self._enabled: + return + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=3.0) + self._thread = None + + if self._fd >= 0 and hasattr(self._lib, "lgw_gps_disable"): + self._lib.lgw_gps_disable(self._fd) + self._fd = -1 + + if hasattr(self._lib, "sx1302_gps_enable"): + self._lib.sx1302_gps_enable(False) + + self._enabled = False + logger.info("GPS/PPS sync stopped") + + def convert_timestamp_us_to_utc( + self, count_us: int + ) -> Optional[tuple[int, int]]: + """Return (tv_sec, tv_nsec) UTC for a concentrator counter, if synced.""" + if not self._last_sync_ok or not hasattr(self._lib, "lgw_cnt2utc"): + return None + utc = TimespecS() + with self._lock: + ref_copy = TrefS() + ctypes.memmove( + ctypes.byref(ref_copy), + ctypes.byref(self._ref), + ctypes.sizeof(TrefS), + ) + rc = self._lib.lgw_cnt2utc(ref_copy, count_us, ctypes.byref(utc)) + if rc != LGW_GPS_SUCCESS: + return None + return int(utc.tv_sec), int(utc.tv_nsec) + + def _reader_loop(self) -> None: + parse_buf = bytearray() + while not self._stop.is_set(): + if self._fd < 0: + time.sleep(0.5) + continue + try: + chunk = os.read(self._fd, _READ_CHUNK) + except OSError as exc: + self._last_error = f"GPS read failed: {exc}" + time.sleep(1.0) + continue + + if not chunk: + time.sleep(0.05) + continue + + parse_buf.extend(chunk) + if len(parse_buf) > _PARSE_BUF: + del parse_buf[: len(parse_buf) - _PARSE_BUF] + + self._consume_parse_buffer(parse_buf) + + def _consume_parse_buffer(self, buf: bytearray) -> None: + if not buf: + return + + text = bytes(buf).decode("ascii", errors="ignore") + for line in text.split("\n"): + line = line.strip() + if not line: + continue + if line.startswith("$"): + with self._lock: + if hasattr(self._lib, "lgw_parse_nmea"): + self._lib.lgw_parse_nmea( + line.encode("ascii"), + len(line) + 1, + ) + + if hasattr(self._lib, "lgw_parse_ubx"): + msg_size = ctypes.c_size_t(0) + raw = bytes(buf) + with self._lock: + msg = self._lib.lgw_parse_ubx( + raw, + len(raw), + ctypes.byref(msg_size), + ) + if msg == GPS_MSG_UBX_NAV_TIMEGPS: + self._attempt_sync() + + def _attempt_sync(self) -> None: + utc = TimespecS() + gps_time = TimespecS() + with self._lock: + if hasattr(self._lib, "lgw_gps_get"): + rc_get = self._lib.lgw_gps_get( + ctypes.byref(utc), + ctypes.byref(gps_time), + None, + None, + ) + else: + return + if rc_get != LGW_GPS_SUCCESS: + return + + count_us = ctypes.c_uint32(0) + if hasattr(self._lib, "lgw_get_trigcnt"): + rc_cnt = self._lib.lgw_get_trigcnt(ctypes.byref(count_us)) + if rc_cnt != LGW_HAL_SUCCESS: + self._last_error = "lgw_get_trigcnt failed before gps_sync" + return + + with self._lock: + rc = self._lib.lgw_gps_sync( + ctypes.byref(self._ref), + count_us.value, + utc, # struct passed by value per loragw_gps.h + gps_time, + ) + + self._sync_count += 1 + if rc == LGW_GPS_SUCCESS: + self._last_sync_ok = True + self._last_error = None + if self._sync_count == 1 or self._sync_count % 60 == 0: + logger.info( + "GPS/PPS sync #%d ok (count_us=%u xtal_err=%.9f)", + self._sync_count, + count_us.value, + self._ref.xtal_err, + ) + else: + self._last_sync_ok = False + self._last_error = f"lgw_gps_sync returned {rc}" diff --git a/src/hal/sx1302_gps_signatures.py b/src/hal/sx1302_gps_signatures.py new file mode 100644 index 0000000..9186c11 --- /dev/null +++ b/src/hal/sx1302_gps_signatures.py @@ -0,0 +1,69 @@ +"""ctypes signatures for libloragw GPS/PPS functions (loragw_gps.h).""" + +from __future__ import annotations + +import ctypes + +from src.hal.sx1302_gps_types import CoordS, TrefS, TimespecS + + +def apply_gps_signatures(lib: ctypes.CDLL) -> None: + """Register optional GPS symbols when present in libloragw.so.""" + if hasattr(lib, "lgw_gps_enable"): + lib.lgw_gps_enable.restype = ctypes.c_int + lib.lgw_gps_enable.argtypes = [ + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_uint, + ctypes.POINTER(ctypes.c_int), + ] + + if hasattr(lib, "lgw_gps_disable"): + lib.lgw_gps_disable.restype = ctypes.c_int + lib.lgw_gps_disable.argtypes = [ctypes.c_int] + + if hasattr(lib, "lgw_parse_nmea"): + lib.lgw_parse_nmea.restype = ctypes.c_int + lib.lgw_parse_nmea.argtypes = [ctypes.c_char_p, ctypes.c_int] + + if hasattr(lib, "lgw_parse_ubx"): + lib.lgw_parse_ubx.restype = ctypes.c_int + lib.lgw_parse_ubx.argtypes = [ + ctypes.c_char_p, + ctypes.c_size_t, + ctypes.POINTER(ctypes.c_size_t), + ] + + if hasattr(lib, "lgw_gps_get"): + lib.lgw_gps_get.restype = ctypes.c_int + lib.lgw_gps_get.argtypes = [ + ctypes.POINTER(TimespecS), + ctypes.POINTER(TimespecS), + ctypes.POINTER(CoordS), + ctypes.POINTER(CoordS), + ] + + if hasattr(lib, "lgw_gps_sync"): + lib.lgw_gps_sync.restype = ctypes.c_int + lib.lgw_gps_sync.argtypes = [ + ctypes.POINTER(TrefS), + ctypes.c_uint32, + TimespecS, + TimespecS, + ] + + if hasattr(lib, "lgw_get_trigcnt"): + lib.lgw_get_trigcnt.restype = ctypes.c_int + lib.lgw_get_trigcnt.argtypes = [ctypes.POINTER(ctypes.c_uint32)] + + if hasattr(lib, "lgw_cnt2utc"): + lib.lgw_cnt2utc.restype = ctypes.c_int + lib.lgw_cnt2utc.argtypes = [ + TrefS, + ctypes.c_uint32, + ctypes.POINTER(TimespecS), + ] + + if hasattr(lib, "sx1302_gps_enable"): + lib.sx1302_gps_enable.restype = ctypes.c_int + lib.sx1302_gps_enable.argtypes = [ctypes.c_bool] diff --git a/src/hal/sx1302_gps_types.py b/src/hal/sx1302_gps_types.py new file mode 100644 index 0000000..c7956d0 --- /dev/null +++ b/src/hal/sx1302_gps_types.py @@ -0,0 +1,45 @@ +"""ctypes mirrors of loragw_gps.h structures for PPS time sync.""" + +from __future__ import annotations + +import ctypes + + +class TimespecS(ctypes.Structure): + """``struct timespec`` as used by the HAL GPS module.""" + + _fields_ = [ + ("tv_sec", ctypes.c_long), + ("tv_nsec", ctypes.c_long), + ] + + +class TrefS(ctypes.Structure): + """``struct tref`` — concentrator counter to GPS/UTC mapping.""" + + _fields_ = [ + ("systime", ctypes.c_long), + ("count_us", ctypes.c_uint32), + ("utc", TimespecS), + ("gps", TimespecS), + ("xtal_err", ctypes.c_double), + ] + + +class CoordS(ctypes.Structure): + """``struct coord_s`` — geodesic coordinates from NMEA/UBX.""" + + _fields_ = [ + ("lat", ctypes.c_double), + ("lon", ctypes.c_double), + ("alt", ctypes.c_short), + ] + + +# loragw_gps.h +LGW_GPS_SUCCESS = 0 +LGW_GPS_ERROR = -1 + +# gps_msg enum subset used after parse +GPS_MSG_UNKNOWN = 0 +GPS_MSG_UBX_NAV_TIMEGPS = 13 diff --git a/src/hal/sx1302_signatures.py b/src/hal/sx1302_signatures.py index 49b83da..e27fdb5 100644 --- a/src/hal/sx1302_signatures.py +++ b/src/hal/sx1302_signatures.py @@ -12,6 +12,7 @@ import ctypes +from src.hal.sx1302_gps_signatures import apply_gps_signatures from src.hal.sx1302_types import ( LgwConfBoardS, LgwConfRxifS, @@ -89,3 +90,9 @@ def apply_signatures(lib: ctypes.CDLL) -> None: if hasattr(lib, "lgw_sx1261_setconf"): lib.lgw_sx1261_setconf.restype = ctypes.c_int lib.lgw_sx1261_setconf.argtypes = [ctypes.POINTER(LgwConfSx1261S)] + + if hasattr(lib, "sx1302_get_model_id"): + lib.sx1302_get_model_id.restype = ctypes.c_int + lib.sx1302_get_model_id.argtypes = [ctypes.POINTER(ctypes.c_uint8)] + + apply_gps_signatures(lib) diff --git a/src/hal/sx1302_types.py b/src/hal/sx1302_types.py index 3da217b..c7ce1ac 100644 --- a/src/hal/sx1302_types.py +++ b/src/hal/sx1302_types.py @@ -9,6 +9,16 @@ import ctypes +# sx1302_model_id_t values from loragw_sx1302.h (when HAL exposes them). +SX1302_MODEL_ID_SX1302: int = 0x02 +SX1302_MODEL_ID_SX1303: int = 0x03 + +# Carriers where the SX1261 is behind the concentrator SPI router, not +# on a separate Pi chip-select. Non-empty sx1261_spi_path is cleared at +# configure() time on these boards. +CARRIERS_WITHOUT_PI_SX1261: frozenset[str] = frozenset( + {"rak", "sensecap_m1"} +) # ── RX configuration structs ──────────────────────────────────────── diff --git a/src/hal/sx1302_wrapper.py b/src/hal/sx1302_wrapper.py index 47b020b..1d79a6f 100644 --- a/src/hal/sx1302_wrapper.py +++ b/src/hal/sx1302_wrapper.py @@ -15,12 +15,14 @@ from typing import Optional from src.hal.concentrator_config import ConcentratorChannelPlan +from src.hal.sx1302_gps import HalGpsPpsSync from src.hal.sx1302_signatures import apply_signatures from src.hal.sx1302_spectral_scan import ( SpectralScanResult, SX1302SpectralScan, ) from src.hal.sx1302_types import ( + CARRIERS_WITHOUT_PI_SX1261, LgwConfBoardS, LgwConfRxifS, LgwConfRxrfS, @@ -28,6 +30,8 @@ LgwPktRxS, LgwPktTxS, LgwTxGainLutS, + SX1302_MODEL_ID_SX1302, + SX1302_MODEL_ID_SX1303, ) logger = logging.getLogger(__name__) @@ -110,6 +114,32 @@ def __init__( self._unknown_status_count = 0 self._spectral_scan: Optional[SX1302SpectralScan] = None self._sx1261_configured = False + self._carrier_type: str = "" + self._concentrator_model_id: Optional[int] = None + self._gps_pps: Optional[HalGpsPpsSync] = None + + @property + def gps_pps(self) -> Optional[HalGpsPpsSync]: + return self._gps_pps + + def set_carrier_type(self, carrier_type: str) -> None: + """Set carrier signature for SX1261 path guardrails (from config or wizard).""" + self._carrier_type = (carrier_type or "").strip().lower() + + @property + def concentrator_model_id(self) -> Optional[int]: + """Raw model byte from ``sx1302_get_model_id`` (0x02 / 0x03), or None.""" + return self._concentrator_model_id + + @property + def concentrator_model_label(self) -> str: + """Human label for telemetry and startup banners.""" + mid = self._concentrator_model_id + if mid == SX1302_MODEL_ID_SX1303: + return "SX1303" + if mid == SX1302_MODEL_ID_SX1302: + return "SX1302" + return "unknown" def load(self) -> None: if not self._lib_path or not os.path.exists(self._lib_path): @@ -167,9 +197,12 @@ def configure(self, plan: ConcentratorChannelPlan) -> None: if self._lib is None: self.load() + self._preflight_spi() + self._guard_sx1261_spi_path() self._configure_board() self._configure_rf_chains(plan) self._configure_if_channels(plan) + self._read_concentrator_model_id() self._configure_sx1261_for_spectral_scan() logger.info("Concentrator configured with %d IF channels", len(plan.multi_sf_channels) + (1 if plan.single_sf_channel else 0)) @@ -179,16 +212,48 @@ def start(self) -> None: self.load() result = self._lib.lgw_start() if result != LGW_HAL_SUCCESS: - raise RuntimeError("lgw_start() failed") + raise RuntimeError( + f"lgw_start() failed (spi={self._spi_path}, " + f"model={self.concentrator_model_label})" + ) self._started = True - logger.info("SX1302 concentrator started") + logger.info( + "%s concentrator started", + self.concentrator_model_label, + ) def stop(self) -> None: + self.stop_gps_pps() if self._started and self._lib: self._lib.lgw_stop() self._started = False logger.info("SX1302 concentrator stopped") + def start_gps_pps( + self, + tty_path: str = "/dev/ttyAMA0", + gps_family: str = "ubx7", + target_baud: int = 0, + ) -> bool: + """Enable HAL GPS UART + PPS sync (call after ``lgw_start``).""" + if self._lib is None: + self.load() + if self._gps_pps is None: + self._gps_pps = HalGpsPpsSync( + self._lib, + tty_path=tty_path, + gps_family=gps_family, + target_baud=target_baud, + ) + if not self._started: + logger.warning("GPS/PPS: concentrator not started yet") + return False + return self._gps_pps.start() + + def stop_gps_pps(self) -> None: + if self._gps_pps is not None: + self._gps_pps.stop() + def receive(self) -> list[ConcentratorPacket]: """Poll for received packets. Non-blocking. @@ -418,6 +483,44 @@ def spectral_scan_supported(self) -> bool: # ── Private: HAL configuration ────────────────────────────────── + def _preflight_spi(self) -> None: + if not os.path.exists(self._spi_path): + raise FileNotFoundError( + f"Concentrator SPI device {self._spi_path} not found. " + "Enable SPI in raspi-config (Interface Options → SPI) and " + "confirm the meshpoint user is in the spi group." + ) + + def _guard_sx1261_spi_path(self) -> None: + """Clear sx1261_spi_path on carriers where the chip is not Pi-visible.""" + if not self._sx1261_spi_path: + return + if self._carrier_type not in CARRIERS_WITHOUT_PI_SX1261: + return + logger.warning( + "radio.sx1261_spi_path=%r is not supported on %s carriers " + "(SX1261 not reachable from the Pi); clearing path. " + "See docs/CONFIGURATION.md § Spectral Scan.", + self._sx1261_spi_path, + self._carrier_type, + ) + self._sx1261_spi_path = "" + + def _read_concentrator_model_id(self) -> None: + if self._lib is None or not hasattr(self._lib, "sx1302_get_model_id"): + return + model = ctypes.c_uint8() + rc = self._lib.sx1302_get_model_id(ctypes.byref(model)) + if rc != LGW_HAL_SUCCESS: + logger.debug("sx1302_get_model_id returned rc=%d", rc) + return + self._concentrator_model_id = int(model.value) + logger.info( + "Concentrator model ID: 0x%02X (%s)", + self._concentrator_model_id, + self.concentrator_model_label, + ) + def _configure_board(self) -> None: conf = LgwConfBoardS() conf.lorawan_public = False diff --git a/tests/test_hal_gps_pps.py b/tests/test_hal_gps_pps.py new file mode 100644 index 0000000..d8fc1fa --- /dev/null +++ b/tests/test_hal_gps_pps.py @@ -0,0 +1,117 @@ +"""Tests for HAL GPS/PPS ctypes bindings and wrapper integration.""" + +from __future__ import annotations + +import ctypes +import unittest +from unittest.mock import MagicMock, patch + +from src.config import AppConfig, LocationConfig, RadioConfig, validate_config_consistency +from src.hal.sx1302_gps import HalGpsPpsSync +from src.hal.sx1302_gps_types import GPS_MSG_UBX_NAV_TIMEGPS, LGW_GPS_SUCCESS +from src.hal.sx1302_wrapper import LGW_HAL_SUCCESS, SX1302Wrapper + + +def _mock_lib_with_gps() -> MagicMock: + lib = MagicMock() + lib.lgw_gps_enable = MagicMock(return_value=LGW_GPS_SUCCESS) + lib.lgw_gps_disable = MagicMock(return_value=LGW_GPS_SUCCESS) + lib.lgw_parse_nmea = MagicMock(return_value=0) + lib.lgw_parse_ubx = MagicMock(return_value=0) + lib.lgw_gps_get = MagicMock(return_value=LGW_GPS_SUCCESS) + lib.lgw_get_trigcnt = MagicMock(return_value=LGW_HAL_SUCCESS) + lib.lgw_gps_sync = MagicMock(return_value=LGW_GPS_SUCCESS) + lib.sx1302_gps_enable = MagicMock(return_value=LGW_HAL_SUCCESS) + return lib + + +class TestConfigGpsConflict(unittest.TestCase): + def test_uart_and_pps_same_tty_raises(self) -> None: + cfg = AppConfig( + radio=RadioConfig(gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0"), + location=LocationConfig(source="uart", uart_path="/dev/ttyAMA0"), + ) + with self.assertRaises(ValueError) as ctx: + validate_config_consistency(cfg) + self.assertIn("cannot share", str(ctx.exception)) + + def test_uart_and_pps_different_tty_ok(self) -> None: + cfg = AppConfig( + radio=RadioConfig( + gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0" + ), + location=LocationConfig(source="uart", uart_path="/dev/ttyUSB0"), + ) + validate_config_consistency(cfg) + + def test_pps_with_gpsd_ok(self) -> None: + cfg = AppConfig( + radio=RadioConfig( + gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0" + ), + location=LocationConfig(source="gpsd"), + ) + validate_config_consistency(cfg) + + +class TestHalGpsPpsSync(unittest.TestCase): + def test_unsupported_lib_returns_false(self) -> None: + lib = MagicMock(spec=[]) + sync = HalGpsPpsSync(lib) + self.assertFalse(sync.start()) + self.assertIn("lgw_gps_enable", sync.get_status().last_error or "") + + @patch("src.hal.sx1302_gps.os.read", return_value=b"") + def test_start_enables_hal_and_sx1302_pps(self, _read: MagicMock) -> None: + lib = _mock_lib_with_gps() + fd_holder = {"value": 7} + + def fake_enable(path, family, baud, fd_ptr): + ctypes.cast(fd_ptr, ctypes.POINTER(ctypes.c_int))[0] = fd_holder["value"] + return LGW_GPS_SUCCESS + + lib.lgw_gps_enable.side_effect = fake_enable + + sync = HalGpsPpsSync(lib, tty_path="/dev/ttyAMA0", gps_family="ubx7") + self.assertTrue(sync.start()) + lib.sx1302_gps_enable.assert_called_once_with(True) + status = sync.get_status() + self.assertTrue(status.enabled) + self.assertTrue(status.available) + sync.stop() + lib.lgw_gps_disable.assert_called_once_with(7) + lib.sx1302_gps_enable.assert_called_with(False) + + def test_ubx_nav_timegps_triggers_sync(self) -> None: + lib = _mock_lib_with_gps() + lib.lgw_parse_ubx.return_value = GPS_MSG_UBX_NAV_TIMEGPS + + sync = HalGpsPpsSync(lib) + sync._enabled = True + sync._fd = 1 + sync._consume_parse_buffer(bytearray(b"\xb5\x62")) + lib.lgw_gps_sync.assert_called_once() + self.assertTrue(sync.get_status().last_sync_ok) + + +class TestWrapperGpsPps(unittest.TestCase): + def test_start_gps_pps_after_concentrator_start(self) -> None: + wrapper = SX1302Wrapper() + wrapper._lib = _mock_lib_with_gps() + wrapper._started = True + + def fake_enable(path, family, baud, fd_ptr): + ctypes.cast(fd_ptr, ctypes.POINTER(ctypes.c_int))[0] = 3 + return LGW_GPS_SUCCESS + + wrapper._lib.lgw_gps_enable.side_effect = fake_enable + + with patch("src.hal.sx1302_gps.os.read", return_value=b""): + self.assertTrue( + wrapper.start_gps_pps( + tty_path="/dev/ttyAMA0", + gps_family="ubx7", + ) + ) + self.assertIsNotNone(wrapper.gps_pps) + wrapper.stop_gps_pps() diff --git a/tests/test_location_config.py b/tests/test_location_config.py index 92a319b..c6908ea 100644 --- a/tests/test_location_config.py +++ b/tests/test_location_config.py @@ -84,11 +84,18 @@ def test_partial_override_preserves_other_fields(self) -> None: path.unlink() def test_uart_source_round_trips(self) -> None: - path = self._write_yaml("location:\n source: uart\n") + path = self._write_yaml( + "location:\n" + " source: uart\n" + " uart_path: /dev/serial0\n" + " uart_baud: 115200\n" + ) try: cfg = AppConfig() _apply_yaml(cfg, path) self.assertEqual(cfg.location.source, "uart") + self.assertEqual(cfg.location.uart_path, "/dev/serial0") + self.assertEqual(cfg.location.uart_baud, 115200) finally: path.unlink() diff --git a/tests/test_location_static_uart_sources.py b/tests/test_location_static_uart_sources.py index fe3850d..a69c5a9 100644 --- a/tests/test_location_static_uart_sources.py +++ b/tests/test_location_static_uart_sources.py @@ -3,8 +3,11 @@ from __future__ import annotations import unittest +from datetime import datetime, timezone +from unittest.mock import MagicMock from src.config import DeviceConfig +from src.hal.gps_reader import GpsPosition from src.hal.location.static_source import StaticSource from src.hal.location.uart_source import UartSource @@ -75,21 +78,31 @@ async def test_source_name_is_stable(self) -> None: class TestUartSource(unittest.IsolatedAsyncioTestCase): - """``UartSource`` is a placeholder; surfaces an explanatory error.""" + """``UartSource`` exposes live NMEA fixes via ``GpsReader``.""" - async def test_status_is_unavailable_with_explanatory_error(self) -> None: + async def test_status_before_start_is_unavailable(self) -> None: source = UartSource() - await source.start() - try: - status = source.get_status() - self.assertEqual(status.source, "uart") - self.assertFalse(status.available) - self.assertIsNotNone(status.error) - # Error must point users to the working alternatives. - self.assertIn("static", status.error.lower()) - self.assertIn("gpsd", status.error.lower()) - finally: - await source.stop() + status = source.get_status() + self.assertEqual(status.source, "uart") + self.assertFalse(status.available) + self.assertIn("not started", (status.error or "").lower()) + + async def test_status_with_fix(self) -> None: + source = UartSource() + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=51.5, + longitude=-0.1, + altitude=20.0, + satellites=6, + fix_quality=2, + timestamp=datetime(2026, 6, 2, tzinfo=timezone.utc), + ) + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNotNone(status.fix) + self.assertEqual(status.device.path, "/dev/ttyAMA0") async def test_source_name_is_stable(self) -> None: source = UartSource() diff --git a/tests/test_sx1302_wrapper_hal_guard.py b/tests/test_sx1302_wrapper_hal_guard.py new file mode 100644 index 0000000..eb9911f --- /dev/null +++ b/tests/test_sx1302_wrapper_hal_guard.py @@ -0,0 +1,91 @@ +"""Tests for SX1302Wrapper SPI preflight, SX1261 carrier guard, and model ID.""" + +from __future__ import annotations + +import ctypes +import unittest +from unittest.mock import MagicMock, patch + +from src.hal.sx1302_types import ( + SX1302_MODEL_ID_SX1302, + SX1302_MODEL_ID_SX1303, +) +from src.hal.sx1302_wrapper import LGW_HAL_SUCCESS, SX1302Wrapper + + +def _build_wrapper() -> SX1302Wrapper: + wrapper = SX1302Wrapper(sx1261_spi_path="/dev/spidev0.1") + wrapper._lib = MagicMock() + return wrapper + + +class TestSx1261CarrierGuard(unittest.TestCase): + def test_rak_carrier_clears_non_empty_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + def test_sensecap_carrier_clears_non_empty_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("sensecap_m1") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + def test_unknown_carrier_keeps_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "/dev/spidev0.1") + + def test_empty_sx1261_path_unchanged_on_rak(self) -> None: + wrapper = SX1302Wrapper(sx1261_spi_path="") + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + +class TestSpiPreflight(unittest.TestCase): + def test_missing_spi_device_raises(self) -> None: + wrapper = SX1302Wrapper(spi_path="/dev/spidev0.0") + with patch("src.hal.sx1302_wrapper.os.path.exists", return_value=False): + with self.assertRaises(FileNotFoundError) as ctx: + wrapper._preflight_spi() + self.assertIn("raspi-config", str(ctx.exception)) + + +class TestConcentratorModelId(unittest.TestCase): + def test_model_label_sx1303(self) -> None: + wrapper = SX1302Wrapper() + wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1303 + self.assertEqual(wrapper.concentrator_model_label, "SX1303") + + def test_model_label_sx1302(self) -> None: + wrapper = SX1302Wrapper() + wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1302 + self.assertEqual(wrapper.concentrator_model_label, "SX1302") + + def test_read_model_id_from_hal(self) -> None: + wrapper = _build_wrapper() + + def fake_get_model(model_ptr): + ptr = ctypes.cast(model_ptr, ctypes.POINTER(ctypes.c_uint8)) + ptr[0] = SX1302_MODEL_ID_SX1303 + return LGW_HAL_SUCCESS + + wrapper._lib.sx1302_get_model_id.side_effect = fake_get_model + wrapper._read_concentrator_model_id() + self.assertEqual(wrapper._concentrator_model_id, SX1302_MODEL_ID_SX1303) + + +class TestSx1261ConfigureSkipsWhenPathCleared(unittest.TestCase): + def test_configure_sx1261_skips_after_rak_guard(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + wrapper._configure_sx1261_for_spectral_scan() + wrapper._lib.lgw_sx1261_setconf.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_uart_location_source.py b/tests/test_uart_location_source.py new file mode 100644 index 0000000..978d004 --- /dev/null +++ b/tests/test_uart_location_source.py @@ -0,0 +1,110 @@ +"""Tests for ``UartSource`` and UART-related config defaults.""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +from src.config import AppConfig, LocationConfig +from src.hal.gps_reader import GpsPosition +from src.hal.location.factory import build_location_source +from src.hal.location.uart_source import UartSource + + +class TestLocationConfigUartDefaults(unittest.TestCase): + def test_uart_path_defaults_to_ttyama0(self) -> None: + cfg = LocationConfig() + self.assertEqual(cfg.uart_path, "/dev/ttyAMA0") + + def test_uart_baud_defaults_to_9600(self) -> None: + cfg = LocationConfig() + self.assertEqual(cfg.uart_baud, 9600) + + +class TestUartSource(unittest.IsolatedAsyncioTestCase): + async def test_no_fix_yet_is_available_without_error(self) -> None: + source = UartSource() + source._reader = MagicMock() + source._reader.latest_position = None + source._started = True + + status = source.get_status() + self.assertEqual(status.source, "uart") + self.assertTrue(status.available) + self.assertIsNone(status.fix) + + async def test_maps_gga_position_to_location_fix(self) -> None: + source = UartSource(min_fix_quality=1) + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=40.7128, + longitude=-74.0060, + altitude=12.0, + satellites=8, + fix_quality=1, + timestamp=datetime(2026, 6, 2, 12, 0, 0, tzinfo=timezone.utc), + ) + + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNotNone(status.fix) + assert status.fix is not None + self.assertEqual(status.fix.latitude, 40.7128) + self.assertEqual(status.fix.longitude, -74.0060) + self.assertEqual(status.fix.altitude_m, 12.0) + self.assertIsNotNone(status.satellites) + assert status.satellites is not None + self.assertEqual(status.satellites.used, 8) + + async def test_min_fix_quality_filters_weak_gga(self) -> None: + source = UartSource(min_fix_quality=2) + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=40.0, + longitude=-74.0, + altitude=0.0, + satellites=4, + fix_quality=1, + timestamp=datetime.now(timezone.utc), + ) + + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNone(status.fix) + + async def test_start_stop_wires_gps_reader(self) -> None: + source = UartSource(uart_path="/dev/ttyAMA0", baud=9600) + with patch("src.hal.location.uart_source.GpsReader") as reader_cls: + reader = MagicMock() + reader.start = AsyncMock() + reader.stop = AsyncMock() + reader_cls.return_value = reader + + await source.start() + reader_cls.assert_called_once_with( + uart_path="/dev/ttyAMA0", baud=9600 + ) + reader.start.assert_awaited_once() + + await source.stop() + reader.stop.assert_awaited_once() + + +class TestUartFactory(unittest.TestCase): + def test_build_uart_source_from_config(self) -> None: + app = AppConfig() + app.location.source = "uart" + app.location.uart_path = "/dev/serial0" + app.location.uart_baud = 115200 + + source = build_location_source(app.location, app.device) + self.assertIsInstance(source, UartSource) + self.assertEqual(source._uart_path, "/dev/serial0") + self.assertEqual(source._baud, 115200) + + +if __name__ == "__main__": + unittest.main()