diff --git a/README.md b/README.md index 6bfd4fa..7807bfb 100644 --- a/README.md +++ b/README.md @@ -266,6 +266,8 @@ See [docs/COMMON-ERRORS.md](docs/COMMON-ERRORS.md#upgrades) if the service fails **Chip version 0x00:** Concentrator not responding. Check that the concentrator module is seated, SPI is enabled (`raspi-config` → Interface Options → SPI), and try a full power cycle (unplug for 10+ seconds). Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). +**`sx1261_check_status` / `lgw_start() failed` on RAK V2:** Clear `radio.sx1261_spi_path` in `local.yaml` (leave it `""`). The SX1261 is not Pi-visible on RAK/SenseCap boards. Use `location.source: uart` for onboard HAT GPS or `gpsd` for USB GPS. See [Common Errors](docs/COMMON-ERRORS.md). + **No packets:** Verify antenna is connected and frequency matches your region. Check `meshpoint logs` for `lgw_receive returned N packet(s)`. **Upstream 401:** Bad API key. Get a free one at [meshradar.io](https://meshradar.io) and re-run `sudo meshpoint setup`. diff --git a/config/default.yaml b/config/default.yaml index b8f7bad..28fe903 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -129,12 +129,14 @@ location: # and are not overwritten by gpsd. # static : use device.latitude/longitude/altitude (default) # gpsd : poll a local or remote gpsd daemon for live fixes - # uart : reserved (RAK Pi HAT GPS, not yet wired) + # uart : on-board RAK Pi HAT GPS via /dev/ttyAMA0 (NMEA GGA) source: "static" # gpsd connection. Defaults match the well-known local socket; only # change when running gpsd on a peer machine on the LAN. gpsd_host: "127.0.0.1" gpsd_port: 2947 + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 # How often the coordinator wakes to read the active source. update_interval_seconds: 5 # Minimum acceptable fix mode: 0=any (incl. no-fix), 1=2D, 2=3D. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b561b94..8134d0a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,11 @@ - **MQTT broker TLS.** Transport TLS (`mqtts`, CA bundle, cert validation) is not implemented on `mqtt_publisher.py` (plain TCP only). Until then use plain port 1883 or a LAN broker without TLS. +- **UART GPS.** `location.source: uart` reads NMEA GGA from the RAK Pi HAT (`/dev/ttyAMA0` by default). Configuration → GPS UART fieldset; `meshpoint setup` can select uart when the wizard probe gets a fix. Requires `pyserial` in the venv (`pip install -r requirements.txt` after upgrade). +- **Concentrator model ID.** Startup logs SX1302 vs SX1303 when `libloragw` exposes `sx1302_get_model_id` (e.g. `Concentrator model ID: 0x12 (SX1303)`). +- **SX1261 guard.** Non-empty `radio.sx1261_spi_path` on RAK/SenseCap carriers (`radio.carrier_type`) is cleared at configure time with an explicit warning, preventing `lgw_start()` failures from misconfigured spectral scan on Pi-invisible SX1261 wiring. +- **SPI preflight.** Missing `/dev/spidev0.0` raises a clear error before `lgw_start()` (raspi-config / spi group hints). + ### v0.7.6 (June 2026) Meshtastic mesh participant release on `main` (merge `feat/v0.7.6`). Edge-only, pure Python, no concentrator recompile. **Upgrade:** Settings → Updates → **Stable**, or the full SSH block in `docs/COMMON-ERRORS.md` (`git fetch`, `checkout main`, `pull`, `scripts/install.sh`, `restart`). Required this release: new `cryptography` dependency for PKI and an updated `meshpoint.service` unit (RAK V2 reset fix). Pull-only upgrades can miss both. Witness-tested on RAK V2. Settings → Updates RC picker now points at **v0.7.7** on `feat/v0.7.7`. diff --git a/docs/COMMON-ERRORS.md b/docs/COMMON-ERRORS.md index 0616469..6c2f4f1 100644 --- a/docs/COMMON-ERRORS.md +++ b/docs/COMMON-ERRORS.md @@ -430,16 +430,54 @@ disabled in `raspi-config`, or the SPI bus latched after a hard power cut. 3. Full power cycle: `sudo poweroff`, wait for green LED to stop, unplug for 10+ seconds, then plug back in. -Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). +Normal chip versions are `0x10` (SX1302) and `0x12` (SX1303). Startup +also logs `Concentrator model ID: 0x12 (SX1303)` when the HAL exposes +`sx1302_get_model_id`. + +### `sx1261_check_status: got:0x00 expected:0x22` / `failed to patch sx1261` + +**Cause:** `radio.sx1261_spi_path` is set on a carrier where the SX1261 +companion chip is **not** wired to a Pi-visible SPI bus (RAK2287, RAK5146, +SenseCap M1, most RAK Hotspot V2 units). The HAL probes a chip that is not +there and may refuse `lgw_start()`. + +**Fix:** + +1. Edit `config/local.yaml` and clear the path: + + ```yaml + radio: + sx1261_spi_path: "" + ``` + +2. Restart: `sudo systemctl restart meshpoint`. + +3. Confirm the journal shows `SX1261 spi_path empty; spectral scan disabled` + and `Application startup complete`. + +On current Meshpoint builds with `radio.carrier_type: rak` or +`sensecap_m1` (written by `meshpoint setup`), a mistaken path is cleared +automatically at startup with a warning. Re-run `sudo meshpoint setup` if +`carrier_type` is missing. + +See [Configuration > Spectral Scan](CONFIGURATION.md#spectral-scan-noise-floor). ### `lgw_start() failed` or `Failed to set SX1250_0 in STANDBY_RC mode` -**Cause:** SPI bus latch from a hard power cut. The Meshpoint shutdown -handler holds the concentrator in reset on `sudo reboot` and -`sudo systemctl restart`, so this only appears after yanked-cable shutdowns, -breaker trips, or outages. +**Cause:** Usually one of: + +1. **SX1261 misconfiguration** — see the entry above if the log mentions + `sx1261_check_status` or `failed to patch sx1261`. +2. **SPI bus latch** from a hard power cut. The Meshpoint shutdown handler + holds the concentrator in reset on `sudo reboot` and + `sudo systemctl restart`, so latch typically follows yanked-cable + shutdowns, breaker trips, or outages. +3. **SPI disabled or device missing** — `/dev/spidev0.0` not present; + enable SPI in `raspi-config` and confirm the `meshpoint` user is in + the `spi` group. -**Fix:** Full power cycle: +**Fix:** If SX1261 lines appear in the log, fix `sx1261_spi_path` first. +Otherwise full power cycle: ```bash sudo poweroff diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index f12b077..cf94dd7 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -26,6 +26,7 @@ radio: tx_power_dbm: 22 # SX1302 concentrator output power spectral_scan_interval_seconds: 60 # noise floor sampler cadence (0 disables) sx1261_spi_path: "" # SX1261 SPI device for spectral scan (empty = disabled) + carrier_type: "" # rak | sensecap_m1 (set by meshpoint setup; guards SX1261 path) ``` The region sets the base frequency, spreading factor, and bandwidth automatically. You only need `region` in most cases. Override `frequency_mhz`, `spreading_factor`, or `bandwidth_khz` individually to tune for non-default presets (MediumFast, ShortFast, etc.) or custom frequency slots. @@ -76,6 +77,8 @@ ERROR: failed to patch sx1261 radio for LBT/Spectral Scan …and `lgw_start()` may then refuse to bring up the concentrator. If you see that, revert `sx1261_spi_path` to `""`, restart the service, and stay on the packet-derived fallback. +On RAK and SenseCap carriers, `meshpoint setup` writes `radio.carrier_type`. When that field is `rak` or `sensecap_m1`, Meshpoint **clears** a non-empty `sx1261_spi_path` at startup and logs a warning instead of calling `lgw_sx1261_setconf`. + If your `libloragw` build does not expose the spectral scan symbols at all (older HAL revisions), the service logs a single info line at startup and falls back automatically. ### Standard Meshtastic Presets @@ -169,8 +172,10 @@ location: source: "static" # static | gpsd | uart gpsd_host: "127.0.0.1" # gpsd TCP host (only when source=gpsd) gpsd_port: 2947 # gpsd TCP port + uart_path: "/dev/ttyAMA0" # serial device (only when source=uart) + uart_baud: 9600 # NMEA baud (uart only) update_interval_seconds: 5 # how often the coordinator polls the source - min_fix_quality: 1 # minimum NMEA fix quality (1=2D, 3=3D) + min_fix_quality: 1 # minimum fix quality (1=2D, 2=3D for gpsd/uart) ``` `location.source` selects where the Meshpoint reads **live GPS fixes** @@ -185,7 +190,7 @@ coordinates and mesh position settings hot-reload from the dashboard. |---|---| | `static` (default) | No live GPS hardware. Registered coordinates live in `device.*` only. Skyplot shows the static pin. | | `gpsd` | Reads live fixes from the system `gpsd` daemon over TCP (`127.0.0.1:2947`). Recommended for any USB GPS receiver (u-blox 7, u-blox 8, VFAN puck, generic CDC ACM sticks). Skyplot and stats update from the live fix. | -| `uart` | Reserved for direct-serial reads from a Pi HAT GPS (e.g. RAK 7248). Currently a placeholder; falls back to static and surfaces an explanatory error in the dashboard. | +| `uart` | Reads NMEA GGA from the on-board RAK Pi HAT GPS on `/dev/ttyAMA0` (or `uart_path`). Fix and satellite count update live; full skyplot az/el/SNR needs `gpsd` + USB receiver. | ### Mesh position broadcasts (LoRa / Meshtastic app map) @@ -253,13 +258,35 @@ in `gpsd-clients`) or `gpsmon`. | u-blox 7 USB stick | USB CDC ACM, NMEA + UBX | yes (RAK V2 .141) | | u-blox 8 USB stick | USB CDC ACM, NMEA + UBX | yes | | VFAN ublox 7 USB puck | USB CDC ACM, NMEA + UBX | yes | -| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | placeholder (`source: uart`, not yet wired) | +| RAK 7248 onboard u-blox via UART (`/dev/ttyAMA0`) | NMEA over UART | yes (`source: uart`; `install.sh` enables UART) | Other USB receivers should work as long as `gpsd` recognizes the device's VID. If `cgps` shows data but the dashboard does not, check `journalctl -u meshpoint | grep -i gpsd` for connection errors and confirm `source: gpsd` in `local.yaml`. +### Using uart (RAK Pi HAT onboard GPS) + +`scripts/install.sh` enables the primary UART (`/dev/ttyAMA0`), disables +the serial console, and turns off Bluetooth on the UART pins so the HAT +GPS module can talk to the Pi. + +1. Run `sudo meshpoint setup` outdoors and accept **Use live UART GPS** + when the wizard acquires a fix, **or** set in `local.yaml`: + + ```yaml + location: + source: "uart" + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 + ``` + +2. Restart: `sudo systemctl restart meshpoint`. + +3. Open **Configuration → GPS** → **UART**. Coordinates and satellite + count update every few seconds outdoors. The skyplot stays empty + until GSV parsing is added (use `gpsd` for a full skyplot today). + ### Privacy Three independent surfaces: @@ -693,6 +720,8 @@ location: # GPS / location source source: "static" # static | gpsd | uart gpsd_host: "127.0.0.1" gpsd_port: 2947 + uart_path: "/dev/ttyAMA0" + uart_baud: 9600 update_interval_seconds: 5 min_fix_quality: 1 diff --git a/docs/HARDWARE-MATRIX.md b/docs/HARDWARE-MATRIX.md index f751fcc..e651c6b 100644 --- a/docs/HARDWARE-MATRIX.md +++ b/docs/HARDWARE-MATRIX.md @@ -17,6 +17,7 @@ Application code is plain Python (v0.7.0+); **aarch64** is required. |---|---|---|---|---| | **Host** | Pi 4 (SD) | Pi 4 (SD) | CM4 (eMMC) | Pi 4 (SD) | | **Concentrator** | RAK2287 (SX1302) | WM1303 (SX1303) | SX1302 (onboard) | RAK2287 (SX1302) | +| **HAL chip version log** | `0x10` (SX1302) | `0x12` (SX1303) | `0x10` (SX1302) | `0x10` (SX1302) | | **TX support** | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) | Yes (native, with HAL patch) | | **RX channels** | 8 simultaneous | 8 simultaneous | 8 simultaneous | 8 simultaneous | | **Spreading factors** | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous | SF7-SF12 simultaneous | @@ -212,8 +213,10 @@ window-mounted deployments. For better coverage: GPS antenna (u.FL to SMA pigtail) is optional. If your carrier board has a u-blox GPS module, plugging in a GPS antenna gives you automatic -positioning during the setup wizard. Otherwise enter coordinates manually -(right-click any spot in Google Maps to copy in decimal format). +positioning during the setup wizard. Set `location.source: uart` for the +on-board RAK HAT module, or `location.source: gpsd` for a USB GPS stick. +Otherwise enter coordinates manually (right-click any spot in Google Maps +to copy in decimal format). --- diff --git a/frontend/js/configuration/gps_card.js b/frontend/js/configuration/gps_card.js index 3f90211..8352776 100644 --- a/frontend/js/configuration/gps_card.js +++ b/frontend/js/configuration/gps_card.js @@ -34,9 +34,10 @@ class GpsConfigCard {

GPS and placement

- Registered coordinates go to Meshradar. Choose whether - the LoRa mesh hears your registered pin or a live GPS - fix, with optional privacy rounding on live. + Registered coordinates go to Meshradar. Live fixes from + gpsd (USB), UART (RAK Pi HAT), or static entry feed the + skyplot. Mesh position broadcasts on the LoRa mesh are + configured separately below.

@@ -132,6 +133,42 @@ class GpsConfigCard {

+ +
Mesh position broadcasts

@@ -188,6 +225,11 @@ class GpsConfigCard { this._staticFields = this._root.querySelector('[data-static-fields]'); this._gpsdFields = this._root.querySelector('[data-gpsd-fields]'); + this._uartFields = this._root.querySelector('[data-uart-fields]'); + this._uartPath = this._root.querySelector('[data-uart-path]'); + this._uartBaud = this._root.querySelector('[data-uart-baud]'); + this._uartInterval = this._root.querySelector('[data-uart-interval]'); + this._uartQuality = this._root.querySelector('[data-uart-quality]'); this._meshLiveChip = this._root.querySelector('[data-mesh-live-chip]'); this._meshPrecisionWrap = this._root.querySelector('[data-mesh-precision-wrap]'); this._meshPrecision = this._root.querySelector('[data-mesh-precision]'); @@ -233,6 +275,19 @@ class GpsConfigCard { this._gpsdQuality.value = String(location.min_fix_quality); } + if (this._uartPath) { + this._uartPath.value = location.uart_path || '/dev/ttyAMA0'; + } + if (this._uartBaud) { + this._uartBaud.value = location.uart_baud || 9600; + } + if (this._uartInterval && location.update_interval_seconds) { + this._uartInterval.value = location.update_interval_seconds; + } + if (this._uartQuality && location.min_fix_quality) { + this._uartQuality.value = String(location.min_fix_quality); + } + const position = (config && config.transmit && config.transmit.position) || {}; const meshSource = (position.coordinate_source || 'static').toLowerCase(); const meshRadio = this._root.querySelector( @@ -299,8 +354,10 @@ class GpsConfigCard { _showFieldsetForSource(source) { if (!this._staticFields || !this._gpsdFields) return; const isGpsd = source === 'gpsd'; + const isUart = source === 'uart'; this._staticFields.hidden = false; this._gpsdFields.hidden = !isGpsd; + if (this._uartFields) this._uartFields.hidden = !isUart; } _updateSourceHint(source) { @@ -312,9 +369,8 @@ class GpsConfigCard { + 'to the daemon.'; } else if (source === 'uart') { this._sourceHint.textContent = - 'Reserved for the on-board RAK Pi HAT GPS module. Not yet ' - + 'wired in v0.7.5; falls back to the static coordinates ' - + 'on save.'; + 'Live NMEA from the on-board RAK Pi HAT GPS (/dev/ttyAMA0). ' + + 'Switching source requires a service restart.'; } else { this._sourceHint.textContent = 'Coordinates are entered manually and stay fixed until ' @@ -324,7 +380,7 @@ class GpsConfigCard { _restartPolling(source) { this._stopPolling(); - const interval = source === 'gpsd' ? 2000 : 30000; + const interval = (source === 'gpsd' || source === 'uart') ? 2000 : 30000; this._pollOnce(); this._timer = window.setInterval(() => this._pollOnce(), interval); } @@ -383,6 +439,15 @@ class GpsConfigCard { if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw); const qualityRaw = this._gpsdQuality.value; if (qualityRaw) payload.min_fix_quality = Number(qualityRaw); + } else if (source === 'uart') { + const path = this._uartPath.value.trim(); + if (path) payload.uart_path = path; + const baudRaw = this._uartBaud.value.trim(); + if (baudRaw) payload.uart_baud = Number(baudRaw); + const intervalRaw = this._uartInterval.value.trim(); + if (intervalRaw) payload.update_interval_seconds = Number(intervalRaw); + const qualityRaw = this._uartQuality.value; + if (qualityRaw) payload.min_fix_quality = Number(qualityRaw); } const gpsResult = await this._api.put('/api/config/gps', payload); diff --git a/requirements.txt b/requirements.txt index 89b9c0b..81381fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ meshcore>=2.1.0 paho-mqtt>=2.1.0 bcrypt>=4.2.0 PyJWT>=2.10.0 +pyserial>=3.5 diff --git a/src/api/routes/config_enrichment.py b/src/api/routes/config_enrichment.py index 5f0ae3a..6926d63 100644 --- a/src/api/routes/config_enrichment.py +++ b/src/api/routes/config_enrichment.py @@ -56,11 +56,14 @@ def enrich_config_payload(cfg: AppConfig, base: dict) -> dict: base["radio_advanced"] = { "spectral_scan_interval_seconds": radio.spectral_scan_interval_seconds, "sx1261_spi_path": radio.sx1261_spi_path or "", + "carrier_type": radio.carrier_type or "", } base["location"] = { "source": location.source, "gpsd_host": location.gpsd_host, "gpsd_port": location.gpsd_port, + "uart_path": location.uart_path, + "uart_baud": location.uart_baud, "update_interval_seconds": location.update_interval_seconds, "min_fix_quality": location.min_fix_quality, } diff --git a/src/api/routes/device_config_routes.py b/src/api/routes/device_config_routes.py index 45f5977..83463a6 100644 --- a/src/api/routes/device_config_routes.py +++ b/src/api/routes/device_config_routes.py @@ -74,8 +74,8 @@ class GpsUpdate(BaseModel): * ``gpsd`` -- live position from a running gpsd daemon (defaults to 127.0.0.1:2947). The ``location:`` section of ``local.yaml`` holds the connection details and update cadence. - * ``uart`` -- placeholder for the on-board RAK Pi HAT GPS module. - Not yet wired in v0.7.5; falls back to static. + * ``uart`` -- on-board RAK Pi HAT GPS via NMEA on ``/dev/ttyAMA0`` + (or a custom ``uart_path``). """ source: str = "static" @@ -88,8 +88,11 @@ class GpsUpdate(BaseModel): gpsd_port: Optional[int] = Field(None, ge=1, le=65535) update_interval_seconds: Optional[int] = Field(None, ge=1, le=300) min_fix_quality: Optional[int] = Field(None, ge=1, le=3) - # uart-mode fields (kept for forward-compat; not yet wired) - baud: Optional[int] = Field(None, ge=9600, le=921600) + # uart-mode fields + uart_path: Optional[str] = Field(None, min_length=1, max_length=128) + uart_baud: Optional[int] = Field(None, ge=4800, le=115200) + # Legacy alias accepted from older dashboard builds + baud: Optional[int] = Field(None, ge=4800, le=115200) timeout_seconds: Optional[int] = Field(None, ge=1, le=3600) # Meshtastic POSITION on the LoRa mesh (not Meshradar upstream pin). mesh_coordinate_source: Optional[str] = None @@ -240,6 +243,25 @@ async def update_gps( if source_changed: location.source = "uart" location_updates["source"] = "uart" + baud = req.uart_baud if req.uart_baud is not None else req.baud + if req.uart_path is not None and req.uart_path != location.uart_path: + location.uart_path = req.uart_path.strip() + location_updates["uart_path"] = location.uart_path + if baud is not None and baud != location.uart_baud: + location.uart_baud = baud + location_updates["uart_baud"] = location.uart_baud + if ( + req.update_interval_seconds is not None + and req.update_interval_seconds != location.update_interval_seconds + ): + location.update_interval_seconds = req.update_interval_seconds + location_updates["update_interval_seconds"] = req.update_interval_seconds + if ( + req.min_fix_quality is not None + and req.min_fix_quality != location.min_fix_quality + ): + location.min_fix_quality = req.min_fix_quality + location_updates["min_fix_quality"] = req.min_fix_quality if req.source == "static" and pos.coordinate_source == "live": pos.coordinate_source = "static" @@ -319,6 +341,8 @@ async def update_gps( "gpsd_port": location.gpsd_port, "update_interval_seconds": location.update_interval_seconds, "min_fix_quality": location.min_fix_quality, + "uart_path": location.uart_path, + "uart_baud": location.uart_baud, "mesh_coordinate_source": pos.coordinate_source, "mesh_location_precision": pos.location_precision, }, diff --git a/src/capture/concentrator_source.py b/src/capture/concentrator_source.py index b3cd2fb..bca0568 100644 --- a/src/capture/concentrator_source.py +++ b/src/capture/concentrator_source.py @@ -47,6 +47,7 @@ def __init__( ) self._poll_interval = poll_interval_ms / 1000.0 self._syncword = syncword + self._radio_config = radio_config self._running = False self._restart_lock = asyncio.Lock() @@ -76,6 +77,8 @@ def is_running(self) -> bool: async def start(self) -> None: self._wrapper.load() + if self._radio_config is not None: + self._wrapper.set_carrier_type(self._radio_config.carrier_type) late_reset = os.environ.get("CONCENTRATOR_LATE_RESET", "0") == "1" diff --git a/src/cli/setup_wizard.py b/src/cli/setup_wizard.py index 4c12de7..da56a69 100644 --- a/src/cli/setup_wizard.py +++ b/src/cli/setup_wizard.py @@ -198,6 +198,8 @@ def _step_capture_source(config: dict, report: HardwareReport) -> None: config.setdefault("device", {})["hardware_description"] = ( report.hardware_description ) + if report.carrier_type: + config.setdefault("radio", {})["carrier_type"] = report.carrier_type elif report.serial_ports: port = _choose_from_list( "Select capture serial port:", report.serial_ports @@ -286,7 +288,23 @@ def _step_location( if gps.got_fix: print(f" GPS fix acquired: {gps.latitude}, {gps.longitude}") print(f" Altitude: {gps.altitude}m | Satellites: {gps.satellites}") - if _confirm("Use this GPS position?", default_yes=True): + if _confirm( + "Use live UART GPS for ongoing position (recommended on RAK HAT)?", + default_yes=True, + ): + config.setdefault("location", {}).update({ + "source": "uart", + "uart_path": gps.uart_path, + }) + config.setdefault("device", {}).update({ + "latitude": gps.latitude, + "longitude": gps.longitude, + "altitude": gps.altitude, + }) + print(" Location source set to uart (on-board GPS).") + print() + return + if _confirm("Use this GPS position as static coordinates only?", default_yes=True): config.setdefault("device", {}).update({ "latitude": gps.latitude, "longitude": gps.longitude, diff --git a/src/config.py b/src/config.py index 4622e2b..b44f276 100644 --- a/src/config.py +++ b/src/config.py @@ -60,14 +60,18 @@ class RadioConfig: # (default; spectral scan stays unavailable, packet-derived # noise floor remains in use). # - # On RAK2287 / RAK5146 / SenseCap M1 this is typically - # ``/dev/spidev0.1`` (separate from the SX1302's - # ``/dev/spidev0.0``). Some carriers daisy-chain the SX1261 - # behind the SX1302's SPI router and want this set to the same - # path as the SX1302 SPI device. Wrong path = HAL refuses to - # ``lgw_start`` after our config attempt, so we ship empty by - # default and ask interested users to opt in explicitly. + # On RAK2287 / RAK5146 / SenseCap M1 the SX1261 is behind the + # concentrator's internal SPI router, not on a Pi chip-select — + # leave empty. Only the Semtech reference kit and custom carriers + # with a dedicated SX1261 CE line need a path (e.g. + # ``/dev/spidev0.1``). Wrong path can brick ``lgw_start`` on + # boards without Pi-visible SX1261; ``carrier_type`` guard clears + # mistaken values on RAK/SenseCap. sx1261_spi_path: str = "" + # Carrier board signature from setup wizard I2C probe (``rak``, + # ``sensecap_m1``, or empty). Used to block Pi-visible SX1261 SPI + # paths that brick ``lgw_start()`` on RAK/SenseCap concentrators. + carrier_type: str = "" @dataclass @@ -245,11 +249,11 @@ class LocationConfig: live fixes (skyplot, optional mesh POSITION). Does not change ``device.{lat,lon,alt}`` (Meshradar pin). Auto-installed by ``scripts/install.sh``. - - ``"uart"`` : reserved for direct on-board UART NMEA reading - (RAK Pi HAT GPS). Plumbing exists in - ``src.hal.gps_reader`` but is not wired into - the runtime yet; treated as ``static`` until - the source is implemented. + - ``"uart"`` : read NMEA GGA from an on-board UART GPS (RAK Pi + HAT on ``/dev/ttyAMA0``). Uses + ``src.hal.gps_reader.GpsReader``. Same Meshradar + pin split as gpsd; live fix feeds skyplot and + optional mesh POSITION only. ``gpsd_host`` / ``gpsd_port`` default to gpsd's well-known localhost socket. Override only when running gpsd on a peer @@ -269,6 +273,8 @@ class LocationConfig: source: str = "static" gpsd_host: str = "127.0.0.1" gpsd_port: int = 2947 + uart_path: str = "/dev/ttyAMA0" + uart_baud: int = 9600 update_interval_seconds: int = 5 min_fix_quality: int = 1 diff --git a/src/coordinator.py b/src/coordinator.py index 665e9ca..6264522 100644 --- a/src/coordinator.py +++ b/src/coordinator.py @@ -466,8 +466,9 @@ def _setup_location_banner(self) -> None: detail = f"gpsd @ {host}:{port}" color = GREEN elif source_name == "uart": - detail = "on-board UART (placeholder, falls back to static)" - color = DIM + path = self._config.location.uart_path + detail = f"UART NMEA @ {path}" + color = GREEN else: detail = "static config coordinates" color = DIM diff --git a/src/hal/gps_reader.py b/src/hal/gps_reader.py index 2c756c1..fb48262 100644 --- a/src/hal/gps_reader.py +++ b/src/hal/gps_reader.py @@ -61,14 +61,22 @@ async def stop(self) -> None: logger.info("GPS reader stopped") async def _read_loop(self) -> None: - """Read NMEA sentences from the GPS UART.""" + """Read NMEA sentences from the GPS UART. + + Linux device nodes (``/dev/tty*``) require pyserial; TCP-style + ``asyncio.open_connection`` only applies to network hosts. + """ + if self._uart_path.startswith("/dev/"): + await self._fallback_loop() + return + try: reader, writer = await asyncio.open_connection( self._uart_path, self._baud ) except Exception: logger.warning( - "GPS UART not available at %s -- using fallback polling", + "GPS UART not available at %s -- using serial fallback", self._uart_path, ) await self._fallback_loop() diff --git a/src/hal/location/factory.py b/src/hal/location/factory.py index a4583c4..79e3ad7 100644 --- a/src/hal/location/factory.py +++ b/src/hal/location/factory.py @@ -39,7 +39,11 @@ def build_location_source( min_fix_quality=location_config.min_fix_quality, ) if source == "uart": - return UartSource() + return UartSource( + uart_path=location_config.uart_path, + baud=location_config.uart_baud, + min_fix_quality=location_config.min_fix_quality, + ) logger.warning( "Unknown location.source=%r in config -- falling back to static", diff --git a/src/hal/location/uart_source.py b/src/hal/location/uart_source.py index a2a68b7..51aec31 100644 --- a/src/hal/location/uart_source.py +++ b/src/hal/location/uart_source.py @@ -1,52 +1,141 @@ -"""UART location source: placeholder for direct on-board NMEA reading. - -The existing ``src.hal.gps_reader.GpsReader`` (created during the v0.4.x -multi-region work but never wired into the runtime) parses NMEA from a -serial port and exposes ``current_position``. Wiring it through the -``LocationSource`` contract is a follow-on item: probably v0.7.6 or -when a user actually reports they want to use the RAK Pi HAT's -on-board u-blox. - -For v0.7.5 this source returns ``available=False`` with an explanatory -error so the GPS card surfaces the correct "not yet implemented" -message instead of falling silently to zero coordinates. +"""UART location source: on-board NMEA GPS (RAK Pi HAT /dev/ttyAMA0). + +Wraps ``GpsReader`` so the coordinator and ``GET /api/device/gps-status`` +share the same ``LocationSource`` contract as gpsd. Skyplot az/el/SNR +requires GSV sentences (not parsed yet); GGA provides fix + satellite +count only. """ from __future__ import annotations import logging from datetime import datetime, timezone +from typing import Optional +from src.hal.gps_reader import GpsReader, GpsPosition from src.hal.location.base import LocationSource -from src.hal.location.models import GpsStatus +from src.hal.location.models import ( + GpsDeviceInfo, + GpsStatus, + LocationFix, + SatellitesView, +) logger = logging.getLogger(__name__) +def _gga_fix_mode(fix_quality: int) -> int: + """Map NMEA GGA fix quality to gpsd-style mode (2=2D, 3=3D).""" + if fix_quality >= 2: + return 3 + if fix_quality == 1: + return 2 + return 1 + + +def _min_gga_quality(min_fix_quality: int) -> int: + """Translate LocationConfig min_fix_quality to minimum GGA fix_quality. + + Config uses gpsd semantics: 1=2D, 2=3D. GGA uses 0=no fix, 1=GPS, + 2=DGPS, etc. We accept any non-zero GGA fix when min is 1, and + require GGA fix_quality >= 2 when min is 2. + """ + if min_fix_quality >= 2: + return 2 + return 1 + + class UartSource(LocationSource): - """Reserved location source for on-board UART GPS (RAK Pi HAT etc.).""" + """Live fixes from a serial NMEA GPS on the Pi UART.""" + + def __init__( + self, + uart_path: str = "/dev/ttyAMA0", + baud: int = 9600, + min_fix_quality: int = 1, + ) -> None: + self._uart_path = uart_path + self._baud = baud + self._min_fix_quality = min_fix_quality + self._reader: Optional[GpsReader] = None + self._started = False + self._last_error: Optional[str] = None @property def source_name(self) -> str: return "uart" async def start(self) -> None: + if self._reader is not None: + return + self._reader = GpsReader(uart_path=self._uart_path, baud=self._baud) + await self._reader.start() + self._started = True + self._last_error = None logger.info( - "UART location source: not yet wired -- using static " - "device coordinates as fallback" + "UART location source: reading NMEA from %s @ %d baud", + self._uart_path, + self._baud, ) async def stop(self) -> None: - return + if self._reader is None: + return + await self._reader.stop() + self._reader = None + self._started = False def get_status(self) -> GpsStatus: + now = datetime.now(timezone.utc) + device = GpsDeviceInfo( + driver="nmea", + path=self._uart_path, + model="RAK Pi HAT GPS", + subtype=f"{self._baud} baud", + ) + + if not self._started or self._reader is None: + return GpsStatus( + source="uart", + available=False, + device=device, + error=self._last_error or "UART reader not started", + last_update=now, + ) + + pos = self._reader.latest_position + if pos is None or not self._position_meets_quality(pos): + return GpsStatus( + source="uart", + available=True, + device=device, + error=None, + last_update=now, + ) + return GpsStatus( source="uart", - available=False, - error=( - "UART GPS source is reserved for the RAK Pi HAT on-board " - "module and is not yet wired into the runtime. Switch to " - "'static' or 'gpsd' under Configuration -> GPS." - ), - last_update=datetime.now(timezone.utc), + available=True, + fix=self._position_to_fix(pos), + satellites=self._satellites_from_position(pos), + device=device, + last_update=pos.timestamp, ) + + def _position_meets_quality(self, pos: GpsPosition) -> bool: + return pos.fix_quality >= _min_gga_quality(self._min_fix_quality) + + @staticmethod + def _position_to_fix(pos: GpsPosition) -> LocationFix: + return LocationFix( + mode=_gga_fix_mode(pos.fix_quality), + latitude=pos.latitude, + longitude=pos.longitude, + altitude_m=pos.altitude, + time=pos.timestamp, + ) + + @staticmethod + def _satellites_from_position(pos: GpsPosition) -> SatellitesView: + count = max(0, pos.satellites) + return SatellitesView(in_view=count, used=count, satellites=()) diff --git a/src/hal/sx1302_signatures.py b/src/hal/sx1302_signatures.py index 49b83da..ec0f92a 100644 --- a/src/hal/sx1302_signatures.py +++ b/src/hal/sx1302_signatures.py @@ -89,3 +89,7 @@ def apply_signatures(lib: ctypes.CDLL) -> None: if hasattr(lib, "lgw_sx1261_setconf"): lib.lgw_sx1261_setconf.restype = ctypes.c_int lib.lgw_sx1261_setconf.argtypes = [ctypes.POINTER(LgwConfSx1261S)] + + if hasattr(lib, "sx1302_get_model_id"): + lib.sx1302_get_model_id.restype = ctypes.c_int + lib.sx1302_get_model_id.argtypes = [ctypes.POINTER(ctypes.c_uint8)] diff --git a/src/hal/sx1302_types.py b/src/hal/sx1302_types.py index 3da217b..c7ce1ac 100644 --- a/src/hal/sx1302_types.py +++ b/src/hal/sx1302_types.py @@ -9,6 +9,16 @@ import ctypes +# sx1302_model_id_t values from loragw_sx1302.h (when HAL exposes them). +SX1302_MODEL_ID_SX1302: int = 0x02 +SX1302_MODEL_ID_SX1303: int = 0x03 + +# Carriers where the SX1261 is behind the concentrator SPI router, not +# on a separate Pi chip-select. Non-empty sx1261_spi_path is cleared at +# configure() time on these boards. +CARRIERS_WITHOUT_PI_SX1261: frozenset[str] = frozenset( + {"rak", "sensecap_m1"} +) # ── RX configuration structs ──────────────────────────────────────── diff --git a/src/hal/sx1302_wrapper.py b/src/hal/sx1302_wrapper.py index 47b020b..a46d404 100644 --- a/src/hal/sx1302_wrapper.py +++ b/src/hal/sx1302_wrapper.py @@ -21,6 +21,7 @@ SX1302SpectralScan, ) from src.hal.sx1302_types import ( + CARRIERS_WITHOUT_PI_SX1261, LgwConfBoardS, LgwConfRxifS, LgwConfRxrfS, @@ -28,6 +29,8 @@ LgwPktRxS, LgwPktTxS, LgwTxGainLutS, + SX1302_MODEL_ID_SX1302, + SX1302_MODEL_ID_SX1303, ) logger = logging.getLogger(__name__) @@ -110,6 +113,27 @@ def __init__( self._unknown_status_count = 0 self._spectral_scan: Optional[SX1302SpectralScan] = None self._sx1261_configured = False + self._carrier_type: str = "" + self._concentrator_model_id: Optional[int] = None + + def set_carrier_type(self, carrier_type: str) -> None: + """Set carrier signature for SX1261 path guardrails (from config or wizard).""" + self._carrier_type = (carrier_type or "").strip().lower() + + @property + def concentrator_model_id(self) -> Optional[int]: + """Raw model byte from ``sx1302_get_model_id`` (0x02 / 0x03), or None.""" + return self._concentrator_model_id + + @property + def concentrator_model_label(self) -> str: + """Human label for telemetry and startup banners.""" + mid = self._concentrator_model_id + if mid == SX1302_MODEL_ID_SX1303: + return "SX1303" + if mid == SX1302_MODEL_ID_SX1302: + return "SX1302" + return "unknown" def load(self) -> None: if not self._lib_path or not os.path.exists(self._lib_path): @@ -167,9 +191,12 @@ def configure(self, plan: ConcentratorChannelPlan) -> None: if self._lib is None: self.load() + self._preflight_spi() + self._guard_sx1261_spi_path() self._configure_board() self._configure_rf_chains(plan) self._configure_if_channels(plan) + self._read_concentrator_model_id() self._configure_sx1261_for_spectral_scan() logger.info("Concentrator configured with %d IF channels", len(plan.multi_sf_channels) + (1 if plan.single_sf_channel else 0)) @@ -179,9 +206,15 @@ def start(self) -> None: self.load() result = self._lib.lgw_start() if result != LGW_HAL_SUCCESS: - raise RuntimeError("lgw_start() failed") + raise RuntimeError( + f"lgw_start() failed (spi={self._spi_path}, " + f"model={self.concentrator_model_label})" + ) self._started = True - logger.info("SX1302 concentrator started") + logger.info( + "%s concentrator started", + self.concentrator_model_label, + ) def stop(self) -> None: if self._started and self._lib: @@ -418,6 +451,44 @@ def spectral_scan_supported(self) -> bool: # ── Private: HAL configuration ────────────────────────────────── + def _preflight_spi(self) -> None: + if not os.path.exists(self._spi_path): + raise FileNotFoundError( + f"Concentrator SPI device {self._spi_path} not found. " + "Enable SPI in raspi-config (Interface Options → SPI) and " + "confirm the meshpoint user is in the spi group." + ) + + def _guard_sx1261_spi_path(self) -> None: + """Clear sx1261_spi_path on carriers where the chip is not Pi-visible.""" + if not self._sx1261_spi_path: + return + if self._carrier_type not in CARRIERS_WITHOUT_PI_SX1261: + return + logger.warning( + "radio.sx1261_spi_path=%r is not supported on %s carriers " + "(SX1261 not reachable from the Pi); clearing path. " + "See docs/CONFIGURATION.md § Spectral Scan.", + self._sx1261_spi_path, + self._carrier_type, + ) + self._sx1261_spi_path = "" + + def _read_concentrator_model_id(self) -> None: + if self._lib is None or not hasattr(self._lib, "sx1302_get_model_id"): + return + model = ctypes.c_uint8() + rc = self._lib.sx1302_get_model_id(ctypes.byref(model)) + if rc != LGW_HAL_SUCCESS: + logger.debug("sx1302_get_model_id returned rc=%d", rc) + return + self._concentrator_model_id = int(model.value) + logger.info( + "Concentrator model ID: 0x%02X (%s)", + self._concentrator_model_id, + self.concentrator_model_label, + ) + def _configure_board(self) -> None: conf = LgwConfBoardS() conf.lorawan_public = False diff --git a/tests/test_location_config.py b/tests/test_location_config.py index 92a319b..c6908ea 100644 --- a/tests/test_location_config.py +++ b/tests/test_location_config.py @@ -84,11 +84,18 @@ def test_partial_override_preserves_other_fields(self) -> None: path.unlink() def test_uart_source_round_trips(self) -> None: - path = self._write_yaml("location:\n source: uart\n") + path = self._write_yaml( + "location:\n" + " source: uart\n" + " uart_path: /dev/serial0\n" + " uart_baud: 115200\n" + ) try: cfg = AppConfig() _apply_yaml(cfg, path) self.assertEqual(cfg.location.source, "uart") + self.assertEqual(cfg.location.uart_path, "/dev/serial0") + self.assertEqual(cfg.location.uart_baud, 115200) finally: path.unlink() diff --git a/tests/test_location_static_uart_sources.py b/tests/test_location_static_uart_sources.py index fe3850d..a69c5a9 100644 --- a/tests/test_location_static_uart_sources.py +++ b/tests/test_location_static_uart_sources.py @@ -3,8 +3,11 @@ from __future__ import annotations import unittest +from datetime import datetime, timezone +from unittest.mock import MagicMock from src.config import DeviceConfig +from src.hal.gps_reader import GpsPosition from src.hal.location.static_source import StaticSource from src.hal.location.uart_source import UartSource @@ -75,21 +78,31 @@ async def test_source_name_is_stable(self) -> None: class TestUartSource(unittest.IsolatedAsyncioTestCase): - """``UartSource`` is a placeholder; surfaces an explanatory error.""" + """``UartSource`` exposes live NMEA fixes via ``GpsReader``.""" - async def test_status_is_unavailable_with_explanatory_error(self) -> None: + async def test_status_before_start_is_unavailable(self) -> None: source = UartSource() - await source.start() - try: - status = source.get_status() - self.assertEqual(status.source, "uart") - self.assertFalse(status.available) - self.assertIsNotNone(status.error) - # Error must point users to the working alternatives. - self.assertIn("static", status.error.lower()) - self.assertIn("gpsd", status.error.lower()) - finally: - await source.stop() + status = source.get_status() + self.assertEqual(status.source, "uart") + self.assertFalse(status.available) + self.assertIn("not started", (status.error or "").lower()) + + async def test_status_with_fix(self) -> None: + source = UartSource() + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=51.5, + longitude=-0.1, + altitude=20.0, + satellites=6, + fix_quality=2, + timestamp=datetime(2026, 6, 2, tzinfo=timezone.utc), + ) + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNotNone(status.fix) + self.assertEqual(status.device.path, "/dev/ttyAMA0") async def test_source_name_is_stable(self) -> None: source = UartSource() diff --git a/tests/test_sx1302_wrapper_hal_guard.py b/tests/test_sx1302_wrapper_hal_guard.py new file mode 100644 index 0000000..eb9911f --- /dev/null +++ b/tests/test_sx1302_wrapper_hal_guard.py @@ -0,0 +1,91 @@ +"""Tests for SX1302Wrapper SPI preflight, SX1261 carrier guard, and model ID.""" + +from __future__ import annotations + +import ctypes +import unittest +from unittest.mock import MagicMock, patch + +from src.hal.sx1302_types import ( + SX1302_MODEL_ID_SX1302, + SX1302_MODEL_ID_SX1303, +) +from src.hal.sx1302_wrapper import LGW_HAL_SUCCESS, SX1302Wrapper + + +def _build_wrapper() -> SX1302Wrapper: + wrapper = SX1302Wrapper(sx1261_spi_path="/dev/spidev0.1") + wrapper._lib = MagicMock() + return wrapper + + +class TestSx1261CarrierGuard(unittest.TestCase): + def test_rak_carrier_clears_non_empty_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + def test_sensecap_carrier_clears_non_empty_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("sensecap_m1") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + def test_unknown_carrier_keeps_sx1261_path(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "/dev/spidev0.1") + + def test_empty_sx1261_path_unchanged_on_rak(self) -> None: + wrapper = SX1302Wrapper(sx1261_spi_path="") + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + self.assertEqual(wrapper._sx1261_spi_path, "") + + +class TestSpiPreflight(unittest.TestCase): + def test_missing_spi_device_raises(self) -> None: + wrapper = SX1302Wrapper(spi_path="/dev/spidev0.0") + with patch("src.hal.sx1302_wrapper.os.path.exists", return_value=False): + with self.assertRaises(FileNotFoundError) as ctx: + wrapper._preflight_spi() + self.assertIn("raspi-config", str(ctx.exception)) + + +class TestConcentratorModelId(unittest.TestCase): + def test_model_label_sx1303(self) -> None: + wrapper = SX1302Wrapper() + wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1303 + self.assertEqual(wrapper.concentrator_model_label, "SX1303") + + def test_model_label_sx1302(self) -> None: + wrapper = SX1302Wrapper() + wrapper._concentrator_model_id = SX1302_MODEL_ID_SX1302 + self.assertEqual(wrapper.concentrator_model_label, "SX1302") + + def test_read_model_id_from_hal(self) -> None: + wrapper = _build_wrapper() + + def fake_get_model(model_ptr): + ptr = ctypes.cast(model_ptr, ctypes.POINTER(ctypes.c_uint8)) + ptr[0] = SX1302_MODEL_ID_SX1303 + return LGW_HAL_SUCCESS + + wrapper._lib.sx1302_get_model_id.side_effect = fake_get_model + wrapper._read_concentrator_model_id() + self.assertEqual(wrapper._concentrator_model_id, SX1302_MODEL_ID_SX1303) + + +class TestSx1261ConfigureSkipsWhenPathCleared(unittest.TestCase): + def test_configure_sx1261_skips_after_rak_guard(self) -> None: + wrapper = _build_wrapper() + wrapper.set_carrier_type("rak") + wrapper._guard_sx1261_spi_path() + wrapper._configure_sx1261_for_spectral_scan() + wrapper._lib.lgw_sx1261_setconf.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_uart_location_source.py b/tests/test_uart_location_source.py new file mode 100644 index 0000000..978d004 --- /dev/null +++ b/tests/test_uart_location_source.py @@ -0,0 +1,110 @@ +"""Tests for ``UartSource`` and UART-related config defaults.""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +from src.config import AppConfig, LocationConfig +from src.hal.gps_reader import GpsPosition +from src.hal.location.factory import build_location_source +from src.hal.location.uart_source import UartSource + + +class TestLocationConfigUartDefaults(unittest.TestCase): + def test_uart_path_defaults_to_ttyama0(self) -> None: + cfg = LocationConfig() + self.assertEqual(cfg.uart_path, "/dev/ttyAMA0") + + def test_uart_baud_defaults_to_9600(self) -> None: + cfg = LocationConfig() + self.assertEqual(cfg.uart_baud, 9600) + + +class TestUartSource(unittest.IsolatedAsyncioTestCase): + async def test_no_fix_yet_is_available_without_error(self) -> None: + source = UartSource() + source._reader = MagicMock() + source._reader.latest_position = None + source._started = True + + status = source.get_status() + self.assertEqual(status.source, "uart") + self.assertTrue(status.available) + self.assertIsNone(status.fix) + + async def test_maps_gga_position_to_location_fix(self) -> None: + source = UartSource(min_fix_quality=1) + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=40.7128, + longitude=-74.0060, + altitude=12.0, + satellites=8, + fix_quality=1, + timestamp=datetime(2026, 6, 2, 12, 0, 0, tzinfo=timezone.utc), + ) + + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNotNone(status.fix) + assert status.fix is not None + self.assertEqual(status.fix.latitude, 40.7128) + self.assertEqual(status.fix.longitude, -74.0060) + self.assertEqual(status.fix.altitude_m, 12.0) + self.assertIsNotNone(status.satellites) + assert status.satellites is not None + self.assertEqual(status.satellites.used, 8) + + async def test_min_fix_quality_filters_weak_gga(self) -> None: + source = UartSource(min_fix_quality=2) + source._reader = MagicMock() + source._started = True + source._reader.latest_position = GpsPosition( + latitude=40.0, + longitude=-74.0, + altitude=0.0, + satellites=4, + fix_quality=1, + timestamp=datetime.now(timezone.utc), + ) + + status = source.get_status() + self.assertTrue(status.available) + self.assertIsNone(status.fix) + + async def test_start_stop_wires_gps_reader(self) -> None: + source = UartSource(uart_path="/dev/ttyAMA0", baud=9600) + with patch("src.hal.location.uart_source.GpsReader") as reader_cls: + reader = MagicMock() + reader.start = AsyncMock() + reader.stop = AsyncMock() + reader_cls.return_value = reader + + await source.start() + reader_cls.assert_called_once_with( + uart_path="/dev/ttyAMA0", baud=9600 + ) + reader.start.assert_awaited_once() + + await source.stop() + reader.stop.assert_awaited_once() + + +class TestUartFactory(unittest.TestCase): + def test_build_uart_source_from_config(self) -> None: + app = AppConfig() + app.location.source = "uart" + app.location.uart_path = "/dev/serial0" + app.location.uart_baud = 115200 + + source = build_location_source(app.location, app.device) + self.assertIsInstance(source, UartSource) + self.assertEqual(source._uart_path, "/dev/serial0") + self.assertEqual(source._baud, 115200) + + +if __name__ == "__main__": + unittest.main()