Skip to content

refactor: replace the collectd mapper with local flows#4136

Open
didier-wenzek wants to merge 14 commits intothin-edge:mainfrom
didier-wenzek:refactor/replace-collectd-mapper-with-flows
Open

refactor: replace the collectd mapper with local flows#4136
didier-wenzek wants to merge 14 commits intothin-edge:mainfrom
didier-wenzek:refactor/replace-collectd-mapper-with-flows

Conversation

@didier-wenzek
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek commented Apr 16, 2026

Proposed changes

The goal of this PR is to deprecate the collectd mapper providing the tools to implement the same using flows

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@didier-wenzek didier-wenzek added theme:monitoring Theme: Service monitoring and watchdogs theme:flows labels Apr 16, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
896 1 4 897 99.89 2h43m45.296348s

Failed Tests

Name Message ⏱️ Duration Suite
Connect to Cumulocity MQTT Service endpoint basic auth tedge connect c8y returned an unexpected exit code stdout: stderr: Connecting to Cumulocity: mapper configuration file: /etc/tedge/mappers/c8y/mapper.toml device id: TST_pass_absolute_ambush cloud profile: <none> cloud host: qaenvironment.eu-latest.cumulocity.com:9883 auth type: Basic credentials path: /etc/tedge/credentials.toml bridge: built-in service manager: systemd mosquitto version: 2.0.11 proxy: Not configured Updating bridge rule templates... ✓ Creating device in Cumulocity cloud... ✗ error: Connection error while creating device in Cumulocity: Connection refused, return code: NotAuthorized [RETRY] FAIL on 3. retry. 48.168 s Tedge Connect Mqtt Service

@didier-wenzek didier-wenzek force-pushed the refactor/replace-collectd-mapper-with-flows branch from a8ba280 to e460f13 Compare April 20, 2026 14:15
@didier-wenzek didier-wenzek force-pushed the refactor/replace-collectd-mapper-with-flows branch from e460f13 to e3a4f91 Compare April 21, 2026 11:22
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 91.22807% with 20 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/common/batcher/src/flows.rs 91.97% 10 Missing and 1 partial ⚠️
crates/extensions/tedge_flows/src/flow.rs 43.75% 9 Missing ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines -30 to -35
Check running collectd
Service Should Be Running collectd

Is collectd publishing MQTT messages?
${messages}= Should Have MQTT Messages topic=collectd/# minimum=1 maximum=None

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks have been moved to the suite setup. They are not system tests but prerequisite checks.

@@ -0,0 +1,18 @@
// Translate an array of thin-edge measurements into a single message with grouped measurements
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider promoting this script to a builtin transformer: grouping measurements is a quite common use case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a good addition. The input topic would also have to be in the tedge measurements format, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input topic would also have to be in the tedge measurements format, right?

I still have to see what's the best approach to decompose the flow into independent steps that can be re-used / re-combined.

Comment on lines +23 to +27
let batch_config = BatchConfigBuilder::new()
.event_jitter(500)
.delivery_jitter(400) // Heuristic delay that should work out well on a Rpi
.message_leap_limit(500)
.build();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configuration parameters are simply hairy to get correct, the batching algorithm being complicated and fragile with two independent clocks (batching time and event time). This complexity is furthermore useless in practice, the best way to avoid too many collectd measurements being to reduce the collection frequency.

@didier-wenzek didier-wenzek force-pushed the refactor/replace-collectd-mapper-with-flows branch from dedbc3a to e00dafc Compare April 22, 2026 13:29
Comment on lines +28 to +32
fn event_time(&self) -> OffsetDateTime {
self.timestamp
.map(|t| t.into())
.unwrap_or_else(OffsetDateTime::now_utc)
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This defeats the purpose of the batcher: i.e using the processing time to batch message, ignoring the event time.

I will fix that with a flow config giving a JSON path to the message event time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, this code will be let unchanged, i.e. the timestamp of the message being used as the event time.

For that purpose the flow engine has been improved. Now, message timestamps can be set by a script and used by the subsequent steps.

type Key = String;

fn key(&self) -> Self::Key {
self.topic.clone()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the topic as a key make the batcher transformer a bit complicated to use: the previous step needs to be aware that the topic is used as a key by the batcher.

I will fix that with a way to configure the transformer with a path to the key.

Copy link
Copy Markdown
Member

@rina23q rina23q left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess still code is in progress, so I reviewed the documentation changes.

## Enabling the systemd watchdog feature for a tedge service

Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az/collectd) is a two-step process.
Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az) is a two-step process.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unrelated) Out of curiosity, The watchdog feature doesn't work also for aws and local mappers?
I found this document is actually unlisted, so maybe it doesn't matter how precise information we have here.

