From 92b1944dd5aac8cb5bfe31905428c2b36a9607a0 Mon Sep 17 00:00:00 2001
From: iceice400
Date: Tue, 2 Jun 2026 13:10:58 -0400
Subject: [PATCH 1/2] feat: RAK SX1303 HAL guard, UART onboard GPS, and
operator docs
---
README.md | 2 +
config/default.yaml | 4 +-
docs/CHANGELOG.md | 4 +
docs/COMMON-ERRORS.md | 50 +++++++-
docs/CONFIGURATION.md | 37 +++++-
docs/HARDWARE-MATRIX.md | 7 +-
frontend/js/configuration/gps_card.js | 76 +++++++++++-
requirements.txt | 1 +
src/api/routes/config_enrichment.py | 3 +
src/api/routes/device_config_routes.py | 33 ++++-
src/capture/concentrator_source.py | 3 +
src/cli/setup_wizard.py | 20 ++-
src/config.py | 28 +++--
src/coordinator.py | 5 +-
src/hal/gps_reader.py | 12 +-
src/hal/location/factory.py | 6 +-
src/hal/location/uart_source.py | 137 +++++++++++++++++----
src/hal/sx1302_signatures.py | 4 +
src/hal/sx1302_types.py | 10 ++
src/hal/sx1302_wrapper.py | 75 ++++++++++-
tests/test_location_config.py | 9 +-
tests/test_location_static_uart_sources.py | 39 ++++--
tests/test_sx1302_wrapper_hal_guard.py | 91 ++++++++++++++
tests/test_uart_location_source.py | 110 +++++++++++++++++
24 files changed, 684 insertions(+), 82 deletions(-)
create mode 100644 tests/test_sx1302_wrapper_hal_guard.py
create mode 100644 tests/test_uart_location_source.py
diff --git a/README.md b/README.md
index 407813f..27cc915 100644
--- a/README.md
+++ b/README.md
@@ -247,6 +247,8 @@ See [docs/COMMON-ERRORS.md](docs/COMMON-ERRORS.md#upgrades) if the service fails
**Chip version 0x00:** Concentrator not responding. Check that the concentrator module is seated, SPI is enabled (`raspi-config` → Interface Options → SPI), and try a full power cycle (unplug for 10+ seconds). Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303).
+**`sx1261_check_status` / `lgw_start() failed` on RAK V2:** Clear `radio.sx1261_spi_path` in `local.yaml` (leave it `""`). The SX1261 is not Pi-visible on RAK/SenseCap boards. Use `location.source: uart` for onboard HAT GPS or `gpsd` for USB GPS. See [Common Errors](docs/COMMON-ERRORS.md).
+
**No packets:** Verify antenna is connected and frequency matches your region. Check `meshpoint logs` for `lgw_receive returned N packet(s)`.
**Upstream 401:** Bad API key. Get a free one at [meshradar.io](https://meshradar.io) and re-run `sudo meshpoint setup`.
diff --git a/config/default.yaml b/config/default.yaml
index 2f7e30d..13b4c2e 100644
--- a/config/default.yaml
+++ b/config/default.yaml
@@ -120,12 +120,14 @@ location:
# Where the Meshpoint's reported lat/lon/alt comes from.
# static : use device.latitude/longitude/altitude (default)
# gpsd : poll a local or remote gpsd daemon for live fixes
- # uart : reserved (RAK Pi HAT GPS, not yet wired)
+ # uart : on-board RAK Pi HAT GPS via /dev/ttyAMA0 (NMEA GGA)
source: "static"
# gpsd connection. Defaults match the well-known local socket; only
# change when running gpsd on a peer machine on the LAN.
gpsd_host: "127.0.0.1"
gpsd_port: 2947
+ uart_path: "/dev/ttyAMA0"
+ uart_baud: 9600
# How often the coordinator wakes to read the active source.
update_interval_seconds: 5
# Minimum acceptable fix mode: 0=any (incl. no-fix), 1=2D, 2=3D.
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 7cb7fd8..81dcde7 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -4,6 +4,10 @@
Queued for the next version bump (v0.7.6 mesh participant RC on `feat/v0.7.6`).
+- **UART GPS.** `location.source: uart` reads NMEA GGA from the RAK Pi HAT (`/dev/ttyAMA0` by default). Configuration → GPS UART fieldset; `meshpoint setup` can select uart when the wizard probe gets a fix. Requires `pyserial` in the venv (`pip install -r requirements.txt` after upgrade).
+- **Concentrator model ID.** Startup logs SX1302 vs SX1303 when `libloragw` exposes `sx1302_get_model_id` (e.g. `Concentrator model ID: 0x12 (SX1303)`).
+- **SX1261 guard.** Non-empty `radio.sx1261_spi_path` on RAK/SenseCap carriers (`radio.carrier_type`) is cleared at configure time with an explicit warning, preventing `lgw_start()` failures from misconfigured spectral scan on Pi-invisible SX1261 wiring.
+- **SPI preflight.** Missing `/dev/spidev0.0` raises a clear error before `lgw_start()` (raspi-config / spi group hints).
- **MQTT broker TLS (deferred).** Transport TLS (`mqtts`, CA bundle, cert validation) is not implemented on `mqtt_publisher.py` (plain TCP only). Planned for v0.7.6. Until then use plain port 1883 or a LAN broker without TLS.
### v0.7.5.1 (May 2026)
diff --git a/docs/COMMON-ERRORS.md b/docs/COMMON-ERRORS.md
index d269f56..df03d58 100644
--- a/docs/COMMON-ERRORS.md
+++ b/docs/COMMON-ERRORS.md
@@ -401,16 +401,54 @@ disabled in `raspi-config`, or the SPI bus latched after a hard power cut.
3. Full power cycle: `sudo poweroff`, wait for green LED to stop, unplug for
10+ seconds, then plug back in.
-Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303).
+Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). Startup
+also logs `Concentrator model ID: 0x12 (SX1303)` when the HAL exposes
+`sx1302_get_model_id`.
+
+### `sx1261_check_status: got:0x00 expected:0x22` / `failed to patch sx1261`
+
+**Cause:** `radio.sx1261_spi_path` is set on a carrier where the SX1261
+companion chip is **not** wired to a Pi-visible SPI bus (RAK2287, RAK5146,
+SenseCap M1, most RAK Hotspot V2 units). The HAL probes a chip that is not
+there and may refuse `lgw_start()`.
+
+**Fix:**
+
+1. Edit `config/local.yaml` and clear the path:
+
+ ```yaml
+ radio:
+ sx1261_spi_path: ""
+ ```
+
+2. Restart: `sudo systemctl restart meshpoint`.
+
+3. Confirm the journal shows `SX1261 spi_path empty; spectral scan disabled`
+ and `Application startup complete`.
+
+On current Meshpoint builds with `radio.carrier_type: rak` or
+`sensecap_m1` (written by `meshpoint setup`), a mistaken path is cleared
+automatically at startup with a warning. Re-run `sudo meshpoint setup` if
+`carrier_type` is missing.
+
+See [Configuration > Spectral Scan](CONFIGURATION.md#spectral-scan-noise-floor).
### `lgw_start() failed` or `Failed to set SX1250_0 in STANDBY_RC mode`
-**Cause:** SPI bus latch from a hard power cut. The Meshpoint shutdown
-handler holds the concentrator in reset on `sudo reboot` and
-`sudo systemctl restart`, so this only appears after yanked-cable shutdowns,
-breaker trips, or outages.
+**Cause:** Usually one of:
+
+1. **SX1261 misconfiguration** — see the entry above if the log mentions
+ `sx1261_check_status` or `failed to patch sx1261`.
+2. **SPI bus latch** from a hard power cut. The Meshpoint shutdown handler
+ holds the concentrator in reset on `sudo reboot` and
+ `sudo systemctl restart`, so latch typically follows yanked-cable
+ shutdowns, breaker trips, or outages.
+3. **SPI disabled or device missing** — `/dev/spidev0.0` not present;
+ enable SPI in `raspi-config` and confirm the `meshpoint` user is in
+ the `spi` group.
-**Fix:** Full power cycle:
+**Fix:** If SX1261 lines appear in the log, fix `sx1261_spi_path` first.
+Otherwise full power cycle:
```bash
sudo poweroff
diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md
index 089403c..825772b 100644
--- a/docs/CONFIGURATION.md
+++ b/docs/CONFIGURATION.md
@@ -26,6 +26,7 @@ radio:
tx_power_dbm: 22 # SX1302 concentrator output power
spectral_scan_interval_seconds: 60 # noise floor sampler cadence (0 disables)
sx1261_spi_path: "" # SX1261 SPI device for spectral scan (empty = disabled)
+ carrier_type: "" # rak | sensecap_m1 (set by meshpoint setup; guards SX1261 path)
```
The region sets the base frequency, spreading factor, and bandwidth automatically. You only need `region` in most cases. Override `frequency_mhz`, `spreading_factor`, or `bandwidth_khz` individually to tune for non-default presets (MediumFast, ShortFast, etc.) or custom frequency slots.
@@ -76,6 +77,8 @@ ERROR: failed to patch sx1261 radio for LBT/Spectral Scan
…and `lgw_start()` may then refuse to bring up the concentrator. If you see that, revert `sx1261_spi_path` to `""`, restart the service, and stay on the packet-derived fallback.
+On RAK and SenseCap carriers, `meshpoint setup` writes `radio.carrier_type`. When that field is `rak` or `sensecap_m1`, Meshpoint **clears** a non-empty `sx1261_spi_path` at startup and logs a warning instead of calling `lgw_sx1261_setconf`.
+
If your `libloragw` build does not expose the spectral scan symbols at all (older HAL revisions), the service logs a single info line at startup and falls back automatically.
### Standard Meshtastic Presets
@@ -169,8 +172,10 @@ location:
source: "static" # static | gpsd | uart
gpsd_host: "127.0.0.1" # gpsd TCP host (only when source=gpsd)
gpsd_port: 2947 # gpsd TCP port
+ uart_path: "/dev/ttyAMA0" # serial device (only when source=uart)
+ uart_baud: 9600 # NMEA baud (uart only)
update_interval_seconds: 5 # how often the coordinator polls the source
- min_fix_quality: 1 # minimum NMEA fix quality (1=2D, 3=3D)
+ min_fix_quality: 1 # minimum fix quality (1=2D, 2=3D for gpsd/uart)
```
`location.source` selects where the Meshpoint reads its current
@@ -183,9 +188,9 @@ changes require a service restart; everything else hot-reloads.
|---|---|
| `static` (default) | Uses `device.latitude` / `device.longitude` / `device.altitude` exactly as set during the wizard. No GPS hardware required. |
| `gpsd` | Reads live fixes from the system `gpsd` daemon over TCP (`127.0.0.1:2947`). Recommended for any USB GPS receiver (u-blox 7, u-blox 8, VFAN puck, generic CDC ACM sticks). |
-| `uart` | Reserved for direct-serial reads from a Pi HAT GPS (e.g. RAK 7248). Currently a placeholder; falls back to static and surfaces an explanatory error in the dashboard. |
+| `uart` | Reads NMEA GGA from the on-board RAK Pi HAT GPS on `/dev/ttyAMA0` (or `uart_path`). Fix and satellite count update live; full skyplot az/el/SNR needs `gpsd` + USB receiver. |
-When `source: gpsd` is active and the daemon has a 2D or 3D fix,
+When `source` is `gpsd` or `uart` and a 2D or 3D fix is available,
the coordinator updates `_config.device.latitude` / `.longitude` /
`.altitude` in place every `update_interval_seconds`. Anything that
reads from `device.*` (NodeInfo broadcasts, MQTT, the dashboard map,
@@ -228,13 +233,35 @@ in `gpsd-clients`) or `gpsmon`.
| u-blox 7 USB stick | USB CDC ACM, NMEA + UBX | yes (RAK V2 .141) |
| u-blox 8 USB stick | USB CDC ACM, NMEA + UBX | yes |
| VFAN ublox 7 USB puck | USB CDC ACM, NMEA + UBX | yes |
-| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | placeholder (`source: uart`, not yet wired) |
+| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | yes (`source: uart`; `install.sh` enables UART) |
Other USB receivers should work as long as `gpsd` recognizes the
device's VID. If `cgps` shows data but the dashboard does not,
check `journalctl -u meshpoint | grep -i gpsd` for connection
errors and confirm `source: gpsd` in `local.yaml`.
+### Using uart (RAK Pi HAT onboard GPS)
+
+`scripts/install.sh` enables the primary UART (`/dev/ttyAMA0`), disables
+the serial console, and turns off Bluetooth on the UART pins so the HAT
+GPS module can talk to the Pi.
+
+1. Run `sudo meshpoint setup` outdoors and accept **Use live UART GPS**
+ when the wizard acquires a fix, **or** set in `local.yaml`:
+
+ ```yaml
+ location:
+ source: "uart"
+ uart_path: "/dev/ttyAMA0"
+ uart_baud: 9600
+ ```
+
+2. Restart: `sudo systemctl restart meshpoint`.
+
+3. Open **Configuration → GPS** → **UART**. Coordinates and satellite
+ count update every few seconds outdoors. The skyplot stays empty
+ until GSV parsing is added (use `gpsd` for a full skyplot today).
+
### Privacy
Live GPS coordinates flow through the same surfaces as the static
@@ -663,6 +690,8 @@ location: # GPS / location source
source: "static" # static | gpsd | uart
gpsd_host: "127.0.0.1"
gpsd_port: 2947
+ uart_path: "/dev/ttyAMA0"
+ uart_baud: 9600
update_interval_seconds: 5
min_fix_quality: 1
diff --git a/docs/HARDWARE-MATRIX.md b/docs/HARDWARE-MATRIX.md
index e06708d..c6dd747 100644
--- a/docs/HARDWARE-MATRIX.md
+++ b/docs/HARDWARE-MATRIX.md
@@ -17,6 +17,7 @@ Application code is plain Python (v0.7.0+); **aarch64** is required.
|---|---|---|---|---|
| **Host** | Pi 4 (SD) | Pi 4 (SD) | CM4 (eMMC) | Pi 4 (SD) |
| **Concentrator** | RAK2287 (SX1302) | WM1303 (SX1303) | SX1302 (onboard) | RAK2287 (SX1302) |
+| **HAL chip version log** | `0x10` (SX1302) | `0x12` (SX1303) | `0x10` (SX1302) | `0x10` (SX1302) |
| **TX support** | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) |
| **RX channels** | 8 simultaneous | 8 simultaneous | 8 simultaneous | 8 simultaneous |
| **Spreading factors** | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous |
@@ -177,8 +178,10 @@ window-mounted deployments. For better coverage:
GPS antenna (u.FL to SMA pigtail) is optional. If your carrier board has a
u-blox GPS module, plugging in a GPS antenna gives you automatic
-positioning during the setup wizard. Otherwise enter coordinates manually
-(right-click any spot in Google Maps to copy in decimal format).
+positioning during the setup wizard. Set `location.source: uart` for the
+on-board RAK HAT module, or `location.source: gpsd` for a USB GPS stick.
+Otherwise enter coordinates manually (right-click any spot in Google Maps
+to copy in decimal format).
---
diff --git a/frontend/js/configuration/gps_card.js b/frontend/js/configuration/gps_card.js
index cacac0b..969c8ae 100644
--- a/frontend/js/configuration/gps_card.js
+++ b/frontend/js/configuration/gps_card.js
@@ -33,8 +33,8 @@ class GpsConfigCard {
GPS and placement
- Live fix from a USB GPS via gpsd, or static coordinates
- you enter manually. Used by the local map and
+ Live fix from gpsd (USB GPS), UART (RAK Pi HAT), or static
+ coordinates you enter manually. Used by the local map and
Meshradar fleet view.
@@ -127,6 +127,42 @@ class GpsConfigCard {
+
+ UART (on-board GPS)
+
+
+ Serial device
+
+
+
+ Baud
+
+
+
+
+
+ Update interval (s)
+
+
+
+ Min fix quality
+
+ 1 — accept any reading
+ 2 — require 2D fix
+ 3 — require 3D fix
+
+
+
+
+ RAK Pi HAT GPS on /dev/ttyAMA0 (install.sh enables UART).
+ Fix and satellite count update live; full skyplot needs gpsd.
+
+
+
Save GPS settings
@@ -155,6 +191,11 @@ class GpsConfigCard {
this._staticFields = this._root.querySelector('[data-static-fields]');
this._gpsdFields = this._root.querySelector('[data-gpsd-fields]');
+ this._uartFields = this._root.querySelector('[data-uart-fields]');
+ this._uartPath = this._root.querySelector('[data-uart-path]');
+ this._uartBaud = this._root.querySelector('[data-uart-baud]');
+ this._uartInterval = this._root.querySelector('[data-uart-interval]');
+ this._uartQuality = this._root.querySelector('[data-uart-quality]');
this._form.addEventListener('submit', (e) => this._onSubmit(e));
this._root.querySelectorAll('input[name="gps-source"]').forEach((radio) => {
@@ -188,6 +229,19 @@ class GpsConfigCard {
this._gpsdQuality.value = String(location.min_fix_quality);
}
+ if (this._uartPath) {
+ this._uartPath.value = location.uart_path || '/dev/ttyAMA0';
+ }
+ if (this._uartBaud) {
+ this._uartBaud.value = location.uart_baud || 9600;
+ }
+ if (this._uartInterval && location.update_interval_seconds) {
+ this._uartInterval.value = location.update_interval_seconds;
+ }
+ if (this._uartQuality && location.min_fix_quality) {
+ this._uartQuality.value = String(location.min_fix_quality);
+ }
+
this._restartPolling(source);
}
@@ -204,8 +258,10 @@ class GpsConfigCard {
if (!this._staticFields || !this._gpsdFields) return;
const isStatic = source === 'static';
const isGpsd = source === 'gpsd';
+ const isUart = source === 'uart';
this._staticFields.hidden = !isStatic;
this._gpsdFields.hidden = !isGpsd;
+ if (this._uartFields) this._uartFields.hidden = !isUart;
}
_updateSourceHint(source) {
@@ -217,9 +273,8 @@ class GpsConfigCard {
+ 'to the daemon.';
} else if (source === 'uart') {
this._sourceHint.textContent =
- 'Reserved for the on-board RAK Pi HAT GPS module. Not yet '
- + 'wired in v0.7.5; falls back to the static coordinates '
- + 'on save.';
+ 'Live NMEA from the on-board RAK Pi HAT GPS (/dev/ttyAMA0). '
+ + 'Switching source requires a service restart.';
} else {
this._sourceHint.textContent =
'Coordinates are entered manually and stay fixed until '
@@ -229,7 +284,7 @@ class GpsConfigCard {
_restartPolling(source) {
this._stopPolling();
- const interval = source === 'gpsd' ? 2000 : 30000;
+ const interval = (source === 'gpsd' || source === 'uart') ? 2000 : 30000;
this._pollOnce();
this._timer = window.setInterval(() => this._pollOnce(), interval);
}
@@ -282,6 +337,15 @@ class GpsConfigCard {
if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw);
const qualityRaw = this._gpsdQuality.value;
if (qualityRaw) payload.min_fix_quality = Number(qualityRaw);
+ } else if (source === 'uart') {
+ const path = this._uartPath.value.trim();
+ if (path) payload.uart_path = path;
+ const baudRaw = this._uartBaud.value.trim();
+ if (baudRaw) payload.uart_baud = Number(baudRaw);
+ const intervalRaw = this._uartInterval.value.trim();
+ if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw);
+ const qualityRaw = this._uartQuality.value;
+ if (qualityRaw) payload.min_fix_quality = Number(qualityRaw);
}
const gpsResult = await this._api.put('/api/config/gps', payload);
diff --git a/requirements.txt b/requirements.txt
index a0aa874..777eb01 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -12,3 +12,4 @@ meshcore>=2.1.0
paho-mqtt>=2.1.0
bcrypt>=4.2.0
PyJWT>=2.10.0
+pyserial>=3.5
diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py
index 80697d7..3f32077 100644
--- a/src/api/routes/config_enrichment.py
+++ b/src/api/routes/config_enrichment.py
@@ -56,11 +56,14 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict:
base["radio_advanced"] = {
"spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds,
"sx1261_spi_path": radio.sx1261_spi_path or "",
+ "carrier_type": radio.carrier_type or "",
}
base["location"] = {
"source": location.source,
"gpsd_host": location.gpsd_host,
"gpsd_port": location.gpsd_port,
+ "uart_path": location.uart_path,
+ "uart_baud": location.uart_baud,
"update_interval_seconds": location.update_interval_seconds,
"min_fix_quality": location.min_fix_quality,
}
diff --git a/src/api/routes/device_config_routes.py b/src/api/routes/device_config_routes.py
index 0474294..e9a5126 100644
--- a/src/api/routes/device_config_routes.py
+++ b/src/api/routes/device_config_routes.py
@@ -63,8 +63,8 @@ class GpsUpdate(BaseModel):
* ``gpsd`` -- live position from a running gpsd daemon (defaults to
127.0.0.1:2947). The ``location:`` section of ``local.yaml`` holds
the connection details and update cadence.
- * ``uart`` -- placeholder for the on-board RAK Pi HAT GPS module.
- Not yet wired in v0.7.5; falls back to static.
+ * ``uart`` -- on-board RAK Pi HAT GPS via NMEA on ``/dev/ttyAMA0``
+ (or a custom ``uart_path``).
"""
source: str = "static"
@@ -77,9 +77,11 @@ class GpsUpdate(BaseModel):
gpsd_port: Optional[int] = Field(None, ge=1, le=65535)
update_interval_seconds: Optional[int] = Field(None, ge=1, le=300)
min_fix_quality: Optional[int] = Field(None, ge=1, le=3)
- # uart-mode fields (kept for forward-compat; not yet wired)
- baud: Optional[int] = Field(None, ge=9600, le=921600)
- timeout_seconds: Optional[int] = Field(None, ge=1, le=3600)
+ # uart-mode fields
+ uart_path: Optional[str] = Field(None, min_length=1, max_length=128)
+ uart_baud: Optional[int] = Field(None, ge=4800, le=115200)
+ # Legacy alias accepted from older dashboard builds
+ baud: Optional[int] = Field(None, ge=4800, le=115200)
@router.put("/device")
@@ -213,6 +215,25 @@ async def update_gps(
if source_changed:
location.source = "uart"
location_updates["source"] = "uart"
+ baud = req.uart_baud if req.uart_baud is not None else req.baud
+ if req.uart_path is not None and req.uart_path != location.uart_path:
+ location.uart_path = req.uart_path.strip()
+ location_updates["uart_path"] = location.uart_path
+ if baud is not None and baud != location.uart_baud:
+ location.uart_baud = baud
+ location_updates["uart_baud"] = location.uart_baud
+ if (
+ req.update_interval_seconds is not None
+ and req.update_interval_seconds != location.update_interval_seconds
+ ):
+ location.update_interval_seconds = req.update_interval_seconds
+ location_updates["update_interval_seconds"] = req.update_interval_seconds
+ if (
+ req.min_fix_quality is not None
+ and req.min_fix_quality != location.min_fix_quality
+ ):
+ location.min_fix_quality = req.min_fix_quality
+ location_updates["min_fix_quality"] = req.min_fix_quality
# ------------------------------------------------------------------
# Persist whatever changed.
@@ -246,5 +267,7 @@ async def update_gps(
"gpsd_port": location.gpsd_port,
"update_interval_seconds": location.update_interval_seconds,
"min_fix_quality": location.min_fix_quality,
+ "uart_path": location.uart_path,
+ "uart_baud": location.uart_baud,
},
}
diff --git a/src/capture/concentrator_source.py b/src/capture/concentrator_source.py
index 7397a48..e276510 100644
--- a/src/capture/concentrator_source.py
+++ b/src/capture/concentrator_source.py
@@ -46,6 +46,7 @@ def __init__(
)
self._poll_interval = poll_interval_ms / 1000.0
self._syncword = syncword
+ self._radio_config = radio_config
self._running = False
self._restart_lock = asyncio.Lock()
@@ -75,6 +76,8 @@ def is_running(self) -> bool:
async def start(self) -> None:
self._wrapper.load()
+ if self._radio_config is not None:
+ self._wrapper.set_carrier_type(self._radio_config.carrier_type)
self._wrapper.reset()
self._wrapper.configure(self._channel_plan)
self._wrapper.start()
diff --git a/src/cli/setup_wizard.py b/src/cli/setup_wizard.py
index 4c12de7..da56a69 100644
--- a/src/cli/setup_wizard.py
+++ b/src/cli/setup_wizard.py
@@ -198,6 +198,8 @@ def _step_capture_source(config: dict, report: HardwareReport) -> None:
config.setdefault("device", {})["hardware_description"] = (
report.hardware_description
)
+ if report.carrier_type:
+ config.setdefault("radio", {})["carrier_type"] = report.carrier_type
elif report.serial_ports:
port = _choose_from_list(
"Select capture serial port:", report.serial_ports
@@ -286,7 +288,23 @@ def _step_location(
if gps.got_fix:
print(f" GPS fix acquired: {gps.latitude}, {gps.longitude}")
print(f" Altitude: {gps.altitude}m | Satellites: {gps.satellites}")
- if _confirm("Use this GPS position?", default_yes=True):
+ if _confirm(
+ "Use live UART GPS for ongoing position (recommended on RAK HAT)?",
+ default_yes=True,
+ ):
+ config.setdefault("location", {}).update({
+ "source": "uart",
+ "uart_path": gps.uart_path,
+ })
+ config.setdefault("device", {}).update({
+ "latitude": gps.latitude,
+ "longitude": gps.longitude,
+ "altitude": gps.altitude,
+ })
+ print(" Location source set to uart (on-board GPS).")
+ print()
+ return
+ if _confirm("Use this GPS position as static coordinates only?", default_yes=True):
config.setdefault("device", {}).update({
"latitude": gps.latitude,
"longitude": gps.longitude,
diff --git a/src/config.py b/src/config.py
index 2e6f566..946cec3 100644
--- a/src/config.py
+++ b/src/config.py
@@ -57,14 +57,18 @@ class RadioConfig:
# (default; spectral scan stays unavailable, packet-derived
# noise floor remains in use).
#
- # On RAK2287 / RAK5146 / SenseCap M1 this is typically
- # ``/dev/spidev0.1`` (separate from the SX1302's
- # ``/dev/spidev0.0``). Some carriers daisy-chain the SX1261
- # behind the SX1302's SPI router and want this set to the same
- # path as the SX1302 SPI device. Wrong path = HAL refuses to
- # ``lgw_start`` after our config attempt, so we ship empty by
- # default and ask interested users to opt in explicitly.
+ # On RAK2287 / RAK5146 / SenseCap M1 the SX1261 is behind the
+ # concentrator's internal SPI router, not on a Pi chip-select —
+ # leave empty. Only the Semtech reference kit and custom carriers
+ # with a dedicated SX1261 CE line need a path (e.g.
+ # ``/dev/spidev0.1``). Wrong path can brick ``lgw_start`` on
+ # boards without Pi-visible SX1261; ``carrier_type`` guard clears
+ # mistaken values on RAK/SenseCap.
sx1261_spi_path: str = ""
+ # Carrier board signature from setup wizard I2C probe (``rak``,
+ # ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI
+ # paths that brick ``lgw_start()`` on RAK/SenseCap concentrators.
+ carrier_type: str = ""
@dataclass
@@ -214,11 +218,9 @@ class LocationConfig:
- ``"gpsd"`` : connect to a local or remote ``gpsd`` daemon and
overwrite ``device.{lat,lon,alt}`` when fixes
arrive. Auto-installed by ``scripts/install.sh``.
- - ``"uart"`` : reserved for direct on-board UART NMEA reading
- (RAK Pi HAT GPS). Plumbing exists in
- ``src.hal.gps_reader`` but is not wired into
- the runtime yet; treated as ``static`` until
- the source is implemented.
+ - ``"uart"`` : read NMEA GGA from an on-board UART GPS (RAK Pi
+ HAT on ``/dev/ttyAMA0``). Uses
+ ``src.hal.gps_reader.GpsReader``.
``gpsd_host`` / ``gpsd_port`` default to gpsd's well-known
localhost socket. Override only when running gpsd on a peer
@@ -238,6 +240,8 @@ class LocationConfig:
source: str = "static"
gpsd_host: str = "127.0.0.1"
gpsd_port: int = 2947
+ uart_path: str = "/dev/ttyAMA0"
+ uart_baud: int = 9600
update_interval_seconds: int = 5
min_fix_quality: int = 1
diff --git a/src/coordinator.py b/src/coordinator.py
index 4c0cb81..05e2fc4 100644
--- a/src/coordinator.py
+++ b/src/coordinator.py
@@ -452,8 +452,9 @@ def _setup_location_banner(self) -> None:
detail = f"gpsd @ {host}:{port}"
color = GREEN
elif source_name == "uart":
- detail = "on-board UART (placeholder, falls back to static)"
- color = DIM
+ path = self._config.location.uart_path
+ detail = f"UART NMEA @ {path}"
+ color = GREEN
else:
detail = "static config coordinates"
color = DIM
diff --git a/src/hal/gps_reader.py b/src/hal/gps_reader.py
index 2c756c1..fb48262 100644
--- a/src/hal/gps_reader.py
+++ b/src/hal/gps_reader.py
@@ -61,14 +61,22 @@ async def stop(self) -> None:
logger.info("GPS reader stopped")
async def _read_loop(self) -> None:
- """Read NMEA sentences from the GPS UART."""
+ """Read NMEA sentences from the GPS UART.
+
+ Linux device nodes (``/dev/tty*``) require pyserial; TCP-style
+ ``asyncio.open_connection`` only applies to network hosts.
+ """
+ if self._uart_path.startswith("/dev/"):
+ await self._fallback_loop()
+ return
+
try:
reader, writer = await asyncio.open_connection(
self._uart_path, self._baud
)
except Exception:
logger.warning(
- "GPS UART not available at %s -- using fallback polling",
+ "GPS UART not available at %s -- using serial fallback",
self._uart_path,
)
await self._fallback_loop()
diff --git a/src/hal/location/factory.py b/src/hal/location/factory.py
index a4583c4..79e3ad7 100644
--- a/src/hal/location/factory.py
+++ b/src/hal/location/factory.py
@@ -39,7 +39,11 @@ def build_location_source(
min_fix_quality=location_config.min_fix_quality,
)
if source == "uart":
- return UartSource()
+ return UartSource(
+ uart_path=location_config.uart_path,
+ baud=location_config.uart_baud,
+ min_fix_quality=location_config.min_fix_quality,
+ )
logger.warning(
"Unknown location.source=%r in config -- falling back to static",
diff --git a/src/hal/location/uart_source.py b/src/hal/location/uart_source.py
index a2a68b7..51aec31 100644
--- a/src/hal/location/uart_source.py
+++ b/src/hal/location/uart_source.py
@@ -1,52 +1,141 @@
-"""UART location source: placeholder for direct on-board NMEA reading.
-
-The existing ``src.hal.gps_reader.GpsReader`` (created during the v0.4.x
-multi-region work but never wired into the runtime) parses NMEA from a
-serial port and exposes ``current_position``. Wiring it through the
-``LocationSource`` contract is a follow-on item: probably v0.7.6 or
-when a user actually reports they want to use the RAK Pi HAT's
-on-board u-blox.
-
-For v0.7.5 this source returns ``available=False`` with an explanatory
-error so the GPS card surfaces the correct "not yet implemented"
-message instead of falling silently to zero coordinates.
+"""UART location source: on-board NMEA GPS (RAK Pi HAT /dev/ttyAMA0).
+
+Wraps ``GpsReader`` so the coordinator and ``GET /api/device/gps-status``
+share the same ``LocationSource`` contract as gpsd. Skyplot az/el/SNR
+requires GSV sentences (not parsed yet); GGA provides fix + satellite
+count only.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
+from typing import Optional
+from src.hal.gps_reader import GpsReader, GpsPosition
from src.hal.location.base import LocationSource
-from src.hal.location.models import GpsStatus
+from src.hal.location.models import (
+ GpsDeviceInfo,
+ GpsStatus,
+ LocationFix,
+ SatellitesView,
+)
logger = logging.getLogger(__name__)
+def _gga_fix_mode(fix_quality: int) -> int:
+ """Map NMEA GGA fix quality to gpsd-style mode (2=2D, 3=3D)."""
+ if fix_quality >= 2:
+ return 3
+ if fix_quality == 1:
+ return 2
+ return 1
+
+
+def _min_gga_quality(min_fix_quality: int) -> int:
+ """Translate LocationConfig min_fix_quality to minimum GGA fix_quality.
+
+ Config uses gpsd semantics: 1=2D, 2=3D. GGA uses 0=no fix, 1=GPS,
+ 2=DGPS, etc. We accept any non-zero GGA fix when min is 1, and
+ require GGA fix_quality >= 2 when min is 2.
+ """
+ if min_fix_quality >= 2:
+ return 2
+ return 1
+
+
class UartSource(LocationSource):
- """Reserved location source for on-board UART GPS (RAK Pi HAT etc.)."""
+ """Live fixes from a serial NMEA GPS on the Pi UART."""
+
+ def __init__(
+ self,
+ uart_path: str = "/dev/ttyAMA0",
+ baud: int = 9600,
+ min_fix_quality: int = 1,
+ ) -> None:
+ self._uart_path = uart_path
+ self._baud = baud
+ self._min_fix_quality = min_fix_quality
+ self._reader: Optional[GpsReader] = None
+ self._started = False
+ self._last_error: Optional[str] = None
@property
def source_name(self) -> str:
return "uart"
async def start(self) -> None:
+ if self._reader is not None:
+ return
+ self._reader = GpsReader(uart_path=self._uart_path, baud=self._baud)
+ await self._reader.start()
+ self._started = True
+ self._last_error = None
logger.info(
- "UART location source: not yet wired -- using static "
- "device coordinates as fallback"
+ "UART location source: reading NMEA from %s @ %d baud",
+ self._uart_path,
+ self._baud,
)
async def stop(self) -> None:
- return
+ if self._reader is None:
+ return
+ await self._reader.stop()
+ self._reader = None
+ self._started = False
def get_status(self) -> GpsStatus:
+ now = datetime.now(timezone.utc)
+ device = GpsDeviceInfo(
+ driver="nmea",
+ path=self._uart_path,
+ model="RAK Pi HAT GPS",
+ subtype=f"{self._baud} baud",
+ )
+
+ if not self._started or self._reader is None:
+ return GpsStatus(
+ source="uart",
+ available=False,
+ device=device,
+ error=self._last_error or "UART reader not started",
+ last_update=now,
+ )
+
+ pos = self._reader.latest_position
+ if pos is None or not self._position_meets_quality(pos):
+ return GpsStatus(
+ source="uart",
+ available=True,
+ device=device,
+ error=None,
+ last_update=now,
+ )
+
return GpsStatus(
source="uart",
- available=False,
- error=(
- "UART GPS source is reserved for the RAK Pi HAT on-board "
- "module and is not yet wired into the runtime. Switch to "
- "'static' or 'gpsd' under Configuration -> GPS."
- ),
- last_update=datetime.now(timezone.utc),
+ available=True,
+ fix=self._position_to_fix(pos),
+ satellites=self._satellites_from_position(pos),
+ device=device,
+ last_update=pos.timestamp,
)
+
+ def _position_meets_quality(self, pos: GpsPosition) -> bool:
+ return pos.fix_quality >= _min_gga_quality(self._min_fix_quality)
+
+ @staticmethod
+ def _position_to_fix(pos: GpsPosition) -> LocationFix:
+ return LocationFix(
+ mode=_gga_fix_mode(pos.fix_quality),
+ latitude=pos.latitude,
+ longitude=pos.longitude,
+ altitude_m=pos.altitude,
+ time=pos.timestamp,
+ )
+
+ @staticmethod
+ def _satellites_from_position(pos: GpsPosition) -> SatellitesView:
+ count = max(0, pos.satellites)
+ return SatellitesView(in_view=count, used=count, satellites=())
diff --git a/src/hal/sx1302_signatures.py b/src/hal/sx1302_signatures.py
index 49b83da..ec0f92a 100644
--- a/src/hal/sx1302_signatures.py
+++ b/src/hal/sx1302_signatures.py
@@ -89,3 +89,7 @@ def apply_signatures(lib: ctypes.CDLL) -> None:
if hasattr(lib, "lgw_sx1261_setconf"):
lib.lgw_sx1261_setconf.restype = ctypes.c_int
lib.lgw_sx1261_setconf.argtypes = [ctypes.POINTER(LgwConfSx1261S)]
+
+ if hasattr(lib, "sx1302_get_model_id"):
+ lib.sx1302_get_model_id.restype = ctypes.c_int
+ lib.sx1302_get_model_id.argtypes = [ctypes.POINTER(ctypes.c_uint8)]
diff --git a/src/hal/sx1302_types.py b/src/hal/sx1302_types.py
index 3da217b..c7ce1ac 100644
--- a/src/hal/sx1302_types.py
+++ b/src/hal/sx1302_types.py
@@ -9,6 +9,16 @@
import ctypes
+# sx1302_model_id_t values from loragw_sx1302.h (when HAL exposes them).
+SX1302_MODEL_ID_SX1302: int = 0x02
+SX1302_MODEL_ID_SX1303: int = 0x03
+
+# Carriers where the SX1261 is behind the concentrator SPI router, not
+# on a separate Pi chip-select. Non-empty sx1261_spi_path is cleared at
+# configure() time on these boards.
+CARRIERS_WITHOUT_PI_SX1261: frozenset[str] = frozenset(
+ {"rak", "sensecap_m1"}
+)
# ── RX configuration structs ────────────────────────────────────────
diff --git a/src/hal/sx1302_wrapper.py b/src/hal/sx1302_wrapper.py
index dfd3128..96e7469 100644
--- a/src/hal/sx1302_wrapper.py
+++ b/src/hal/sx1302_wrapper.py
@@ -21,6 +21,7 @@
SX1302SpectralScan,
)
from src.hal.sx1302_types import (
+ CARRIERS_WITHOUT_PI_SX1261,
LgwConfBoardS,
LgwConfRxifS,
LgwConfRxrfS,
@@ -28,6 +29,8 @@
LgwPktRxS,
LgwPktTxS,
LgwTxGainLutS,
+ SX1302_MODEL_ID_SX1302,
+ SX1302_MODEL_ID_SX1303,
)
logger = logging.getLogger(__name__)
@@ -110,6 +113,27 @@ def __init__(
self._unknown_status_count = 0
self._spectral_scan: Optional[SX1302SpectralScan] = None
self._sx1261_configured = False
+ self._carrier_type: str = ""
+ self._concentrator_model_id: Optional[int] = None
+
+ def set_carrier_type(self, carrier_type: str) -> None:
+ """Set carrier signature for SX1261 path guardrails (from config or wizard)."""
+ self._carrier_type = (carrier_type or "").strip().lower()
+
+ @property
+ def concentrator_model_id(self) -> Optional[int]:
+ """Raw model byte from ``sx1302_get_model_id`` (0x02 / 0x03), or None."""
+ return self._concentrator_model_id
+
+ @property
+ def concentrator_model_label(self) -> str:
+ """Human label for telemetry and startup banners."""
+ mid = self._concentrator_model_id
+ if mid == SX1302_MODEL_ID_SX1303:
+ return "SX1303"
+ if mid == SX1302_MODEL_ID_SX1302:
+ return "SX1302"
+ return "unknown"
def load(self) -> None:
if not self._lib_path or not os.path.exists(self._lib_path):
@@ -161,9 +185,12 @@ def configure(self, plan: ConcentratorChannelPlan) -> None:
if self._lib is None:
self.load()
+ self._preflight_spi()
+ self._guard_sx1261_spi_path()
self._configure_board()
self._configure_rf_chains(plan)
self._configure_if_channels(plan)
+ self._read_concentrator_model_id()
self._configure_sx1261_for_spectral_scan()
logger.info("Concentrator configured with %d IF channels",
len(plan.multi_sf_channels) + (1 if plan.single_sf_channel else 0))
@@ -173,9 +200,15 @@ def start(self) -> None:
self.load()
result = self._lib.lgw_start()
if result != LGW_HAL_SUCCESS:
- raise RuntimeError("lgw_start() failed")
+ raise RuntimeError(
+ f"lgw_start() failed (spi={self._spi_path}, "
+ f"model={self.concentrator_model_label})"
+ )
self._started = True
- logger.info("SX1302 concentrator started")
+ logger.info(
+ "%s concentrator started",
+ self.concentrator_model_label,
+ )
def stop(self) -> None:
if self._started and self._lib:
@@ -412,6 +445,44 @@ def spectral_scan_supported(self) -> bool:
# ── Private: HAL configuration ──────────────────────────────────
+ def _preflight_spi(self) -> None:
+ if not os.path.exists(self._spi_path):
+ raise FileNotFoundError(
+ f"Concentrator SPI device {self._spi_path} not found. "
+ "Enable SPI in raspi-config (Interface Options → SPI) and "
+ "confirm the meshpoint user is in the spi group."
+ )
+
+ def _guard_sx1261_spi_path(self) -> None:
+ """Clear sx1261_spi_path on carriers where the chip is not Pi-visible."""
+ if not self._sx1261_spi_path:
+ return
+ if self._carrier_type not in CARRIERS_WITHOUT_PI_SX1261:
+ return
+ logger.warning(
+ "radio.sx1261_spi_path=%r is not supported on %s carriers "
+ "(SX1261 not reachable from the Pi); clearing path. "
+ "See docs/CONFIGURATION.md § Spectral Scan.",
+ self._sx1261_spi_path,
+ self._carrier_type,
+ )
+ self._sx1261_spi_path = ""
+
+ def _read_concentrator_model_id(self) -> None:
+ if self._lib is None or not hasattr(self._lib, "sx1302_get_model_id"):
+ return
+ model = ctypes.c_uint8()
+ rc = self._lib.sx1302_get_model_id(ctypes.byref(model))
+ if rc != LGW_HAL_SUCCESS:
+ logger.debug("sx1302_get_model_id returned rc=%d", rc)
+ return
+ self._concentrator_model_id = int(model.value)
+ logger.info(
+ "Concentrator model ID: 0x%02X (%s)",
+ self._concentrator_model_id,
+ self.concentrator_model_label,
+ )
+
def _configure_board(self) -> None:
conf = LgwConfBoardS()
conf.lorawan_public = False
diff --git a/tests/test_location_config.py b/tests/test_location_config.py
index 92a319b..c6908ea 100644
--- a/tests/test_location_config.py
+++ b/tests/test_location_config.py
@@ -84,11 +84,18 @@ def test_partial_override_preserves_other_fields(self) -> None:
path.unlink()
def test_uart_source_round_trips(self) -> None:
- path = self._write_yaml("location:\n source: uart\n")
+ path = self._write_yaml(
+ "location:\n"
+ " source: uart\n"
+ " uart_path: /dev/serial0\n"
+ " uart_baud: 115200\n"
+ )
try:
cfg = AppConfig()
_apply_yaml(cfg, path)
self.assertEqual(cfg.location.source, "uart")
+ self.assertEqual(cfg.location.uart_path, "/dev/serial0")
+ self.assertEqual(cfg.location.uart_baud, 115200)
finally:
path.unlink()
diff --git a/tests/test_location_static_uart_sources.py b/tests/test_location_static_uart_sources.py
index fe3850d..a69c5a9 100644
--- a/tests/test_location_static_uart_sources.py
+++ b/tests/test_location_static_uart_sources.py
@@ -3,8 +3,11 @@
from __future__ import annotations
import unittest
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
from src.config import DeviceConfig
+from src.hal.gps_reader import GpsPosition
from src.hal.location.static_source import StaticSource
from src.hal.location.uart_source import UartSource
@@ -75,21 +78,31 @@ async def test_source_name_is_stable(self) -> None:
class TestUartSource(unittest.IsolatedAsyncioTestCase):
- """``UartSource`` is a placeholder; surfaces an explanatory error."""
+ """``UartSource`` exposes live NMEA fixes via ``GpsReader``."""
- async def test_status_is_unavailable_with_explanatory_error(self) -> None:
+ async def test_status_before_start_is_unavailable(self) -> None:
source = UartSource()
- await source.start()
- try:
- status = source.get_status()
- self.assertEqual(status.source, "uart")
- self.assertFalse(status.available)
- self.assertIsNotNone(status.error)
- # Error must point users to the working alternatives.
- self.assertIn("static", status.error.lower())
- self.assertIn("gpsd", status.error.lower())
- finally:
- await source.stop()
+ status = source.get_status()
+ self.assertEqual(status.source, "uart")
+ self.assertFalse(status.available)
+ self.assertIn("not started", (status.error or "").lower())
+
+ async def test_status_with_fix(self) -> None:
+ source = UartSource()
+ source._reader = MagicMock()
+ source._started = True
+ source._reader.latest_position = GpsPosition(
+ latitude=51.5,
+ longitude=-0.1,
+ altitude=20.0,
+ satellites=6,
+ fix_quality=2,
+ timestamp=datetime(2026, 6, 2, tzinfo=timezone.utc),
+ )
+ status = source.get_status()
+ self.assertTrue(status.available)
+ self.assertIsNotNone(status.fix)
+ self.assertEqual(status.device.path, "/dev/ttyAMA0")
async def test_source_name_is_stable(self) -> None:
source = UartSource()
diff --git a/tests/test_sx1302_wrapper_hal_guard.py b/tests/test_sx1302_wrapper_hal_guard.py
new file mode 100644
index 0000000..eb9911f
--- /dev/null
+++ b/tests/test_sx1302_wrapper_hal_guard.py
@@ -0,0 +1,91 @@
+"""Tests for SX1302Wrapper SPI preflight, SX1261 carrier guard, and model ID."""
+
+from __future__ import annotations
+
+import ctypes
+import unittest
+from unittest.mock import MagicMock, patch
+
+from src.hal.sx1302_types import (
+ SX1302_MODEL_ID_SX1302,
+ SX1302_MODEL_ID_SX1303,
+)
+from src.hal.sx1302_wrapper import LGW_HAL_SUCCESS, SX1302Wrapper
+
+
+def _build_wrapper() -> SX1302Wrapper:
+ wrapper = SX1302Wrapper(sx1261_spi_path="/dev/spidev0.1")
+ wrapper._lib = MagicMock()
+ return wrapper
+
+
+class TestSx1261CarrierGuard(unittest.TestCase):
+ def test_rak_carrier_clears_non_empty_sx1261_path(self) -> None:
+ wrapper = _build_wrapper()
+ wrapper.set_carrier_type("rak")
+ wrapper._guard_sx1261_spi_path()
+ self.assertEqual(wrapper._sx1261_spi_path, "")
+
+ def test_sensecap_carrier_clears_non_empty_sx1261_path(self) -> None:
+ wrapper = _build_wrapper()
+ wrapper.set_carrier_type("sensecap_m1")
+ wrapper._guard_sx1261_spi_path()
+ self.assertEqual(wrapper._sx1261_spi_path, "")
+
+ def test_unknown_carrier_keeps_sx1261_path(self) -> None:
+ wrapper = _build_wrapper()
+ wrapper.set_carrier_type("")
+ wrapper._guard_sx1261_spi_path()
+ self.assertEqual(wrapper._sx1261_spi_path, "/dev/spidev0.1")
+
+ def test_empty_sx1261_path_unchanged_on_rak(self) -> None:
+ wrapper = SX1302Wrapper(sx1261_spi_path="")
+ wrapper.set_carrier_type("rak")
+ wrapper._guard_sx1261_spi_path()
+ self.assertEqual(wrapper._sx1261_spi_path, "")
+
+
+class TestSpiPreflight(unittest.TestCase):
+ def test_missing_spi_device_raises(self) -> None:
+ wrapper = SX1302Wrapper(spi_path="/dev/spidev0.0")
+ with patch("src.hal.sx1302_wrapper.os.path.exists", return_value=False):
+ with self.assertRaises(FileNotFoundError) as ctx:
+ wrapper._preflight_spi()
+ self.assertIn("raspi-config", str(ctx.exception))
+
+
+class TestConcentratorModelId(unittest.TestCase):
+ def test_model_label_sx1303(self) -> None:
+ wrapper = SX1302Wrapper()
+ wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1303
+ self.assertEqual(wrapper.concentrator_model_label, "SX1303")
+
+ def test_model_label_sx1302(self) -> None:
+ wrapper = SX1302Wrapper()
+ wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1302
+ self.assertEqual(wrapper.concentrator_model_label, "SX1302")
+
+ def test_read_model_id_from_hal(self) -> None:
+ wrapper = _build_wrapper()
+
+ def fake_get_model(model_ptr):
+ ptr = ctypes.cast(model_ptr, ctypes.POINTER(ctypes.c_uint8))
+ ptr[0] = SX1302_MODEL_ID_SX1303
+ return LGW_HAL_SUCCESS
+
+ wrapper._lib.sx1302_get_model_id.side_effect = fake_get_model
+ wrapper._read_concentrator_model_id()
+ self.assertEqual(wrapper._concentrator_model_id, SX1302_MODEL_ID_SX1303)
+
+
+class TestSx1261ConfigureSkipsWhenPathCleared(unittest.TestCase):
+ def test_configure_sx1261_skips_after_rak_guard(self) -> None:
+ wrapper = _build_wrapper()
+ wrapper.set_carrier_type("rak")
+ wrapper._guard_sx1261_spi_path()
+ wrapper._configure_sx1261_for_spectral_scan()
+ wrapper._lib.lgw_sx1261_setconf.assert_not_called()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_uart_location_source.py b/tests/test_uart_location_source.py
new file mode 100644
index 0000000..978d004
--- /dev/null
+++ b/tests/test_uart_location_source.py
@@ -0,0 +1,110 @@
+"""Tests for ``UartSource`` and UART-related config defaults."""
+
+from __future__ import annotations
+
+import unittest
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from src.config import AppConfig, LocationConfig
+from src.hal.gps_reader import GpsPosition
+from src.hal.location.factory import build_location_source
+from src.hal.location.uart_source import UartSource
+
+
+class TestLocationConfigUartDefaults(unittest.TestCase):
+ def test_uart_path_defaults_to_ttyama0(self) -> None:
+ cfg = LocationConfig()
+ self.assertEqual(cfg.uart_path, "/dev/ttyAMA0")
+
+ def test_uart_baud_defaults_to_9600(self) -> None:
+ cfg = LocationConfig()
+ self.assertEqual(cfg.uart_baud, 9600)
+
+
+class TestUartSource(unittest.IsolatedAsyncioTestCase):
+ async def test_no_fix_yet_is_available_without_error(self) -> None:
+ source = UartSource()
+ source._reader = MagicMock()
+ source._reader.latest_position = None
+ source._started = True
+
+ status = source.get_status()
+ self.assertEqual(status.source, "uart")
+ self.assertTrue(status.available)
+ self.assertIsNone(status.fix)
+
+ async def test_maps_gga_position_to_location_fix(self) -> None:
+ source = UartSource(min_fix_quality=1)
+ source._reader = MagicMock()
+ source._started = True
+ source._reader.latest_position = GpsPosition(
+ latitude=40.7128,
+ longitude=-74.0060,
+ altitude=12.0,
+ satellites=8,
+ fix_quality=1,
+ timestamp=datetime(2026, 6, 2, 12, 0, 0, tzinfo=timezone.utc),
+ )
+
+ status = source.get_status()
+ self.assertTrue(status.available)
+ self.assertIsNotNone(status.fix)
+ assert status.fix is not None
+ self.assertEqual(status.fix.latitude, 40.7128)
+ self.assertEqual(status.fix.longitude, -74.0060)
+ self.assertEqual(status.fix.altitude_m, 12.0)
+ self.assertIsNotNone(status.satellites)
+ assert status.satellites is not None
+ self.assertEqual(status.satellites.used, 8)
+
+ async def test_min_fix_quality_filters_weak_gga(self) -> None:
+ source = UartSource(min_fix_quality=2)
+ source._reader = MagicMock()
+ source._started = True
+ source._reader.latest_position = GpsPosition(
+ latitude=40.0,
+ longitude=-74.0,
+ altitude=0.0,
+ satellites=4,
+ fix_quality=1,
+ timestamp=datetime.now(timezone.utc),
+ )
+
+ status = source.get_status()
+ self.assertTrue(status.available)
+ self.assertIsNone(status.fix)
+
+ async def test_start_stop_wires_gps_reader(self) -> None:
+ source = UartSource(uart_path="/dev/ttyAMA0", baud=9600)
+ with patch("src.hal.location.uart_source.GpsReader") as reader_cls:
+ reader = MagicMock()
+ reader.start = AsyncMock()
+ reader.stop = AsyncMock()
+ reader_cls.return_value = reader
+
+ await source.start()
+ reader_cls.assert_called_once_with(
+ uart_path="/dev/ttyAMA0", baud=9600
+ )
+ reader.start.assert_awaited_once()
+
+ await source.stop()
+ reader.stop.assert_awaited_once()
+
+
+class TestUartFactory(unittest.TestCase):
+ def test_build_uart_source_from_config(self) -> None:
+ app = AppConfig()
+ app.location.source = "uart"
+ app.location.uart_path = "/dev/serial0"
+ app.location.uart_baud = 115200
+
+ source = build_location_source(app.location, app.device)
+ self.assertIsInstance(source, UartSource)
+ self.assertEqual(source._uart_path, "/dev/serial0")
+ self.assertEqual(source._baud, 115200)
+
+
+if __name__ == "__main__":
+ unittest.main()
From 7861cd1586731bec9ccd621511579f5d259b9940 Mon Sep 17 00:00:00 2001
From: iceice400
Date: Tue, 2 Jun 2026 13:26:13 -0400
Subject: [PATCH 2/2] feat(hal): GPS PPS timestamp sync via lgw_gps_* for
SX1303
---
config/default.yaml | 5 +
docs/CHANGELOG.md | 1 +
docs/CONFIGURATION.md | 28 +++
docs/plans/gps-pps-follow-up-issue.md | 71 ++++++
pr-body-gps-pps.md | 64 ++++++
src/api/routes/config_enrichment.py | 4 +
src/api/routes/gps_pps_status.py | 59 +++++
src/api/server.py | 5 +
src/capture/concentrator_source.py | 12 +
src/config.py | 26 +++
src/hal/sx1302_gps.py | 307 ++++++++++++++++++++++++++
src/hal/sx1302_gps_signatures.py | 69 ++++++
src/hal/sx1302_gps_types.py | 45 ++++
src/hal/sx1302_signatures.py | 3 +
src/hal/sx1302_wrapper.py | 32 +++
tests/test_hal_gps_pps.py | 117 ++++++++++
16 files changed, 848 insertions(+)
create mode 100644 docs/plans/gps-pps-follow-up-issue.md
create mode 100644 pr-body-gps-pps.md
create mode 100644 src/api/routes/gps_pps_status.py
create mode 100644 src/hal/sx1302_gps.py
create mode 100644 src/hal/sx1302_gps_signatures.py
create mode 100644 src/hal/sx1302_gps_types.py
create mode 100644 tests/test_hal_gps_pps.py
diff --git a/config/default.yaml b/config/default.yaml
index 28fe903..4d8947d 100644
--- a/config/default.yaml
+++ b/config/default.yaml
@@ -11,6 +11,11 @@ radio:
sync_word: 0x2B
preamble_length: 16
tx_power_dbm: 22
+ # HAL GPS/PPS (RAK Pi HAT u-blox → concentrator PPS pin). Off by default.
+ # gps_pps_enabled: false
+ # gps_pps_tty_path: "/dev/ttyAMA0"
+ # gps_family: "ubx7"
+ # gps_pps_target_baud: 0 # 0 = HAL default (9600)
meshtastic:
default_key_b64: "AQ=="
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 8134d0a..d105635 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -8,6 +8,7 @@
- **Concentrator model ID.** Startup logs SX1302 vs SX1303 when `libloragw` exposes `sx1302_get_model_id` (e.g. `Concentrator model ID: 0x12 (SX1303)`).
- **SX1261 guard.** Non-empty `radio.sx1261_spi_path` on RAK/SenseCap carriers (`radio.carrier_type`) is cleared at configure time with an explicit warning, preventing `lgw_start()` failures from misconfigured spectral scan on Pi-invisible SX1261 wiring.
- **SPI preflight.** Missing `/dev/spidev0.0` raises a clear error before `lgw_start()` (raspi-config / spi group hints).
+- **GPS PPS (HAL).** Optional `radio.gps_pps_*` enables `lgw_gps_*` / `sx1302_gps_enable` timestamp sync on RAK Pi HATs with PPS wired to the concentrator. `GET /api/device/gps-pps-status`; config rejects sharing the HAT UART with `location.source: uart`.
### v0.7.6 (June 2026)
diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md
index cf94dd7..8406311 100644
--- a/docs/CONFIGURATION.md
+++ b/docs/CONFIGURATION.md
@@ -81,6 +81,34 @@ On RAK and SenseCap carriers, `meshpoint setup` writes `radio.carrier_type`. Whe
If your `libloragw` build does not expose the spectral scan symbols at all (older HAL revisions), the service logs a single info line at startup and falls back automatically.
+### GPS PPS time sync (HAL)
+
+Separate from [Location (GPS) source](#location-gps-source) (dashboard lat/lon). PPS sync wires the concentrator's internal packet counter to GPS time via Semtech `lgw_gps_*` and `sx1302_gps_enable` — useful when you need accurate `timestamp_us` on RX metadata (logging, TDoA-style analysis, correlating captures to wall clock).
+
+```yaml
+radio:
+ gps_pps_enabled: false # set true on RAK Pi HAT with PPS wired to SX1303
+ gps_pps_tty_path: "/dev/ttyAMA0"
+ gps_family: "ubx7" # passed to lgw_gps_enable
+ gps_pps_target_baud: 0 # 0 = HAL default (9600)
+```
+
+Requirements:
+
+- `libloragw` built from the SX1302 HAL tree with `loragw_gps.c` linked (stock `lora_gateway` packages on some images omit these symbols — startup logs `GPS/PPS sync unavailable` and continues).
+- u-blox on the HAT UART with PPS routed to the concentrator (RAK2287/5146/SX1303 HAT).
+- Clear sky: first `lgw_gps_sync` may take minutes until UBX time messages arrive.
+
+**Do not** enable `gps_pps_enabled` and `location.source: uart` on the same `tty_path`. Only one owner can open the serial port. Typical RAK deployments:
+
+| Goal | `location.source` | `radio.gps_pps_enabled` |
+|---|---|---|
+| Map position from HAT GPS | `uart` | `false` |
+| Accurate packet timestamps + map from USB gpsd | `gpsd` | `true` on `/dev/ttyAMA0` |
+| Timestamps only, static map coords | `static` | `true` |
+
+Status: `GET /api/device/gps-pps-status` (enabled, sync count, last error, reference counter). Config keys also appear under `radio_advanced` in `GET /api/config`.
+
### Standard Meshtastic Presets
To match a Meshtastic preset, set `spreading_factor` and `bandwidth_khz` together:
diff --git a/docs/plans/gps-pps-follow-up-issue.md b/docs/plans/gps-pps-follow-up-issue.md
new file mode 100644
index 0000000..35710b1
--- /dev/null
+++ b/docs/plans/gps-pps-follow-up-issue.md
@@ -0,0 +1,71 @@
+# GitHub issue draft: HAL GPS/PPS sync on SX1303 (follow-up to #65)
+
+**Title:** `feat(hal): GPS PPS timestamp sync via lgw_gps_* for RAK SX1303 HAT`
+
+**Labels:** `enhancement`, `hardware`, `hal`
+
+---
+
+## Context
+
+PR #65 (RAK SX1303 HAL guard + `location.source: uart`) deliberately split two GPS concerns:
+
+| Layer | Config | Purpose |
+|--------|--------|---------|
+| Dashboard / NodeInfo | `location.source` (`static`, `gpsd`, `uart`) | Human-readable lat/lon |
+| Concentrator timestamps | `radio.gps_pps_*` + Semtech `lgw_gps_*` | Align `timestamp_us` with GPS via PPS |
+
+On RAK Pi gateways the u-blox speaks NMEA/UBX on `/dev/ttyAMA0` and drives a PPS line into the SX1302/SX1303. Meshpoint can use the HAL to parse UBX, call `lgw_gps_sync` on `UBX-NAV-TIMEGPS`, and enable `sx1302_gps_enable(true)` — but only when `libloragw` includes `loragw_gps.c`.
+
+## Problem
+
+Without PPS sync, packet `timestamp_us` values are concentrator-relative counters. That is fine for RSSI/SNR analytics but weak for:
+
+- Correlating captures to wall-clock or external sensors
+- Future TDoA / multilateration experiments
+- Debugging “when did this burst happen?” across reboots
+
+## Proposed solution
+
+1. ctypes bindings for `lgw_gps_enable`, `lgw_parse_nmea`/`ubx`, `lgw_gps_get`, `lgw_gps_sync`, `lgw_get_trigcnt`, `lgw_cnt2utc`, `sx1302_gps_enable`.
+2. Background reader thread after `lgw_start()`.
+3. Config under `radio:`:
+ - `gps_pps_enabled` (default `false`)
+ - `gps_pps_tty_path` (default `/dev/ttyAMA0`)
+ - `gps_family` (default `ubx7`)
+ - `gps_pps_target_baud` (default `0` = HAL default)
+4. Startup guard: reject `gps_pps_enabled` + `location.source: uart` on the same TTY.
+5. `GET /api/device/gps-pps-status` for operators.
+6. Docs in `CONFIGURATION.md` with the uart vs PPS matrix.
+
+## Acceptance criteria
+
+- [ ] With `gps_pps_enabled: true` on hardware with PPS wired, logs show `GPS/PPS sync #1 ok` after fix.
+- [ ] `GET /api/device/gps-pps-status` reports `last_sync_ok: true` and increasing `sync_count`.
+- [ ] Misconfigured dual-UART use fails at config load with a clear error.
+- [ ] Graceful degrade when HAL lacks GPS symbols (info log, concentrator still starts).
+- [ ] Unit tests mock `libloragw` GPS entry points.
+
+## Out of scope (this issue)
+
+- Automatic UBX CFG-PPS programming (operators may use u-center / `gpsd` once).
+- Sharing one UART between HAL PPS and `UartSource` (documented conflict; use `gpsd` or `static` for map coords).
+- Spectral scan / SX1261 path changes (covered in #65).
+
+## Hardware test plan
+
+1. RAK2287/5146 + Pi, `chip version 0x12` (SX1303), outdoor antenna.
+2. `config/local.yaml`:
+ ```yaml
+ radio:
+ gps_pps_enabled: true
+ location:
+ source: static # or gpsd on USB
+ ```
+3. `journalctl -u meshpoint -f` → expect PPS sync lines.
+4. `curl -s -H "Authorization: Bearer …" http://127.0.0.1:8080/api/device/gps-pps-status | jq`
+
+## References
+
+- Semtech `loragw_gps.h` / `loragw_sx1302.h` in sx1302_hal
+- Related: #65, RAK `install.sh` UART enable for `/dev/ttyAMA0`
diff --git a/pr-body-gps-pps.md b/pr-body-gps-pps.md
new file mode 100644
index 0000000..968f05c
--- /dev/null
+++ b/pr-body-gps-pps.md
@@ -0,0 +1,64 @@
+## Summary
+
+Follow-up to #65: adds Semtech **HAL GPS/PPS** bindings so the SX1302/SX1303 concentrator can align internal packet `timestamp_us` with GPS time on RAK Pi HATs (u-blox on `/dev/ttyAMA0`, PPS into the concentrator).
+
+This is **orthogonal** to `location.source: uart` / `gpsd`, which only feed dashboard coordinates and NodeInfo.
+
+## Changes
+
+**HAL (`src/hal/`)**
+- `sx1302_gps_types.py` — `TimespecS`, `TrefS`, `CoordS`, GPS message constants.
+- `sx1302_gps_signatures.py` + `apply_gps_signatures()` from `apply_signatures()`.
+- `sx1302_gps.py` — `HalGpsPpsSync`: `lgw_gps_enable`, background UBX/NMEA parser, `lgw_gps_sync` on `UBX-NAV-TIMEGPS`, `sx1302_gps_enable`, optional `lgw_cnt2utc`.
+- `SX1302Wrapper.start_gps_pps()` / `stop_gps_pps()`; `ConcentratorCaptureSource` starts PPS after `lgw_start()` when configured.
+
+**Config**
+- `radio.gps_pps_enabled`, `gps_pps_tty_path`, `gps_family`, `gps_pps_target_baud`.
+- `validate_config_consistency()` rejects `gps_pps_enabled` + `location.source: uart` on the same TTY.
+
+**API**
+- `GET /api/device/gps-pps-status` — sync count, last error, reference counter.
+- `radio_advanced` in `GET /api/config` exposes PPS fields.
+
+**Docs**
+- `CONFIGURATION.md` — GPS PPS section + uart/PPS matrix.
+- `docs/plans/gps-pps-follow-up-issue.md` — issue template for trackers.
+
+## Why
+
+#65 fixed `lgw_start()` on RAK/SX1303 and wired UART for **map** GPS. Operators running SX1303 with PPS still need concentrator-time alignment for accurate packet timestamps — that path lives in `lgw_gps_*`, not in NMEA GGA parsing.
+
+## Operator notes
+
+| Need | Config |
+|------|--------|
+| Live map from HAT UART | `location.source: uart`, `gps_pps_enabled: false` |
+| PPS timestamps + USB gpsd map | `location.source: gpsd`, `gps_pps_enabled: true` |
+| PPS only, fixed coords | `location.source: static`, `gps_pps_enabled: true` |
+
+Requires `libloragw` built with `loragw_gps.c`. If symbols are missing, Meshpoint logs once and continues without PPS.
+
+## Type
+
+- [x] Feature
+- [x] Hardware change
+- [x] Docs
+- [ ] Bug fix
+- [ ] UI (status API only; no dashboard card in this PR)
+
+## Testing
+
+- [x] Local unit tests
+- [ ] Tested on RAK hardware (needs reviewer / field)
+
+```bash
+python -m unittest tests.test_hal_gps_pps tests.test_sx1302_wrapper_hal_guard -v
+```
+
+## Depends on
+
+- Best reviewed **after** #65 merges (stacked from the same fork branch family). Can rebase onto `main` once #65 lands.
+
+## Closes
+
+
diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py
index 6926d63..6171ff4 100644
--- a/src/api/routes/config_enrichment.py
+++ b/src/api/routes/config_enrichment.py
@@ -57,6 +57,10 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict:
"spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds,
"sx1261_spi_path": radio.sx1261_spi_path or "",
"carrier_type": radio.carrier_type or "",
+ "gps_pps_enabled": radio.gps_pps_enabled,
+ "gps_pps_tty_path": radio.gps_pps_tty_path,
+ "gps_family": radio.gps_family,
+ "gps_pps_target_baud": radio.gps_pps_target_baud,
}
base["location"] = {
"source": location.source,
diff --git a/src/api/routes/gps_pps_status.py b/src/api/routes/gps_pps_status.py
new file mode 100644
index 0000000..cba2a4f
--- /dev/null
+++ b/src/api/routes/gps_pps_status.py
@@ -0,0 +1,59 @@
+"""Read-only HAL GPS/PPS sync status from the concentrator wrapper."""
+
+from __future__ import annotations
+
+import logging
+from typing import Callable, Optional
+
+from fastapi import APIRouter
+
+from src.hal.sx1302_wrapper import SX1302Wrapper
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/api/device", tags=["device"])
+
+_get_wrapper: Optional[Callable[[], Optional[SX1302Wrapper]]] = None
+
+
+def init_routes(
+ get_wrapper: Callable[[], Optional[SX1302Wrapper]],
+) -> None:
+ global _get_wrapper
+ _get_wrapper = get_wrapper
+
+
+def reset_routes() -> None:
+ global _get_wrapper
+ _get_wrapper = None
+
+
+@router.get("/gps-pps-status")
+async def gps_pps_status() -> dict:
+ """Return HAL PPS sync state (separate from ``/api/device/gps-status``)."""
+ if _get_wrapper is None:
+ return _idle_payload(error="routes not initialized")
+
+ wrapper = _get_wrapper()
+ if wrapper is None:
+ return _idle_payload(error="concentrator capture source not running")
+
+ gps = wrapper.gps_pps
+ if gps is None:
+ return _idle_payload()
+
+ return gps.get_status().to_dict()
+
+
+def _idle_payload(error: Optional[str] = None) -> dict:
+ return {
+ "enabled": False,
+ "available": False,
+ "tty_path": "",
+ "gps_family": "",
+ "sync_count": 0,
+ "last_sync_ok": False,
+ "last_error": error,
+ "xtal_err": None,
+ "reference_count_us": None,
+ }
diff --git a/src/api/server.py b/src/api/server.py
index 3bd4fb0..ebfa5a6 100644
--- a/src/api/server.py
+++ b/src/api/server.py
@@ -41,6 +41,7 @@
dangerous_routes,
device,
device_config_routes,
+ gps_pps_status,
gps_status,
identity_routes,
messages,
@@ -276,6 +277,7 @@ async def lifespan(app: FastAPI):
app.include_router(upstream_config_routes.router, dependencies=protected)
app.include_router(device_config_routes.router, dependencies=protected)
app.include_router(gps_status.router, dependencies=protected)
+ app.include_router(gps_pps_status.router, dependencies=protected)
app.include_router(system_config_routes.router, dependencies=protected)
app.include_router(meshcore_config_routes.router, dependencies=protected)
app.include_router(config_routes.router, dependencies=protected)
@@ -1265,6 +1267,9 @@ def _init_routes(
upstream_config_routes.init_routes(config=config)
device_config_routes.init_routes(config=config, identity=identity)
gps_status.init_routes(location_source=coord.location_source)
+ gps_pps_status.init_routes(
+ get_wrapper=lambda: _get_concentrator_wrapper(coord),
+ )
system_config_routes.init_routes(config=config)
meshcore_config_routes.init_routes(config=config, tx_service=tx_service)
diff --git a/src/capture/concentrator_source.py b/src/capture/concentrator_source.py
index bca0568..b5241a7 100644
--- a/src/capture/concentrator_source.py
+++ b/src/capture/concentrator_source.py
@@ -96,6 +96,17 @@ async def start(self) -> None:
self._wrapper.start()
self._wrapper.set_syncword(self._syncword)
+ if self._radio_config is not None and self._radio_config.gps_pps_enabled:
+ ok = self._wrapper.start_gps_pps(
+ tty_path=self._radio_config.gps_pps_tty_path,
+ gps_family=self._radio_config.gps_family,
+ target_baud=self._radio_config.gps_pps_target_baud,
+ )
+ if not ok:
+ logger.warning(
+ "GPS/PPS requested but HAL sync did not start "
+ "(see earlier warnings)"
+ )
self._running = True
logger.info(
"Concentrator capture started (syncword=0x%02X)",
@@ -104,6 +115,7 @@ async def start(self) -> None:
async def stop(self) -> None:
self._running = False
+ self._wrapper.stop_gps_pps()
self._wrapper.stop()
logger.info("Concentrator capture stopped")
diff --git a/src/config.py b/src/config.py
index b44f276..c5adf37 100644
--- a/src/config.py
+++ b/src/config.py
@@ -72,6 +72,14 @@ class RadioConfig:
# ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI
# paths that brick ``lgw_start()`` on RAK/SenseCap concentrators.
carrier_type: str = ""
+ # HAL GPS/PPS: align concentrator packet timestamps with GPS time via
+ # libloragw ``lgw_gps_*`` + ``sx1302_gps_enable``. Requires a HAL build
+ # that includes loragw_gps.c. Exclusive with ``location.source: uart`` on
+ # the same TTY (only one process may open the GPS serial port).
+ gps_pps_enabled: bool = False
+ gps_pps_tty_path: str = "/dev/ttyAMA0"
+ gps_family: str = "ubx7"
+ gps_pps_target_baud: int = 0
@dataclass
@@ -455,10 +463,28 @@ def load_config(config_path: Optional[str] = None) -> AppConfig:
local = config_path or os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml")
_apply_yaml(cfg, _validated_config_path(local))
_resolve_radio_frequency(cfg.radio)
+ validate_config_consistency(cfg)
return cfg
+def validate_config_consistency(config: AppConfig) -> None:
+ """Reject impossible radio/location combinations before hardware starts."""
+ if not config.radio.gps_pps_enabled:
+ return
+ if config.location.source != "uart":
+ return
+ pps_tty = os.path.normpath(config.radio.gps_pps_tty_path)
+ uart_tty = os.path.normpath(config.location.uart_path)
+ if pps_tty == uart_tty:
+ raise ValueError(
+ "radio.gps_pps_enabled and location.source=uart cannot share "
+ f"the same serial device ({pps_tty!r}). Use location.source=gpsd "
+ "or static for dashboard coordinates while PPS owns the HAT UART, "
+ "or disable gps_pps_enabled when using UART for location only."
+ )
+
+
def _get_local_yaml_path() -> Path:
"""Resolve the local.yaml path used for user overrides."""
raw = os.environ.get("CONCENTRATOR_CONFIG", "config/local.yaml")
diff --git a/src/hal/sx1302_gps.py b/src/hal/sx1302_gps.py
new file mode 100644
index 0000000..220ffe4
--- /dev/null
+++ b/src/hal/sx1302_gps.py
@@ -0,0 +1,307 @@
+"""Semtech libloragw GPS/PPS time synchronization (lgw_gps_* bindings).
+
+Aligns the SX1302/SX1303 internal packet counter with GPS time using
+the HAL's serial parser and ``lgw_gps_sync``. This is separate from
+``location.source: uart`` / ``gpsd``, which only feed dashboard
+coordinates — PPS sync is for accurate ``timestamp_us`` on RX packets.
+
+Typical RAK Pi HAT wiring: u-blox on ``/dev/ttyAMA0``, PPS into the
+concentrator. Do not enable PPS and ``location.source: uart`` on the
+same TTY simultaneously (exclusive open).
+"""
+
+from __future__ import annotations
+
+import ctypes
+import logging
+import os
+import threading
+import time
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Optional
+
+from src.hal.sx1302_gps_types import (
+ CoordS,
+ GPS_MSG_UBX_NAV_TIMEGPS,
+ LGW_GPS_ERROR,
+ LGW_GPS_SUCCESS,
+ TrefS,
+ TimespecS,
+)
+
+if TYPE_CHECKING:
+ from ctypes import CDLL
+
+logger = logging.getLogger(__name__)
+
+LGW_HAL_SUCCESS = 0
+_READ_CHUNK = 256
+_PARSE_BUF = 4096
+
+
+@dataclass
+class GpsPpsStatus:
+ """Runtime snapshot for dashboards and ``GET /api/device/gps-pps-status``."""
+
+ enabled: bool
+ available: bool
+ tty_path: str
+ gps_family: str
+ sync_count: int
+ last_sync_ok: bool
+ last_error: Optional[str] = None
+ xtal_err: Optional[float] = None
+ reference_count_us: Optional[int] = None
+
+ def to_dict(self) -> dict:
+ return {
+ "enabled": self.enabled,
+ "available": self.available,
+ "tty_path": self.tty_path,
+ "gps_family": self.gps_family,
+ "sync_count": self.sync_count,
+ "last_sync_ok": self.last_sync_ok,
+ "last_error": self.last_error,
+ "xtal_err": self.xtal_err,
+ "reference_count_us": self.reference_count_us,
+ }
+
+
+class HalGpsPpsSync:
+ """Background GPS reader + periodic ``lgw_gps_sync`` against PPS."""
+
+ def __init__(
+ self,
+ lib: CDLL,
+ tty_path: str = "/dev/ttyAMA0",
+ gps_family: str = "ubx7",
+ target_baud: int = 0,
+ ) -> None:
+ self._lib = lib
+ self._tty_path = tty_path
+ self._gps_family = gps_family
+ self._target_baud = target_baud
+ self._fd: int = -1
+ self._thread: Optional[threading.Thread] = None
+ self._stop = threading.Event()
+ self._lock = threading.Lock()
+ self._ref = TrefS()
+ self._ref.systime = 0
+ self._sync_count = 0
+ self._last_sync_ok = False
+ self._last_error: Optional[str] = None
+ self._enabled = False
+
+ @property
+ def supported(self) -> bool:
+ return hasattr(self._lib, "lgw_gps_enable") and hasattr(
+ self._lib, "lgw_gps_sync"
+ )
+
+ def get_status(self) -> GpsPpsStatus:
+ with self._lock:
+ return GpsPpsStatus(
+ enabled=self._enabled,
+ available=self._fd >= 0 and self.supported,
+ tty_path=self._tty_path,
+ gps_family=self._gps_family,
+ sync_count=self._sync_count,
+ last_sync_ok=self._last_sync_ok,
+ last_error=self._last_error,
+ xtal_err=self._ref.xtal_err if self._last_sync_ok else None,
+ reference_count_us=(
+ int(self._ref.count_us) if self._last_sync_ok else None
+ ),
+ )
+
+ def start(self) -> bool:
+ """Open GPS UART via HAL and enable SX1302 PPS sampling if available."""
+ if not self.supported:
+ self._last_error = "libloragw lacks lgw_gps_enable/lgw_gps_sync"
+ logger.info("GPS/PPS sync unavailable: %s", self._last_error)
+ return False
+
+ if self._enabled:
+ return True
+
+ fd = ctypes.c_int(-1)
+ tty_b = self._tty_path.encode("ascii")
+ family_b = self._gps_family.encode("ascii")
+ baud = self._target_baud
+ if hasattr(self._lib, "lgw_gps_enable"):
+ try:
+ from termios import B9600
+
+ if baud == 0:
+ baud = B9600
+ except ImportError:
+ if baud == 0:
+ baud = 13 # B9600 on Linux; Pi-only path
+
+ rc = self._lib.lgw_gps_enable(
+ tty_b, family_b, ctypes.c_uint(baud), ctypes.byref(fd)
+ )
+ if rc != LGW_GPS_SUCCESS:
+ self._last_error = f"lgw_gps_enable({self._tty_path}) failed (rc={rc})"
+ logger.warning("GPS/PPS: %s", self._last_error)
+ return False
+
+ self._fd = int(fd.value)
+ if hasattr(self._lib, "sx1302_gps_enable"):
+ sx_rc = self._lib.sx1302_gps_enable(True)
+ if sx_rc != LGW_HAL_SUCCESS:
+ logger.warning(
+ "sx1302_gps_enable(true) returned %s (PPS may be unavailable)",
+ sx_rc,
+ )
+
+ self._stop.clear()
+ self._thread = threading.Thread(
+ target=self._reader_loop,
+ name="hal-gps-pps",
+ daemon=True,
+ )
+ self._thread.start()
+ self._enabled = True
+ logger.info(
+ "GPS/PPS sync started (tty=%s family=%s fd=%d)",
+ self._tty_path,
+ self._gps_family,
+ self._fd,
+ )
+ return True
+
+ def stop(self) -> None:
+ if not self._enabled:
+ return
+ self._stop.set()
+ if self._thread is not None:
+ self._thread.join(timeout=3.0)
+ self._thread = None
+
+ if self._fd >= 0 and hasattr(self._lib, "lgw_gps_disable"):
+ self._lib.lgw_gps_disable(self._fd)
+ self._fd = -1
+
+ if hasattr(self._lib, "sx1302_gps_enable"):
+ self._lib.sx1302_gps_enable(False)
+
+ self._enabled = False
+ logger.info("GPS/PPS sync stopped")
+
+ def convert_timestamp_us_to_utc(
+ self, count_us: int
+ ) -> Optional[tuple[int, int]]:
+ """Return (tv_sec, tv_nsec) UTC for a concentrator counter, if synced."""
+ if not self._last_sync_ok or not hasattr(self._lib, "lgw_cnt2utc"):
+ return None
+ utc = TimespecS()
+ with self._lock:
+ ref_copy = TrefS()
+ ctypes.memmove(
+ ctypes.byref(ref_copy),
+ ctypes.byref(self._ref),
+ ctypes.sizeof(TrefS),
+ )
+ rc = self._lib.lgw_cnt2utc(ref_copy, count_us, ctypes.byref(utc))
+ if rc != LGW_GPS_SUCCESS:
+ return None
+ return int(utc.tv_sec), int(utc.tv_nsec)
+
+ def _reader_loop(self) -> None:
+ parse_buf = bytearray()
+ while not self._stop.is_set():
+ if self._fd < 0:
+ time.sleep(0.5)
+ continue
+ try:
+ chunk = os.read(self._fd, _READ_CHUNK)
+ except OSError as exc:
+ self._last_error = f"GPS read failed: {exc}"
+ time.sleep(1.0)
+ continue
+
+ if not chunk:
+ time.sleep(0.05)
+ continue
+
+ parse_buf.extend(chunk)
+ if len(parse_buf) > _PARSE_BUF:
+ del parse_buf[: len(parse_buf) - _PARSE_BUF]
+
+ self._consume_parse_buffer(parse_buf)
+
+ def _consume_parse_buffer(self, buf: bytearray) -> None:
+ if not buf:
+ return
+
+ text = bytes(buf).decode("ascii", errors="ignore")
+ for line in text.split("\n"):
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith("$"):
+ with self._lock:
+ if hasattr(self._lib, "lgw_parse_nmea"):
+ self._lib.lgw_parse_nmea(
+ line.encode("ascii"),
+ len(line) + 1,
+ )
+
+ if hasattr(self._lib, "lgw_parse_ubx"):
+ msg_size = ctypes.c_size_t(0)
+ raw = bytes(buf)
+ with self._lock:
+ msg = self._lib.lgw_parse_ubx(
+ raw,
+ len(raw),
+ ctypes.byref(msg_size),
+ )
+ if msg == GPS_MSG_UBX_NAV_TIMEGPS:
+ self._attempt_sync()
+
+ def _attempt_sync(self) -> None:
+ utc = TimespecS()
+ gps_time = TimespecS()
+ with self._lock:
+ if hasattr(self._lib, "lgw_gps_get"):
+ rc_get = self._lib.lgw_gps_get(
+ ctypes.byref(utc),
+ ctypes.byref(gps_time),
+ None,
+ None,
+ )
+ else:
+ return
+ if rc_get != LGW_GPS_SUCCESS:
+ return
+
+ count_us = ctypes.c_uint32(0)
+ if hasattr(self._lib, "lgw_get_trigcnt"):
+ rc_cnt = self._lib.lgw_get_trigcnt(ctypes.byref(count_us))
+ if rc_cnt != LGW_HAL_SUCCESS:
+ self._last_error = "lgw_get_trigcnt failed before gps_sync"
+ return
+
+ with self._lock:
+ rc = self._lib.lgw_gps_sync(
+ ctypes.byref(self._ref),
+ count_us.value,
+ utc, # struct passed by value per loragw_gps.h
+ gps_time,
+ )
+
+ self._sync_count += 1
+ if rc == LGW_GPS_SUCCESS:
+ self._last_sync_ok = True
+ self._last_error = None
+ if self._sync_count == 1 or self._sync_count % 60 == 0:
+ logger.info(
+ "GPS/PPS sync #%d ok (count_us=%u xtal_err=%.9f)",
+ self._sync_count,
+ count_us.value,
+ self._ref.xtal_err,
+ )
+ else:
+ self._last_sync_ok = False
+ self._last_error = f"lgw_gps_sync returned {rc}"
diff --git a/src/hal/sx1302_gps_signatures.py b/src/hal/sx1302_gps_signatures.py
new file mode 100644
index 0000000..9186c11
--- /dev/null
+++ b/src/hal/sx1302_gps_signatures.py
@@ -0,0 +1,69 @@
+"""ctypes signatures for libloragw GPS/PPS functions (loragw_gps.h)."""
+
+from __future__ import annotations
+
+import ctypes
+
+from src.hal.sx1302_gps_types import CoordS, TrefS, TimespecS
+
+
+def apply_gps_signatures(lib: ctypes.CDLL) -> None:
+ """Register optional GPS symbols when present in libloragw.so."""
+ if hasattr(lib, "lgw_gps_enable"):
+ lib.lgw_gps_enable.restype = ctypes.c_int
+ lib.lgw_gps_enable.argtypes = [
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_uint,
+ ctypes.POINTER(ctypes.c_int),
+ ]
+
+ if hasattr(lib, "lgw_gps_disable"):
+ lib.lgw_gps_disable.restype = ctypes.c_int
+ lib.lgw_gps_disable.argtypes = [ctypes.c_int]
+
+ if hasattr(lib, "lgw_parse_nmea"):
+ lib.lgw_parse_nmea.restype = ctypes.c_int
+ lib.lgw_parse_nmea.argtypes = [ctypes.c_char_p, ctypes.c_int]
+
+ if hasattr(lib, "lgw_parse_ubx"):
+ lib.lgw_parse_ubx.restype = ctypes.c_int
+ lib.lgw_parse_ubx.argtypes = [
+ ctypes.c_char_p,
+ ctypes.c_size_t,
+ ctypes.POINTER(ctypes.c_size_t),
+ ]
+
+ if hasattr(lib, "lgw_gps_get"):
+ lib.lgw_gps_get.restype = ctypes.c_int
+ lib.lgw_gps_get.argtypes = [
+ ctypes.POINTER(TimespecS),
+ ctypes.POINTER(TimespecS),
+ ctypes.POINTER(CoordS),
+ ctypes.POINTER(CoordS),
+ ]
+
+ if hasattr(lib, "lgw_gps_sync"):
+ lib.lgw_gps_sync.restype = ctypes.c_int
+ lib.lgw_gps_sync.argtypes = [
+ ctypes.POINTER(TrefS),
+ ctypes.c_uint32,
+ TimespecS,
+ TimespecS,
+ ]
+
+ if hasattr(lib, "lgw_get_trigcnt"):
+ lib.lgw_get_trigcnt.restype = ctypes.c_int
+ lib.lgw_get_trigcnt.argtypes = [ctypes.POINTER(ctypes.c_uint32)]
+
+ if hasattr(lib, "lgw_cnt2utc"):
+ lib.lgw_cnt2utc.restype = ctypes.c_int
+ lib.lgw_cnt2utc.argtypes = [
+ TrefS,
+ ctypes.c_uint32,
+ ctypes.POINTER(TimespecS),
+ ]
+
+ if hasattr(lib, "sx1302_gps_enable"):
+ lib.sx1302_gps_enable.restype = ctypes.c_int
+ lib.sx1302_gps_enable.argtypes = [ctypes.c_bool]
diff --git a/src/hal/sx1302_gps_types.py b/src/hal/sx1302_gps_types.py
new file mode 100644
index 0000000..c7956d0
--- /dev/null
+++ b/src/hal/sx1302_gps_types.py
@@ -0,0 +1,45 @@
+"""ctypes mirrors of loragw_gps.h structures for PPS time sync."""
+
+from __future__ import annotations
+
+import ctypes
+
+
+class TimespecS(ctypes.Structure):
+ """``struct timespec`` as used by the HAL GPS module."""
+
+ _fields_ = [
+ ("tv_sec", ctypes.c_long),
+ ("tv_nsec", ctypes.c_long),
+ ]
+
+
+class TrefS(ctypes.Structure):
+ """``struct tref`` — concentrator counter to GPS/UTC mapping."""
+
+ _fields_ = [
+ ("systime", ctypes.c_long),
+ ("count_us", ctypes.c_uint32),
+ ("utc", TimespecS),
+ ("gps", TimespecS),
+ ("xtal_err", ctypes.c_double),
+ ]
+
+
+class CoordS(ctypes.Structure):
+ """``struct coord_s`` — geodesic coordinates from NMEA/UBX."""
+
+ _fields_ = [
+ ("lat", ctypes.c_double),
+ ("lon", ctypes.c_double),
+ ("alt", ctypes.c_short),
+ ]
+
+
+# loragw_gps.h
+LGW_GPS_SUCCESS = 0
+LGW_GPS_ERROR = -1
+
+# gps_msg enum subset used after parse
+GPS_MSG_UNKNOWN = 0
+GPS_MSG_UBX_NAV_TIMEGPS = 13
diff --git a/src/hal/sx1302_signatures.py b/src/hal/sx1302_signatures.py
index ec0f92a..e27fdb5 100644
--- a/src/hal/sx1302_signatures.py
+++ b/src/hal/sx1302_signatures.py
@@ -12,6 +12,7 @@
import ctypes
+from src.hal.sx1302_gps_signatures import apply_gps_signatures
from src.hal.sx1302_types import (
LgwConfBoardS,
LgwConfRxifS,
@@ -93,3 +94,5 @@ def apply_signatures(lib: ctypes.CDLL) -> None:
if hasattr(lib, "sx1302_get_model_id"):
lib.sx1302_get_model_id.restype = ctypes.c_int
lib.sx1302_get_model_id.argtypes = [ctypes.POINTER(ctypes.c_uint8)]
+
+ apply_gps_signatures(lib)
diff --git a/src/hal/sx1302_wrapper.py b/src/hal/sx1302_wrapper.py
index a46d404..1d79a6f 100644
--- a/src/hal/sx1302_wrapper.py
+++ b/src/hal/sx1302_wrapper.py
@@ -15,6 +15,7 @@
from typing import Optional
from src.hal.concentrator_config import ConcentratorChannelPlan
+from src.hal.sx1302_gps import HalGpsPpsSync
from src.hal.sx1302_signatures import apply_signatures
from src.hal.sx1302_spectral_scan import (
SpectralScanResult,
@@ -115,6 +116,11 @@ def __init__(
self._sx1261_configured = False
self._carrier_type: str = ""
self._concentrator_model_id: Optional[int] = None
+ self._gps_pps: Optional[HalGpsPpsSync] = None
+
+ @property
+ def gps_pps(self) -> Optional[HalGpsPpsSync]:
+ return self._gps_pps
def set_carrier_type(self, carrier_type: str) -> None:
"""Set carrier signature for SX1261 path guardrails (from config or wizard)."""
@@ -217,11 +223,37 @@ def start(self) -> None:
)
def stop(self) -> None:
+ self.stop_gps_pps()
if self._started and self._lib:
self._lib.lgw_stop()
self._started = False
logger.info("SX1302 concentrator stopped")
+ def start_gps_pps(
+ self,
+ tty_path: str = "/dev/ttyAMA0",
+ gps_family: str = "ubx7",
+ target_baud: int = 0,
+ ) -> bool:
+ """Enable HAL GPS UART + PPS sync (call after ``lgw_start``)."""
+ if self._lib is None:
+ self.load()
+ if self._gps_pps is None:
+ self._gps_pps = HalGpsPpsSync(
+ self._lib,
+ tty_path=tty_path,
+ gps_family=gps_family,
+ target_baud=target_baud,
+ )
+ if not self._started:
+ logger.warning("GPS/PPS: concentrator not started yet")
+ return False
+ return self._gps_pps.start()
+
+ def stop_gps_pps(self) -> None:
+ if self._gps_pps is not None:
+ self._gps_pps.stop()
+
def receive(self) -> list[ConcentratorPacket]:
"""Poll for received packets. Non-blocking.
diff --git a/tests/test_hal_gps_pps.py b/tests/test_hal_gps_pps.py
new file mode 100644
index 0000000..d8fc1fa
--- /dev/null
+++ b/tests/test_hal_gps_pps.py
@@ -0,0 +1,117 @@
+"""Tests for HAL GPS/PPS ctypes bindings and wrapper integration."""
+
+from __future__ import annotations
+
+import ctypes
+import unittest
+from unittest.mock import MagicMock, patch
+
+from src.config import AppConfig, LocationConfig, RadioConfig, validate_config_consistency
+from src.hal.sx1302_gps import HalGpsPpsSync
+from src.hal.sx1302_gps_types import GPS_MSG_UBX_NAV_TIMEGPS, LGW_GPS_SUCCESS
+from src.hal.sx1302_wrapper import LGW_HAL_SUCCESS, SX1302Wrapper
+
+
+def _mock_lib_with_gps() -> MagicMock:
+ lib = MagicMock()
+ lib.lgw_gps_enable = MagicMock(return_value=LGW_GPS_SUCCESS)
+ lib.lgw_gps_disable = MagicMock(return_value=LGW_GPS_SUCCESS)
+ lib.lgw_parse_nmea = MagicMock(return_value=0)
+ lib.lgw_parse_ubx = MagicMock(return_value=0)
+ lib.lgw_gps_get = MagicMock(return_value=LGW_GPS_SUCCESS)
+ lib.lgw_get_trigcnt = MagicMock(return_value=LGW_HAL_SUCCESS)
+ lib.lgw_gps_sync = MagicMock(return_value=LGW_GPS_SUCCESS)
+ lib.sx1302_gps_enable = MagicMock(return_value=LGW_HAL_SUCCESS)
+ return lib
+
+
+class TestConfigGpsConflict(unittest.TestCase):
+ def test_uart_and_pps_same_tty_raises(self) -> None:
+ cfg = AppConfig(
+ radio=RadioConfig(gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0"),
+ location=LocationConfig(source="uart", uart_path="/dev/ttyAMA0"),
+ )
+ with self.assertRaises(ValueError) as ctx:
+ validate_config_consistency(cfg)
+ self.assertIn("cannot share", str(ctx.exception))
+
+ def test_uart_and_pps_different_tty_ok(self) -> None:
+ cfg = AppConfig(
+ radio=RadioConfig(
+ gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0"
+ ),
+ location=LocationConfig(source="uart", uart_path="/dev/ttyUSB0"),
+ )
+ validate_config_consistency(cfg)
+
+ def test_pps_with_gpsd_ok(self) -> None:
+ cfg = AppConfig(
+ radio=RadioConfig(
+ gps_pps_enabled=True, gps_pps_tty_path="/dev/ttyAMA0"
+ ),
+ location=LocationConfig(source="gpsd"),
+ )
+ validate_config_consistency(cfg)
+
+
+class TestHalGpsPpsSync(unittest.TestCase):
+ def test_unsupported_lib_returns_false(self) -> None:
+ lib = MagicMock(spec=[])
+ sync = HalGpsPpsSync(lib)
+ self.assertFalse(sync.start())
+ self.assertIn("lgw_gps_enable", sync.get_status().last_error or "")
+
+ @patch("src.hal.sx1302_gps.os.read", return_value=b"")
+ def test_start_enables_hal_and_sx1302_pps(self, _read: MagicMock) -> None:
+ lib = _mock_lib_with_gps()
+ fd_holder = {"value": 7}
+
+ def fake_enable(path, family, baud, fd_ptr):
+ ctypes.cast(fd_ptr, ctypes.POINTER(ctypes.c_int))[0] = fd_holder["value"]
+ return LGW_GPS_SUCCESS
+
+ lib.lgw_gps_enable.side_effect = fake_enable
+
+ sync = HalGpsPpsSync(lib, tty_path="/dev/ttyAMA0", gps_family="ubx7")
+ self.assertTrue(sync.start())
+ lib.sx1302_gps_enable.assert_called_once_with(True)
+ status = sync.get_status()
+ self.assertTrue(status.enabled)
+ self.assertTrue(status.available)
+ sync.stop()
+ lib.lgw_gps_disable.assert_called_once_with(7)
+ lib.sx1302_gps_enable.assert_called_with(False)
+
+ def test_ubx_nav_timegps_triggers_sync(self) -> None:
+ lib = _mock_lib_with_gps()
+ lib.lgw_parse_ubx.return_value = GPS_MSG_UBX_NAV_TIMEGPS
+
+ sync = HalGpsPpsSync(lib)
+ sync._enabled = True
+ sync._fd = 1
+ sync._consume_parse_buffer(bytearray(b"\xb5\x62"))
+ lib.lgw_gps_sync.assert_called_once()
+ self.assertTrue(sync.get_status().last_sync_ok)
+
+
+class TestWrapperGpsPps(unittest.TestCase):
+ def test_start_gps_pps_after_concentrator_start(self) -> None:
+ wrapper = SX1302Wrapper()
+ wrapper._lib = _mock_lib_with_gps()
+ wrapper._started = True
+
+ def fake_enable(path, family, baud, fd_ptr):
+ ctypes.cast(fd_ptr, ctypes.POINTER(ctypes.c_int))[0] = 3
+ return LGW_GPS_SUCCESS
+
+ wrapper._lib.lgw_gps_enable.side_effect = fake_enable
+
+ with patch("src.hal.sx1302_gps.os.read", return_value=b""):
+ self.assertTrue(
+ wrapper.start_gps_pps(
+ tty_path="/dev/ttyAMA0",
+ gps_family="ubx7",
+ )
+ )
+ self.assertIsNotNone(wrapper.gps_pps)
+ wrapper.stop_gps_pps()