Add parallel diagnostics forwarding pipeline (DiagnosticStatus → Zenoh → InfluxDB → Grafana + REST)#1
Draft
Copilot wants to merge 2 commits into
Draft
Conversation
…hboard, docs link, fmt/clippy fixes
Copilot
AI
changed the title
[WIP] Add parallel diagnostics pipeline for forwarding
Add parallel diagnostics forwarding pipeline (DiagnosticStatus → Zenoh → InfluxDB → Grafana + REST)
Jun 5, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds a complete diagnostics data path alongside the existing rFMS/VehicleStatus flow, without touching any existing behavior. The pipeline runs:
diagnostic-source-simulator → Kuksa Databroker → fms-diagnostics-forwarder → uProtocol/Zenoh (D110) → fms-diagnostics-consumer → InfluxDB → Grafana + REST API.Proto (
fms-proto)DiagnosticSummary,DiagnosticCode,DiagnosticStatusinfms.proto(proto3, packagefms.v4)fms-protoadded to workspace members; integration test covers round-trip serializationInfluxDB (
influx-client)MEASUREMENT_DIAGNOSTIC_SUMMARY,MEASUREMENT_DIAGNOSTIC_CODE, and associated tag/field nameswrite_diagnostic_status(&DiagnosticStatus)— fire-and-forget async method mirroringwrite_vehicle_status; writes onediagnostic_summaryrow and onediagnostic_coderow per active/stored/pending DTC; normalizes empty tags to"UNKNOWN"New crates
fms-diagnostics-forwarderVehicle.Diagnostics.*VSS paths from Kuksa, buildsDiagnosticStatus, publishes to topicup://fms-diagnostics-forwarder/D110/1/D110fms-diagnostics-consumerup://*/D110/1/D110, decodesDiagnosticStatus, writes to InfluxDBdiagnostic-source-simulatorpublish_valueVSS overlay (
spec/overlay/vss.json+fms.vspec)Vehicle.Diagnosticsbranch: 4 DTC count fields (uint16), 7 string signals withallowedenumerations for severity/lifecycle state,E2EVsub-branch (CrcOk bool, AliveCounter uint16, LastFault string)fms-server — diagnostics REST API
Five new routes (existing rFMS routes unchanged):
Backed by four new Flux queries on
diagnostic_summary/diagnostic_codemeasurements.Compose & Grafana
fms-blueprint-compose.yamlwith Zenoh overrides in-zenoh.yaml; Hono compose annotated as Zenoh-only for nowgrafana/dashboards/FMS-Diagnostics.json: 5 panels — Active DTC Count by VIN, Worst Severity by VIN, Active DTC table, Active Faults status, DTC timeline; provisioned via existing filesystem loaderrust-musl-crosspatternOriginal prompt
Extend Fleet Management Blueprint with Diagnostic Code Forwarding
Repository and baseline
chheis/fleet-managementeedc2c373efb0a89d1de26c7ab9843bc574f8884feature/diagnostic-code-forwardingcomponents/, Docker Compose orchestration, Kuksa Databroker (VSS), uProtocol over Zenoh/Hono, InfluxDB 2.7, Grafana 9.5.Goal
Add a parallel diagnostics pipeline alongside the existing rFMS/VehicleStatus flow, without changing existing behavior. Target data path:
diagnostic source -> Kuksa Databroker diagnostic VSS paths -> fms-diagnostics-forwarder -> uProtocol notification (Zenoh) -> fms-diagnostics-consumer -> InfluxDB -> Grafana diagnostics dashboard + fms-server diagnostics REST APIThe existing path MUST keep working unchanged:
csv-provider -> Databroker -> fms-forwarder -> uProtocol -> fms-consumer -> InfluxDB -> Grafana/rFMS APIHard constraints
VehicleStatusmessage, its writer, or its rFMS endpoints in a behavior-changing way.cargo fmt --all,cargo clippy --workspace --all-targets -- -D warnings, andcargo test --workspacemust pass for everything you add. Run all cargo commands fromcomponents/..licensesidecar, following existing files (Apache-2.0, "Contributors to the Eclipse Foundation").docs/diagnostics-forwarding.mdand proceed best-effort; still ensure new code compiles in the workspace.Repository facts you must rely on (verified)
components/Cargo.toml[workspace].membersdoes NOT includefms-proto(members arefms-consumer, fms-forwarder, fms-server, fms-zenoh, influx-client, up-transport-hono-kafka, up-transport-hono-mqtt). Add"fms-proto"plus the two new crates tomembers.components/fms-proto/build.rscompiles onlyproto/fms/v4/fms.protointo cargo out dir modulefms;src/lib.rsdoesinclude!(concat!(env!("OUT_DIR"), "/fms/mod.rs")). New messages are reachable asfms_proto::fms::*. Nobuild.rschange required.proto/fms/v4/fms.protoisproto3, packagefms.v4, already importsgoogle/protobuf/timestamp.proto.components/influx-client/src/lib.rsdefines constants includingTAG_VIN = "vin",FIELD_CREATED_DATE_TIME = "createdDateTime",MEASUREMENT_HEADER,MEASUREMENT_SNAPSHOT,TAG_TRIGGER. ReuseTAG_VINandFIELD_CREATED_DATE_TIME.components/influx-client/src/writer.rs:InfluxWriterwrapsInfluxConnection.write_vehicle_status(&self, &VehicleStatus)returns()and logs viawarn!/debug!(fire-and-forget). Usesinfluxrs::Measurement::builder(name).tag(..).field(..).build() -> Result. Timestamps useprotobuf::well_known_types::timestamp::Timestampconverted toSystemTimethen millis sinceUNIX_EPOCHasu128. The crate error type isinfluxrs::InfluxError(NOTinfluxrs::Error).components/fms-consumer/src/main.rs: subscribes to--topic-filterdefaultup://*/D100/1/D100, local uServiceup://fms-consumer/D101/1/0,TransportType { Hono(HonoKafkaTransportConfig), Zenoh(ZenohTransportConfig) }, listener callsmsg.extract_protobuf::<VehicleStatus>()theninflux_writer.write_vehicle_status(...).components/fms-forwarder/src/main.rs: publishes to--topicdefaultup://fms-forwarder/D100/1/D100usingSimplePublisher+StaticUriProvider::try_from(&topic); payload viaUPayload::try_from_protobuf(...);origin_resource_id = u16::try_from(topic.resource_id).TransportType { Hono(HonoMqttTransportConfig), Zenoh(ZenohTransportConfig) }. Vehicle data comes frommod vehicle_abstraction(vehicle_abstraction.rs+vehicle_abstraction/{kuksa.rs,vss.rs}).vehicle_abstraction.rs:KuksaDatabrokerClientConfig(clapArgs) exposes--databroker-uri(envKUKSA_DATABROKER_URI, defaulthttp://127.0.0.1:55555) and--timer-interval(envTIMER_INTERVAL).KuksaValDatabrokeruseskuksa_rust_sdk::kuksa::val::v2::KuksaClientV2,get_values(paths),subscribe(...). VIN path constantVehicle.VehicleIdentification.VIN.components/fms-server/src/lib.rs: builds an axumRouterwith/,/rfms/vehiclepositions,/rfms/vehicles,/rfms/vehiclestatuses, stateArc<InfluxReader>.influx_reader.rsbuilds Flux queries againstMEASUREMENT_SNAPSHOTetc. Response models live insrc/models/{mod.rs,position.rs,status.rs,vehicle.rs}.fms-blueprint-compose.yaml: networksfms-backend,fms-vehicle;databrokermounts./spec/overlay/vss.jsonas configvss_overlay.json-> `/vss_o...This pull request was created from Copilot chat.