Comment thread docs/src/operate/troubleshooting/log-files.md Outdated
Comment thread docs/src/operate/configuration/mosquitto-configuration.md Outdated
Run `tedge_mapper --debug aws` to log more debug messages
:::

### Device monitoring logs {#device-logs}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can also remove the section "Collectd logs" below (inside "Third-party component logs"). Instead, we can mention how to see the log of collectd in the new repository for the collectd flow.

Install Collectd
Execute Command
... sudo apt-get update && sudo apt-get install -y --no-install-recommends collectd-core libmosquitto1
Execute Command sudo cp /etc/tedge/contrib/collectd/collectd.conf /etc/collectd/collectd.conf
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this collectd.conf file is also a candidate to move out of the thin-edge.io repository later.

@didier-wenzek didier-wenzek force-pushed the refactor/replace-collectd-mapper-with-flows branch from e00dafc to 9bb4852 Compare April 23, 2026 08:28
${start_time}= Get Unix Timestamp
Execute Command
... tedge mqtt pub collectd/localhost/temperature/temp1 "`date +%s.%N`:50" && tedge mqtt pub collectd/localhost/temperature/temp2 "`date +%s.%N`:40" && tedge mqtt pub collectd/localhost/pressure/pres1 "`date +%s.%N`:10" && tedge mqtt pub collectd/localhost/pressure/pres2 "`date +%s.%N`:20"
Execute Command tedge mqtt pub collectd/localhost/temperature/temp1 "`date +%s.%N`:50"
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the chaining of the pub commands in the former version was done to minimise the timestamp drift between the first measurement and the last measurement, to make sure that they get batched together. So, this version might end up being flaky.

This was the point that we always wanted to improve, at least by making the batch window configurable, so that we don't have to rely on fragile tricks like that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this version might end up being flaky.

This is not what I observe:

$ invoke flake-finder --test-name  "Check grouping of measurements" --iterations 25 --outputdir output_ff --clean

------------------------------
Overall: PASSED
Results: 25 iterations, 25 passed, 0 failed
Elapsed time: 0:10:34.572869

This was the point that we always wanted to improve, at least by making the batch window configurable,

I will do that in this PR.

let grouped_measurements = {}

for (let measurement of measurements) {
Object.assign(grouped_measurements, measurement.payload)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would get the timestamp wrong, no? The timestamp of the last measurement would replace the former ones, while the expectation is for the batch to keep the timestamp of the first measurement that triggered the batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not say this is wrong. Not your expectation apparently, but as meaningful.

Anyway, this script is only used for testing purposes and will be replaced by a builtin transformer.

An the joke side. According this comment, messages must happen in a blink to be batched! So first or last timestamp should not be a big deal ;-)

@@ -0,0 +1,18 @@
// Translate an array of thin-edge measurements into a single message with grouped measurements
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a good addition. The input topic would also have to be in the tedge measurements format, right?

The fact that the batcher is always initialized with default values
highlights that something is wrong. It's really difficult to pick sensible values.
And in practice nobody care.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The following sections have still to be deeply rewritten:
- Getting Started / A tour of thin-edge.io / Monitor the device
- Getting Started / Monitoring
- Operate Devices / Troubleshooting / Device Monitoring

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
A value created in JS using `new Date()` cannot be directly returned to the flow engine.
A script has then to invoke `valueOf()` method to assign a date to a message timestamp.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek force-pushed the refactor/replace-collectd-mapper-with-flows branch from 89338e2 to 080f3a6 Compare April 24, 2026 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

theme:flows theme:monitoring Theme: Service monitoring and watchdogs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants