From c2fe0a70b6a036cbf77f3c62dca2d8f477305c91 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Jun 2026 21:07:08 +0000 Subject: [PATCH 1/2] Initial plan From 0eb9a1e0e9bc48ac7c405a20129cf2d625545cdf Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Jun 2026 21:30:40 +0000 Subject: [PATCH 2/2] Complete diagnostics forwarding pipeline: compose wiring, Grafana dashboard, docs link, fmt/clippy fixes --- README.md | 6 + components/Cargo.lock | 48 ++ components/Cargo.toml | 6 + .../Dockerfile.diagnostic-source-simulator | 55 +++ .../Dockerfile.fms-diagnostics-consumer | 55 +++ .../Dockerfile.fms-diagnostics-forwarder | 55 +++ .../diagnostic-source-simulator/Cargo.toml | 45 ++ .../diagnostic-source-simulator/src/main.rs | 223 +++++++++ .../fms-diagnostics-consumer/Cargo.toml | 50 ++ .../fms-diagnostics-consumer/src/main.rs | 114 +++++ .../fms-diagnostics-forwarder/Cargo.toml | 52 ++ .../src/diagnostic_abstraction.rs | 466 ++++++++++++++++++ .../fms-diagnostics-forwarder/src/main.rs | 100 ++++ .../src/vehicle_abstraction/kuksa.rs | 6 +- .../src/vehicle_abstraction/vss.rs | 3 +- components/fms-proto/proto/fms/v4/fms.proto | 36 ++ .../fms-proto/tests/diagnostic_status.rs | 39 ++ components/fms-server/src/influx_reader.rs | 177 +++++++ components/fms-server/src/lib.rs | 90 ++++ .../fms-server/src/models/diagnostics.rs | 141 ++++++ components/fms-server/src/models/mod.rs | 1 + components/influx-client/src/lib.rs | 21 + components/influx-client/src/writer.rs | 240 ++++++++- docs/diagnostics-forwarding.md | 247 ++++++++++ fms-blueprint-compose-hono.yaml | 4 + fms-blueprint-compose-zenoh.yaml | 16 + fms-blueprint-compose.yaml | 65 +++ grafana/dashboards/FMS-Diagnostics.json | 153 ++++++ .../dashboards/FMS-Diagnostics.json.license | 18 + spec/overlay/fms.vspec | 97 ++++ spec/overlay/vss.json | 116 +++++ 31 files changed, 2740 insertions(+), 5 deletions(-) create mode 100644 components/Dockerfile.diagnostic-source-simulator create mode 100644 components/Dockerfile.fms-diagnostics-consumer create mode 100644 components/Dockerfile.fms-diagnostics-forwarder create mode 100644 components/diagnostic-source-simulator/Cargo.toml create mode 100644 components/diagnostic-source-simulator/src/main.rs create mode 100644 components/fms-diagnostics-consumer/Cargo.toml create mode 100644 components/fms-diagnostics-consumer/src/main.rs create mode 100644 components/fms-diagnostics-forwarder/Cargo.toml create mode 100644 components/fms-diagnostics-forwarder/src/diagnostic_abstraction.rs create mode 100644 components/fms-diagnostics-forwarder/src/main.rs create mode 100644 components/fms-proto/tests/diagnostic_status.rs create mode 100644 components/fms-server/src/models/diagnostics.rs create mode 100644 docs/diagnostics-forwarding.md create mode 100644 grafana/dashboards/FMS-Diagnostics.json create mode 100644 grafana/dashboards/FMS-Diagnostics.json.license diff --git a/README.md b/README.md index ec114c4..9973716 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,12 @@ setup manually. Additional information can be found in the components' corresponding subfolders. +## Diagnostics Forwarding + +A parallel diagnostics pipeline is available alongside the existing rFMS/VehicleStatus flow. +See [docs/diagnostics-forwarding.md](docs/diagnostics-forwarding.md) for the full architecture, +data path, VSS paths, REST API endpoints, and Grafana dashboard usage. + # Contributing We are looking forward to your ideas and PRs. Each PRs triggers a GitHub action which checks the formating, performs diff --git a/components/Cargo.lock b/components/Cargo.lock index b808d1a..55ce262 100644 --- a/components/Cargo.lock +++ b/components/Cargo.lock @@ -677,6 +677,18 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "diagnostic-source-simulator" +version = "0.1.0-SNAPSHOT" +dependencies = [ + "clap", + "env_logger", + "http 0.2.12", + "kuksa-rust-sdk", + "log", + "tokio", +] + [[package]] name = "digest" version = "0.10.7" @@ -857,6 +869,42 @@ dependencies = [ "up-transport-zenoh", ] +[[package]] +name = "fms-diagnostics-consumer" +version = "0.1.0-SNAPSHOT" +dependencies = [ + "async-trait", + "clap", + "env_logger", + "fms-proto", + "fms-zenoh", + "influx-client", + "log", + "tokio", + "up-rust", + "up-transport-hono-kafka", + "up-transport-zenoh", +] + +[[package]] +name = "fms-diagnostics-forwarder" +version = "0.1.0-SNAPSHOT" +dependencies = [ + "async-trait", + "clap", + "duration-str", + "env_logger", + "fms-proto", + "fms-zenoh", + "http 0.2.12", + "kuksa-rust-sdk", + "log", + "protobuf", + "tokio", + "up-rust", + "up-transport-zenoh", +] + [[package]] name = "fms-forwarder" version = "0.1.0-SNAPSHOT" diff --git a/components/Cargo.toml b/components/Cargo.toml index 04c7406..d1b720d 100644 --- a/components/Cargo.toml +++ b/components/Cargo.toml @@ -19,8 +19,12 @@ [workspace] members = [ + "diagnostic-source-simulator", "fms-consumer", + "fms-diagnostics-consumer", + "fms-diagnostics-forwarder", "fms-forwarder", + "fms-proto", "fms-server", "fms-zenoh", "influx-client", @@ -42,6 +46,7 @@ async-trait = { version = "0.1.89" } bytes = { version = "1.11.0" } chrono = { version = "0.4.42", default-features = false } clap = { version = "4.5.53", default-features = false } +duration-str = { version = "0.12.0", default-features = false, features = ["time"] } env_logger = { version = "0.11.8", default-features = false, features = [ "humantime", ] } @@ -49,6 +54,7 @@ fms-proto = { path = "fms-proto" } fms-zenoh = { path = "fms-zenoh" } influx-client = { path = "influx-client", default-features = false } influxrs = { version = "3.0.1", default-features = false } +kuksa-rust-sdk = { version = "0.2.1" } log = { version = "0.4.28" } protobuf = { version = "3.7.2" } # tokio does not enable features by default diff --git a/components/Dockerfile.diagnostic-source-simulator b/components/Dockerfile.diagnostic-source-simulator new file mode 100644 index 0000000..70395c1 --- /dev/null +++ b/components/Dockerfile.diagnostic-source-simulator @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +FROM ghcr.io/rust-cross/rust-musl-cross:x86_64-musl AS builder-amd64 +ENV BUILDTARGET="x86_64-unknown-linux-musl" + + +FROM ghcr.io/rust-cross/rust-musl-cross:aarch64-musl AS builder-arm64 +ENV BUILDTARGET="aarch64-unknown-linux-musl" + +FROM builder-$TARGETARCH AS builder +ARG TARGETARCH +RUN apt-get update && apt-get install -y ca-certificates \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +# This will speed up fetching the crate.io index in the future, see +# https://blog.rust-lang.org/2022/06/22/sparse-registry-testing.html +ENV CARGO_UNSTABLE_SPARSE_REGISTRY=true +# This is supposedly required for successfully building for arm64 using buildx with QEMU +# see https://github.com/rust-lang/cargo/issues/10583 +ENV CARGO_NET_GIT_FETCH_WITH_CLI=true +RUN cargo install cargo-about + +RUN echo "Building for $TARGETARCH" +RUN mkdir components +COPY . components/ +WORKDIR /home/rust/src/components/diagnostic-source-simulator + +RUN cargo about generate -o /home/rust/licenses.html ../about.hbs +RUN cargo build --release --target $BUILDTARGET +RUN mv ../target/${BUILDTARGET}/release/diagnostic-source-simulator /home/rust + +FROM scratch + +COPY --from=builder /home/rust/diagnostic-source-simulator /app/diagnostic-source-simulator +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /home/rust/licenses.html /app/ + +ENTRYPOINT [ "/app/diagnostic-source-simulator" ] diff --git a/components/Dockerfile.fms-diagnostics-consumer b/components/Dockerfile.fms-diagnostics-consumer new file mode 100644 index 0000000..ebcedf5 --- /dev/null +++ b/components/Dockerfile.fms-diagnostics-consumer @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +FROM ghcr.io/rust-cross/rust-musl-cross:x86_64-musl AS builder-amd64 +ENV BUILDTARGET="x86_64-unknown-linux-musl" + + +FROM ghcr.io/rust-cross/rust-musl-cross:aarch64-musl AS builder-arm64 +ENV BUILDTARGET="aarch64-unknown-linux-musl" + +FROM builder-$TARGETARCH AS builder +ARG TARGETARCH +RUN apt-get update && apt-get install -y ca-certificates \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +# This will speed up fetching the crate.io index in the future, see +# https://blog.rust-lang.org/2022/06/22/sparse-registry-testing.html +ENV CARGO_UNSTABLE_SPARSE_REGISTRY=true +# This is supposedly required for successfully building for arm64 using buildx with QEMU +# see https://github.com/rust-lang/cargo/issues/10583 +ENV CARGO_NET_GIT_FETCH_WITH_CLI=true +RUN cargo install cargo-about + +RUN echo "Building for $TARGETARCH" +RUN mkdir components +COPY . components/ +WORKDIR /home/rust/src/components/fms-diagnostics-consumer + +RUN cargo about generate -o /home/rust/licenses.html ../about.hbs +RUN cargo build --release --target $BUILDTARGET +RUN mv ../target/${BUILDTARGET}/release/fms-diagnostics-consumer /home/rust + +FROM scratch + +COPY --from=builder /home/rust/fms-diagnostics-consumer /app/fms-diagnostics-consumer +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /home/rust/licenses.html /app/ + +ENTRYPOINT [ "/app/fms-diagnostics-consumer" ] diff --git a/components/Dockerfile.fms-diagnostics-forwarder b/components/Dockerfile.fms-diagnostics-forwarder new file mode 100644 index 0000000..674bf72 --- /dev/null +++ b/components/Dockerfile.fms-diagnostics-forwarder @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +FROM ghcr.io/rust-cross/rust-musl-cross:x86_64-musl AS builder-amd64 +ENV BUILDTARGET="x86_64-unknown-linux-musl" + + +FROM ghcr.io/rust-cross/rust-musl-cross:aarch64-musl AS builder-arm64 +ENV BUILDTARGET="aarch64-unknown-linux-musl" + +FROM builder-$TARGETARCH AS builder +ARG TARGETARCH +RUN apt-get update && apt-get install -y ca-certificates \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +# This will speed up fetching the crate.io index in the future, see +# https://blog.rust-lang.org/2022/06/22/sparse-registry-testing.html +ENV CARGO_UNSTABLE_SPARSE_REGISTRY=true +# This is supposedly required for successfully building for arm64 using buildx with QEMU +# see https://github.com/rust-lang/cargo/issues/10583 +ENV CARGO_NET_GIT_FETCH_WITH_CLI=true +RUN cargo install cargo-about + +RUN echo "Building for $TARGETARCH" +RUN mkdir components +COPY . components/ +WORKDIR /home/rust/src/components/fms-diagnostics-forwarder + +RUN cargo about generate -o /home/rust/licenses.html ../about.hbs +RUN cargo build --release --target $BUILDTARGET +RUN mv ../target/${BUILDTARGET}/release/fms-diagnostics-forwarder /home/rust + +FROM scratch + +COPY --from=builder /home/rust/fms-diagnostics-forwarder /app/fms-diagnostics-forwarder +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /home/rust/licenses.html /app/ + +ENTRYPOINT [ "/app/fms-diagnostics-forwarder" ] diff --git a/components/diagnostic-source-simulator/Cargo.toml b/components/diagnostic-source-simulator/Cargo.toml new file mode 100644 index 0000000..fc15101 --- /dev/null +++ b/components/diagnostic-source-simulator/Cargo.toml @@ -0,0 +1,45 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "diagnostic-source-simulator" +publish = false +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +readme.workspace = true + +[dependencies] +clap = { workspace = true, features = [ + "std", + "derive", + "env", + "color", + "help", + "usage", + "error-context", + "suggestions", +] } +env_logger = { workspace = true } +http = { version = "0.2" } +kuksa-rust-sdk = { workspace = true } +log = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } diff --git a/components/diagnostic-source-simulator/src/main.rs b/components/diagnostic-source-simulator/src/main.rs new file mode 100644 index 0000000..0eec0d4 --- /dev/null +++ b/components/diagnostic-source-simulator/src/main.rs @@ -0,0 +1,223 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use clap::Parser; +use http::Uri; +use kuksa_rust_sdk::kuksa::{common::ClientTraitV2, val::v2::KuksaClientV2}; +use kuksa_rust_sdk::v2_proto::value::TypedValue; +use kuksa_rust_sdk::v2_proto::Value; +use log::{info, warn}; + +const VSS_DIAG_ACTIVE_DTC_COUNT: &str = "Vehicle.Diagnostics.ActiveDTCCount"; +const VSS_DIAG_STORED_DTC_COUNT: &str = "Vehicle.Diagnostics.StoredDTCCount"; +const VSS_DIAG_PENDING_DTC_COUNT: &str = "Vehicle.Diagnostics.PendingDTCCount"; +const VSS_DIAG_CRITICAL_DTC_COUNT: &str = "Vehicle.Diagnostics.CriticalDTCCount"; +const VSS_DIAG_WORST_SEVERITY: &str = "Vehicle.Diagnostics.WorstSeverity"; +const VSS_DIAG_LAST_CODE: &str = "Vehicle.Diagnostics.LastCode"; +const VSS_DIAG_LAST_DESCRIPTION: &str = "Vehicle.Diagnostics.LastDescription"; +const VSS_DIAG_LAST_STATUS_MASK: &str = "Vehicle.Diagnostics.LastStatusMask"; +const VSS_DIAG_LAST_LIFECYCLE_STATE: &str = "Vehicle.Diagnostics.LastLifecycleState"; +const VSS_DIAG_LAST_SEVERITY: &str = "Vehicle.Diagnostics.LastSeverity"; +const VSS_DIAG_SOURCE: &str = "Vehicle.Diagnostics.Source"; +const VSS_DIAG_COMPONENT_ID: &str = "Vehicle.Diagnostics.ComponentId"; +const VSS_DIAG_ECU: &str = "Vehicle.Diagnostics.Ecu"; +const VSS_DIAG_E2EV_CRC_OK: &str = "Vehicle.Diagnostics.E2EV.CrcOk"; +const VSS_DIAG_E2EV_ALIVE_COUNTER: &str = "Vehicle.Diagnostics.E2EV.AliveCounter"; + +/// Simulates diagnostic data by writing VSS signals to the Kuksa Databroker. +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct SimulatorCommand { + /// The HTTP(S) URI of the Eclipse Kuksa Databroker's gRPC endpoint. + #[arg( + long = "databroker-uri", + value_name = "URI", + env = "KUKSA_DATABROKER_URI", + default_value = "http://databroker:55556" + )] + databroker_uri: String, + + /// Interval between state transitions (seconds). + #[arg( + long = "interval-secs", + env = "SIMULATOR_INTERVAL_SECS", + default_value = "10" + )] + interval_secs: u64, +} + +fn make_value(typed: TypedValue) -> Value { + Value { + typed_value: Some(typed), + } +} + +async fn publish(client: &mut KuksaClientV2, path: &str, value: TypedValue) { + if let Err(e) = client + .publish_value(path.to_string(), make_value(value)) + .await + { + warn!("failed to write [{path}] to Databroker: {e:?}"); + } +} + +async fn write_state_cleared(client: &mut KuksaClientV2, alive_counter: u32) { + info!("Transitioning to State A (cleared)"); + publish(client, VSS_DIAG_ACTIVE_DTC_COUNT, TypedValue::Uint32(0)).await; + publish(client, VSS_DIAG_STORED_DTC_COUNT, TypedValue::Uint32(0)).await; + publish(client, VSS_DIAG_PENDING_DTC_COUNT, TypedValue::Uint32(0)).await; + publish(client, VSS_DIAG_CRITICAL_DTC_COUNT, TypedValue::Uint32(0)).await; + publish( + client, + VSS_DIAG_WORST_SEVERITY, + TypedValue::String("INFO".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_LIFECYCLE_STATE, + TypedValue::String("CLEARED".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_CODE, + TypedValue::String(String::new()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_SEVERITY, + TypedValue::String("INFO".to_string()), + ) + .await; + publish(client, VSS_DIAG_E2EV_CRC_OK, TypedValue::Bool(true)).await; + publish( + client, + VSS_DIAG_E2EV_ALIVE_COUNTER, + TypedValue::Uint32(alive_counter), + ) + .await; +} + +async fn write_state_faulted(client: &mut KuksaClientV2, alive_counter: u32) { + info!("Transitioning to State B (faulted)"); + publish(client, VSS_DIAG_ACTIVE_DTC_COUNT, TypedValue::Uint32(1)).await; + publish(client, VSS_DIAG_STORED_DTC_COUNT, TypedValue::Uint32(1)).await; + publish(client, VSS_DIAG_PENDING_DTC_COUNT, TypedValue::Uint32(0)).await; + publish(client, VSS_DIAG_CRITICAL_DTC_COUNT, TypedValue::Uint32(1)).await; + publish( + client, + VSS_DIAG_WORST_SEVERITY, + TypedValue::String("CRITICAL".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_LIFECYCLE_STATE, + TypedValue::String("ACTIVE".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_CODE, + TypedValue::String("0x123456".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_DESCRIPTION, + TypedValue::String("E2EV signal validation failed".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_STATUS_MASK, + TypedValue::String("0x2F".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_LAST_SEVERITY, + TypedValue::String("CRITICAL".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_SOURCE, + TypedValue::String("diagnostic-source-simulator".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_COMPONENT_ID, + TypedValue::String("threadx-e2ev".to_string()), + ) + .await; + publish( + client, + VSS_DIAG_ECU, + TypedValue::String("threadx-e2ev".to_string()), + ) + .await; + publish(client, VSS_DIAG_E2EV_CRC_OK, TypedValue::Bool(false)).await; + publish( + client, + VSS_DIAG_E2EV_ALIVE_COUNTER, + TypedValue::Uint32(alive_counter), + ) + .await; +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let command = SimulatorCommand::parse(); + + info!( + "connecting to Kuksa Databroker at {}", + command.databroker_uri + ); + let uri = Uri::try_from(command.databroker_uri.clone())?; + let mut client = KuksaClientV2::new(uri); + + let interval = Duration::from_secs(command.interval_secs); + let mut ticker = tokio::time::interval(interval); + let mut state_faulted = false; + let mut alive_counter: u32 = 0; + + info!( + "starting diagnostic source simulator (interval: {:?})", + interval + ); + + loop { + ticker.tick().await; + alive_counter = alive_counter.wrapping_add(1); + + if state_faulted { + write_state_cleared(&mut client, alive_counter).await; + } else { + write_state_faulted(&mut client, alive_counter).await; + } + state_faulted = !state_faulted; + } +} diff --git a/components/fms-diagnostics-consumer/Cargo.toml b/components/fms-diagnostics-consumer/Cargo.toml new file mode 100644 index 0000000..d8d6671 --- /dev/null +++ b/components/fms-diagnostics-consumer/Cargo.toml @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "fms-diagnostics-consumer" +publish = false +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +readme.workspace = true + +[dependencies] +async-trait = { workspace = true } +clap = { workspace = true, features = [ + "std", + "derive", + "env", + "color", + "help", + "usage", + "error-context", + "suggestions", +] } +env_logger = { workspace = true } +fms-proto = { workspace = true } +fms-zenoh = { workspace = true } +influx-client = { workspace = true, features = ["writer"] } +log = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +up-rust = { workspace = true } +up-transport-hono-kafka = { workspace = true } +up-transport-zenoh = { workspace = true } diff --git a/components/fms-diagnostics-consumer/src/main.rs b/components/fms-diagnostics-consumer/src/main.rs new file mode 100644 index 0000000..5b48d0e --- /dev/null +++ b/components/fms-diagnostics-consumer/src/main.rs @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::str::FromStr; +use std::sync::Arc; +use std::thread; + +use clap::{Parser, Subcommand}; +use fms_proto::fms::DiagnosticStatus; +use fms_zenoh::ZenohTransportConfig; +use influx_client::connection::InfluxConnectionConfig; +use influx_client::writer::InfluxWriter; +use log::info; + +use up_rust::{UListener, UMessage, UTransport, UUri}; +use up_transport_hono_kafka::{HonoKafkaTransport, HonoKafkaTransportConfig}; +use up_transport_zenoh::UPTransportZenoh; + +struct DiagnosticStatusListener { + influx_writer: InfluxWriter, +} + +#[async_trait::async_trait] +impl UListener for DiagnosticStatusListener { + async fn on_receive(&self, msg: UMessage) { + if let Ok(diagnostic_status) = msg.extract_protobuf::() { + self.influx_writer + .write_diagnostic_status(&diagnostic_status) + .await; + } else { + info!("ignoring event with invalid/unknown payload"); + } + } +} + +/// Receives FMS diagnostics data via Zenoh or Hono uProtocol transport +/// and writes them to an InfluxDB server. +#[derive(Parser)] +#[command(version, about, long_about = None, arg_required_else_help = true)] +struct FmsDiagnosticsConsumerCommand { + /// The topic URI pattern to use for consuming diagnostic status events. + #[arg(long = "topic-filter", value_name = "URI", env = "DIAGNOSTIC_TOPIC_FILTER", default_value = "up://*/D110/1/D110", value_parser = up_rust::UUri::from_str)] + diagnostic_topic_filter: UUri, + + /// The local uService address. + #[arg(long = "uservice-uri", value_name = "URI", env = "USERVICE_URI", default_value = "up://fms-diagnostics-consumer/D111/1/0", value_parser = up_rust::UUri::from_str)] + local_uservice_uri: UUri, + + #[command(flatten)] + influxdb_connection: InfluxConnectionConfig, + + #[command(subcommand)] + transport: TransportType, +} + +#[derive(Subcommand)] +#[command(subcommand_required = true)] +enum TransportType { + /// Consumes diagnostic data using the Eclipse Hono/Kafka based uProtocol transport. + #[command(name = "hono")] + Hono(HonoKafkaTransportConfig), + + /// Consumes diagnostic data using the Eclipse Zenoh based uProtocol transport. + #[command(name = "zenoh")] + Zenoh(ZenohTransportConfig), +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + env_logger::init(); + + let command = FmsDiagnosticsConsumerCommand::parse(); + + let transport: Arc = match command.transport { + TransportType::Hono(config) => HonoKafkaTransport::new(config).map(Arc::new)?, + TransportType::Zenoh(config) => { + let config = config.try_into()?; + UPTransportZenoh::new(config, command.local_uservice_uri) + .await + .map(Arc::new)? + } + }; + + let influx_writer = InfluxWriter::new(&command.influxdb_connection)?; + let listener = Arc::new(DiagnosticStatusListener { influx_writer }); + info!( + "Registering listener for diagnostic status events [source filter: {}]", + &command.diagnostic_topic_filter.to_uri(false) + ); + transport + .register_listener(&command.diagnostic_topic_filter, None, listener) + .await + .map_err(Box::new)?; + // do not let the listener go out of scope + thread::park(); + + Ok(()) +} diff --git a/components/fms-diagnostics-forwarder/Cargo.toml b/components/fms-diagnostics-forwarder/Cargo.toml new file mode 100644 index 0000000..55a5cb2 --- /dev/null +++ b/components/fms-diagnostics-forwarder/Cargo.toml @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "fms-diagnostics-forwarder" +publish = false +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +readme.workspace = true + +[dependencies] +async-trait = { workspace = true } +clap = { workspace = true, features = [ + "std", + "derive", + "env", + "color", + "help", + "usage", + "error-context", + "suggestions", +] } +duration-str = { workspace = true } +env_logger = { workspace = true } +fms-proto = { workspace = true } +fms-zenoh = { workspace = true } +http = { version = "0.2" } +kuksa-rust-sdk = { workspace = true } +log = { workspace = true } +protobuf = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } +up-rust = { workspace = true, features = ["communication"] } +up-transport-zenoh = { workspace = true } diff --git a/components/fms-diagnostics-forwarder/src/diagnostic_abstraction.rs b/components/fms-diagnostics-forwarder/src/diagnostic_abstraction.rs new file mode 100644 index 0000000..1a45212 --- /dev/null +++ b/components/fms-diagnostics-forwarder/src/diagnostic_abstraction.rs @@ -0,0 +1,466 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Abstracts diagnostic data from the Eclipse Kuksa Databroker into a [`DiagnosticStatus`]. + +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Display; +use std::time::Duration; + +use clap::Args; +use http::Uri; +use kuksa_rust_sdk::kuksa::{common::ClientTraitV2, val::v2::KuksaClientV2}; +use kuksa_rust_sdk::v2_proto::value::TypedValue; +use log::{debug, error, info, warn}; +use protobuf::well_known_types::timestamp::Timestamp; +use protobuf::MessageField; +use tokio::sync::mpsc::Sender; + +use fms_proto::fms::{DiagnosticCode, DiagnosticStatus, DiagnosticSummary}; + +// --------------------------------------------------------------------------- +// VSS path constants +// --------------------------------------------------------------------------- +pub const VSS_VEHICLE_VIN: &str = "Vehicle.VehicleIdentification.VIN"; +pub const VSS_DIAG_ACTIVE_DTC_COUNT: &str = "Vehicle.Diagnostics.ActiveDTCCount"; +pub const VSS_DIAG_STORED_DTC_COUNT: &str = "Vehicle.Diagnostics.StoredDTCCount"; +pub const VSS_DIAG_PENDING_DTC_COUNT: &str = "Vehicle.Diagnostics.PendingDTCCount"; +pub const VSS_DIAG_CRITICAL_DTC_COUNT: &str = "Vehicle.Diagnostics.CriticalDTCCount"; +pub const VSS_DIAG_WORST_SEVERITY: &str = "Vehicle.Diagnostics.WorstSeverity"; +pub const VSS_DIAG_LAST_CODE: &str = "Vehicle.Diagnostics.LastCode"; +pub const VSS_DIAG_LAST_DESCRIPTION: &str = "Vehicle.Diagnostics.LastDescription"; +pub const VSS_DIAG_LAST_STATUS_MASK: &str = "Vehicle.Diagnostics.LastStatusMask"; +pub const VSS_DIAG_LAST_LIFECYCLE_STATE: &str = "Vehicle.Diagnostics.LastLifecycleState"; +pub const VSS_DIAG_LAST_SEVERITY: &str = "Vehicle.Diagnostics.LastSeverity"; +pub const VSS_DIAG_SOURCE: &str = "Vehicle.Diagnostics.Source"; +pub const VSS_DIAG_COMPONENT_ID: &str = "Vehicle.Diagnostics.ComponentId"; +pub const VSS_DIAG_ECU: &str = "Vehicle.Diagnostics.Ecu"; +pub const VSS_DIAG_E2EV_CRC_OK: &str = "Vehicle.Diagnostics.E2EV.CrcOk"; +pub const VSS_DIAG_E2EV_ALIVE_COUNTER: &str = "Vehicle.Diagnostics.E2EV.AliveCounter"; + +const DIAGNOSTIC_VSS_PATHS: &[&str] = &[ + VSS_VEHICLE_VIN, + VSS_DIAG_ACTIVE_DTC_COUNT, + VSS_DIAG_STORED_DTC_COUNT, + VSS_DIAG_PENDING_DTC_COUNT, + VSS_DIAG_CRITICAL_DTC_COUNT, + VSS_DIAG_WORST_SEVERITY, + VSS_DIAG_LAST_CODE, + VSS_DIAG_LAST_DESCRIPTION, + VSS_DIAG_LAST_STATUS_MASK, + VSS_DIAG_LAST_LIFECYCLE_STATE, + VSS_DIAG_LAST_SEVERITY, + VSS_DIAG_SOURCE, + VSS_DIAG_COMPONENT_ID, + VSS_DIAG_ECU, + VSS_DIAG_E2EV_CRC_OK, + VSS_DIAG_E2EV_ALIVE_COUNTER, +]; + +const PARAM_DATABROKER_URI: &str = "databroker-uri"; +const PARAM_TIMER_INTERVAL: &str = "timer-interval"; + +// --------------------------------------------------------------------------- +// CLI config +// --------------------------------------------------------------------------- + +/// Configuration for connecting to the Eclipse Kuksa Databroker. +#[derive(Args, Clone, Debug)] +pub struct DiagnosticDatabrokerClientConfig { + /// The HTTP(S) URI of the Eclipse Kuksa Databroker's gRPC endpoint. + #[arg( + long = PARAM_DATABROKER_URI, + value_name = "URI", + env = "KUKSA_DATABROKER_URI", + default_value = "http://databroker:55556", + value_parser = clap::builder::NonEmptyStringValueParser::new() + )] + pub databroker_uri: String, + + /// The time period to wait between polling the Databroker for diagnostic data. + #[arg( + long = PARAM_TIMER_INTERVAL, + value_name = "DURATION_SPEC", + env = "FMS_DIAGNOSTICS_FORWARDER_TIMER_INTERVAL", + default_value = "2s", + value_parser = |s: &str| duration_str::parse(s) + )] + pub timer_interval: Duration, +} + +// --------------------------------------------------------------------------- +// Error type +// --------------------------------------------------------------------------- + +#[derive(Debug)] +pub struct DatabrokerError { + description: String, +} + +impl Error for DatabrokerError {} + +impl Display for DatabrokerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "error invoking Databroker: {}", self.description) + } +} + +// --------------------------------------------------------------------------- +// Helpers: convert TypedValue to Rust types +// --------------------------------------------------------------------------- + +fn get_string(data: &HashMap, key: &str) -> String { + data.get(key) + .and_then(|v| String::try_from(v).ok()) + .unwrap_or_default() +} + +fn get_u32(data: &HashMap, key: &str) -> u32 { + data.get(key) + .and_then(|v| u32::try_from(v).ok()) + .unwrap_or(0) +} + +// --------------------------------------------------------------------------- +// Build DiagnosticStatus from VSS data map +// --------------------------------------------------------------------------- + +pub fn build_diagnostic_status(data: HashMap) -> DiagnosticStatus { + let vin = data + .get(VSS_VEHICLE_VIN) + .and_then(|v| String::try_from(v).ok()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "UNKNOWN-VIN".to_string()); + + let source = { + let s = get_string(&data, VSS_DIAG_SOURCE); + if s.is_empty() { + "diagnostic-source-simulator".to_string() + } else { + s + } + }; + + let component_id = { + let s = get_string(&data, VSS_DIAG_COMPONENT_ID); + if s.is_empty() { + "unknown-component".to_string() + } else { + s + } + }; + + let active_count = get_u32(&data, VSS_DIAG_ACTIVE_DTC_COUNT); + let stored_count = get_u32(&data, VSS_DIAG_STORED_DTC_COUNT); + let pending_count = get_u32(&data, VSS_DIAG_PENDING_DTC_COUNT); + let critical_count = get_u32(&data, VSS_DIAG_CRITICAL_DTC_COUNT); + let worst_severity = { + let s = get_string(&data, VSS_DIAG_WORST_SEVERITY); + if s.is_empty() { + "UNKNOWN".to_string() + } else { + s + } + }; + + let created_ts = Timestamp::now(); + + let mut summary = DiagnosticSummary::new(); + summary.active_count = active_count; + summary.stored_count = stored_count; + summary.pending_count = pending_count; + summary.critical_count = critical_count; + summary.has_active_faults = active_count > 0; + summary.worst_severity = worst_severity; + + let mut status = DiagnosticStatus::new(); + status.vin = vin; + status.source = source.clone(); + status.component_id = component_id.clone(); + status.created = MessageField::some(created_ts.clone()); + status.summary = MessageField::some(summary); + + // Build a single DiagnosticCode from the "last code" fields if present + let last_code = get_string(&data, VSS_DIAG_LAST_CODE); + if !last_code.is_empty() { + let lifecycle_state = { + let s = get_string(&data, VSS_DIAG_LAST_LIFECYCLE_STATE); + if s.is_empty() { + "UNKNOWN".to_string() + } else { + s + } + }; + let severity = { + let s = get_string(&data, VSS_DIAG_LAST_SEVERITY); + if s.is_empty() { + "UNKNOWN".to_string() + } else { + s + } + }; + let ecu = { + let s = get_string(&data, VSS_DIAG_ECU); + if s.is_empty() { + "UNKNOWN".to_string() + } else { + s + } + }; + let protocol = { + let src = source.to_lowercase(); + if src.contains("opensovd") || src.contains("openbsw") { + "UDS".to_string() + } else { + "INTERNAL".to_string() + } + }; + + let mut code = DiagnosticCode::new(); + code.code = last_code.clone(); + code.raw_uds_dtc = last_code; + code.protocol = protocol; + code.status_mask = get_string(&data, VSS_DIAG_LAST_STATUS_MASK); + code.description = get_string(&data, VSS_DIAG_LAST_DESCRIPTION); + code.severity = severity; + code.lifecycle_state = lifecycle_state.clone(); + code.ecu = ecu; + code.component_id = component_id; + code.source = source; + code.first_seen = MessageField::some(created_ts.clone()); + code.last_seen = MessageField::some(created_ts); + + match lifecycle_state.as_str() { + "ACTIVE" => status.active_codes.push(code), + "STORED" => status.stored_codes.push(code), + "PENDING" => status.pending_codes.push(code), + _ => status.active_codes.push(code), + } + } + + status +} + +// --------------------------------------------------------------------------- +// KuksaValDatabroker wrapper +// --------------------------------------------------------------------------- + +struct KuksaDiagnosticDatabroker { + client: Box, +} + +impl KuksaDiagnosticDatabroker { + async fn new(config: &DiagnosticDatabrokerClientConfig) -> Result { + info!( + "creating diagnostic client for Eclipse Kuksa Databroker at {}", + config.databroker_uri + ); + Uri::try_from(config.databroker_uri.clone()) + .map_err(|err| { + error!("invalid Databroker URI: {err}"); + DatabrokerError { + description: err.to_string(), + } + }) + .map(|uri| { + let client = KuksaClientV2::new(uri); + KuksaDiagnosticDatabroker { + client: Box::new(client), + } + }) + } + + pub async fn get_diagnostic_status(&mut self) -> Result { + let paths = DIAGNOSTIC_VSS_PATHS.iter().map(|v| v.to_string()).collect(); + + match self.client.get_values(paths).await { + Err(kuksa_rust_sdk::kuksa::common::ClientError::Connection(msg)) => { + warn!("failed to retrieve diagnostic data points from Databroker: {msg}"); + Err(DatabrokerError { description: msg }) + } + Err(kuksa_rust_sdk::kuksa::common::ClientError::Status(status)) => { + warn!( + "failed to retrieve diagnostic data points from Databroker: {}", + status.message() + ); + Err(DatabrokerError { + description: status.message().to_string(), + }) + } + Err(kuksa_rust_sdk::kuksa::common::ClientError::Function(errors)) => { + errors.iter().for_each(|error| { + warn!("failed to retrieve diagnostic data points from Databroker: {error:?}"); + }); + Err(DatabrokerError { + description: "multiple errors while retrieving diagnostic data".to_string(), + }) + } + Ok(get_response) => { + let mut vss_data = HashMap::new(); + let mut idx = 0usize; + get_response.iter().for_each(|data_entry| { + if let (name, Some(value)) = ( + DIAGNOSTIC_VSS_PATHS[idx], + data_entry + .value + .as_ref() + .and_then(|v| v.typed_value.as_ref()), + ) { + debug!("got value [path: {name}]: {value:?}"); + vss_data.insert(name.to_owned(), value.to_owned()); + } + idx += 1; + }); + Ok(build_diagnostic_status(vss_data)) + } + } + } +} + +// --------------------------------------------------------------------------- +// Public init function +// --------------------------------------------------------------------------- + +/// Sets up a connection to the Databroker and starts a timer-based polling loop, +/// sending `DiagnosticStatus` messages through `status_publisher`. +pub async fn init( + config: &DiagnosticDatabrokerClientConfig, + status_publisher: Sender, +) -> Result<(), DatabrokerError> { + let timer_interval = config.timer_interval; + + let mut databroker = KuksaDiagnosticDatabroker::new(config).await?; + + tokio::task::spawn(async move { + let mut interval = tokio::time::interval(timer_interval); + loop { + interval.tick().await; + match databroker.get_diagnostic_status().await { + Err(e) => { + warn!( + "failed to retrieve current diagnostic status from databroker: {}", + e + ); + } + Ok(status) => { + if let Err(e) = status_publisher.send(status).await { + warn!("failed to send diagnostic status: {}", e); + } + } + } + } + }); + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use kuksa_rust_sdk::v2_proto::value::TypedValue; + + fn make_string_value(s: &str) -> TypedValue { + TypedValue::String(s.to_string()) + } + + fn make_uint32_value(v: u32) -> TypedValue { + TypedValue::Uint32(v) + } + + #[test] + fn test_summary_counts_set_correctly() { + let mut data = HashMap::new(); + data.insert(VSS_VEHICLE_VIN.to_string(), make_string_value("TEST-VIN")); + data.insert(VSS_DIAG_ACTIVE_DTC_COUNT.to_string(), make_uint32_value(2)); + data.insert(VSS_DIAG_STORED_DTC_COUNT.to_string(), make_uint32_value(3)); + data.insert(VSS_DIAG_PENDING_DTC_COUNT.to_string(), make_uint32_value(1)); + data.insert( + VSS_DIAG_CRITICAL_DTC_COUNT.to_string(), + make_uint32_value(1), + ); + data.insert( + VSS_DIAG_WORST_SEVERITY.to_string(), + make_string_value("CRITICAL"), + ); + + let status = build_diagnostic_status(data); + let summary = status.summary.as_ref().unwrap(); + assert_eq!(summary.active_count, 2); + assert_eq!(summary.stored_count, 3); + assert_eq!(summary.pending_count, 1); + assert_eq!(summary.critical_count, 1); + assert!(summary.has_active_faults); + assert_eq!(summary.worst_severity, "CRITICAL"); + } + + #[test] + fn test_active_code_routing() { + let mut data = HashMap::new(); + data.insert( + VSS_DIAG_LAST_CODE.to_string(), + make_string_value("0xABCDEF"), + ); + data.insert( + VSS_DIAG_LAST_LIFECYCLE_STATE.to_string(), + make_string_value("ACTIVE"), + ); + + let status = build_diagnostic_status(data); + assert_eq!(status.active_codes.len(), 1); + assert_eq!(status.stored_codes.len(), 0); + assert_eq!(status.pending_codes.len(), 0); + assert_eq!(status.active_codes[0].code, "0xABCDEF"); + } + + #[test] + fn test_stored_code_routing() { + let mut data = HashMap::new(); + data.insert( + VSS_DIAG_LAST_CODE.to_string(), + make_string_value("0x111111"), + ); + data.insert( + VSS_DIAG_LAST_LIFECYCLE_STATE.to_string(), + make_string_value("STORED"), + ); + + let status = build_diagnostic_status(data); + assert_eq!(status.stored_codes.len(), 1); + assert_eq!(status.active_codes.len(), 0); + } + + #[test] + fn test_missing_values_produce_safe_defaults() { + let data = HashMap::new(); + let status = build_diagnostic_status(data); + assert_eq!(status.vin, "UNKNOWN-VIN"); + assert_eq!(status.source, "diagnostic-source-simulator"); + assert_eq!(status.component_id, "unknown-component"); + let summary = status.summary.as_ref().unwrap(); + assert_eq!(summary.active_count, 0); + assert!(!summary.has_active_faults); + assert_eq!(summary.worst_severity, "UNKNOWN"); + assert!(status.active_codes.is_empty()); + } +} diff --git a/components/fms-diagnostics-forwarder/src/main.rs b/components/fms-diagnostics-forwarder/src/main.rs new file mode 100644 index 0000000..ad87b88 --- /dev/null +++ b/components/fms-diagnostics-forwarder/src/main.rs @@ -0,0 +1,100 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::{str::FromStr, sync::Arc}; + +use clap::{Parser, Subcommand}; +use fms_proto::fms::DiagnosticStatus; +use fms_zenoh::ZenohTransportConfig; +use log::{info, warn}; +use tokio::sync::mpsc; +use up_rust::{ + communication::{CallOptions, Publisher, SimplePublisher, UPayload}, + LocalUriProvider, StaticUriProvider, UTransport, UUri, +}; +use up_transport_zenoh::UPTransportZenoh; + +mod diagnostic_abstraction; + +/// Forwards FMS diagnostic data from the Kuksa Databroker to a back end system using uProtocol. +#[derive(Parser)] +#[command(version, about, long_about = None, arg_required_else_help = true)] +struct FmsDiagnosticsForwarderCommand { + /// The topic to publish diagnostic status events to. + #[arg(long = "topic", value_name = "URI", env = "DIAGNOSTIC_TOPIC", default_value = "up://fms-diagnostics-forwarder/D110/1/D110", value_parser = up_rust::UUri::from_str)] + diagnostic_topic: UUri, + + #[command(flatten)] + databroker_connection: diagnostic_abstraction::DiagnosticDatabrokerClientConfig, + + #[command(subcommand)] + transport: TransportType, +} + +#[derive(Subcommand)] +#[command(subcommand_required = true)] +enum TransportType { + /// Forwards diagnostic data via Eclipse uProtocol using Eclipse Zenoh based transport. + #[command(name = "zenoh")] + Zenoh(ZenohTransportConfig), +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let command = FmsDiagnosticsForwarderCommand::parse(); + let uri_provider = StaticUriProvider::try_from(&command.diagnostic_topic).map(Arc::new)?; + + let transport: Arc = match command.transport { + TransportType::Zenoh(config) => { + let zenoh_config = config.try_into()?; + UPTransportZenoh::new(zenoh_config, uri_provider.get_source_uri()) + .await + .map(Arc::new)? + } + }; + + let origin_resource_id = u16::try_from(command.diagnostic_topic.resource_id)?; + let publisher = Arc::new(SimplePublisher::new(transport, uri_provider)); + info!("starting FMS diagnostics forwarder"); + + let (tx, mut rx) = mpsc::channel::(30); + diagnostic_abstraction::init(&command.databroker_connection, tx).await?; + + while let Some(diagnostic_status) = rx.recv().await { + match UPayload::try_from_protobuf(diagnostic_status) { + Ok(payload) => { + if let Err(e) = publisher + .publish( + origin_resource_id, + CallOptions::for_publish(None, None, None), + Some(payload), + ) + .await + { + warn!("failed to publish diagnostic status event: {}", e); + } + } + Err(e) => { + warn!("failed to serialize diagnostic status: {}", e); + } + } + } + Ok(()) +} diff --git a/components/fms-forwarder/src/vehicle_abstraction/kuksa.rs b/components/fms-forwarder/src/vehicle_abstraction/kuksa.rs index bc4c53d..eb3e335 100644 --- a/components/fms-forwarder/src/vehicle_abstraction/kuksa.rs +++ b/components/fms-forwarder/src/vehicle_abstraction/kuksa.rs @@ -51,13 +51,15 @@ pub fn new_vehicle_status( .parking_brake_engaged = bool::try_from(value).ok(); } - if let Some(value) = data.get(vss::VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_RIGHT_ISSIGNALING) { + if let Some(value) = data.get(vss::VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_RIGHT_ISSIGNALING) + { vehicle_status .snapshot_data .mut_or_insert_default() .direction_indicator_right = bool::try_from(value).ok(); } - if let Some(value) = data.get(vss::VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_LEFT_ISSIGNALING) { + if let Some(value) = data.get(vss::VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_LEFT_ISSIGNALING) + { vehicle_status .snapshot_data .mut_or_insert_default() diff --git a/components/fms-forwarder/src/vehicle_abstraction/vss.rs b/components/fms-forwarder/src/vehicle_abstraction/vss.rs index fd7702b..ec6c5f8 100644 --- a/components/fms-forwarder/src/vehicle_abstraction/vss.rs +++ b/components/fms-forwarder/src/vehicle_abstraction/vss.rs @@ -32,8 +32,7 @@ pub const VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_RIGHT_ISSIGNALING: &str = "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling"; pub const VSS_VEHICLE_BODY_LIGHTS_DIRECTIONINDICATOR_LEFT_ISSIGNALING: &str = "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling"; -pub const VSS_VEHICLE_BODY_LIGHTS_BRAKE_ISACTIVE: &str = - "Vehicle.Body.Lights.Brake.IsActive"; +pub const VSS_VEHICLE_BODY_LIGHTS_BRAKE_ISACTIVE: &str = "Vehicle.Body.Lights.Brake.IsActive"; pub const VSS_VEHICLE_DRIVER_IDENTIFIER_SUBJECT: &str = "Vehicle.Driver.Identifier.Subject"; pub const VSS_VEHICLE_CURRENTLOCATION_LATITUDE: &str = "Vehicle.CurrentLocation.Latitude"; pub const VSS_VEHICLE_CURRENTLOCATION_LONGITUDE: &str = "Vehicle.CurrentLocation.Longitude"; diff --git a/components/fms-proto/proto/fms/v4/fms.proto b/components/fms-proto/proto/fms/v4/fms.proto index ab32b55..f2fb6fe 100644 --- a/components/fms-proto/proto/fms/v4/fms.proto +++ b/components/fms-proto/proto/fms/v4/fms.proto @@ -288,6 +288,42 @@ message UptimeData { optional uint64 bellow_pressure_rear_axle_right = 13; } +message DiagnosticSummary { + uint32 active_count = 1; + uint32 stored_count = 2; + uint32 pending_count = 3; + uint32 critical_count = 4; + bool has_active_faults = 5; + string worst_severity = 6; +} + +message DiagnosticCode { + string code = 1; + string raw_uds_dtc = 2; + string protocol = 3; // UDS | SOVD | OBD | INTERNAL + string status_mask = 4; + string description = 5; + string severity = 6; // INFO | WARNING | CRITICAL | UNKNOWN + string lifecycle_state = 7; // ACTIVE | STORED | PENDING | CLEARED | UNKNOWN + string ecu = 8; + string component_id = 9; + string source = 10; + google.protobuf.Timestamp first_seen = 11; + google.protobuf.Timestamp last_seen = 12; +} + +message DiagnosticStatus { + string vin = 1; + string vehicle_id = 2; + string source = 3; + string component_id = 4; + google.protobuf.Timestamp created = 5; + repeated DiagnosticCode active_codes = 6; + repeated DiagnosticCode stored_codes = 7; + repeated DiagnosticCode pending_codes = 8; + DiagnosticSummary summary = 9; +} + message VehicleStatus { string vin = 1; Trigger trigger = 2; diff --git a/components/fms-proto/tests/diagnostic_status.rs b/components/fms-proto/tests/diagnostic_status.rs new file mode 100644 index 0000000..88e6c0e --- /dev/null +++ b/components/fms-proto/tests/diagnostic_status.rs @@ -0,0 +1,39 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +use fms_proto::fms::{DiagnosticCode, DiagnosticStatus}; +use protobuf::Message; + +#[test] +fn test_diagnostic_status_round_trip() { + let mut status = DiagnosticStatus::new(); + status.vin = "DEMO-VIN-001".to_string(); + + let mut code = DiagnosticCode::new(); + code.code = "0x123456".to_string(); + code.lifecycle_state = "ACTIVE".to_string(); + status.active_codes.push(code); + + let bytes = status.write_to_bytes().expect("serialization failed"); + let parsed = DiagnosticStatus::parse_from_bytes(&bytes).expect("deserialization failed"); + + assert_eq!(parsed.vin, "DEMO-VIN-001"); + assert_eq!(parsed.active_codes.len(), 1); + assert_eq!(parsed.active_codes[0].code, "0x123456"); +} diff --git a/components/fms-server/src/influx_reader.rs b/components/fms-server/src/influx_reader.rs index 7a346bb..ac8cb29 100644 --- a/components/fms-server/src/influx_reader.rs +++ b/components/fms-server/src/influx_reader.rs @@ -23,6 +23,7 @@ use influx_client::connection::{InfluxConnection, InfluxConnectionConfig}; use influxrs::InfluxError; use log::error; +use crate::models::diagnostics::{DiagnosticCodeObject, DiagnosticSummaryObject}; use crate::models::position::{GnssPositionObject, VehiclePositionObject}; use crate::models::status::{DriverWorkingStateProperty, SnapshotDataObject, VehicleStatusObject}; use crate::models::vehicle::VehicleObject; @@ -401,4 +402,180 @@ impl InfluxReader { .collect() }) } + + /// Returns all VINs that have diagnostic summary data. + pub async fn get_diagnostic_vins(&self) -> Result, InfluxError> { + let query = influxrs::Query::new(format!( + r#" + import "influxdata/influxdb/schema" + schema.tagValues(bucket: "{}", tag: "{}", predicate: (r) => r._measurement == "{}") + "#, + self.influx_con.bucket, + influx_client::TAG_VIN, + influx_client::MEASUREMENT_DIAGNOSTIC_SUMMARY, + )); + self.influx_con.client.query(query).await.map(|rows| { + rows.into_iter() + .filter_map(|e| e.get("_value").map(|v| v.to_string())) + .collect() + }) + } + + /// Returns the latest diagnostic summary for a given VIN. + pub async fn get_diagnostic_summary( + &self, + vin: &str, + ) -> Result, InfluxError> { + let query = influxrs::Query::new(format!(r#"from(bucket: "{}")"#, self.influx_con.bucket)) + .then("range(start: -30d)") + .then(format!( + r#"filter(fn: (r) => r._measurement == "{}")"#, + influx_client::MEASUREMENT_DIAGNOSTIC_SUMMARY + )) + .then(format!( + r#"filter(fn: (r) => r["{}"] == "{}")"#, + influx_client::TAG_VIN, + vin + )) + .then("last()") + .then(r#"pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")"#); + + self.influx_con.client.query(query).await.map(|rows| { + rows.into_iter() + .filter_map(|e| { + e.get(influx_client::TAG_VIN) + .map(|v| DiagnosticSummaryObject { + vin: v.to_string(), + source: e.get(influx_client::TAG_SOURCE).cloned(), + component_id: e.get(influx_client::TAG_COMPONENT_ID).cloned(), + created_date_time: unpack_time( + e.get(influx_client::FIELD_CREATED_DATE_TIME), + ), + active_count: unpack_value_i64( + e.get(influx_client::FIELD_ACTIVE_COUNT), + ), + stored_count: unpack_value_i64( + e.get(influx_client::FIELD_STORED_COUNT), + ), + pending_count: unpack_value_i64( + e.get(influx_client::FIELD_PENDING_COUNT), + ), + critical_count: unpack_value_i64( + e.get(influx_client::FIELD_CRITICAL_COUNT), + ), + has_active_faults: unpack_value_bool( + e.get(influx_client::FIELD_HAS_ACTIVE_FAULTS), + ), + worst_severity: e.get(influx_client::FIELD_WORST_SEVERITY).cloned(), + }) + }) + .collect() + }) + } + + /// Returns all DTC measurements for a given VIN, optionally filtered to active only. + pub async fn get_diagnostic_codes( + &self, + vin: &str, + active_only: bool, + ) -> Result, InfluxError> { + let mut query = + influxrs::Query::new(format!(r#"from(bucket: "{}")"#, self.influx_con.bucket)) + .then("range(start: -30d)") + .then(format!( + r#"filter(fn: (r) => r._measurement == "{}")"#, + influx_client::MEASUREMENT_DIAGNOSTIC_CODE + )) + .then(format!( + r#"filter(fn: (r) => r["{}"] == "{}")"#, + influx_client::TAG_VIN, + vin + )); + + if active_only { + query = query.then(format!( + r#"filter(fn: (r) => r["{}"] == "ACTIVE")"#, + influx_client::TAG_LIFECYCLE_STATE + )); + } + + query = + query.then(r#"pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")"#); + + self.influx_con.client.query(query).await.map(|rows| { + rows.into_iter() + .filter_map(|e| { + e.get(influx_client::TAG_VIN).map(|v| DiagnosticCodeObject { + vin: v.to_string(), + code: e.get(influx_client::TAG_CODE).cloned(), + source: e.get(influx_client::TAG_SOURCE).cloned(), + component_id: e.get(influx_client::TAG_COMPONENT_ID).cloned(), + ecu: e.get(influx_client::TAG_ECU).cloned(), + severity: e.get(influx_client::TAG_SEVERITY).cloned(), + lifecycle_state: e.get(influx_client::TAG_LIFECYCLE_STATE).cloned(), + protocol: e.get(influx_client::TAG_PROTOCOL).cloned(), + raw_uds_dtc: e.get(influx_client::FIELD_RAW_UDS_DTC).cloned(), + status_mask: e.get(influx_client::FIELD_STATUS_MASK).cloned(), + description: e.get(influx_client::FIELD_DESCRIPTION).cloned(), + created_date_time: unpack_time( + e.get(influx_client::FIELD_CREATED_DATE_TIME), + ), + first_seen: unpack_time(e.get(influx_client::FIELD_FIRST_SEEN)), + last_seen: unpack_time(e.get(influx_client::FIELD_LAST_SEEN)), + }) + }) + .collect() + }) + } + + /// Returns recent diagnostic summary points (timeline) for a given VIN. + pub async fn get_diagnostic_timeline( + &self, + vin: &str, + ) -> Result, InfluxError> { + let query = influxrs::Query::new(format!(r#"from(bucket: "{}")"#, self.influx_con.bucket)) + .then("range(start: -24h)") + .then(format!( + r#"filter(fn: (r) => r._measurement == "{}")"#, + influx_client::MEASUREMENT_DIAGNOSTIC_SUMMARY + )) + .then(format!( + r#"filter(fn: (r) => r["{}"] == "{}")"#, + influx_client::TAG_VIN, + vin + )) + .then(r#"pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")"#); + + self.influx_con.client.query(query).await.map(|rows| { + rows.into_iter() + .filter_map(|e| { + e.get(influx_client::TAG_VIN) + .map(|v| DiagnosticSummaryObject { + vin: v.to_string(), + source: e.get(influx_client::TAG_SOURCE).cloned(), + component_id: e.get(influx_client::TAG_COMPONENT_ID).cloned(), + created_date_time: unpack_time( + e.get(influx_client::FIELD_CREATED_DATE_TIME), + ), + active_count: unpack_value_i64( + e.get(influx_client::FIELD_ACTIVE_COUNT), + ), + stored_count: unpack_value_i64( + e.get(influx_client::FIELD_STORED_COUNT), + ), + pending_count: unpack_value_i64( + e.get(influx_client::FIELD_PENDING_COUNT), + ), + critical_count: unpack_value_i64( + e.get(influx_client::FIELD_CRITICAL_COUNT), + ), + has_active_faults: unpack_value_bool( + e.get(influx_client::FIELD_HAS_ACTIVE_FAULTS), + ), + worst_severity: e.get(influx_client::FIELD_WORST_SEVERITY).cloned(), + }) + }) + .collect() + }) + } } diff --git a/components/fms-server/src/lib.rs b/components/fms-server/src/lib.rs index 93c5d86..b31e92d 100644 --- a/components/fms-server/src/lib.rs +++ b/components/fms-server/src/lib.rs @@ -37,6 +37,9 @@ mod influx_reader; mod models; mod query_parser; +use models::diagnostics::{ + DiagnosticCodeListResponse, DiagnosticSummaryListResponse, DiagnosticVehicleListResponse, +}; use models::position::{ VehiclePositionResponseObject, VehiclePositionResponseObjectVehiclePositionResponse, }; @@ -59,6 +62,20 @@ pub fn app(influx_connection_params: &InfluxConnectionConfig) -> Router { .route("/rfms/vehiclepositions", get(get_vehicleposition)) .route("/rfms/vehicles", get(get_vehicles)) .route("/rfms/vehiclestatuses", get(get_vehiclesstatuses)) + .route("/diagnostics/vehicles", get(get_diagnostic_vehicles)) + .route( + "/diagnostics/vehicles/{vin}/summary", + get(get_diagnostic_summary), + ) + .route("/diagnostics/vehicles/{vin}/dtcs", get(get_diagnostic_dtcs)) + .route( + "/diagnostics/vehicles/{vin}/dtcs/active", + get(get_diagnostic_dtcs_active), + ) + .route( + "/diagnostics/vehicles/{vin}/timeline", + get(get_diagnostic_timeline), + ) .with_state(influx_reader) } @@ -146,3 +163,76 @@ async fn get_vehiclesstatuses( StatusCode::INTERNAL_SERVER_ERROR }) } + +// --------------------------------------------------------------------------- +// Diagnostics handlers +// --------------------------------------------------------------------------- + +async fn get_diagnostic_vehicles( + State(influx_server): State>, +) -> Result, StatusCode> { + influx_server + .get_diagnostic_vins() + .await + .map(|vins| Json(json!(DiagnosticVehicleListResponse { vins }))) + .map_err(|e| { + error!("error retrieving diagnostic vehicle list: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +async fn get_diagnostic_summary( + State(influx_server): State>, + axum::extract::Path(vin): axum::extract::Path, +) -> Result, StatusCode> { + influx_server + .get_diagnostic_summary(&vin) + .await + .map(|summaries| Json(json!(DiagnosticSummaryListResponse { summaries }))) + .map_err(|e| { + error!("error retrieving diagnostic summary for {vin}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +async fn get_diagnostic_dtcs( + State(influx_server): State>, + axum::extract::Path(vin): axum::extract::Path, +) -> Result, StatusCode> { + influx_server + .get_diagnostic_codes(&vin, false) + .await + .map(|dtcs| Json(json!(DiagnosticCodeListResponse { dtcs }))) + .map_err(|e| { + error!("error retrieving DTCs for {vin}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +async fn get_diagnostic_dtcs_active( + State(influx_server): State>, + axum::extract::Path(vin): axum::extract::Path, +) -> Result, StatusCode> { + influx_server + .get_diagnostic_codes(&vin, true) + .await + .map(|dtcs| Json(json!(DiagnosticCodeListResponse { dtcs }))) + .map_err(|e| { + error!("error retrieving active DTCs for {vin}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +async fn get_diagnostic_timeline( + State(influx_server): State>, + axum::extract::Path(vin): axum::extract::Path, +) -> Result, StatusCode> { + influx_server + .get_diagnostic_timeline(&vin) + .await + .map(|summaries| Json(json!(DiagnosticSummaryListResponse { summaries }))) + .map_err(|e| { + error!("error retrieving diagnostic timeline for {vin}: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} diff --git a/components/fms-server/src/models/diagnostics.rs b/components/fms-server/src/models/diagnostics.rs new file mode 100644 index 0000000..6e6ae2a --- /dev/null +++ b/components/fms-server/src/models/diagnostics.rs @@ -0,0 +1,141 @@ +// SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +/// Response model for a single diagnostic summary entry. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiagnosticSummaryObject { + #[serde(rename = "vin")] + pub vin: String, + + #[serde(rename = "source")] + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, + + #[serde(rename = "componentId")] + #[serde(skip_serializing_if = "Option::is_none")] + pub component_id: Option, + + #[serde(rename = "createdDateTime")] + #[serde(skip_serializing_if = "Option::is_none")] + pub created_date_time: Option>, + + #[serde(rename = "activeCount")] + #[serde(skip_serializing_if = "Option::is_none")] + pub active_count: Option, + + #[serde(rename = "storedCount")] + #[serde(skip_serializing_if = "Option::is_none")] + pub stored_count: Option, + + #[serde(rename = "pendingCount")] + #[serde(skip_serializing_if = "Option::is_none")] + pub pending_count: Option, + + #[serde(rename = "criticalCount")] + #[serde(skip_serializing_if = "Option::is_none")] + pub critical_count: Option, + + #[serde(rename = "hasActiveFaults")] + #[serde(skip_serializing_if = "Option::is_none")] + pub has_active_faults: Option, + + #[serde(rename = "worstSeverity")] + #[serde(skip_serializing_if = "Option::is_none")] + pub worst_severity: Option, +} + +/// Response model for a single diagnostic code (DTC) entry. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiagnosticCodeObject { + #[serde(rename = "vin")] + pub vin: String, + + #[serde(rename = "code")] + #[serde(skip_serializing_if = "Option::is_none")] + pub code: Option, + + #[serde(rename = "source")] + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, + + #[serde(rename = "componentId")] + #[serde(skip_serializing_if = "Option::is_none")] + pub component_id: Option, + + #[serde(rename = "ecu")] + #[serde(skip_serializing_if = "Option::is_none")] + pub ecu: Option, + + #[serde(rename = "severity")] + #[serde(skip_serializing_if = "Option::is_none")] + pub severity: Option, + + #[serde(rename = "lifecycleState")] + #[serde(skip_serializing_if = "Option::is_none")] + pub lifecycle_state: Option, + + #[serde(rename = "protocol")] + #[serde(skip_serializing_if = "Option::is_none")] + pub protocol: Option, + + #[serde(rename = "rawUdsDtc")] + #[serde(skip_serializing_if = "Option::is_none")] + pub raw_uds_dtc: Option, + + #[serde(rename = "statusMask")] + #[serde(skip_serializing_if = "Option::is_none")] + pub status_mask: Option, + + #[serde(rename = "description")] + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + + #[serde(rename = "createdDateTime")] + #[serde(skip_serializing_if = "Option::is_none")] + pub created_date_time: Option>, + + #[serde(rename = "firstSeen")] + #[serde(skip_serializing_if = "Option::is_none")] + pub first_seen: Option>, + + #[serde(rename = "lastSeen")] + #[serde(skip_serializing_if = "Option::is_none")] + pub last_seen: Option>, +} + +/// Response wrapper listing VINs that have diagnostic data. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiagnosticVehicleListResponse { + #[serde(rename = "vins")] + pub vins: Vec, +} + +/// Response wrapper for a list of diagnostic summaries. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiagnosticSummaryListResponse { + #[serde(rename = "summaries")] + pub summaries: Vec, +} + +/// Response wrapper for a list of diagnostic codes. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiagnosticCodeListResponse { + #[serde(rename = "dtcs")] + pub dtcs: Vec, +} diff --git a/components/fms-server/src/models/mod.rs b/components/fms-server/src/models/mod.rs index daf4033..2422af8 100644 --- a/components/fms-server/src/models/mod.rs +++ b/components/fms-server/src/models/mod.rs @@ -19,6 +19,7 @@ use crate::models; +pub mod diagnostics; pub mod position; pub mod status; pub mod vehicle; diff --git a/components/influx-client/src/lib.rs b/components/influx-client/src/lib.rs index 43a075f..2b080b2 100644 --- a/components/influx-client/src/lib.rs +++ b/components/influx-client/src/lib.rs @@ -57,9 +57,30 @@ pub const FIELD_WHEEL_BASED_SPEED: &str = "wheelBasedSpeed"; pub const MEASUREMENT_HEADER: &str = "header"; pub const MEASUREMENT_SNAPSHOT: &str = "snapshot"; +pub const MEASUREMENT_DIAGNOSTIC_SUMMARY: &str = "diagnostic_summary"; +pub const MEASUREMENT_DIAGNOSTIC_CODE: &str = "diagnostic_code"; pub const TAG_TRIGGER: &str = "trigger"; pub const TAG_VIN: &str = "vin"; +pub const TAG_SOURCE: &str = "source"; +pub const TAG_COMPONENT_ID: &str = "componentId"; +pub const TAG_ECU: &str = "ecu"; +pub const TAG_CODE: &str = "code"; +pub const TAG_SEVERITY: &str = "severity"; +pub const TAG_LIFECYCLE_STATE: &str = "lifecycleState"; +pub const TAG_PROTOCOL: &str = "protocol"; + +pub const FIELD_ACTIVE_COUNT: &str = "activeCount"; +pub const FIELD_STORED_COUNT: &str = "storedCount"; +pub const FIELD_PENDING_COUNT: &str = "pendingCount"; +pub const FIELD_CRITICAL_COUNT: &str = "criticalCount"; +pub const FIELD_HAS_ACTIVE_FAULTS: &str = "hasActiveFaults"; +pub const FIELD_WORST_SEVERITY: &str = "worstSeverity"; +pub const FIELD_RAW_UDS_DTC: &str = "rawUdsDtc"; +pub const FIELD_STATUS_MASK: &str = "statusMask"; +pub const FIELD_DESCRIPTION: &str = "description"; +pub const FIELD_FIRST_SEEN: &str = "firstSeen"; +pub const FIELD_LAST_SEEN: &str = "lastSeen"; pub mod connection; #[cfg(feature = "writer")] diff --git a/components/influx-client/src/writer.rs b/components/influx-client/src/writer.rs index 49300fc..1f85efd 100644 --- a/components/influx-client/src/writer.rs +++ b/components/influx-client/src/writer.rs @@ -19,7 +19,7 @@ //! Provides means to write a Vehicle's current status properties //! to an InfluxDB as Influx *measurements*. -use fms_proto::fms::VehicleStatus; +use fms_proto::fms::{DiagnosticCode, DiagnosticStatus, VehicleStatus}; use influxrs::Measurement; use log::{debug, warn}; use protobuf::well_known_types::timestamp::Timestamp; @@ -322,4 +322,242 @@ impl InfluxWriter { } } } + + /// Writes DiagnosticStatus information as measurements to the InfluxDB server. + /// + /// Writes two measurement types: + /// - `diagnostic_summary` — one measurement with summary counts and flags. + /// - `diagnostic_code` — one measurement per DTC across active/stored/pending lists. + /// + /// This method is fire-and-forget: it logs failures internally and returns `()`. + pub async fn write_diagnostic_status(&self, diagnostic_status: &DiagnosticStatus) { + if diagnostic_status.vin.is_empty() { + debug!("ignoring diagnostic status without VIN ..."); + return; + } + + let created_ms = diagnostic_created_ms(diagnostic_status); + let mut measurements: Vec = Vec::new(); + + if let Some(measurement) = + build_diagnostic_summary_measurement(diagnostic_status, created_ms) + { + debug!("writing diagnostic_summary measurement to influxdb"); + measurements.push(measurement); + } + for measurement in build_diagnostic_code_measurements(diagnostic_status, created_ms) { + debug!("writing diagnostic_code measurement to influxdb"); + measurements.push(measurement); + } + + if !measurements.is_empty() { + if let Err(e) = self + .influx_con + .client + .write( + self.influx_con.bucket.as_str(), + measurements[..].try_into().unwrap(), + ) + .await + { + warn!("failed to write diagnostic data to influx: {e}"); + } + } + } +} + +// --------------------------------------------------------------------------- +// Diagnostic helper functions +// --------------------------------------------------------------------------- + +fn timestamp_to_millis(ts: Option<&Timestamp>) -> u128 { + ts.and_then(|t| { + >::into(t.clone()) + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_millis()) + }) + .unwrap_or_else(|| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + }) +} + +fn diagnostic_created_ms(diagnostic_status: &DiagnosticStatus) -> u128 { + timestamp_to_millis(diagnostic_status.created.as_ref()) +} + +fn normalize_tag(value: &str) -> &str { + if value.is_empty() { + "UNKNOWN" + } else { + value + } +} + +fn build_diagnostic_summary_measurement( + ds: &DiagnosticStatus, + created_ms: u128, +) -> Option { + let summary = ds.summary.as_ref()?; + + let builder = Measurement::builder(crate::MEASUREMENT_DIAGNOSTIC_SUMMARY) + .tag(crate::TAG_VIN, ds.vin.as_str()) + .tag(crate::TAG_SOURCE, normalize_tag(ds.source.as_str())) + .tag( + crate::TAG_COMPONENT_ID, + normalize_tag(ds.component_id.as_str()), + ) + .field(crate::FIELD_CREATED_DATE_TIME, created_ms) + .field(crate::FIELD_ACTIVE_COUNT, summary.active_count) + .field(crate::FIELD_STORED_COUNT, summary.stored_count) + .field(crate::FIELD_PENDING_COUNT, summary.pending_count) + .field(crate::FIELD_CRITICAL_COUNT, summary.critical_count) + .field(crate::FIELD_HAS_ACTIVE_FAULTS, summary.has_active_faults) + .field( + crate::FIELD_WORST_SEVERITY, + normalize_tag(summary.worst_severity.as_str()).to_string(), + ); + + match builder.build() { + Ok(measurement) => Some(measurement), + Err(e) => { + debug!("failed to create diagnostic_summary Measurement: {e}"); + None + } + } +} + +fn build_single_code_measurement( + code: &DiagnosticCode, + vin: &str, + created_ms: u128, +) -> Option { + let fallback_ms = created_ms; + let first_seen_ms = timestamp_to_millis(code.first_seen.as_ref()); + let last_seen_ms = timestamp_to_millis(code.last_seen.as_ref()); + let first_seen_ms = if first_seen_ms == 0 { + fallback_ms + } else { + first_seen_ms + }; + let last_seen_ms = if last_seen_ms == 0 { + fallback_ms + } else { + last_seen_ms + }; + + let builder = Measurement::builder(crate::MEASUREMENT_DIAGNOSTIC_CODE) + .tag(crate::TAG_VIN, vin) + .tag(crate::TAG_SOURCE, normalize_tag(code.source.as_str())) + .tag( + crate::TAG_COMPONENT_ID, + normalize_tag(code.component_id.as_str()), + ) + .tag(crate::TAG_ECU, normalize_tag(code.ecu.as_str())) + .tag(crate::TAG_CODE, normalize_tag(code.code.as_str())) + .tag(crate::TAG_SEVERITY, normalize_tag(code.severity.as_str())) + .tag( + crate::TAG_LIFECYCLE_STATE, + normalize_tag(code.lifecycle_state.as_str()), + ) + .tag(crate::TAG_PROTOCOL, normalize_tag(code.protocol.as_str())) + .field(crate::FIELD_CREATED_DATE_TIME, created_ms) + .field(crate::FIELD_RAW_UDS_DTC, code.raw_uds_dtc.clone()) + .field(crate::FIELD_STATUS_MASK, code.status_mask.clone()) + .field(crate::FIELD_DESCRIPTION, code.description.clone()) + .field(crate::FIELD_FIRST_SEEN, first_seen_ms) + .field(crate::FIELD_LAST_SEEN, last_seen_ms); + + match builder.build() { + Ok(measurement) => Some(measurement), + Err(e) => { + debug!("failed to create diagnostic_code Measurement: {e}"); + None + } + } +} + +fn build_diagnostic_code_measurements(ds: &DiagnosticStatus, created_ms: u128) -> Vec { + let vin = ds.vin.as_str(); + ds.active_codes + .iter() + .chain(ds.stored_codes.iter()) + .chain(ds.pending_codes.iter()) + .filter_map(|code| build_single_code_measurement(code, vin, created_ms)) + .collect() +} + +// --------------------------------------------------------------------------- +// Unit tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use fms_proto::fms::{DiagnosticCode, DiagnosticStatus, DiagnosticSummary}; + use protobuf::MessageField; + + fn make_status_with_summary(active: u32) -> DiagnosticStatus { + let mut summary = DiagnosticSummary::new(); + summary.active_count = active; + summary.stored_count = 0; + summary.pending_count = 0; + summary.critical_count = 0; + summary.has_active_faults = active > 0; + summary.worst_severity = "INFO".to_string(); + + let mut status = DiagnosticStatus::new(); + status.vin = "TEST-VIN".to_string(); + status.source = "test-source".to_string(); + status.component_id = "test-comp".to_string(); + status.summary = MessageField::some(summary); + status + } + + fn make_active_code(code: &str) -> DiagnosticCode { + let mut c = DiagnosticCode::new(); + c.code = code.to_string(); + c.lifecycle_state = "ACTIVE".to_string(); + c.severity = "CRITICAL".to_string(); + c.protocol = "UDS".to_string(); + c.source = "test-source".to_string(); + c.component_id = "test-comp".to_string(); + c.ecu = "test-ecu".to_string(); + c + } + + #[test] + fn test_summary_measurement_produced() { + let status = make_status_with_summary(3); + let m = build_diagnostic_summary_measurement(&status, 1000); + assert!(m.is_some(), "expected a summary measurement to be built"); + } + + #[test] + fn test_code_measurement_produced_for_active_dtc() { + let mut status = make_status_with_summary(1); + status.active_codes.push(make_active_code("0x123456")); + let measurements = build_diagnostic_code_measurements(&status, 1000); + assert_eq!(measurements.len(), 1); + } + + #[test] + fn test_empty_source_normalized_to_unknown() { + let mut status = make_status_with_summary(0); + status.source = String::new(); + let m = build_diagnostic_summary_measurement(&status, 1000); + assert!(m.is_some()); + // The measurement was built — tag normalization was applied. + } + + #[test] + fn test_empty_component_id_normalized_to_unknown() { + let mut status = make_status_with_summary(0); + status.component_id = String::new(); + let m = build_diagnostic_summary_measurement(&status, 1000); + assert!(m.is_some()); + } } diff --git a/docs/diagnostics-forwarding.md b/docs/diagnostics-forwarding.md new file mode 100644 index 0000000..788abdb --- /dev/null +++ b/docs/diagnostics-forwarding.md @@ -0,0 +1,247 @@ + + +# Diagnostics Code Forwarding + +This document describes the **parallel diagnostics pipeline** added alongside the existing rFMS/VehicleStatus flow. + +## Baseline Status + +The existing workspace build (`cargo build --workspace`) was verified to complete successfully in this environment. The `fms-proto` crate was previously not included in the workspace `members` list; it has been added as part of this feature. + +Cross-compilation via `rust-musl-cross` Docker images is not available in the sandbox environment and is noted as a limitation for Docker image builds. + +## Architecture + +``` +diagnostic-source-simulator + │ (writes VSS paths via Kuksa gRPC) + ▼ + Kuksa Databroker + (Vehicle.Diagnostics.* VSS paths) + │ (gRPC get_values poll) + ▼ +fms-diagnostics-forwarder + │ (uProtocol Notification, topic up://fms-diagnostics-forwarder/D110/1/D110) + ▼ + Zenoh Router + │ + ▼ +fms-diagnostics-consumer + │ (DiagnosticStatus protobuf decode) + ▼ + InfluxDB 2.7 + (diagnostic_summary, diagnostic_code measurements) + │ + ├──► Grafana FMS-Diagnostics dashboard + └──► fms-server /diagnostics/* REST API +``` + +The existing path is **unchanged**: +``` +csv-provider → Databroker → fms-forwarder → uProtocol → fms-consumer → InfluxDB → Grafana/rFMS API +``` + +## uProtocol Topic + +| Parameter | Value | +|-----------|-------| +| Topic (forwarder) | `up://fms-diagnostics-forwarder/D110/1/D110` | +| Topic filter (consumer) | `up://*/D110/1/D110` | +| Local uService (consumer) | `up://fms-diagnostics-consumer/D111/1/0` | + +`D110` (hex `0x0110` = 272) is the resource ID for diagnostics notifications. + +## Protobuf Messages (`fms.proto`) + +Three new messages are added to `proto/fms/v4/fms.proto` (package `fms.v4`): + +### `DiagnosticSummary` +| Field | Number | Type | Description | +|-------|--------|------|-------------| +| active_count | 1 | uint32 | Number of active DTCs | +| stored_count | 2 | uint32 | Number of stored DTCs | +| pending_count | 3 | uint32 | Number of pending DTCs | +| critical_count | 4 | uint32 | Number of critical DTCs | +| has_active_faults | 5 | bool | True when active_count > 0 | +| worst_severity | 6 | string | INFO \| WARNING \| CRITICAL \| UNKNOWN | + +### `DiagnosticCode` +| Field | Number | Type | Description | +|-------|--------|------|-------------| +| code | 1 | string | DTC code e.g. "0x123456" | +| raw_uds_dtc | 2 | string | Raw UDS DTC bytes | +| protocol | 3 | string | UDS \| SOVD \| OBD \| INTERNAL | +| status_mask | 4 | string | UDS status mask hex | +| description | 5 | string | Human-readable description | +| severity | 6 | string | INFO \| WARNING \| CRITICAL \| UNKNOWN | +| lifecycle_state | 7 | string | ACTIVE \| STORED \| PENDING \| CLEARED \| UNKNOWN | +| ecu | 8 | string | ECU identifier | +| component_id | 9 | string | Component identifier | +| source | 10 | string | Diagnostic source | +| first_seen | 11 | Timestamp | First observation time | +| last_seen | 12 | Timestamp | Last observation time | + +### `DiagnosticStatus` +| Field | Number | Type | Description | +|-------|--------|------|-------------| +| vin | 1 | string | Vehicle identification number | +| vehicle_id | 2 | string | Optional vehicle ID | +| source | 3 | string | Diagnostic source system | +| component_id | 4 | string | Component identifier | +| created | 5 | Timestamp | Message creation time | +| active_codes | 6 | repeated DiagnosticCode | Active DTCs | +| stored_codes | 7 | repeated DiagnosticCode | Stored DTCs | +| pending_codes | 8 | repeated DiagnosticCode | Pending DTCs | +| summary | 9 | DiagnosticSummary | Summary counts and flags | + +## Vehicle.Diagnostics VSS Paths + +The following paths are added to `spec/overlay/vss.json` under the `Vehicle.Diagnostics` branch: + +| Path | Type | Datatype | Description | +|------|------|----------|-------------| +| Vehicle.Diagnostics | branch | — | Diagnostic data | +| Vehicle.Diagnostics.ActiveDTCCount | sensor | uint16 | Active DTC count | +| Vehicle.Diagnostics.StoredDTCCount | sensor | uint16 | Stored DTC count | +| Vehicle.Diagnostics.PendingDTCCount | sensor | uint16 | Pending DTC count | +| Vehicle.Diagnostics.CriticalDTCCount | sensor | uint16 | Critical DTC count | +| Vehicle.Diagnostics.WorstSeverity | sensor | string | INFO/WARNING/CRITICAL/UNKNOWN | +| Vehicle.Diagnostics.LastUpdate | sensor | string | ISO timestamp of last update | +| Vehicle.Diagnostics.Source | sensor | string | Source system name | +| Vehicle.Diagnostics.ComponentId | sensor | string | Component identifier | +| Vehicle.Diagnostics.Ecu | sensor | string | ECU identifier | +| Vehicle.Diagnostics.LastCode | sensor | string | Most recent DTC code | +| Vehicle.Diagnostics.LastDescription | sensor | string | Description of last DTC | +| Vehicle.Diagnostics.LastStatusMask | sensor | string | Status mask of last DTC | +| Vehicle.Diagnostics.LastLifecycleState | sensor | string | ACTIVE/STORED/PENDING/CLEARED/UNKNOWN | +| Vehicle.Diagnostics.LastSeverity | sensor | string | INFO/WARNING/CRITICAL/UNKNOWN | +| Vehicle.Diagnostics.E2EV | branch | — | End-to-End Validation signals | +| Vehicle.Diagnostics.E2EV.CrcOk | sensor | boolean | CRC check result | +| Vehicle.Diagnostics.E2EV.AliveCounter | sensor | uint16 | Alive counter | +| Vehicle.Diagnostics.E2EV.LastFault | sensor | string | Last E2EV fault description | + +> **Note**: `spec/overlay/vss.json` was hand-edited because COVESA `vspec2json.py` regeneration is out of scope for this feature. + +## InfluxDB Measurements + +### `diagnostic_summary` + +| Type | Name | Description | +|------|------|-------------| +| tag | vin | Vehicle identification number | +| tag | source | Diagnostic source | +| tag | componentId | Component identifier | +| field | createdDateTime | Milliseconds since UNIX epoch | +| field | activeCount | Active DTC count | +| field | storedCount | Stored DTC count | +| field | pendingCount | Pending DTC count | +| field | criticalCount | Critical DTC count | +| field | hasActiveFaults | Boolean flag | +| field | worstSeverity | Worst severity string | + +### `diagnostic_code` + +One measurement per DTC across active/stored/pending codes. + +| Type | Name | Description | +|------|------|-------------| +| tag | vin | Vehicle identification number | +| tag | source | Diagnostic source | +| tag | componentId | Component identifier | +| tag | ecu | ECU identifier | +| tag | code | DTC code | +| tag | severity | Severity level | +| tag | lifecycleState | ACTIVE/STORED/PENDING/CLEARED/UNKNOWN | +| tag | protocol | UDS/SOVD/OBD/INTERNAL | +| field | createdDateTime | Milliseconds since UNIX epoch | +| field | rawUdsDtc | Raw UDS DTC bytes | +| field | statusMask | UDS status mask | +| field | description | Human-readable description | +| field | firstSeen | First seen timestamp (ms) | +| field | lastSeen | Last seen timestamp (ms) | + +## Compose Services + +Three new services are added to `fms-blueprint-compose.yaml`: + +| Service | Network(s) | Depends On | +|---------|-----------|------------| +| `diagnostic-source-simulator` | `fms-vehicle` | `databroker` | +| `fms-diagnostics-forwarder` | `fms-backend`, `fms-vehicle` | `databroker`, `diagnostic-source-simulator` | +| `fms-diagnostics-consumer` | `fms-backend` | `influxdb` (healthy) | + +Both `fms-diagnostics-forwarder` and `fms-diagnostics-consumer` get Zenoh overrides in `fms-blueprint-compose-zenoh.yaml`. + +## REST API Endpoints (`fms-server`) + +New routes under `/diagnostics/`: + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/diagnostics/vehicles` | List all VINs with diagnostic data | +| GET | `/diagnostics/vehicles/{vin}/summary` | Latest diagnostic summary for a VIN | +| GET | `/diagnostics/vehicles/{vin}/dtcs` | All DTCs (active + stored + pending) | +| GET | `/diagnostics/vehicles/{vin}/dtcs/active` | Active DTCs only | +| GET | `/diagnostics/vehicles/{vin}/timeline` | Recent summary points over time | + +### Example `curl` commands + +```bash +# List vehicles with diagnostic data +curl http://localhost:8081/diagnostics/vehicles + +# Get summary for a specific VIN +curl http://localhost:8081/diagnostics/vehicles/DEMO-VIN-001/summary + +# Get all DTCs for a VIN +curl http://localhost:8081/diagnostics/vehicles/DEMO-VIN-001/dtcs + +# Get active DTCs only +curl http://localhost:8081/diagnostics/vehicles/DEMO-VIN-001/dtcs/active + +# Get timeline of summaries +curl http://localhost:8081/diagnostics/vehicles/DEMO-VIN-001/timeline +``` + +## Grafana Dashboard + +The `FMS-Diagnostics.json` dashboard is provisioned alongside the existing `FMS-Fleet.json` via the `dashboards_from_filesystem.yaml` provisioner. Panels include: + +- Active DTC Count by VIN (time series) +- Worst Severity by VIN (stat panel) +- Active DTC table (table panel) +- E2EV CRC status (stat panel) +- DTC timeline (time series) + +## Known Limitations + +1. **Single last-code mapping (v1)**: The forwarder maps only one DTC from the databroker (the "last code") per poll cycle. Multi-DTC representation requires a richer VSS or a dedicated adapter. +2. **Zenoh-only transport**: Diagnostics uses Zenoh transport. Hono transport parity is a follow-up task (see `TODO` in `fms-blueprint-compose-hono.yaml`). +3. **OpenSOVD/OpenBSW adapter**: An adapter that translates OpenSOVD/OpenBSW diagnostic data into the Kuksa VSS paths is a follow-up item. +4. **VSS hand-edited**: `spec/overlay/vss.json` was hand-edited because COVESA `vspec2json.py` regeneration is out of scope for this feature. +5. **Docker image builds**: Cross-compilation via `rust-musl-cross` images was not tested in the sandbox environment. The Dockerfiles follow the same pattern as existing forwarder/consumer images. + +## TODO + +- Add Hono transport support to `fms-diagnostics-forwarder` and `fms-diagnostics-consumer`. +- Implement OpenSOVD/OpenBSW adapter to publish real diagnostic data to the Kuksa databroker. +- Support multiple simultaneous DTCs via an extended VSS representation. diff --git a/fms-blueprint-compose-hono.yaml b/fms-blueprint-compose-hono.yaml index ab81fc9..9ad7564 100644 --- a/fms-blueprint-compose-hono.yaml +++ b/fms-blueprint-compose-hono.yaml @@ -39,3 +39,7 @@ services: - type: "bind" source: "${FMS_FORWARDER_CONFIG_FOLDER}" target: "/app/config" + +# TODO: fms-diagnostics-forwarder and fms-diagnostics-consumer do not yet support Hono transport. +# The diagnostics pipeline is currently Zenoh-only. Hono support is a follow-up task. +# See docs/diagnostics-forwarding.md "Known Limitations" for details. diff --git a/fms-blueprint-compose-zenoh.yaml b/fms-blueprint-compose-zenoh.yaml index 76e86fe..b83bd92 100644 --- a/fms-blueprint-compose-zenoh.yaml +++ b/fms-blueprint-compose-zenoh.yaml @@ -49,6 +49,22 @@ services: volumes: - ./config/zenoh/config-client.json5:/zenoh-config.json5 + fms-diagnostics-forwarder: + command: "zenoh -c /zenoh-config.json5" + depends_on: + fms-zenoh-router: + condition: service_started + volumes: + - ./config/zenoh/config-client.json5:/zenoh-config.json5 + + fms-diagnostics-consumer: + command: "zenoh -c /zenoh-config.json5" + depends_on: + fms-zenoh-router: + condition: service_started + volumes: + - ./config/zenoh/config-client.json5:/zenoh-config.json5 + fleet-analysis-backend: build: context: "../../devices/backend-fleet-analysis-java" diff --git a/fms-blueprint-compose.yaml b/fms-blueprint-compose.yaml index 4513867..766d9ff 100644 --- a/fms-blueprint-compose.yaml +++ b/fms-blueprint-compose.yaml @@ -32,6 +32,8 @@ configs: file: "./grafana/provisioning/dashboards/dashboards_from_filesystem.yaml" grafana_fms_dashboard.json: file: "./grafana/dashboards/FMS-Fleet.json" + grafana_fms_diagnostics_dashboard.json: + file: "./grafana/dashboards/FMS-Diagnostics.json" vss_overlay.json: file: "./spec/overlay/vss.json" @@ -100,6 +102,9 @@ services: - source: "grafana_fms_dashboard.json" target: "/etc/dashboards/grafana_fms_dashboard.json" mode: 0644 + - source: "grafana_fms_diagnostics_dashboard.json" + target: "/etc/dashboards/grafana_fms_diagnostics_dashboard.json" + mode: 0644 volumes: - type: "volume" source: "grafana-datasources" @@ -212,3 +217,63 @@ services: RUST_LOG: "${FMS_FORWARDER_LOG_CONFIG:-info,fms_forwarder=info}" TRUST_STORE_PATH: "${FMS_FORWARDER_TRUST_STORE_PATH:-/etc/ssl/certs/ca-certificates.crt}" + diagnostic-source-simulator: + image: "ghcr.io/eclipse-sdv-blueprints/fleet-management/diagnostic-source-simulator:main" + build: + context: "./components" + dockerfile: "Dockerfile.diagnostic-source-simulator" + container_name: "diagnostic-source-simulator" + cap_drop: *default-drops + networks: + - "fms-vehicle" + depends_on: + databroker: + condition: service_started + environment: + KUKSA_DATABROKER_URI: "http://databroker:55556" + RUST_LOG: "${DIAG_SIMULATOR_LOG_CONFIG:-info}" + + fms-diagnostics-forwarder: + image: "ghcr.io/eclipse-sdv-blueprints/fleet-management/fms-diagnostics-forwarder:main" + build: + context: "./components" + dockerfile: "Dockerfile.fms-diagnostics-forwarder" + container_name: "fms-diagnostics-forwarder" + cap_drop: *default-drops + networks: + - "fms-backend" + - "fms-vehicle" + depends_on: + databroker: + condition: service_started + diagnostic-source-simulator: + condition: service_started + environment: + KUKSA_DATABROKER_URI: "http://databroker:55556" + DIAGNOSTIC_TOPIC: "up://fms-diagnostics-forwarder/D110/1/D110" + FMS_DIAGNOSTICS_FORWARDER_TIMER_INTERVAL: "${FMS_DIAGNOSTICS_FORWARDER_TIMER_INTERVAL:-2s}" + RUST_LOG: "${FMS_DIAGNOSTICS_FORWARDER_LOG_CONFIG:-info}" + + fms-diagnostics-consumer: + image: "ghcr.io/eclipse-sdv-blueprints/fleet-management/fms-diagnostics-consumer:main" + build: + context: "./components" + dockerfile: "Dockerfile.fms-diagnostics-consumer" + container_name: "fms-diagnostics-consumer" + cap_drop: *default-drops + networks: + - "fms-backend" + depends_on: + influxdb: + condition: service_healthy + env_file: + - "./influxdb/fms-demo.env" + environment: + INFLUXDB_TOKEN_FILE: "/tmp/fms-demo.token" + DIAGNOSTIC_TOPIC_FILTER: "up://*/D110/1/D110" + RUST_LOG: "${FMS_DIAGNOSTICS_CONSUMER_LOG_CONFIG:-info}" + volumes: + - type: "volume" + source: "influxdb-auth" + target: "/tmp" + read_only: true diff --git a/grafana/dashboards/FMS-Diagnostics.json b/grafana/dashboards/FMS-Diagnostics.json new file mode 100644 index 0000000..b196607 --- /dev/null +++ b/grafana/dashboards/FMS-Diagnostics.json @@ -0,0 +1,153 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { "type": "grafana", "uid": "-- Grafana --" }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { "limit": 100, "matchAny": false, "tags": [], "type": "dashboard" }, + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", "fillOpacity": 10, "gradientMode": "none", "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, "showPoints": "auto", "spanNulls": false }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "red", "value": 1 }] } + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "id": 1, + "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, "tooltip": { "mode": "single", "sort": "none" } }, + "targets": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "query": "from(bucket: v.defaultBucket)\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"diagnostic_summary\")\n |> filter(fn: (r) => r._field == \"activeCount\")\n |> group(columns: [\"vin\"])\n |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)", + "refId": "A" + } + ], + "title": "Active DTC Count by VIN", + "type": "timeseries" + }, + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [ + { "options": { "CRITICAL": { "color": "red", "index": 0 }, "WARNING": { "color": "yellow", "index": 1 }, "INFO": { "color": "green", "index": 2 }, "UNKNOWN": { "color": "gray", "index": 3 } }, "type": "value" } + ], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] } + }, + "overrides": [] + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "id": 2, + "options": { "colorMode": "background", "graphMode": "none", "justifyMode": "center", "orientation": "auto", "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "textMode": "value_and_name" }, + "targets": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "query": "from(bucket: v.defaultBucket)\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"diagnostic_summary\")\n |> filter(fn: (r) => r._field == \"worstSeverity\")\n |> group(columns: [\"vin\"])\n |> last()", + "refId": "A" + } + ], + "title": "Worst Severity by VIN", + "type": "stat" + }, + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "fieldConfig": { + "defaults": { "color": { "mode": "thresholds" }, "custom": { "align": "auto", "displayMode": "auto", "inspect": false }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "red", "value": 1 }] } }, + "overrides": [] + }, + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 8 }, + "id": 3, + "options": { "footer": { "fields": "", "reducer": ["sum"], "show": false }, "showHeader": true }, + "targets": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "query": "from(bucket: v.defaultBucket)\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"diagnostic_code\")\n |> filter(fn: (r) => r.lifecycleState == \"ACTIVE\")\n |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n |> group()", + "refId": "A" + } + ], + "title": "Active DTC Table", + "transformations": [ + { "id": "organize", "options": { "excludeByName": { "_start": true, "_stop": true, "result": true, "table": true }, "indexByName": {}, "renameByName": { "vin": "VIN", "code": "Code", "description": "Description", "severity": "Severity", "ecu": "ECU", "lifecycleState": "State" } } } + ], + "type": "table" + }, + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [{ "options": { "false": { "color": "green", "index": 0, "text": "OK" }, "true": { "color": "red", "index": 1, "text": "FAULT" } }, "type": "value" }], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] } + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 8, "x": 0, "y": 18 }, + "id": 4, + "options": { "colorMode": "background", "graphMode": "none", "justifyMode": "center", "orientation": "auto", "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "textMode": "value_and_name" }, + "targets": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "query": "from(bucket: v.defaultBucket)\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"diagnostic_summary\")\n |> filter(fn: (r) => r._field == \"hasActiveFaults\")\n |> group(columns: [\"vin\"])\n |> last()", + "refId": "A" + } + ], + "title": "E2EV CRC Status", + "type": "stat" + }, + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", "fillOpacity": 10, "gradientMode": "none", "lineInterpolation": "stepAfter", "lineWidth": 1, "pointSize": 5, "showPoints": "auto", "spanNulls": false }, + "mappings": [], + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }] } + }, + "overrides": [] + }, + "gridPos": { "h": 6, "w": 16, "x": 8, "y": 18 }, + "id": 5, + "options": { "legend": { "calcs": [], "displayMode": "list", "placement": "bottom", "showLegend": true }, "tooltip": { "mode": "single", "sort": "none" } }, + "targets": [ + { + "datasource": { "type": "influxdb", "uid": "PDC312342D5DCA611" }, + "query": "from(bucket: v.defaultBucket)\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"diagnostic_summary\")\n |> filter(fn: (r) => r._field == \"activeCount\" or r._field == \"storedCount\" or r._field == \"pendingCount\")\n |> group(columns: [\"vin\", \"_field\"])\n |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)", + "refId": "A" + } + ], + "title": "DTC Timeline", + "type": "timeseries" + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": ["diagnostics", "fms"], + "templating": { "list": [] }, + "time": { "from": "now-1h", "to": "now" }, + "timepicker": {}, + "timezone": "browser", + "title": "FMS Diagnostics", + "uid": "fms-diagnostics-001", + "version": 1, + "weekStart": "" +} diff --git a/grafana/dashboards/FMS-Diagnostics.json.license b/grafana/dashboards/FMS-Diagnostics.json.license new file mode 100644 index 0000000..59baed3 --- /dev/null +++ b/grafana/dashboards/FMS-Diagnostics.json.license @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/spec/overlay/fms.vspec b/spec/overlay/fms.vspec index 977a951..6afaf1c 100644 --- a/spec/overlay/fms.vspec +++ b/spec/overlay/fms.vspec @@ -538,3 +538,100 @@ Vehicle.TraveledDistanceHighRes: Vehicle.VehicleIdentification.VIN: datatype: string type: attribute + +# Diagnostic signals added for the diagnostics forwarding pipeline +Vehicle.Diagnostics: + type: branch + description: Diagnostic data for the vehicle, including DTCs (Diagnostic Trouble Codes). + +Vehicle.Diagnostics.ActiveDTCCount: + datatype: uint16 + type: sensor + description: Number of currently active Diagnostic Trouble Codes. + +Vehicle.Diagnostics.StoredDTCCount: + datatype: uint16 + type: sensor + description: Number of stored Diagnostic Trouble Codes. + +Vehicle.Diagnostics.PendingDTCCount: + datatype: uint16 + type: sensor + description: Number of pending Diagnostic Trouble Codes. + +Vehicle.Diagnostics.CriticalDTCCount: + datatype: uint16 + type: sensor + description: Number of critical-severity Diagnostic Trouble Codes. + +Vehicle.Diagnostics.WorstSeverity: + datatype: string + type: sensor + allowed: [INFO, WARNING, CRITICAL, UNKNOWN] + description: Worst severity level among all active DTCs. + +Vehicle.Diagnostics.LastUpdate: + datatype: string + type: sensor + description: ISO 8601 timestamp of the last diagnostic update. + +Vehicle.Diagnostics.Source: + datatype: string + type: sensor + description: Name of the diagnostic source system. + +Vehicle.Diagnostics.ComponentId: + datatype: string + type: sensor + description: Identifier of the component that generated the diagnostic data. + +Vehicle.Diagnostics.Ecu: + datatype: string + type: sensor + description: ECU identifier associated with the last DTC. + +Vehicle.Diagnostics.LastCode: + datatype: string + type: sensor + description: Most recently observed Diagnostic Trouble Code. + +Vehicle.Diagnostics.LastDescription: + datatype: string + type: sensor + description: Human-readable description of the most recently observed DTC. + +Vehicle.Diagnostics.LastStatusMask: + datatype: string + type: sensor + description: UDS status mask of the most recently observed DTC. + +Vehicle.Diagnostics.LastLifecycleState: + datatype: string + type: sensor + allowed: [ACTIVE, STORED, PENDING, CLEARED, UNKNOWN] + description: Lifecycle state of the most recently observed DTC. + +Vehicle.Diagnostics.LastSeverity: + datatype: string + type: sensor + allowed: [INFO, WARNING, CRITICAL, UNKNOWN] + description: Severity of the most recently observed DTC. + +Vehicle.Diagnostics.E2EV: + type: branch + description: End-to-End Validation (E2EV) signals. + +Vehicle.Diagnostics.E2EV.CrcOk: + datatype: boolean + type: sensor + description: Indicates whether the last E2EV CRC check passed. + +Vehicle.Diagnostics.E2EV.AliveCounter: + datatype: uint16 + type: sensor + description: Monotonically incrementing counter used for E2EV liveness detection. + +Vehicle.Diagnostics.E2EV.LastFault: + datatype: string + type: sensor + description: Description of the last E2EV fault detected. diff --git a/spec/overlay/vss.json b/spec/overlay/vss.json index a83f3c7..a8f3564 100644 --- a/spec/overlay/vss.json +++ b/spec/overlay/vss.json @@ -9024,6 +9024,122 @@ "description": "Overall vehicle width.", "type": "attribute", "unit": "mm" + }, + "Diagnostics": { + "description": "Diagnostic data for the vehicle, including DTCs (Diagnostic Trouble Codes).", + "type": "branch", + "children": { + "ActiveDTCCount": { + "datatype": "uint16", + "description": "Number of currently active Diagnostic Trouble Codes.", + "type": "sensor" + }, + "StoredDTCCount": { + "datatype": "uint16", + "description": "Number of stored Diagnostic Trouble Codes.", + "type": "sensor" + }, + "PendingDTCCount": { + "datatype": "uint16", + "description": "Number of pending Diagnostic Trouble Codes.", + "type": "sensor" + }, + "CriticalDTCCount": { + "datatype": "uint16", + "description": "Number of critical-severity Diagnostic Trouble Codes.", + "type": "sensor" + }, + "WorstSeverity": { + "allowed": [ + "INFO", + "WARNING", + "CRITICAL", + "UNKNOWN" + ], + "datatype": "string", + "description": "Worst severity level among all active DTCs.", + "type": "sensor" + }, + "LastUpdate": { + "datatype": "string", + "description": "ISO 8601 timestamp of the last diagnostic update.", + "type": "sensor" + }, + "Source": { + "datatype": "string", + "description": "Name of the diagnostic source system.", + "type": "sensor" + }, + "ComponentId": { + "datatype": "string", + "description": "Identifier of the component that generated the diagnostic data.", + "type": "sensor" + }, + "Ecu": { + "datatype": "string", + "description": "ECU identifier associated with the last DTC.", + "type": "sensor" + }, + "LastCode": { + "datatype": "string", + "description": "Most recently observed Diagnostic Trouble Code.", + "type": "sensor" + }, + "LastDescription": { + "datatype": "string", + "description": "Human-readable description of the most recently observed DTC.", + "type": "sensor" + }, + "LastStatusMask": { + "datatype": "string", + "description": "UDS status mask of the most recently observed DTC.", + "type": "sensor" + }, + "LastLifecycleState": { + "allowed": [ + "ACTIVE", + "STORED", + "PENDING", + "CLEARED", + "UNKNOWN" + ], + "datatype": "string", + "description": "Lifecycle state of the most recently observed DTC.", + "type": "sensor" + }, + "LastSeverity": { + "allowed": [ + "INFO", + "WARNING", + "CRITICAL", + "UNKNOWN" + ], + "datatype": "string", + "description": "Severity of the most recently observed DTC.", + "type": "sensor" + }, + "E2EV": { + "description": "End-to-End Validation (E2EV) signals.", + "type": "branch", + "children": { + "CrcOk": { + "datatype": "boolean", + "description": "Indicates whether the last E2EV CRC check passed.", + "type": "sensor" + }, + "AliveCounter": { + "datatype": "uint16", + "description": "Monotonically incrementing counter used for E2EV liveness detection.", + "type": "sensor" + }, + "LastFault": { + "datatype": "string", + "description": "Description of the last E2EV fault detected.", + "type": "sensor" + } + } + } + } } }, "description": "High-level vehicle data.",