diff --git a/Cargo.lock b/Cargo.lock index b1e5a737..ad985d16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -360,6 +360,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + [[package]] name = "arrayvec" version = "0.7.6" @@ -642,6 +648,20 @@ dependencies = [ "digest", ] +[[package]] +name = "blake3" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", + "cpufeatures", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1871,9 +1891,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "env_filter" -version = "0.1.4" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" dependencies = [ "log", "regex", @@ -1887,9 +1907,9 @@ checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" [[package]] name = "env_logger" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" dependencies = [ "anstream", "anstyle", @@ -1987,6 +2007,21 @@ dependencies = [ "users 0.11.0", ] +[[package]] +name = "filescream" +version = "0.1.0" +source = "git+https://github.com/tinythings/filescream.git?branch=master#36633c3b91018bbace7ea3113f09e0df275e90e6" +dependencies = [ + "bitflags 2.10.0", + "blake3", + "globset", + "hashbrown 0.16.1", + "ignore", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -2445,6 +2480,8 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" dependencies = [ + "allocator-api2", + "equivalent", "foldhash 0.2.0", ] @@ -3430,6 +3467,30 @@ dependencies = [ "tokio-task-scheduler", ] +[[package]] +name = "libsensors" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "colored", + "dashmap", + "env_logger", + "fastrand", + "filescream", + "indexmap 2.13.0", + "lazy_static", + "libcommon", + "libsysinspect", + "log", + "serde", + "serde_json", + "serde_yaml", + "tempfile", + "tokio", + "walkdir", +] + [[package]] name = "libsetup" version = "0.1.0" @@ -7100,6 +7161,7 @@ dependencies = [ "libcommon", "libdpq", "libmodpak", + "libsensors", "libsetup", "libsysinspect", "libsysproto", @@ -7183,9 +7245,9 @@ checksum = "b1dd07eb858a2067e2f3c7155d54e929265c264e6f37efe3ee7a8d1b5a1dd0ba" [[package]] name = "tempfile" -version = "3.24.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", "getrandom 0.3.4", diff --git a/Cargo.toml b/Cargo.toml index 555ced08..8e4e469e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,8 @@ members = [ "sysclient", "libdpq", "libsysproto", - "libcommon"] + "libcommon", + "libsensors"] [profile.release] strip = true diff --git a/docs/eventsensors/fsnotify.rst b/docs/eventsensors/fsnotify.rst new file mode 100644 index 00000000..0de4d7ae --- /dev/null +++ b/docs/eventsensors/fsnotify.rst @@ -0,0 +1,94 @@ +``fsnotify``: React to Filesystem Events +================================================= + +The ``fsnotify`` sensor uses the fsnotify library to monitor file system events. +It can detect events such as file creation, modification, deletion, and more. +This sensor is useful for monitoring specific directories or files for changes. + +Synopsis +-------- + +Sensor configuration as follows: + +.. code-block:: + + : + [profile]: + - + description: + listener: fsnotify + opts: + - # created | changed | deleted + args: + path: + tag: # optional, default is fsnotify + +``profile`` +^^^^^^^^^^^ + + **Optional** + + The list of profiles to which this sensor belongs. If current Minion is attached to + any other profile, the sensor will be inactive. + +``description`` +^^^^^^^^^^^^^^^ + + A human-readable description of the sensor. + +``listener`` +^^^^^^^^^^^^ + + The type of listener used by the sensor. In this case, it is ``fsnotify``. + +``opts`` +^^^^^^^^^^ + + A list of file events to monitor. Possible values include: + + - ``created``: Triggered when a file is created. + - ``changed``: Triggered when a file is modified. + - ``deleted``: Triggered when a file is deleted. + +``args`` +^^^^^^^^^^ + Arguments specific to the listener. For the ``fsnotify`` sensor, the following argument is required: + + - ``path``: The path to the file or directory to monitor. + +``tag`` +^^^^^^^^^^ + + An optional tag to associate with the event. If specified, the event name will include this tag, + allowing for easier identification and filtering of events. Example: + + .. code-block:: yaml + + tag: my-tag + + In case event is defined as ``some-id`` listening to ``/tmp`` directory, then if ``/tmp/foobar`` file + is created, this results to the following event name: + + .. code-block:: text + + some-id|fsnotify@my-tag|created@/tmp/foobar|0 + +Example +------- + +Here is an example of how to use the ``fsnotify`` sensor to monitor a directory for file creation events: + +.. code-block:: yaml + + ssh_config_change: + description: Monitor SSH configuration changes + listener: fsnotify + opts: + - changed + args: + path: /etc/ssh/sshd_config + + # If defined, an extra tag will be added to the event name: + # ssh_config_change|fsnotify@my-tag|changed@/etc/ssh/sshd_config|0 + tag: my-tag + diff --git a/docs/eventsensors/overview.rst b/docs/eventsensors/overview.rst new file mode 100644 index 00000000..6d306044 --- /dev/null +++ b/docs/eventsensors/overview.rst @@ -0,0 +1,153 @@ +Event Sensors +============== + +.. note:: + + This document explains how to use **event sensors** and how Sysinspect routes events to handlers. + +Overview +======== + +Every time in your system something happens. That "something" is an event. SysInspect can react to specific +events by listening to them in real time. This allows you to build powerful monitoring and automation solutions +that respond to changes in your system as they happen. + +Sensors are defined separately from the models. While models are downloaded and refreshed on demand, every time +minion is calling them, sensors are always running in the background, listening for events right as Minion starts. +Sensor configuration cannot be changed on the fly just like that, because they are always running. Sensor updates +are applied on the next Minion restart or when you issue cluster sync command: + +.. code-block:: bash + + sysinspect --sync + +This is because sensors facility must stop listening to the whole system, sync configuration, pick a proper profile +and start again. + +Sensors, just like models, are merged together from a different snippets. On a Master, they are placed in the ``sensors`` +subdirectory. By default it is under ``/etc/sysinspect/data/sensors``. The rest of the directory structure is completely +up to you. They are merged together into a single configuration on the Master and then distributed to Minions. + +.. important:: + + If you accidentally define same IDs with the different configurations, the rule is "first wins" — the files are sorted + alphabetically. + + +Synopsis +-------- + +There are few important things to keep in mind when working with sensors: + +1. **Local Mode Behavior**: When sensors are defined in your configuration, the local version of Sysinspect + will run continuously in the foreground. To run it as a background daemon, use the ``--daemon`` flag + with the ``sysinspect`` command. + +2. **Event Handler Responsibility**: It is the responsibility of the user to define what actions should be + taken when events occur. Sensors themselves are purely passive listeners; they only emit the captured + event data in JSON format to the event handler. No automatic actions are triggered by sensors alone. + +3. **Separation of Concerns**: Sensors are independent components and not part of any data model. They serve + as the listening configuration layer for a Sysinspect agent (whether SysMinion or local Sysinspect mode). + Their sole purpose is to detect and report system events. The decision of what to do in response, which + models to invoke, or whether to take any action at all, is entirely up to the user. + +4. **Event Model Integration**: Event definitions are separated from models and moved into their own sensors + configuration namespace. Once an event is emitted, it can be routed to any handler, which can then trigger + models or other actions based on the event data. + + +Sensor Configuration Syntax +----------------------------- + +The following shows the complete structure for configuring sensors. This YAML-based syntax defines how +sensors listen for events and emit notifications: + +.. code-block:: text + + sensors: + interval: + max: int # default 10 + min: int # default 3 + unit: seconds|minutes|hours + + : + profile: + - + - + description: + listener: + interval: + opts: + - + - + args: + key: + event: + +``profile`` +^^^^^^^^^^^ + + **Optional** + + This is the list of Minion profiles that matches the sensor. For example, if a minion has profile ``QNX``, then a sensor with + the profile ``Linux`` won't be picked up. IF not defined, **all sensors** will be applied. + +``description`` +^^^^^^^^^^^^^^^ + + A human-readable description of the sensor. This is purely informational and does not affect the sensor's behavior. + +``listener`` +^^^^^^^^^^^^ + + This specifies the type of implemented supported listener that the sensor uses to detect events. + Refer to the list of supported listeners in the documentation for more details on how to configure each listener type. + +``interval`` +^^^^^^^^^^^^ + + Typically, all sensors are made on polling, which means they check for events at regular intervals. This is a pragmatic choice + to provide a simple and consistent way to detect events across different systems and use cases. The ``interval`` field allows + you to specify how often the sensor should check for events. You can set it globally for all sensors or override it for individual sensors. + + .. important:: + + Most sensors are **using polling** under the hood, in order to provide a consistent and simple way to detect events across different + systems and use cases. + + The interval can be set manually or calculated randomly between a minimum and maximum value. This is useful to avoid thundering + herd problems when you have many sensors running at the same time. + +``opts`` +^^^^^^^^^^ + + **Optional** + + This is a list of options specific to the listener type. Please refer to the documentation of a specific sensor. + +``args`` +^^^^^^^^^^ + + **Optional** + + This is a dictionary of additional arguments that the listener might require. Please refer to the documentation + of a specific sensor for details on what arguments are needed. + +``event`` +^^^^^^^^^^ + + A sensor typically emits an event, constructing its own event path, based on the sensor specifics and implementation. + However, in some corner cases it might be necessary to define (override) the event ID with a specific static value. + + Highly **not recommended** to use on a regular basis. But the feature is here if needed. + +Available Sensors +================= + +Here are the available sensors. This list is not exhaustive and may be updated as new sensors are added: + +.. toctree:: + :maxdepth: 1 + + fsnotify diff --git a/docs/evthandlers/action_event_routing.rst b/docs/evthandlers/action_event_routing.rst new file mode 100644 index 00000000..3c27e9d2 --- /dev/null +++ b/docs/evthandlers/action_event_routing.rst @@ -0,0 +1,66 @@ +Action Events Routing +===================== + +.. important:: + + 🚨 If you define the filter format incorrectly, you won't get any reaction and the event will be ignored. + Additionally, **no logging will be emitted** unless in debug mode. + +An event is represented as a single positional string, delimited by ``|``: + +.. code-block:: text + + action|entity|state|exit-code + +Where: + +* ``action`` is the action ID (e.g., a check you run) +* ``entity`` is the bound entity ID (e.g., a host/service/thing you target) +* ``state`` is the state ID inside the action (e.g., ok/warn/crit or your own state name) +* ``exit-code`` is the result indicator: + + * ``0`` = success + * ``E`` = error/failure + * ``$`` = any exit code (match everything) + +Given the module structure below, you can write event filters like this: + +.. code-block:: yaml + + actions: + my-great-action: # This is the action ID + ... + bind: + - some-entity # This is the entity ID + state: + some-state: # This is the state ID + ... + + events: + my-great-action|some-entity|some-state|0: # React only on successes + ... + + my-great-action|some-entity|some-state|E: # React only on errors + ... + + my-great-action|some-entity|some-state|$: # React on all exit codes + ... + +Each segment can be wildcarded using ``$``. + +Example: ``my-action|$|some-state|0`` + +This will trigger only when: + +* the action is ``my-action`` +* the state is ``some-state`` +* the result exit code is ``0`` (success) +* the entity can be anything (because of the ``$``) + +More practical examples: + +* ``$|$|$|E``: catch *all* errors, regardless of action/entity/state +* ``deploy|prod|$|$``: catch all deploy events for the ``prod`` entity, any state, any exit code +* ``backup|$|$|0``: catch all successful backups + +If you truly want to catch everything from everywhere, use ``$|$|$|$``. diff --git a/docs/evthandlers/console_logger.rst b/docs/evthandlers/console_logger.rst index 16ea3b57..4cdc6e66 100644 --- a/docs/evthandlers/console_logger.rst +++ b/docs/evthandlers/console_logger.rst @@ -46,7 +46,7 @@ Example events: # Capture all events - $/$/$/$: + $|$|$|$: handlers: console-logger diff --git a/docs/evthandlers/outcome_logger.rst b/docs/evthandlers/outcome_logger.rst index f52e6ada..9593bf4e 100644 --- a/docs/evthandlers/outcome_logger.rst +++ b/docs/evthandlers/outcome_logger.rst @@ -36,7 +36,7 @@ Example events: # Capture all events - $/$/$/$: + $|$|$|$: handlers: outcome-logger diff --git a/docs/evthandlers/overview.rst b/docs/evthandlers/overview.rst index 0ca69738..d252d391 100644 --- a/docs/evthandlers/overview.rst +++ b/docs/evthandlers/overview.rst @@ -1,5 +1,5 @@ -Event Handlers -============== +Sensors and Events Routing +========================== .. note:: @@ -21,72 +21,18 @@ handler that can do useful things like: You configure handlers by defining an **event filter** (what to match) and a **handler action** (what to do). If the filter matches, the handler runs. If it doesn't match, nothing happens. -Event Filter Format -------------------- -.. important:: +Routing Events to Handlers +--------------------------- - 🚨 If you define the filter format incorrectly, you won't get any reaction and the event will be ignored. - Additionally, **no logging will be emitted** unless in debug mode. +Actions and sensors produce different events with different payloads and are handled slightly differently. +You can route those events to handlers by defining event filters. -An event is represented as a single path-like string: - -.. code-block:: text - - action/entity/state/exit-code - -Where: - -* ``action`` is the action ID (e.g., a check you run) -* ``entity`` is the bound entity ID (e.g., a host/service/thing you target) -* ``state`` is the state ID inside the action (e.g., ok/warn/crit or your own state name) -* ``exit-code`` is the result indicator: - - * ``0`` = success - * ``E`` = error/failure - * ``$`` = any exit code (match everything) - -Given the module structure below, you can write event filters like this: - -.. code-block:: yaml - - actions: - my-great-action: # This is the action ID - ... - bind: - - some-entity # This is the entity ID - state: - some-state: # This is the state ID - ... - - events: - my-great-action/some-entity/some-state/0: # React only on successes - ... - - my-great-action/some-entity/some-state/E: # React only on errors - ... - - my-great-action/some-entity/some-state/$: # React on all exit codes - ... - -Each segment can be wildcarded using ``$``. - -Example: ``my-action/$/some-state/0`` - -This will trigger only when: - -* the action is ``my-action`` -* the state is ``some-state`` -* the result exit code is ``0`` (success) -* the entity can be anything (because of the ``$``) - -More practical examples: - -* ``$/$/$/E``: catch *all* errors, regardless of action/entity/state -* ``deploy/prod/$/$``: catch all deploy events for the ``prod`` entity, any state, any exit code -* ``backup/$/$/0``: catch all successful backups +.. toctree:: + :maxdepth: 1 -If you truly want to catch everything from everywhere, use ``$/$/$/$``. + action_event_routing + sensor_event_routing Available Handlers diff --git a/docs/evthandlers/pipeline.rst b/docs/evthandlers/pipeline.rst index d2855702..62681f27 100644 --- a/docs/evthandlers/pipeline.rst +++ b/docs/evthandlers/pipeline.rst @@ -34,7 +34,7 @@ How It Works When an action finishes, SysInspect emits an event that contains the action result (return code, stdout/stderr, and any structured ``data`` the action produced). The *pipeline* handler listens for events that match your event filter. -Most pipelines are wired to successful runs only, by using the ``.../0`` return-code filter (``0`` = success). With that +Most pipelines are wired to successful runs only, by using the ``...|0`` return-code filter (``0`` = success). With that setup, a failed action (non-zero return code) simply will not match the filter, so the pipeline won't run for it. You'll still see the failure in logs, and you can add a separate event filter if you also want to react to failures. @@ -140,7 +140,7 @@ So instead of hard-coding ``data:500``, you map it from the event like ``data: $ events: # Only react to successful actions (return code 0) - $/$/$/0: + $|$|$|0: handlers: - pipeline diff --git a/docs/evthandlers/pipescript.rst b/docs/evthandlers/pipescript.rst index 57502e83..6773555b 100644 --- a/docs/evthandlers/pipescript.rst +++ b/docs/evthandlers/pipescript.rst @@ -33,7 +33,7 @@ Options ------- ``program`` -^^^^^^^^^^ +^^^^^^^^^^^^ This is the full command line for the script or program you want to run. The handler will send the action's output to this program through STDIN. For example: @@ -78,7 +78,7 @@ Here is an example of how to set up the pipescript handler in your configuration events: # Only react to successful actions (return code 0) - $/$/$/0: + $|$|$|0: handlers: pipescript diff --git a/docs/evthandlers/sensor_event_routing.rst b/docs/evthandlers/sensor_event_routing.rst new file mode 100644 index 00000000..7591d9df --- /dev/null +++ b/docs/evthandlers/sensor_event_routing.rst @@ -0,0 +1,54 @@ +Sensor Events Routing +===================== + +.. important:: + + 🚨 If you define the filter format incorrectly, you won't get any reaction and the event will be ignored. + Additionally, **no logging will be emitted** unless your system is running in debug mode. + +A sensor event is a single positional string, delimited by ``|``: + +.. code-block:: text + + sensor-id|listener|action[@specifier]|exit-code + +Where: + +* ``sensor-id`` is the ID of the sensor that emitted the event (e.g., ``my-tmp-dir`` — a check you run) +* ``listener`` is the listener ID that processed the sensor (e.g., ``fsnotify`` — the sensor plugin that runs the check) +* ``action`` is the action the sensor emits. For example, for ``fsnotify`` listener, the action is the event type like ``create``, ``modify``, ``delete`` +* ``specifier`` is an optional part that can be used to further specify the event details, like a specific file path or a glob pattern. For example, ``create@/tmp/$``. +* ``exit-code`` is always ``0`` for sensor events, since they don't have a return code like actions do + +An event routing is essentially defined like this: + +.. code-block:: yaml + + sensors: + tmp-watch: + ... + + events: + tmp-watch|some-sensor|result-action|0: + ... + +Each segment can be wildcarded using ``$``. +Example: ``tmp-watch|fsnotify|$|0`` + +Events for sensors are defined with the following synopsis: + +.. code-block:: text + + events: + ||action[@specifier]|0: + handlers: + - handler-id + - handler-id + - ... + + handler-id: + # handler configuration + ... + +Each event handler has its own configuration. Please refer to the documentation of each handler for more +details on how to set them up and use them effectively. diff --git a/docs/global_config.rst b/docs/global_config.rst index 421ccafa..665c24d8 100644 --- a/docs/global_config.rst +++ b/docs/global_config.rst @@ -201,14 +201,29 @@ Below are directives for the configuration of the File Server service: Type: **string** - Relative path where are the master models kept. + Relative path where are the master models kept. Default: ``/models``. ``fileserver.models`` ###################### Type: **list** - List of subdirectories within ``fileserver.models.root``, exporting models. If a model is not + List of subdirectories within ``fileserver.models``, exporting models. If a model is not + in the list, it will not be available for the minions. + +``fileserver.sensors.root`` +########################### + + Type: **string** + + Relative path where are the master sensors kept. Default: ``/sensors``. + +``fileserver.sensors`` +###################### + + Type: **list** + + List of subdirectories within ``fileserver.sensors``, exporting sensors. If a sensor is not in the list, it will not be available for the minions. ``api.enabled`` diff --git a/docs/index.rst b/docs/index.rst index f3a2faad..680481b7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -35,6 +35,7 @@ welcome—see the section on contributing for how to get involved. moddev/overview moddescr/overview evthandlers/overview + eventsensors/overview uix/ui apidoc/overview diff --git a/libdpq/examples/dpq-demo.rs b/libdpq/examples/dpq-demo.rs index 23cf2b70..74b81895 100644 --- a/libdpq/examples/dpq-demo.rs +++ b/libdpq/examples/dpq-demo.rs @@ -13,6 +13,9 @@ async fn main() -> Result<(), Box> { WorkItem::MasterCommand(msg) => { println!("JOB {job_id}: got MasterMessage: {msg:#?}"); } + WorkItem::EventCommand(msg) => { + println!("JOB {job_id}: got EventMessage: {msg:#?}"); + } } if let Err(e) = q3.ack(job_id) { diff --git a/libdpq/src/lib.rs b/libdpq/src/lib.rs index 62d7a58c..905aaea7 100644 --- a/libdpq/src/lib.rs +++ b/libdpq/src/lib.rs @@ -8,8 +8,7 @@ use tokio::sync::mpsc; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum WorkItem { MasterCommand(MasterMessage), - // Next: - // Kick(KickRequest), + EventCommand(MasterMessage), } /// Disk-backed persistent queue for background job processing. diff --git a/libsensors/Cargo.toml b/libsensors/Cargo.toml new file mode 100644 index 00000000..5a5a0388 --- /dev/null +++ b/libsensors/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "libsensors" +version = "0.1.0" +edition = "2024" + +[lib] +name = "libsensors" +path = "src/lib.rs" + +[dependencies] +anyhow = "1.0.101" +async-trait = "0.1.89" +dashmap = "6.1.0" +env_logger = "0.11.9" +fastrand = "2.3.0" +filescream = {git = "https://github.com/tinythings/filescream.git", branch = "master"} +indexmap = { version = "2.13.0", features = ["serde"] } +lazy_static = "1.5.0" +log = "0.4.29" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" +serde_yaml = "0.9.34" +tempfile = "3.25.0" +tokio = { version = "1.49.0", features = ["full"] } +walkdir = "2.5.0" +libsysinspect = { path = "../libsysinspect"} +libcommon = { path = "../libcommon" } +colored = "3.1.1" diff --git a/libsensors/Makefile b/libsensors/Makefile new file mode 100644 index 00000000..768a224e --- /dev/null +++ b/libsensors/Makefile @@ -0,0 +1,73 @@ +# Makefile — run all examples in this subcrate + +CARGO ?= cargo +PROFILE ?= debug +RUST_LOG ?= info + +# Optional extras: +# make run-svc ARGS="--foo bar" +# make run-all FEATURES="something" NO_DEFAULT_FEATURES=1 + +ARGS ?= +FEATURES ?= +NO_DEFAULT_FEATURES ?= + +# Feature flags assembly +FEATURE_FLAGS := +ifneq ($(strip $(FEATURES)),) +FEATURE_FLAGS += --features "$(FEATURES)" +endif +ifeq ($(NO_DEFAULT_FEATURES),1) +FEATURE_FLAGS += --no-default-features +endif + +# Profile flags +ifeq ($(PROFILE),release) +PROFILE_FLAG := --release +else +PROFILE_FLAG := +endif + +EXAMPLES := svc loader + +.PHONY: help list examples run-all $(addprefix run-,$(EXAMPLES)) clean + +help: + @echo "Targets:" + @echo " make list - list known examples" + @echo " make run-all - run all examples" + @echo " make run-svc - run examples/svc.rs" + @echo " make run-loader - run examples/loader.rs" + @echo "" + @echo "Vars:" + @echo " PROFILE=debug|release (default: debug)" + @echo " RUST_LOG=info|debug|trace (default: info)" + @echo " ARGS='...' (passed after --)" + @echo " FEATURES='a b' (cargo --features)" + @echo " NO_DEFAULT_FEATURES=1 (cargo --no-default-features)" + @echo "" + @echo "Examples:" + @echo " make run-all RUST_LOG=debug" + @echo " make run-svc ARGS='--config ./test.json' RUST_LOG=trace" + @echo " make run-loader PROFILE=release FEATURES='foo'" + +list examples: + @echo $(EXAMPLES) + +run-all: + @set -e; \ + for ex in $(EXAMPLES); do \ + echo "==> running example: $$ex"; \ + $(MAKE) --no-print-directory run-$$ex; \ + done + +run-svc: + @echo "RUST_LOG=$(RUST_LOG) PROFILE=$(PROFILE) FEATURES='$(FEATURES)'" + @RUST_LOG=$(RUST_LOG) $(CARGO) run $(PROFILE_FLAG) $(FEATURE_FLAGS) --example svc -- $(ARGS) + +run-loader: + @echo "RUST_LOG=$(RUST_LOG) PROFILE=$(PROFILE) FEATURES='$(FEATURES)'" + @RUST_LOG=$(RUST_LOG) $(CARGO) run $(PROFILE_FLAG) $(FEATURE_FLAGS) --example loader -- $(ARGS) + +clean: + @$(CARGO) clean diff --git a/libsensors/etc/sensors.cfg b/libsensors/etc/sensors.cfg new file mode 100644 index 00000000..33760a9a --- /dev/null +++ b/libsensors/etc/sensors.cfg @@ -0,0 +1,162 @@ +# Conditions: +# +# 1. If sensors are defined, local version runs forever +# Use --daemon to "sysinspect" to go background in local mode. +# +# 2. It is up to luser define what to do on the events. Sensors only +# emit the data in JSON to the event handler and nothing else. +# +# 3. Sensors are not a part of any model. It is the listening configuration +# of a Sysinspect agent (SysMinion or Sysinspect local mode). Their purpose +# is merely scream if stuff happens. What to do, what model to call (or not) +# is all up to luser. +# +# 4. Events must be removed from the model itself, because they are always the same +# anyways and must be part of sensors now. +# + +# SYNOPSIS +# sensors: +# interval: +# max: 10 +# min: 3 +# unit: seconds|minutes|hours +# +# : +# profile: +# - +# - +# description: +# listener: +# interval: +# opts: +# - +# - +# args: +# key: +# +# event: + +sensors: + # If interval is not specified in the sensor explicitly, + # A random one within this range will be selected + interval: + min: 400 + max: 5000 + unit: milliseconds + + # A random ID + tmp-watch: + # Default if not defined at all + profile: + - default + + description: Watches /tmp for changes + listener: fsnotify + opts: + - created + - changed + - deleted + args: + # interval: 5 + # unit: second + + # Listener-specific arg + path: /tmp + + # Extra tag + tag: foobar + + pid-watch: + profile: + - system + - default + description: If a particular process appears + listener: process + opts: + - appeared + args: + interval: 5 + unit: second + + # Listener-specific arg + process: vivaldi-bin + + # If event is not explicitly overridden, then default event ID is used + # in the following format: + # + # event: listener/id/process:[opts]/0 + # + # In this particular case it will be the following: + # + # event: process/pid-watch/vivaldi-bin:appeared/0 + # + # To capture that — one must be setting up corresponding event handler + # what to do on that event. + + io: + description: Measure peak of disk[s] IO + listener: loadaverage + opts: + - disk-io + + # Possible options + # - free-mem + # - free-swap + # - cpu-load + args: + # A random interval. See above. + + # Listener-specific arg + threshold: ">10" + + logged-in-users: + description: Capture if anyone logged into the system via SSH + listener: users + opts: + # React if any of these appears + - any + args: + # By UID + uid: + - 0 + - 10650 + + # By user name + uname: + - john + + syslog-watch: + description: Watching a syslod for a specific pattern + listener: syslog + opts: + - any + args: + pattern: + - "snap-store" + - "E: Failed to fetch https://mirror.company.com" + + diskfree: + description: See if we have enough disk space + listener: disk + opts: + - free + args: + threshold: ">5GB" + +# Events are almost identical to events in models, but they are emitted by sensors and not by model actions. +# The difference is the error code doesn't returned, but rather the result from a sensor. For example, +# fsnotify will return the actual file or path configured. This way you can setup one sensor but capture +# different events from it and route them differently. +events: + $|$|$|$: + handlers: + - console-logger + - outcome-logger + + console-logger: + concise: false + prefix: Single Call + + outcome-logger: + prefix: Single Call diff --git a/libsensors/examples/loader.rs b/libsensors/examples/loader.rs new file mode 100644 index 00000000..369d6de6 --- /dev/null +++ b/libsensors/examples/loader.rs @@ -0,0 +1,14 @@ +use libsensors::load; +use std::path::Path; + +fn main() { + let spec = match load(Path::new(".")) { + Ok(s) => s, + Err(e) => { + eprintln!("Error loading sensor specifications: {}", e); + return; + } + }; + + println!("{:#?}", spec); +} diff --git a/libsensors/examples/svc.rs b/libsensors/examples/svc.rs new file mode 100644 index 00000000..b7c4a6d7 --- /dev/null +++ b/libsensors/examples/svc.rs @@ -0,0 +1,16 @@ +use libsensors::{load, service::SensorService}; +use std::path::Path; + +#[tokio::main] +async fn main() { + env_logger::init(); + let spec = load(Path::new(".")).unwrap(); + let mut svc = SensorService::new(spec); + + let _handles = svc.start(); + + // keep process alive + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } +} diff --git a/libsensors/src/lib.rs b/libsensors/src/lib.rs new file mode 100644 index 00000000..3021b4c4 --- /dev/null +++ b/libsensors/src/lib.rs @@ -0,0 +1,90 @@ +pub mod sensors; +pub mod service; +pub mod sspec; + +use crate::sspec::{IntervalRange, SensorConf, SensorSpec}; +use indexmap::IndexMap; +use libcommon::SysinspectError; +use serde::Deserialize; +use std::{fs, path::Path}; +use walkdir::WalkDir; + +#[derive(Deserialize)] +struct Wrapper { + sensors: Option, + #[serde(default)] + events: Option, +} + +pub fn load(p: &Path) -> Result { + log::info!("Loading sensor specifications from {}", p.display()); + + let mut interval: Option = None; + let mut sensors: IndexMap = IndexMap::new(); + let mut events: Option = None; + + let mut chunks: Vec<_> = WalkDir::new(p) + .into_iter() + .filter_map(Result::ok) + .filter(|e| e.file_type().is_file()) + .filter(|e| e.path().extension().and_then(|x| x.to_str()) == Some("cfg")) + .map(|e| e.into_path()) + .collect(); + + chunks.sort(); + + for path in chunks { + log::debug!("Loading sensors chunk: {}", path.display()); + let w: Wrapper = match serde_yaml::from_str(&fs::read_to_string(&path)?) { + Ok(p) => p, + Err(err) => { + log::warn!("Skipping invalid DSL file {}: {}", path.display(), err); + continue; + } + }; + + let Some(mut spec) = w.sensors else { + continue; + }; + + // first interval wins + if interval.is_none() { + interval = spec.interval_range().cloned(); + } else if spec.interval_range().is_some() { + log::warn!("Interval already defined. Ignoring interval in {}", path.display()); + } + + // first sensor wins + for (k, v) in spec.items() { + if sensors.contains_key(&k) { + log::warn!("Duplicate sensor '{}' in {} ignored (first wins)", k, path.display()); + continue; + } + sensors.insert(k.clone(), v.clone()); + } + + // first events block wins (same rule as interval) + if events.is_none() { + events = w.events.clone(); + } else if w.events.is_some() { + log::warn!("Events already defined. Ignoring events in {}", path.display()); + } + } + + // Sort sensors alphabetically + let mut sorted: Vec<_> = sensors.into_iter().collect(); + sorted.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut sensors = IndexMap::new(); + for (k, v) in sorted { + sensors.insert(k, v); + } + + let mut out = SensorSpec::new(interval, sensors); + + if let Some(ev) = events { + out.set_events_yaml(ev)?; // we’ll add this setter in sspec.rs + } + + Ok(out) +} diff --git a/libsensors/src/sensors/fsnotify.rs b/libsensors/src/sensors/fsnotify.rs new file mode 100644 index 00000000..855090e6 --- /dev/null +++ b/libsensors/src/sensors/fsnotify.rs @@ -0,0 +1,99 @@ +use super::sensor::{Sensor, SensorEvent}; +use crate::sspec::SensorConf; +use async_trait::async_trait; +use filescream::events::{Callback, EventMask, FileScreamEvent}; +use filescream::{FileScream, FileScreamConfig}; +use serde_json::json; +use std::{fmt, time::Duration}; +use tokio::sync::mpsc; + +#[derive(Clone)] +pub struct FsNotifySensor { + sid: String, + cfg: SensorConf, +} + +impl fmt::Debug for FsNotifySensor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FsNotifySensor").field("sid", &self.sid).field("listener", &self.cfg.listener()).finish() + } +} + +impl FsNotifySensor { + fn arg_str(cfg: &SensorConf, key: &str) -> Option { + cfg.args().get(key).and_then(|v| v.as_str()).map(|s| s.to_string()) + } + + fn arg_u64(cfg: &SensorConf, key: &str) -> Option { + cfg.args().get(key).and_then(|v| v.as_i64()).map(|i| i as u64) + } +} + +#[async_trait] +impl Sensor for FsNotifySensor { + fn new(id: String, cfg: SensorConf) -> Self { + Self { sid: id, cfg } + } + + fn id() -> String { + "fsnotify".to_string() + } + + async fn run(&self, emit: &(dyn Fn(SensorEvent) + Send + Sync)) { + // required args + let Some(path) = Self::arg_str(&self.cfg, "path") else { + log::warn!("fsnotify '{}' missing args.path; not starting", self.sid); + return; + }; + + let pulse = self.cfg.interval().unwrap_or_else(|| Duration::from_secs(3)); + log::info!("fsnotify '{}' watching '{}' with pulse {:?} and opts {:?}", self.sid, path, pulse, self.cfg.opts()); + + let mut fs = FileScream::new(Some(FileScreamConfig::default().pulse(pulse))); + fs.watch(&path); + + // EventMask + let mut mask = EventMask::empty(); + for o in self.cfg.opts() { + match o.as_str() { + "created" => mask |= EventMask::CREATED, + "changed" => mask |= EventMask::CHANGED, + "deleted" => mask |= EventMask::REMOVED, + _ => { + log::warn!("fsnotify '{}' unknown opt '{}'", self.sid, o); + } + } + } + + let cb = Callback::new(mask).on(|ev| async move { + match ev { + FileScreamEvent::Created { path } => Some(json!({"action":"created","file":path.to_string_lossy()})), + FileScreamEvent::Changed { path } => Some(json!({"action":"changed","file":path.to_string_lossy()})), + FileScreamEvent::Removed { path } => Some(json!({"action":"deleted","file":path.to_string_lossy()})), + } + }); + fs.add_callback(cb); + + // Channel to receive callback JSON + let (tx, mut rx) = mpsc::channel::(0xfff); + fs.set_callback_channel(tx); + + tokio::spawn(fs.run()); + + // Forward results + while let Some(r) = rx.recv().await { + let action = r.get("action").and_then(|v| v.as_str()).unwrap_or("unknown"); + let file = r.get("file").and_then(|v| v.as_str()).unwrap_or("unknown"); + + let lstid = format!("{}{}{}", FsNotifySensor::id(), if self.cfg.tag().is_none() { "" } else { "@" }, self.cfg.tag().unwrap_or("")); + let eid = format!("{}|{}|{}@{}|{}", self.sid, lstid, action, file, 0); + + (emit)(json!({ + "eid": eid, + "sensor": self.sid, + "listener": FsNotifySensor::id(), + "data": r, + })); + } + } +} diff --git a/libsensors/src/sensors/mod.rs b/libsensors/src/sensors/mod.rs new file mode 100644 index 00000000..64afaadf --- /dev/null +++ b/libsensors/src/sensors/mod.rs @@ -0,0 +1,25 @@ +pub mod fsnotify; +pub mod sensor; + +use crate::{sensors::sensor::Sensor, sspec::SensorConf}; +use dashmap::DashMap; +use lazy_static::lazy_static; + +pub type SensorFactory = fn(String, SensorConf) -> Box; +pub type SensorRegistry = DashMap; + +lazy_static! { + pub static ref REGISTRY: SensorRegistry = DashMap::new(); +} + +pub fn init_sensor(listener: &str, sid: String, cfg: SensorConf) -> Option> { + REGISTRY.get(listener).map(|f| f(sid, cfg)) +} + +pub fn init_registry() { + if !REGISTRY.is_empty() { + return; + } + + REGISTRY.insert(fsnotify::FsNotifySensor::id(), |sid: String, cfg: SensorConf| Box::new(fsnotify::FsNotifySensor::new(sid, cfg))); +} diff --git a/libsensors/src/sensors/sensor.rs b/libsensors/src/sensors/sensor.rs new file mode 100644 index 00000000..055cc698 --- /dev/null +++ b/libsensors/src/sensors/sensor.rs @@ -0,0 +1,18 @@ +use async_trait::async_trait; +use serde_json::Value; +use std::fmt::Debug; + +pub type SensorEvent = Value; + +#[async_trait] +pub trait Sensor: Debug + Send + Sync { + fn new(id: String, cfg: crate::sspec::SensorConf) -> Self + where + Self: Sized; + + fn id() -> String + where + Self: Sized; + + async fn run(&self, emit: &(dyn Fn(SensorEvent) + Send + Sync)); +} diff --git a/libsensors/src/service.rs b/libsensors/src/service.rs new file mode 100644 index 00000000..c873657f --- /dev/null +++ b/libsensors/src/service.rs @@ -0,0 +1,66 @@ +use crate::sensors; +use crate::sspec::SensorSpec; +use colored::Colorize; +use libsysinspect::intp::actproc::response::ActionResponse; +use libsysinspect::reactor::evtproc::EventProcessor; +use serde_json::Value as JsonValue; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +pub struct SensorService { + spec: SensorSpec, + reactor: Option>>, +} + +impl SensorService { + pub fn new(spec: SensorSpec) -> Self { + sensors::init_registry(); + Self { spec, reactor: None } + } + + /// Start all sensors in the service spec, returning a list of JoinHandles for the running tasks. + pub fn start(&mut self) -> Vec> { + let reactor = self.reactor.clone(); + let mut handles = Vec::new(); + + for (sid, cfg) in self.spec.items() { + log::debug!("Starting sensor '{}' with listener '{}'", sid, cfg.listener()); + + let Some(sensor) = sensors::init_sensor(cfg.listener(), sid.to_string(), cfg.clone()) else { + log::error!("Unknown sensor listener '{}' for '{}'", cfg.listener(), sid); + continue; + }; + + log::info!("Initialized sensor '{}'", format!("{}/{}", sid, cfg.listener()).bright_yellow()); + + let sid = sid.to_string(); + let reactor = reactor.clone(); + + let emit = move |ev: JsonValue| { + log::debug!("Registering event from sensor to reactor: {ev}"); + + let Some(reactor) = reactor.clone() else { + log::warn!("No reactor attached for sensor '{}': {}", sid, ev); + return; + }; + + let eid = ev.get("eid").and_then(|v| v.as_str()).map(|s| s.to_string()).unwrap_or_else(|| sid.clone()); + tokio::spawn(async move { + let response = ActionResponse::from_sensor(ev); + reactor.lock().await.receiver().register(eid, response); + }); + }; + + handles.push(tokio::spawn(async move { + sensor.run(&emit).await; + })); + } + + handles + } + + pub fn set_event_processor(&mut self, events: Arc>) { + self.reactor = Some(events); + } +} diff --git a/libsensors/src/sspec.rs b/libsensors/src/sspec.rs new file mode 100644 index 00000000..2f9cafe4 --- /dev/null +++ b/libsensors/src/sspec.rs @@ -0,0 +1,269 @@ +use indexmap::IndexMap; +use libcommon::SysinspectError; +use libsysinspect::intp::conf::EventsConfig; +use serde::Deserialize; +use serde_yaml::Value as YamlValue; +use std::{ + mem, + str::FromStr, + time::{Duration, SystemTime}, +}; + +/// Global default interval range from `sensors.interval`. +#[derive(Debug, Deserialize, Clone)] +/// Represents a range of intervals with a unit. +/// +/// This struct defines minimum and maximum bounds for time intervals and +/// provides methods to normalize and pick random values within the range. +pub struct IntervalRange { + /// Minimum interval value + pub min: u64, + /// Maximum interval value + pub max: u64, + /// Unit of time (e.g., "seconds", "ms", "minutes") + pub unit: String, +} + +impl IntervalRange { + /// Returns a tuple of (min, max) with min <= max and both >= 1. + /// + /// Normalizes the interval range by ensuring both values are at least 1 + /// and that min <= max. If the input has min > max, they are swapped. + /// + /// # Returns + /// + /// A tuple `(min, max)` where `min <= max` and both values are >= 1. + /// + /// # Examples + /// + /// ``` + /// let range = IntervalRange { min: 5, max: 10, unit: "seconds".to_string() }; + /// assert_eq!(range.range(), (5, 10)); + /// ``` + /// + /// ``` + /// let range = IntervalRange { min: 10, max: 5, unit: "seconds".to_string() }; + /// assert_eq!(range.range(), (5, 10)); // swapped + /// ``` + pub fn range(&self) -> (u64, u64) { + let mut a = self.min.max(1); + let mut b = self.max.max(1); + if a > b { + mem::swap(&mut a, &mut b); + } + (a, b) + } + + /// Picks a random value within the interval range using the current system time as seed. + /// + /// This method normalizes the interval range and generates a pseudo-random value + /// within that range using the system time in nanoseconds as the random seed. + /// + /// # Returns + /// + /// A random `u64` value within the normalized interval range (inclusive). + /// + /// # Examples + /// + /// ``` + /// let range = IntervalRange { min: 1, max: 10, unit: "seconds".to_string() }; + /// let value = range.pick(); + /// assert!(value >= 1 && value <= 10); + /// ``` + pub fn pick(&self) -> u64 { + let (min, max) = self.range(); + fastrand::u64(min..=max) + } +} + +#[derive(Debug, Deserialize)] +pub struct SensorSpec { + #[serde(default)] + interval: Option, + + #[serde(flatten)] + items: IndexMap, + + #[serde(skip)] + updated: bool, // Marker that items were updated with the interval + + #[serde(skip)] + events: Option, // EventConfig placeholder, added later +} + +impl SensorSpec { + pub fn new(interval: Option, items: IndexMap) -> Self { + SensorSpec { interval, items, updated: false, events: None } + } + + /// For loader merge (first wins). + pub fn interval_range(&self) -> Option<&IntervalRange> { + self.interval.as_ref() + } + + fn pick_range(min: u64, max: u64) -> u64 { + let mut a = min.max(1); + let mut b = max.max(1); + if a > b { + std::mem::swap(&mut a, &mut b); + } + fastrand::seed(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64); + fastrand::u64(a..=b) + } + + fn global_range(&self) -> IntervalRange { + self.interval.clone().unwrap_or(IntervalRange { min: 3, max: 10, unit: "seconds".to_string() }) + } + + fn u2d(v: u64, unit: &str) -> Duration { + match unit.to_lowercase().as_str() { + "ms" | "msec" | "millisecond" | "milliseconds" => Duration::from_millis(v), + "s" | "sec" | "second" | "seconds" => Duration::from_secs(v), + "m" | "min" | "minute" | "minutes" => Duration::from_secs(v.saturating_mul(60)), + "h" | "hr" | "hour" | "hours" => Duration::from_secs(v.saturating_mul(60 * 60)), + _ => Duration::from_secs(v), + } + } + + pub fn interval(&self) -> Duration { + let range = self.global_range(); + let mut min = range.min.max(1); + let mut max = range.max.max(1); + if min > max { + std::mem::swap(&mut min, &mut max); + } + Self::u2d(Self::pick_range(min, max), &range.unit) + } + + /// Updates sensorconf with the interval, if not defined + pub fn items(&mut self) -> IndexMap { + if self.updated { + return self.items.clone(); + } + + let range = self.global_range(); + for (_, config) in self.items.iter_mut() { + if config.interval().is_none() { + let mut c = config.clone(); + c.interval = Some(Self::u2d(Self::pick_range(range.min, range.max), &range.unit)); + *config = c; + } + } + self.updated = true; + self.items.clone() + } + + /// Get a sensor settings configuration by its name + pub fn sensor_config(&self, name: &str) -> Option<&SensorConf> { + self.items.get(name) + } + + /// Get the entire events configuration, if defined. This is added later by the loader. + pub fn events_config(&self) -> Option<&EventsConfig> { + self.events.as_ref() + } + + pub fn set_events_yaml(&mut self, ev: serde_yaml::Value) -> Result<(), SysinspectError> { + let mut cfg = EventsConfig::default(); + cfg.set_events(&ev)?; + self.events = Some(cfg); + Ok(()) + } +} + +#[derive(Debug, Deserialize, Clone)] +pub struct SensorConf { + #[serde(default)] + profile: Option>, + + #[serde(default)] + description: Option, + + listener: String, + + #[serde(default)] + opts: Vec, + + #[serde(default)] + args: YamlValue, + + #[serde(default)] + tag: Option, + + #[serde(default)] + interval: Option, +} + +impl SensorConf { + /// Returns the profile list, defaulting to `["default"]` if not specified. + /// + /// All profile names are converted to lowercase for consistent comparison. + pub fn profile(&self) -> Vec { + self.profile.clone().unwrap_or_else(|| vec!["default".to_string()]).into_iter().map(|p| p.to_lowercase()).collect() + } + + /// Checks if any of this sensor's profiles match any in the provided list (case-insensitive). + /// + /// # Arguments + /// + /// * `profiles` - A slice of profile names to match against + /// + /// # Returns + /// + /// `true` if there is at least one matching profile, `false` otherwise. + pub fn matches_profile(&self, profiles: &[String]) -> bool { + self.profile().iter().any(|tag| profiles.iter().any(|m| m.eq_ignore_ascii_case(tag))) + } + + /// Returns the sensor's description, if defined. + pub fn description(&self) -> Option<&str> { + self.description.as_deref() + } + + /// Returns the listener name for this sensor configuration. + pub fn listener(&self) -> &str { + &self.listener + } + + /// Returns the command-line options for this sensor. + pub fn opts(&self) -> &[String] { + &self.opts + } + + /// Returns the YAML arguments for this sensor. + pub fn args(&self) -> &YamlValue { + &self.args + } + + /// Returns the tag associated with this sensor, if defined. + pub fn tag(&self) -> Option<&str> { + self.tag.as_deref() + } + + /// Returns the interval duration for this sensor, if defined. + pub fn interval(&self) -> Option { + self.interval + } +} + +impl FromStr for SensorSpec { + type Err = SysinspectError; + + fn from_str(s: &str) -> Result { + #[derive(Deserialize)] + struct Wrapper { + sensors: SensorSpec, + #[serde(default)] + events: Option, + } + let w = serde_yaml::from_str::(s)?; + let mut spec = w.sensors; + if let Some(ev) = w.events { + let mut cfg = EventsConfig::default(); + cfg.set_events(&ev)?; + spec.events = Some(cfg); + } + + Ok(spec) + } +} diff --git a/libsensors/tests/loader_merge.rs b/libsensors/tests/loader_merge.rs new file mode 100644 index 00000000..051e7dbb --- /dev/null +++ b/libsensors/tests/loader_merge.rs @@ -0,0 +1,132 @@ +mod loader_merge_test { + use libsensors::load; + use std::{fs, path::Path}; + use tempfile::TempDir; + + fn write(p: &Path, name: &str, content: &str) { + fs::write(p.join(name), content).unwrap(); + } + + #[test] + fn test_loader_merges_recursive_cfg_files_and_sorts() { + let td = TempDir::new().unwrap(); + let root = td.path(); + + fs::create_dir_all(root.join("my-crappy-sensors")).unwrap(); + fs::create_dir_all(root.join("my-better-sensors/sub")).unwrap(); + + write( + root.join("my-crappy-sensors").as_path(), + "a.cfg", + r#" +sensors: + interval: + min: 3 + max: 10 + unit: seconds + + zebra: + listener: file + + alpha: + listener: process +"#, + ); + + write( + root.join("my-better-sensors/sub").as_path(), + "b.cfg", + r#" +sensors: + beta: + listener: disk +"#, + ); + + write(root, "nope.txt", "sensors: { }"); + write(root, "bad.cfg", "this: is: not: valid: yaml: ["); + + let mut spec = load(root).unwrap(); + + // interval range from first file + let ir = spec.interval_range().unwrap(); + assert_eq!(ir.min, 3); + assert_eq!(ir.max, 10); + assert_eq!(ir.unit, "seconds"); + + // keys sorted alphabetically + let keys: Vec = spec.items().keys().cloned().collect(); + assert_eq!(keys, vec!["alpha".to_string(), "beta".to_string(), "zebra".to_string()]); + } + + #[test] + fn test_loader_first_wins_interval_and_sensor_id() { + let td = TempDir::new().unwrap(); + let root = td.path(); + + fs::create_dir_all(root.join("x")).unwrap(); + fs::create_dir_all(root.join("y")).unwrap(); + + write( + root.join("x").as_path(), + "1.cfg", + r#" +sensors: + interval: + min: 1 + max: 2 + unit: seconds + + dup: + listener: file +"#, + ); + + write( + root.join("y").as_path(), + "2.cfg", + r#" +sensors: + interval: + min: 99 + max: 100 + unit: hours + + dup: + listener: process +"#, + ); + + let mut spec = load(root).unwrap(); + + // first interval wins + let ir = spec.interval_range().unwrap(); + assert_eq!(ir.min, 1); + assert_eq!(ir.max, 2); + assert_eq!(ir.unit, "seconds"); + + // first sensor wins + let dup = spec.items().get("dup").cloned().unwrap(); + assert_eq!(dup.listener(), "file"); + } + + #[test] + fn test_loader_ignores_cfg_without_sensors_key() { + let td = TempDir::new().unwrap(); + let root = td.path(); + + write( + root, + "x.cfg", + r#" +not_sensors: + a: + listener: file +"#, + ); + + let mut spec = load(root).unwrap(); + assert_eq!(spec.items().len(), 0); + assert!(spec.interval_range().is_none()); + } +} diff --git a/libsensors/tests/profile_match.rs b/libsensors/tests/profile_match.rs new file mode 100644 index 00000000..47e589b9 --- /dev/null +++ b/libsensors/tests/profile_match.rs @@ -0,0 +1,46 @@ +mod profile_match_test { + use libsensors::sspec::SensorSpec; + use std::str::FromStr; + + #[test] + fn test_matches_profile_default_when_missing() { + let y = r#" +sensors: + a: + listener: file +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + let c = items.get("a").unwrap(); + + let profiles = vec!["default".to_string()]; + assert!(c.matches_profile(&profiles)); + + let profiles2 = vec!["banana".to_string()]; + assert!(!c.matches_profile(&profiles2)); + } + + #[test] + fn test_matches_profile_case_insensitive() { + let y = r#" +sensors: + a: + listener: file + profile: [BrownStinkyBanana] +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + let c = items.get("a").unwrap(); + + let profiles = vec!["brownstinkybanana".to_string()]; + assert!(c.matches_profile(&profiles)); + + let profiles2 = vec!["BROWNSTINKYBANANA".to_string()]; + assert!(c.matches_profile(&profiles2)); + + let profiles3 = vec!["default".to_string()]; + assert!(!c.matches_profile(&profiles3)); + } +} diff --git a/libsensors/tests/registry_init.rs b/libsensors/tests/registry_init.rs new file mode 100644 index 00000000..97526004 --- /dev/null +++ b/libsensors/tests/registry_init.rs @@ -0,0 +1,55 @@ +mod registry_test { + use libsensors::sensors; + use libsensors::sspec::SensorSpec; + use std::str::FromStr; + + #[test] + fn test_registry_has_fsnotify_after_init() { + sensors::init_registry(); + + // should be able to create sensor by listener id + let y = r#" +sensors: + ssh-conf: + listener: fsnotify + opts: [changed] + args: + path: /tmp +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + let (sid, cfg) = items.iter().next().unwrap(); + + let s = sensors::init_sensor(cfg.listener(), sid.to_string(), cfg.clone()); + assert!(s.is_some(), "fsnotify must be registered"); + } + + #[test] + fn test_registry_unknown_listener_returns_none() { + sensors::init_registry(); + + let y = r#" +sensors: + x: + listener: does-not-exist +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + let (sid, cfg) = items.iter().next().unwrap(); + + let s = sensors::init_sensor(cfg.listener(), sid.to_string(), cfg.clone()); + assert!(s.is_none(), "unknown listener must return None"); + } + + #[test] + fn test_registry_init_is_idempotent() { + sensors::init_registry(); + let n1 = sensors::REGISTRY.len(); + sensors::init_registry(); + let n2 = sensors::REGISTRY.len(); + assert_eq!(n1, n2); + assert!(n1 >= 1); + } +} diff --git a/libsensors/tests/spec_items.rs b/libsensors/tests/spec_items.rs new file mode 100644 index 00000000..d7c64060 --- /dev/null +++ b/libsensors/tests/spec_items.rs @@ -0,0 +1,33 @@ +mod spec_items_test { + use libsensors::sspec::SensorSpec; + use std::str::FromStr; + + #[test] + fn test_items_exposes_sensor_ids() { + let y = r#" +sensors: + b: + listener: fsnotify + args: { path: /tmp } + a: + listener: fsnotify + args: { path: /etc } +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + assert!(spec.items().contains_key("a")); + assert!(spec.items().contains_key("b")); + } + + #[test] + fn test_missing_sensors_key_is_error() { + let y = r#" +not_sensors: + a: + listener: fsnotify +"#; + + let r = SensorSpec::from_str(y); + assert!(r.is_err()); + } +} diff --git a/libsensors/tests/sspec_parse.rs b/libsensors/tests/sspec_parse.rs new file mode 100644 index 00000000..c998fb21 --- /dev/null +++ b/libsensors/tests/sspec_parse.rs @@ -0,0 +1,119 @@ +mod sspec_parse_test { + use libsensors::sspec::SensorSpec; + use std::str::FromStr; + + #[test] + fn test_parse_minimal_sensor() { + let y = r#" +sensors: + ssh-conf: + listener: file +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + assert_eq!(items.len(), 1); + + let c = items.get("ssh-conf").unwrap(); + assert_eq!(c.listener(), "file"); + assert!(c.description().is_none()); + assert!(c.opts().is_empty()); + assert!(c.tag().is_none()); + } + + #[test] + fn test_parse_interval_range() { + let y = r#" +sensors: + interval: + min: 3 + max: 10 + unit: seconds + + a: + listener: file +"#; + + let spec = SensorSpec::from_str(y).unwrap(); + + let ir = spec.interval_range().unwrap(); + let ivl = spec.interval(); + println!("interval range: {:?}", ivl); + assert_eq!(ir.min, 3); + assert_eq!(ir.max, 10); + assert_eq!(ir.unit, "seconds"); + } + + #[test] + fn test_parse_opts_args_event_profile() { + let y = r#" +sensors: + ssh-conf: + profile: [default, system] + description: Watches SSH config + listener: file + opts: [changed, deleted] + args: + path: /etc/ssh/ssh_config + interval: 5 + unit: second + tag: stuff +"#; + + let mut spec = SensorSpec::from_str(y).unwrap(); + let items = spec.items(); + let c = items.get("ssh-conf").unwrap(); + + assert_eq!(c.listener(), "file"); + assert_eq!(c.description().unwrap(), "Watches SSH config"); + assert_eq!(c.opts(), &vec!["changed".to_string(), "deleted".to_string()]); + assert_eq!(c.tag().unwrap(), "stuff"); + + let p = c.profile(); + assert!(p.contains(&"default".to_string())); + assert!(p.contains(&"system".to_string())); + + let args = c.args(); + assert!(args.is_mapping()); + } + + #[test] + fn test_interval_injection_is_random_and_stable_per_spec() { + use libsensors::sspec::SensorSpec; + use std::{collections::HashSet, str::FromStr}; + + // Wide range -> collisions become hilariously unlikely. + let y = r#" +sensors: + interval: + min: 1 + max: 1000 + unit: seconds + + a: + listener: file + args: + path: /tmp/whatever +"#; + + let mut seen = HashSet::new(); + + for _ in 0..100 { + let mut spec = SensorSpec::from_str(y).unwrap(); + + let first = spec.items(); + let a1 = first.get("a").unwrap(); + + let seen_ivl = a1.interval(); // e.g. returns u64 / usize / i64 etc. + seen.insert(seen_ivl); + + let second = spec.items(); + let a2 = second.get("a").unwrap(); + let coming_ivl = a2.interval(); + + assert_eq!(seen_ivl, coming_ivl, "interval changed after injection; injected flag isn't working"); + } + + assert!(seen.len() >= 2, "interval injection doesn't look random: got only one value across runs: {:?}", seen); + } +} diff --git a/libsetup/src/mnsetup.rs b/libsetup/src/mnsetup.rs index ab863c50..5a015b3b 100644 --- a/libsetup/src/mnsetup.rs +++ b/libsetup/src/mnsetup.rs @@ -1,8 +1,8 @@ use colored::Colorize; use libcommon::SysinspectError; use libsysinspect::cfg::mmconf::{ - CFG_AUTOSYNC_SHALLOW, DEFAULT_MODULES_DIR, DEFAULT_MODULES_LIB_DIR, DEFAULT_MODULES_SHARELIB, DEFAULT_SYSINSPECT_ROOT, MinionConfig, - SysInspectConfig, + CFG_AUTOSYNC_SHALLOW, CFG_SENSORS_ROOT, DEFAULT_MODULES_DIR, DEFAULT_MODULES_LIB_DIR, DEFAULT_MODULES_SHARELIB, DEFAULT_SYSINSPECT_ROOT, + MinionConfig, SysInspectConfig, }; use std::{ fs::{self, File}, @@ -117,6 +117,7 @@ impl MinionSetup { self.get_db(), self.get_shared_subdir(DEFAULT_MODULES_DIR), self.get_shared_subdir(DEFAULT_MODULES_LIB_DIR), + self.get_shared_subdir(CFG_SENSORS_ROOT), ]; for d in dirs { diff --git a/libsysinspect/src/cfg/mmconf.rs b/libsysinspect/src/cfg/mmconf.rs index 53802758..7d875936 100644 --- a/libsysinspect/src/cfg/mmconf.rs +++ b/libsysinspect/src/cfg/mmconf.rs @@ -85,6 +85,9 @@ pub static CFG_TRAITS_ROOT: &str = "traits"; // Trait custom functions within the CFG_FILESERVER_ROOT pub static CFG_TRAIT_FUNCTIONS_ROOT: &str = "functions"; +/// Directory within the `DEFAULT_MODULES_SHARELIB` for sensors +pub static CFG_SENSORS_ROOT: &str = "sensors"; + // Key names // --------- pub static CFG_MASTER_KEY_PUB: &str = "master.rsa.pub"; @@ -489,6 +492,11 @@ impl MinionConfig { self.root_dir().join(CFG_TRAITS_ROOT) } + /// Get root directory for sensors config + pub fn sensors_dir(&self) -> PathBuf { + self.root_dir().join(CFG_SENSORS_ROOT) + } + /// Return machine Id path pub fn machine_id_path(&self) -> PathBuf { if let Some(mid) = self.machine_id.clone() { @@ -731,12 +739,18 @@ pub struct MasterConfig { // Exported models path root on the fileserver #[serde(rename = "fileserver.models.root")] - fsr_models_root: String, + fsr_models_root: Option, + + #[serde(rename = "fileserver.sensors.root")] + fsr_sensors_root: Option, // Exported models on the fileserver #[serde(rename = "fileserver.models")] fsr_models: Vec, + #[serde(rename = "fileserver.sensors")] + fsr_sensors: Option>, + #[serde(rename = "api.enabled")] api_enabled: Option, @@ -916,15 +930,36 @@ impl MasterConfig { &self.fsr_models } + /// Get a list of exported sensors from the fileserver + pub fn fileserver_sensors(&self) -> Vec { + self.fsr_sensors.clone().unwrap_or_default() + } + /// Get fileserver root pub fn fileserver_root(&self) -> PathBuf { self.root_dir().join(CFG_FILESERVER_ROOT) } /// Get models root on the fileserver - pub fn fileserver_mdl_root(&self, alone: bool) -> PathBuf { - let mr = PathBuf::from(&self.fsr_models_root.strip_prefix("/").unwrap_or_default()); - if alone { mr } else { self.fileserver_root().join(mr) } + pub fn fileserver_models_root(&self, uri_only: bool) -> PathBuf { + if uri_only { + if let Some(models_root) = &self.fsr_models_root { + return PathBuf::from(models_root.trim_start_matches('/')); + } else { + return PathBuf::from(CFG_MODELS_ROOT); + } + } + + self.fileserver_root().join(PathBuf::from(self.fsr_models_root.clone().unwrap_or(CFG_MODELS_ROOT.to_string()).trim_start_matches('/'))) + } + + /// Get sensors root on the fileserver + pub fn fileserver_sensors_root(&self) -> PathBuf { + if let Some(sensors_root) = &self.fsr_sensors_root { + self.fileserver_root().join(PathBuf::from(sensors_root.trim_start_matches('/'))) + } else { + self.fileserver_root().join(CFG_SENSORS_ROOT) + } } /// Get default sysinspect root. For master it is always /etc/sysinspect diff --git a/libsysinspect/src/inspector.rs b/libsysinspect/src/inspector.rs index 1aed0f7b..4ac5274d 100644 --- a/libsysinspect/src/inspector.rs +++ b/libsysinspect/src/inspector.rs @@ -12,6 +12,7 @@ use libcommon::SysinspectError; use libdpq::DiskPersistentQueue; use once_cell::sync::OnceCell; use std::sync::Arc; +use tokio::sync::Mutex; static MINION_CONFIG: OnceCell> = OnceCell::new(); static DPQ_HANDLE: OnceCell> = OnceCell::new(); @@ -42,6 +43,9 @@ pub struct SysInspectRunner { // Called after all actions at the end model_callbacks: Vec>, + + // Event processor for handling action responses and emitting events to the telemetry system + evtproc: Option>>, } impl SysInspectRunner { @@ -149,12 +153,17 @@ impl SysInspectRunner { match SysInspector::new(spec.clone(), Some(Self::minion_cfg().sharelib_dir().clone()), self.context.clone().unwrap_or_default()) { Ok(isp) => { // Setup event processor - let mut evtproc = EventProcessor::new().set_config(isp.cfg(), spec.telemetry()); - for c in std::mem::take(&mut self.action_callbacks) { - evtproc.add_action_callback(c); - } - for c in std::mem::take(&mut self.model_callbacks) { - evtproc.add_model_callback(c); + self.evtproc = Some(Arc::new(Mutex::new(EventProcessor::new().set_config(Arc::new(isp.cfg().clone()), spec.telemetry())))); + let evtproc = self.evtproc.as_ref().unwrap().clone(); + + { + let mut guard = evtproc.lock().await; + for c in std::mem::take(&mut self.action_callbacks) { + guard.add_action_callback(c); + } + for c in std::mem::take(&mut self.model_callbacks) { + guard.add_model_callback(c); + } } let actions = if !self.cb_labels.is_empty() { @@ -176,7 +185,7 @@ impl SysInspectRunner { let response = response.unwrap_or(ActionResponse::default()); self.update_cstr_eval(&response); log::trace!("Action response for '{}': {:#?}", ac.id(), response); - evtproc.receiver().register(response.eid().to_owned(), response); + evtproc.lock().await.receiver().register(response.eid().to_owned(), response); } Err(err) => { log::error!("{err}") @@ -198,7 +207,7 @@ impl SysInspectRunner { } } log::debug!("Starting event processor cycle"); - evtproc.process().await; + evtproc.lock().await.process(false).await; log::debug!("Event processing cycle finished"); } Err(err) => log::error!("{err}"), diff --git a/libsysinspect/src/intp/actproc/response.rs b/libsysinspect/src/intp/actproc/response.rs index 7ef8a9a8..5ec8b282 100644 --- a/libsysinspect/src/intp/actproc/response.rs +++ b/libsysinspect/src/intp/actproc/response.rs @@ -129,6 +129,11 @@ pub struct ActionModResponse { } impl ActionModResponse { + /// Create new with return code + pub fn with_retcode(retcode: i32) -> Self { + Self { retcode, warning: None, message: String::new(), data: None } + } + /// Get a return code pub fn retcode(&self) -> i32 { self.retcode @@ -213,6 +218,35 @@ impl ActionResponse { Self { eid, aid, sid, response, constraints, cid: "".to_string(), timestamp: Utc::now(), telemetry: vec![] } } + /// Build an ActionResponse from sensor JSON event payload. + /// ``` + /// { + /// "eid": "tmp-watch/fsnotify/created/0", + /// "sensor": "...", + /// "listener": "...", + /// "data": {...} + /// } + /// ``` + pub fn from_sensor(v: serde_json::Value) -> Self { + let mut ar = ActionResponse::default(); + let eid_str = v.get("eid").and_then(|x| x.as_str()).unwrap_or("$|$|$|$").to_string(); + + // match_eid expects: /// + let parts: Vec<&str> = eid_str.split('|').collect(); + if parts.len() == 4 { + ar.aid = parts[0].to_string(); + ar.eid = parts[1].to_string(); + ar.sid = parts[2].to_string(); + ar.response.retcode = 0; + } else { + // fallback + ar.eid = eid_str.clone(); + } + + ar.response.set_data(v); + ar + } + /// Return an Entity Id to which this action was bound to pub fn eid(&self) -> &str { &self.eid @@ -255,6 +289,15 @@ impl ActionResponse { /// - `0..255` - specific code /// - `E` - error only (non-0) /// + /// Rules: + /// 1. no @ — exact match (current behavior) + /// 2. @ splits into kind@detailpattern + /// 3. $ inside detailpattern means * (glob) + /// + /// Example: + /// - kind@$ means “kind@anything” + /// - also allow plain + /// pub fn match_eid(&self, evt_id: &str) -> bool { // If explicitly specified and already matching for expr in self.constraints.expressions() { @@ -265,14 +308,63 @@ impl ActionResponse { } } - let p_eid = evt_id.split('/').map(|s| s.trim()).collect::>(); + let p_eid = evt_id.split('|').map(|s| s.trim()).collect::>(); p_eid.len() == 4 && (self.aid().eq(p_eid[0]) || p_eid[0] == "$") && (self.eid().eq(p_eid[1]) || p_eid[1] == "$") - && (self.sid().eq(p_eid[2]) || p_eid[2] == "$") + && Self::sid_matches(self.sid(), p_eid[2]) && ((p_eid[3] == "$") || (p_eid[3].eq("E") && self.response.retcode() > 0) || p_eid[3].eq(&self.response.retcode().to_string())) } + pub fn sid_matches(value: &str, pattern: &str) -> bool { + let pat = pattern.trim(); + if pat == "$" { + return true; + } + + // old behavior: exact + let Some((pkind, pdetail)) = pat.split_once('@') else { + return value == pat; + }; + + // event value must also have '@' if pattern uses it + let Some((vkind, vdetail)) = value.split_once('@') else { + return false; + }; + + if vkind != pkind { + return false; + } + + // "kind@$" => any detail + if pdetail == "$" { + return true; + } + + Self::glob_match(pdetail, vdetail) + } + + pub fn glob_match(pat: &str, text: &str) -> bool { + // very small glob: only '$' means ".*" + // escape everything else, then replace \$ with .* + let mut re = String::from("^"); + for ch in pat.chars() { + match ch { + '$' => re.push_str(".*"), + // escape regex meta (but NOT '$' because it's handled above) + '.' | '+' | '*' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '^' | '|' | '\\' => { + re.push('\\'); + re.push(ch); + } + _ => re.push(ch), + } + } + + re.push('$'); + + regex::Regex::new(&re).map(|r| r.is_match(text)).unwrap_or(false) + } + /// Set telemetry configuration for data processing pub fn set_telemetry_config(&mut self, telemetry: Vec) { self.telemetry = telemetry; diff --git a/libsysinspect/src/intp/conf.rs b/libsysinspect/src/intp/conf.rs index c3ddda05..e3cf9852 100644 --- a/libsysinspect/src/intp/conf.rs +++ b/libsysinspect/src/intp/conf.rs @@ -79,16 +79,16 @@ impl EventConfig { /// The entire config #[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct Config { +pub struct EventsConfig { modules: Option, // EventId to config, added later events: Option>, } -impl Config { +impl EventsConfig { pub fn new(obj: &Value) -> Result { - if let Ok(instance) = serde_yaml::from_value::(obj.to_owned()) { + if let Ok(instance) = serde_yaml::from_value::(obj.to_owned()) { return Ok(instance); } @@ -130,7 +130,7 @@ impl Config { } /// Set events config - pub(crate) fn set_events(&mut self, obj: &Value) -> Result<(), SysinspectError> { + pub fn set_events(&mut self, obj: &Value) -> Result<(), SysinspectError> { if let Ok(cfg) = serde_yaml::from_value::>(obj.to_owned()) { self.events = Some(cfg); } else { diff --git a/libsysinspect/src/intp/inspector.rs b/libsysinspect/src/intp/inspector.rs index 95124332..3b6dda3f 100644 --- a/libsysinspect/src/intp/inspector.rs +++ b/libsysinspect/src/intp/inspector.rs @@ -1,7 +1,7 @@ use super::{ actions::Action, checkbook::CheckbookSection, - conf::Config, + conf::EventsConfig, constraints::Constraint, entities::Entity, functions::{ClaimNamespace, ModArgFunction, StaticNamespace}, @@ -52,7 +52,7 @@ pub struct SysInspector { actions: IndexMap, constraints: IndexMap, checkbook: Vec, - config: Config, + config: EventsConfig, spec: ModelSpec, context: IndexMap, schemaonly: bool, @@ -81,7 +81,7 @@ impl SysInspector { actions: IndexMap::new(), constraints: IndexMap::new(), checkbook: Vec::default(), - config: Config::default(), + config: EventsConfig::default(), spec, context: IndexMap::new(), schemaonly: true, @@ -151,7 +151,7 @@ impl SysInspector { // Load config if directive == DSL_IDX_CFG { - self.config = Config::new(v_obj.unwrap())?; + self.config = EventsConfig::new(v_obj.unwrap())?; } if directive == DSL_IDX_EVENTS_CFG { @@ -247,7 +247,7 @@ impl SysInspector { } /// Return config reference - pub fn cfg(&self) -> &Config { + pub fn cfg(&self) -> &EventsConfig { &self.config } diff --git a/libsysinspect/src/reactor/evtproc.rs b/libsysinspect/src/reactor/evtproc.rs index 4831133d..c7c29d0e 100644 --- a/libsysinspect/src/reactor/evtproc.rs +++ b/libsysinspect/src/reactor/evtproc.rs @@ -1,21 +1,23 @@ +use std::sync::Arc; use super::{callback::EventProcessorCallback, handlers::evthandler::EventHandler, receiver::Receiver}; use crate::{ - intp::conf::Config, + intp::conf::EventsConfig, mdescr::telemetry::TelemetrySpec, reactor::handlers::{self}, }; -pub struct EventProcessor<'a> { +#[derive(Debug)] +pub struct EventProcessor { receiver: Receiver, - cfg: Option<&'a Config>, + cfg: Option>, handlers: Vec>, action_callbacks: Vec>, model_callbacks: Vec>, telemetry_cfg: Option, } -impl<'a> EventProcessor<'a> { +impl EventProcessor { pub fn new() -> Self { EventProcessor { receiver: Receiver::default(), @@ -35,7 +37,7 @@ impl<'a> EventProcessor<'a> { self.telemetry_cfg = telemetry_config; - let cfg = self.cfg.unwrap(); + let cfg = self.cfg.as_ref().unwrap(); for evt_id in cfg.get_event_ids() { let evt_cfg = cfg.get_event(&evt_id).unwrap(); for handler_id in evt_cfg.get_bound_handlers() { @@ -68,26 +70,26 @@ impl<'a> EventProcessor<'a> { } /// Set the configuration of a model - pub fn set_config(mut self, cfg: &'a Config, tcfg: Option) -> Self { + pub fn set_config(mut self, cfg: Arc, tcfg: Option) -> Self { self.cfg = Some(cfg); self.setup(tcfg) } /// Process all handlers - pub async fn process(&mut self) { - for ar in self.receiver.get_all() { - // For each action handle events + pub async fn process(&mut self, drain: bool) { + let batch = if drain { self.receiver.drain_all() } else { self.receiver.get_all() }; + let last = batch.last().cloned(); + + for ar in batch { for h in &self.handlers { h.handle(&ar); } - // Each action response sent via callback for ac in &mut self.action_callbacks { _ = ac.on_action_response(ar.clone()).await; } } - // Call model callbacks for the last action response (it is usually only passes the minion reference) - if let Some(ar) = self.receiver.get_last() { + if let Some(ar) = last { for cb in &mut self.model_callbacks { _ = cb.on_action_response(ar.clone()).await; } @@ -95,7 +97,7 @@ impl<'a> EventProcessor<'a> { } } -impl Default for EventProcessor<'_> { +impl Default for EventProcessor { fn default() -> Self { Self::new() } diff --git a/libsysinspect/src/reactor/handlers/pipeline.rs b/libsysinspect/src/reactor/handlers/pipeline.rs index 785ded31..d0222625 100644 --- a/libsysinspect/src/reactor/handlers/pipeline.rs +++ b/libsysinspect/src/reactor/handlers/pipeline.rs @@ -73,6 +73,34 @@ impl PipelineHandler { } } + /// This is a little tokenizer: tokens start with "$." and then continue until whitespace or quote + fn get_jsonpath_tokens(s: &str) -> Vec { + let bytes = s.as_bytes(); + let mut i = 0usize; + let mut out = Vec::new(); + + while i + 1 < bytes.len() { + if bytes[i] == b'$' && bytes[i + 1] == b'.' { + let start = i; + i += 2; + while i < bytes.len() { + let c = bytes[i] as char; + if c.is_whitespace() || c == '"' || c == '\'' { + break; + } + i += 1; + } + out.push(s[start..i].to_string()); + } else { + i += 1; + } + } + + // dedup while preserving order + let mut seen = std::collections::HashSet::new(); + out.into_iter().filter(|t| seen.insert(t.clone())).collect() + } + fn scalar2s(v: &Value) -> String { match v { Value::String(s) => s.trim_end().to_string(), @@ -87,27 +115,55 @@ impl PipelineHandler { self.config().unwrap_or_default().get("verbose").and_then(|v| v.as_bool()).unwrap_or(false) } + fn is_pure_jsonpath(s: &str) -> bool { + let t = s.trim(); + t.starts_with("$.") && !t.contains(char::is_whitespace) && !t.contains('"') && !t.contains('\'') + } + fn eval_context(&self, evt: &ActionResponse, call: &mut Call) { let data = evt.response.data().unwrap_or(json!({})); - let updates: Vec<(String, Value)> = call .context .iter() .map(|(k, v_yaml)| { - // 1) YAML scalar → clean string JSONPath - let path = Self::scalar2s(v_yaml); - - // 2) Run JSONPath - let out = match data.query(&path) { - Ok(h) if !h.is_empty() => match &h[0] { - serde_json::Value::String(s) => Value::String(s.clone()), - serde_json::Value::Number(n) => Value::Number(serde_yaml::Number::from(n.as_f64().unwrap_or(0.0))), - serde_json::Value::Bool(b) => Value::Bool(*b), - _ => Value::Null, - }, - _ => Value::Null, + // 1) scalar to clean string JSONPath + let raw = Self::scalar2s(v_yaml); + + // 2) Resolve value: + // - if it's exactly a JSONPath (starts with "$.") => query and return scalar + // - if it contains JSONPath tokens inside a string => interpolate into a string + // - otherwise => literal string + let out = if Self::is_pure_jsonpath(&raw) { + match data.query(&raw) { + Ok(h) if !h.is_empty() => match &h[0] { + serde_json::Value::String(s) => Value::String(s.clone()), + serde_json::Value::Number(n) => Value::Number(serde_yaml::Number::from(n.as_f64().unwrap_or(0.0))), + serde_json::Value::Bool(b) => Value::Bool(*b), + _ => Value::String(raw.clone()), // not a scalar -> keep literal + }, + _ => Value::String(raw.clone()), // invalid path or no hits -> keep literal + } + } else if raw.contains("$.") { + // interpolate: replace each "$." with its first scalar result + // If jsonpath is wrong, it won't interpolate and will just return the original string as is. + let mut out_s = raw.clone(); + for tok in Self::get_jsonpath_tokens(&raw) { + if let Ok(h) = data.query(&tok) + && let Some(val) = h.first() { + let repl = match val { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::Bool(b) => b.to_string(), + serde_json::Value::Null => "".to_string(), + _ => "".to_string(), + }; + out_s = out_s.replace(&tok, &repl); + } + } + Value::String(out_s) + } else { + Value::String(raw.clone()) }; - (k.clone(), out) }) .collect(); @@ -153,7 +209,7 @@ impl EventHandler for PipelineHandler { log::info!( "[{}] Event {} doesn't match handler {}", PipelineHandler::id().bright_blue(), - format!("{}/{}/{}/{}", evt.aid(), evt.eid(), evt.sid(), evt.response.retcode()).bright_yellow(), + format!("{}|{}|{}|{}", evt.aid(), evt.eid(), evt.sid(), evt.response.retcode()).bright_yellow(), self.eid.bright_yellow() ); } diff --git a/libsysinspect/src/reactor/receiver.rs b/libsysinspect/src/reactor/receiver.rs index d6bcc0d9..e0790490 100644 --- a/libsysinspect/src/reactor/receiver.rs +++ b/libsysinspect/src/reactor/receiver.rs @@ -14,15 +14,28 @@ pub struct Receiver { impl Receiver { /// Add an action. - /// Requires: - /// `eid` - Entity Id - /// `response` - ActionResponse object + /// NOTE: the order of responses is not guaranteed, if async is used. + /// + /// Parameters: + /// - `eid`: Entity Id, which is a string identifier of the entity that produced the response. It can be used to group responses by their source. + /// - `response`: The actual response object, which contains the data produced by the action. It can be of any type that implements the `ActionResponse` trait. + /// + /// Returns: None. This method modifies the internal state of the `Receiver` by adding the response to the list of responses associated with the given Entity Id. pub fn register(&mut self, eid: String, response: ActionResponse) { // XXX: And process here as well! log::debug!("Registered action response: {response:#?}"); self.actions.entry(eid).or_default().push(response); } + /// Drain all action responses (consumes stored ones). + pub fn drain_all(&mut self) -> Vec { + let mut out = Vec::new(); + for (_, mut v) in self.actions.drain(..) { + out.append(&mut v); + } + out + } + /// Get an action response by Entity Id pub fn get_by_eid(&self, eid: String) -> Option> { self.actions.get(&eid).cloned() diff --git a/libsysinspect/tests/action_response.rs b/libsysinspect/tests/action_response.rs new file mode 100644 index 00000000..05f994af --- /dev/null +++ b/libsysinspect/tests/action_response.rs @@ -0,0 +1,225 @@ +#[cfg(test)] +mod tests { + use libsysinspect::intp::actproc::response::{ActionModResponse, ActionResponse, ConstraintResponse}; + use serde_json::json; + + fn mk_ar(aid: &str, eid: &str, sid: &str, ret: i32) -> ActionResponse { + ActionResponse::new(eid.to_string(), aid.to_string(), sid.to_string(), ActionModResponse::with_retcode(ret), ConstraintResponse::default()) + } + + // ------------------------- + // from_sensor() tests + // ------------------------- + + #[test] + fn from_sensor_parses_4_parts_and_sets_fields() { + let v = json!({ + "eid": "tmp-watch|fsnotify|deleted@/tmp/x|0", + "sensor": "tmp-watch", + "listener": "fsnotify", + "data": {"kind":"deleted","path":"/tmp/x"} + }); + + let ar = ActionResponse::from_sensor(v.clone()); + + assert_eq!(ar.aid(), "tmp-watch"); + assert_eq!(ar.eid(), "fsnotify"); + assert_eq!(ar.sid(), "deleted@/tmp/x"); + // current behavior: retcode forced to 0 even if eid contains "|123" + assert_eq!(ar.response.retcode(), 0); + + // payload stored as data + assert_eq!(ar.response.data().unwrap(), v); + } + + #[test] + fn from_sensor_fallback_when_not_4_parts() { + let v = json!({"eid":"nonsense", "data":{"x":1}}); + let ar = ActionResponse::from_sensor(v.clone()); + + // fallback: only ar.eid set + assert_eq!(ar.eid(), "nonsense"); + assert_eq!(ar.aid(), ""); + // sid() returns "$" when empty + assert_eq!(ar.sid(), "$"); + assert_eq!(ar.response.retcode(), 0); + assert_eq!(ar.response.data().unwrap(), v); + } + + #[test] + fn from_sensor_default_eid_when_missing() { + let v = json!({"data":{"x":1}}); + let ar = ActionResponse::from_sensor(v.clone()); + assert_eq!(ar.aid(), "$"); + assert_eq!(ar.eid(), "$"); + assert_eq!(ar.sid(), "$"); + assert_eq!(ar.response.retcode(), 0); + assert_eq!(ar.response.data().unwrap(), v); + } + + // ------------------------- + // glob_match() tests + // ------------------------- + + #[test] + fn glob_match_basic_dollar_is_wildcard() { + assert!(ActionResponse::glob_match("$", "")); + assert!(ActionResponse::glob_match("$", "anything")); + assert!(ActionResponse::glob_match("/tmp/$", "/tmp/x")); + assert!(ActionResponse::glob_match("/tmp/$", "/tmp/foo/bar")); + assert!(!ActionResponse::glob_match("/tmp/$", "/etc/x")); + } + + #[test] + fn glob_match_middle_wildcard() { + assert!(ActionResponse::glob_match("/tmp/$/x", "/tmp/a/x")); + assert!(ActionResponse::glob_match("/tmp/$/x", "/tmp/a/b/c/x")); + assert!(!ActionResponse::glob_match("/tmp/$/x", "/tmp/a/y")); + } + + #[test] + fn glob_match_escapes_regex_metachars() { + // '.' should be literal dot, not any-char + assert!(ActionResponse::glob_match("a.b", "a.b")); + assert!(!ActionResponse::glob_match("a.b", "acb")); + + // '+' literal + assert!(ActionResponse::glob_match("a+b", "a+b")); + assert!(!ActionResponse::glob_match("a+b", "aaab")); + + // brackets literal + assert!(ActionResponse::glob_match("[x]", "[x]")); + assert!(!ActionResponse::glob_match("[x]", "x")); + } + + // ------------------------- + // sid_matches() tests + // ------------------------- + + #[test] + fn sid_matches_dollar_matches_anything() { + assert!(ActionResponse::sid_matches("whatever", "$")); + assert!(ActionResponse::sid_matches("deleted@/tmp/x", "$")); + assert!(ActionResponse::sid_matches("", "$")); + } + + #[test] + fn sid_matches_exact_without_at() { + assert!(ActionResponse::sid_matches("deleted:/tmp/x", "deleted:/tmp/x")); + assert!(!ActionResponse::sid_matches("deleted:/tmp/x", "deleted:/tmp/y")); + } + + #[test] + fn sid_matches_pattern_with_at_requires_value_with_at() { + assert!(!ActionResponse::sid_matches("deleted:/tmp/x", "deleted@/tmp/$")); + assert!(!ActionResponse::sid_matches("deleted", "deleted@$")); + } + + #[test] + fn sid_matches_kind_must_match() { + assert!(!ActionResponse::sid_matches("created@/tmp/x", "deleted@/tmp/$")); + assert!(ActionResponse::sid_matches("deleted@/tmp/x", "deleted@/tmp/$")); + } + + #[test] + fn sid_matches_kind_at_dollar_means_any_detail() { + assert!(ActionResponse::sid_matches("deleted@/tmp/x", "deleted@$")); + assert!(ActionResponse::sid_matches("deleted@/etc/passwd", "deleted@$")); + assert!(!ActionResponse::sid_matches("created@/tmp/x", "deleted@$")); + } + + #[test] + fn sid_matches_detail_glob_with_dollar() { + assert!(ActionResponse::sid_matches("deleted@/tmp/x", "deleted@/tmp/$")); + assert!(ActionResponse::sid_matches("deleted@/tmp/foo/bar", "deleted@/tmp/$")); + assert!(!ActionResponse::sid_matches("deleted@/etc/x", "deleted@/tmp/$")); + } + + // ------------------------- + // match_eid() matrix tests + // ------------------------- + + #[test] + fn match_eid_exact_match_all_parts() { + let ar = mk_ar("tmp-watch", "fsnotify", "deleted@/tmp/x", 0); + assert!(ar.match_eid("tmp-watch|fsnotify|deleted@/tmp/x|0")); + assert!(!ar.match_eid("tmp-watch|fsnotify|deleted@/tmp/y|0")); + assert!(!ar.match_eid("tmp-watch|other|deleted@/tmp/x|0")); + assert!(!ar.match_eid("other|fsnotify|deleted@/tmp/x|0")); + } + + #[test] + fn match_eid_wildcards_dollar_in_any_field() { + let ar = mk_ar("A", "B", "C@/tmp/x", 0); + + assert!(ar.match_eid("$|$|$|$")); + assert!(ar.match_eid("$|B|C@/tmp/x|0")); + assert!(ar.match_eid("A|$|C@/tmp/x|0")); + assert!(ar.match_eid("A|B|$|0")); + assert!(ar.match_eid("A|B|C@/tmp/$|0")); // sid glob + } + + #[test] + fn match_eid_retcode_dollar_matches_any_retcode() { + let ar0 = mk_ar("A", "B", "C@/x", 0); + let ar5 = mk_ar("A", "B", "C@/x", 5); + + assert!(ar0.match_eid("A|B|C@/x|$")); + assert!(ar5.match_eid("A|B|C@/x|$")); + } + + #[test] + fn match_eid_retcode_exact_numeric() { + let ar0 = mk_ar("A", "B", "C@/x", 0); + let ar5 = mk_ar("A", "B", "C@/x", 5); + + assert!(ar0.match_eid("A|B|C@/x|0")); + assert!(!ar0.match_eid("A|B|C@/x|5")); + + assert!(ar5.match_eid("A|B|C@/x|5")); + assert!(!ar5.match_eid("A|B|C@/x|0")); + } + + #[test] + fn match_eid_retcode_e_means_error_only() { + let ok = mk_ar("A", "B", "C@/x", 0); + let err = mk_ar("A", "B", "C@/x", 2); + + assert!(!ok.match_eid("A|B|C@/x|E")); + assert!(err.match_eid("A|B|C@/x|E")); + } + + #[test] + fn match_eid_rejects_bad_format() { + let ar = mk_ar("A", "B", "C@/x", 0); + + assert!(!ar.match_eid("A|B|C@/x")); // 3 parts + assert!(!ar.match_eid("A|B|C@/x|0|junk")); // 5 parts + assert!(!ar.match_eid("")); // 0 parts + } + + // "Diagonal" matrix: many cases in a table-like loop + #[test] + fn match_eid_matrix_diagonal() { + let cases = vec![ + // (ar_aid, ar_eid, ar_sid, ar_ret, pattern, expected) + ("a", "b", "k@/tmp/x", 0, "a|b|k@/tmp/x|0", true), + ("a", "b", "k@/tmp/x", 0, "a|b|k@/tmp/$|0", true), + ("a", "b", "k@/tmp/x", 0, "a|b|k@/etc/$|0", false), + ("a", "b", "k@/tmp/x", 3, "a|b|k@/tmp/$|E", true), + ("a", "b", "k@/tmp/x", 0, "a|b|k@/tmp/$|E", false), + ("a", "b", "k@/tmp/x", 7, "$|b|k@/tmp/$|7", true), + ("a", "b", "k@/tmp/x", 7, "$|b|k@/tmp/$|8", false), + ("a", "b", "k@/tmp/x", 7, "a|$|k@/tmp/$|$", true), + ("a", "b", "k@/tmp/x", 7, "x|b|k@/tmp/$|$", false), + ("a", "b", "k@/tmp/x", 7, "a|b|missing_at|$", false), // pattern has no '@', expects exact sid + ("a", "b", "k@/tmp/x", 7, "a|b|k@$|$", true), + ("a", "b", "k@/tmp/x", 7, "a|b|z@$|$", false), + ]; + + for (aid, eid, sid, ret, pat, exp) in cases { + let ar = mk_ar(aid, eid, sid, ret); + assert_eq!(ar.match_eid(pat), exp, "case failed: ar=({aid},{eid},{sid},{ret}) pat={pat}"); + } + } +} diff --git a/libsysproto/src/lib.rs b/libsysproto/src/lib.rs index 4b63f4a1..e22bb2a4 100644 --- a/libsysproto/src/lib.rs +++ b/libsysproto/src/lib.rs @@ -41,7 +41,7 @@ impl MasterMessage { /// Creates a command message with a new cycle ID. This is used by the pipeline handler to create a master /// message with the URI specified in the event configuration. pub fn command() -> MasterMessage { - MasterMessage::new(RequestType::Command, json!({"files": {}, "sid": "", "models_root": "", "uri": ""})) + MasterMessage::new(RequestType::Command, json!({"files": {}, "sid": "", "models_root": "", "sensors_root":"", "uri": ""})) } /// Add a target. diff --git a/libsysproto/src/payload.rs b/libsysproto/src/payload.rs index 5f3f2644..2fa8bfae 100644 --- a/libsysproto/src/payload.rs +++ b/libsysproto/src/payload.rs @@ -35,6 +35,10 @@ pub struct ModStatePayload { // It will be substracted from each file path when saving models_root: String, + // Root where sensors starts. It corresponds to "fileserver.sensors.root" conf of Master. + // It will be substracted from each file path when saving + sensors_root: String, + // session Id sid: String, @@ -65,6 +69,12 @@ impl ModStatePayload { self } + /// Set sensors root + pub fn set_sensors_root(mut self, sr: &str) -> Self { + self.sensors_root = sr.to_string(); + self + } + /// Get list of files to download pub fn files(&self) -> &IndexMap { &self.files @@ -84,6 +94,11 @@ impl ModStatePayload { pub fn models_root(&self) -> &str { &self.models_root } + + /// Get root of sensors + pub fn sensors_root(&self) -> &str { + &self.sensors_root + } } #[derive(Debug, Clone, Serialize, Deserialize, Default)] diff --git a/libsysproto/src/rqtypes.rs b/libsysproto/src/rqtypes.rs index 0ceeceb9..ff8d3057 100644 --- a/libsysproto/src/rqtypes.rs +++ b/libsysproto/src/rqtypes.rs @@ -120,4 +120,10 @@ pub enum RequestType { /// Model notice. This is called at the end of the model cycle #[serde(rename = "mvt")] ModelEvent, + + #[serde(rename = "ssr")] + SensorsSyncRequest, + + #[serde(rename = "ssp")] + SensorsSyncResponse, } diff --git a/libwebapi/src/api/v1/model.rs b/libwebapi/src/api/v1/model.rs index ffd7a1db..76de0b7f 100644 --- a/libwebapi/src/api/v1/model.rs +++ b/libwebapi/src/api/v1/model.rs @@ -130,7 +130,7 @@ pub async fn model_descr_handler(master: Data, query: Query return Ok(HttpResponse::NotFound().json(ModelResponseError { error: format!("Model '{}' not found", mid) })); } - let root = cfg.fileserver_mdl_root(false); + let root = cfg.fileserver_models_root(false); match mspec::load(Arc::new(MinionConfig::default()), &format!("{}/{}", root.to_str().unwrap_or_default(), mid), None, None) { Err(e) => { diff --git a/sysmaster/src/dataserv/fls.rs b/sysmaster/src/dataserv/fls.rs index 0a5d81a9..2a61fc45 100644 --- a/sysmaster/src/dataserv/fls.rs +++ b/sysmaster/src/dataserv/fls.rs @@ -1,26 +1,48 @@ use actix_web::{App, HttpResponse, HttpServer, Responder, rt::System, web}; +use colored::Colorize; use libcommon::SysinspectError; -use libsysinspect::cfg::mmconf::{CFG_FILESERVER_ROOT, DEFAULT_SYSINSPECT_ROOT, MasterConfig}; -use std::{ - fs, - path::{Path, PathBuf}, - thread, +use libsysinspect::cfg::mmconf::{ + CFG_FILESERVER_ROOT, CFG_MODELS_ROOT, CFG_MODREPO_ROOT, CFG_SENSORS_ROOT, CFG_TRAIT_FUNCTIONS_ROOT, CFG_TRAITS_ROOT, MasterConfig, }; +use std::{fs, path::PathBuf, thread}; + +/// Initialize the file server environment by creating necessary directories if they do not exist. +fn init_fs_env(cfg: &MasterConfig) -> Result<(), SysinspectError> { + let root = cfg.root_dir().join(CFG_FILESERVER_ROOT); + if !root.exists() { + log::info!("Created file server root directory at {}", root.display().to_string().bright_yellow()); + fs::create_dir_all(&root)?; + } + + for sub in [CFG_TRAIT_FUNCTIONS_ROOT, CFG_MODELS_ROOT, CFG_MODREPO_ROOT, CFG_TRAITS_ROOT, CFG_SENSORS_ROOT] { + let subdir = root.join(sub); + if !subdir.exists() { + log::info!("Created file server subdirectory at {}", subdir.display().to_string().bright_yellow()); + fs::create_dir_all(&subdir)?; + } + } + + Ok(()) +} // Separate handler on every HTTP call -async fn serve_file(path: web::Path, _cfg: web::Data) -> impl Responder { - let pth = Path::new(DEFAULT_SYSINSPECT_ROOT).join(CFG_FILESERVER_ROOT).join(path.into_inner()); - log::debug!("Requested local file: {pth:?}"); +async fn serve_file(path: web::Path, cfg: web::Data) -> impl Responder { + let pth = cfg.root_dir().join(CFG_FILESERVER_ROOT).join(path.into_inner()); + log::debug!("Requested local file: {}", pth.display().to_string().bright_yellow()); if pth.is_file() { return HttpResponse::Ok().body(fs::read(pth).unwrap()); } - log::error!("File {pth:?} was not found"); + log::error!("File {} was not found", pth.display().to_string().bright_red()); HttpResponse::NotFound().body("File not found") } /// Start fileserver pub async fn start(cfg: MasterConfig) -> Result<(), SysinspectError> { + log::info!("Starting file server"); + let cfg_clone = cfg.clone(); + init_fs_env(&cfg)?; + thread::spawn(move || { let c_cfg = cfg_clone.clone(); System::new().block_on(async move { diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 5bc669d5..6dbf5baf 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -272,8 +272,8 @@ impl SysMaster { let mut out: IndexMap = IndexMap::default(); for em in self.cfg.fileserver_models() { - for (n, cs) in scan_files_sha256(self.cfg.fileserver_mdl_root(false).join(em), Some(MODEL_FILE_EXT)) { - out.insert(format!("/{}/{em}/{n}", self.cfg.fileserver_mdl_root(false).file_name().unwrap().to_str().unwrap()), cs); + for (n, cs) in scan_files_sha256(self.cfg.fileserver_models_root(false).join(em), Some(MODEL_FILE_EXT)) { + out.insert(format!("/{}/{em}/{n}", self.cfg.fileserver_models_root(false).file_name().unwrap().to_str().unwrap()), cs); } } @@ -288,7 +288,7 @@ impl SysMaster { ModStatePayload::new(payload) .set_uri(querypath.to_string()) .add_files(out) - .set_models_root(self.cfg.fileserver_mdl_root(true).to_str().unwrap_or_default()) + .set_models_root(self.cfg.fileserver_models_root(true).to_str().unwrap_or_default()) ), // TODO: SID part ); msg.set_target(tgt); @@ -302,6 +302,24 @@ impl SysMaster { None } + fn msg_sensors_files(&mut self) -> MasterMessage { + let mut out: IndexMap = IndexMap::default(); + for es in self.cfg.fileserver_sensors() { + for (n, cs) in scan_files_sha256(self.cfg.fileserver_sensors_root().join(&es), None) { + out.insert(format!("/{}/{es}/{n}", self.cfg.fileserver_sensors_root().file_name().unwrap().to_str().unwrap()), cs); + } + } + + MasterMessage::new( + RequestType::SensorsSyncResponse, + json!( + ModStatePayload::new(String::from("")) + .add_files(out) + .set_sensors_root(self.cfg.fileserver_sensors_root().file_name().unwrap_or_default().to_str().unwrap_or_default()) + ), + ) + } + /// Request minion to sync its traits fn msg_request_traits(&mut self, mid: String, sid: String) -> MasterMessage { let mut m = MasterMessage::new(RequestType::Traits, json!(sid)); @@ -413,9 +431,11 @@ impl SysMaster { _ = c_bcast.send(guard.msg_registered(req.id().to_string(), resp_msg).sendable().unwrap()); }); } + RequestType::Response => { log::info!("Response"); } + RequestType::Ehlo => { log::info!("EHLO from {}", req.id()); @@ -605,6 +625,15 @@ impl SysMaster { }); } + RequestType::SensorsSyncRequest => { + let c_master = Arc::clone(&master); + let c_bcast = bcast.clone(); + tokio::spawn(async move { + let mut guard = c_master.lock().await; + _ = c_bcast.send(guard.msg_sensors_files().sendable().unwrap()); + }); + } + _ => { log::error!("Minion sends unknown request type"); } diff --git a/sysminion/Cargo.toml b/sysminion/Cargo.toml index b6e67792..665b26c5 100644 --- a/sysminion/Cargo.toml +++ b/sysminion/Cargo.toml @@ -27,6 +27,7 @@ libmodpak = { path = "../libmodpak" } libdpq = { path = "../libdpq" } libsysproto = { path = "../libsysproto" } libcommon = { path = "../libcommon" } +libsensors = { path = "../libsensors" } log = "0.4.29" serde_yaml = "0.9.34" serde = { version = "1.0.228", features = ["derive"] } diff --git a/sysminion/src/filedata.rs b/sysminion/src/filedata.rs index 1b7b4106..b272393f 100644 --- a/sysminion/src/filedata.rs +++ b/sysminion/src/filedata.rs @@ -3,7 +3,11 @@ Filedata manager */ use libcommon::SysinspectError; -use libsysinspect::{mdescr::mspec::MODEL_FILE_EXT, util::iofs::scan_files_sha256}; +use libsysinspect::{ + mdescr::mspec::MODEL_FILE_EXT, + util::iofs::{get_file_sha256, scan_files_sha256}, +}; +use serde::Deserialize; use std::{collections::HashMap, path::PathBuf}; #[derive(Debug, Default, Clone)] @@ -50,3 +54,65 @@ impl MinionFiledata { false } } + +#[derive(Debug, Default, Clone, Deserialize)] +pub struct SensorsFiledata { + files: HashMap, + sensors_root: String, + + #[serde(skip)] + stack: HashMap, + + #[serde(skip)] + spth: PathBuf, +} + +impl SensorsFiledata { + pub fn from_payload(payload: serde_json::Value, spth: PathBuf) -> Result { + match serde_json::from_value::(payload) { + Ok(mut sfd) => Ok(sfd.init(spth)), + Err(err) => Err(SysinspectError::ProtoError(format!("unable to parse sensors filedata: {err}"))), + } + } + + /// Path on the server is prefixed with the sensors root, so we need to unprefix it to get the actual path on the minion. + pub fn unprefix_path(&self, pth: &str) -> String { + if self.sensors_root.is_empty() { + return pth.to_string(); + } + + pth.trim_start_matches('/').strip_prefix(self.sensors_root.trim_start_matches('/')).unwrap_or(pth).trim_start_matches('/').to_string() + } + + fn init(&mut self, spth: PathBuf) -> Self { + self.spth = spth; + self.stack = scan_files_sha256(self.spth.to_owned(), None) + .iter() + .map(|(f, cs)| (cs.to_owned(), PathBuf::from(f.to_owned()))) + .collect::>(); + + for (pth, cs) in self.files.iter() { + let pth = self.unprefix_path(pth); + let p = self.spth.join(&pth); + if p.exists() { + get_file_sha256(p.clone()) + .map(|c| { + if c != *cs { + log::warn!("Checksum mismatch for sensor file '{}': expected {}, got {}", pth, cs, c); + } + }) + .unwrap_or_else(|e| log::error!("Failed to calculate checksum for '{}': {e}", &p.display())); + } + } + self.files.retain(|_, cs| !self.stack.contains_key(cs)); + self.clone() + } + + pub fn files(&self) -> &HashMap { + &self.files + } + + pub fn sensors_root(&self) -> &str { + &self.sensors_root + } +} diff --git a/sysminion/src/main.rs b/sysminion/src/main.rs index 342e9147..fdfd1b25 100644 --- a/sysminion/src/main.rs +++ b/sysminion/src/main.rs @@ -25,7 +25,13 @@ static VERSION: &str = "0.4.0"; static LOGGER: OnceLock = OnceLock::new(); fn start_minion(cfg: MinionConfig, fp: Option) -> Result<(), SysinspectError> { - let runtime = tokio::runtime::Runtime::new().map_err(|e| SysinspectError::DynError(Box::new(e)))?; + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .max_blocking_threads(4) + .enable_all() + .build() + .map_err(|e| SysinspectError::DynError(Box::new(e)))?; + runtime.block_on(async { loop { let c_cfg = cfg.clone(); diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index 8af3d65d..5b6ff406 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -1,6 +1,6 @@ use crate::{ callbacks::{ActionResponseCallback, ModelResponseCallback}, - filedata::MinionFiledata, + filedata::{MinionFiledata, SensorsFiledata}, proto::{ self, msg::{CONNECTION_TX, ExitState}, @@ -13,7 +13,8 @@ use colored::Colorize; use indexmap::IndexMap; use libcommon::SysinspectError; use libdpq::{DiskPersistentQueue, WorkItem}; -use libmodpak::MODPAK_SYNC_STATE; +use libmodpak::{MODPAK_SYNC_STATE, SysInspectModPakMinion}; +use libsensors::service::SensorService; use libsetup::get_ssh_client_ip; use libsysinspect::{ cfg::{ @@ -27,7 +28,10 @@ use libsysinspect::{ inspector::SysInspector, }, mdescr::mspecdef::ModelSpec, - reactor::fmt::{formatter::StringFormatter, kvfmt::KeyValueFormatter}, + reactor::{ + evtproc::EventProcessor, + fmt::{formatter::StringFormatter, kvfmt::KeyValueFormatter}, + }, rsa, traits::{self}, util::{self, dataconv}, @@ -53,10 +57,13 @@ use std::{ time::{Duration, Instant}, vec, }; -use tokio::net::{TcpStream, tcp::OwnedReadHalf}; use tokio::sync::Mutex; use tokio::{io::AsyncReadExt, time::sleep}; use tokio::{io::AsyncWriteExt, net::tcp::OwnedWriteHalf}; +use tokio::{ + net::{TcpStream, tcp::OwnedReadHalf}, + task::JoinHandle, +}; use uuid::Uuid; /// Session Id of the minion @@ -78,6 +85,14 @@ pub struct SysMinion { pt_counter: Mutex, dpq: Arc, connected: AtomicBool, + + minion_id: String, + + sensors_task: Mutex>>, + sensors_pump: Mutex>>, + ping_task: Mutex>>, + proto_task: Mutex>>, + stats_task: Mutex>>, } impl SysMinion { @@ -100,6 +115,12 @@ impl SysMinion { pt_counter: Mutex::new(PTCounter::new()), dpq, connected: AtomicBool::new(false), + minion_id: dataconv::as_str(traits::get_minion_traits(None).get(traits::SYS_ID)), + sensors_task: Mutex::new(None), + sensors_pump: Mutex::new(None), + ping_task: Mutex::new(None), + proto_task: Mutex::new(None), + stats_task: Mutex::new(None), }; log::debug!("Instance set up with root directory at {}", cfg.root_dir().to_str().unwrap_or_default()); instance.init()?; @@ -136,6 +157,12 @@ impl SysMinion { fs::create_dir_all(self.cfg.functions_dir())?; } + // Place for sensors config + if !self.cfg.sensors_dir().exists() { + log::debug!("Creating directory for the sensors config at {}", self.cfg.sensors_dir().as_os_str().to_str().unwrap_or_default()); + fs::create_dir_all(self.cfg.sensors_dir())?; + } + let mut out: Vec = vec![]; for t in traits::get_minion_traits(Some(&self.cfg)).trait_keys() { out.push(format!("{}: {}", t.to_owned(), dataconv::to_string(traits::get_minion_traits(None).get(&t)).unwrap_or_default())); @@ -145,6 +172,34 @@ impl SysMinion { Ok(()) } + /// Stop sensors by aborting their tasks. This is used when the master + /// disconnects or becomes unresponsive, to stop the sensors and prepare for reconnection. + async fn stop_sensors(&self) { + if let Some(h) = self.sensors_pump.lock().await.take() { + h.abort(); + let _ = h.await; + } + if let Some(h) = self.sensors_task.lock().await.take() { + h.abort(); + let _ = h.await; + } + } + + async fn stop_background(&self) { + if let Some(h) = self.ping_task.lock().await.take() { + h.abort(); + let _ = h.await; + } + if let Some(h) = self.proto_task.lock().await.take() { + h.abort(); + let _ = h.await; + } + if let Some(h) = self.stats_task.lock().await.take() { + h.abort(); + let _ = h.await; + } + } + /// Display minion info pub fn print_info(cfg: &MinionConfig) { let mut out: IndexMap = IndexMap::new(); @@ -177,8 +232,8 @@ impl SysMinion { } /// Get current minion Id - fn get_minion_id(&self) -> String { - dataconv::as_str(traits::get_minion_traits(None).get(traits::SYS_ID)) + fn get_minion_id(&self) -> &str { + &self.minion_id } /// Talk-back to the master @@ -207,17 +262,22 @@ impl SysMinion { /// That should kick Minion to start reconnecting. pub async fn do_ping_update(self: Arc, state: Arc) -> Result<(), SysinspectError> { let reconnect_sender = CONNECTION_TX.clone(); - tokio::spawn(async move { - loop { - sleep(tokio::time::Duration::from_secs(1)).await; - if self.as_ptr().last_ping.lock().await.elapsed() > self.as_ptr().ping_timeout { - log::warn!("Master seems unresponsive, triggering reconnect."); - let _ = reconnect_sender.send(()); - state.exit.store(true, std::sync::atomic::Ordering::Relaxed); - break; + let h = tokio::spawn({ + let this = self.clone(); + async move { + loop { + sleep(Duration::from_secs(1)).await; + if this.last_ping.lock().await.elapsed() > this.ping_timeout { + log::warn!("Master seems unresponsive, triggering reconnect."); + let _ = reconnect_sender.send(()); + state.exit.store(true, Ordering::Relaxed); + break; + } } } }); + + *self.ping_task.lock().await = Some(h); Ok(()) } @@ -232,33 +292,114 @@ impl SysMinion { self.connected.load(Ordering::Relaxed) } + /// Start sensors based on the configuration, and pump their events to the reactor. + /// This is a separate async task that runs in parallel with the main loop, and is responsible + /// for keeping the sensors running and their events flowing to the reactor. + pub async fn do_sensors(self: Arc, sensors: SensorsFiledata) -> Result<(), SysinspectError> { + // download + for f in sensors.files().keys() { + log::info!("Sensor file '{}' with checksum {} needs to be downloaded or updated.", f, sensors.files().get(f).unwrap_or(&"".to_string())); + + match self.as_ptr().download_file(f).await { + Ok(content) => { + let pth = self.cfg.sensors_dir().join(sensors.unprefix_path(f.trim_start_matches('/'))); + + if let Some(parent) = pth.parent() { + if !parent.exists() + && let Err(e) = fs::create_dir_all(parent) { + log::error!("Failed to create directories for '{}': {e}", pth.display()); + continue; + } + if let Err(e) = fs::write(&pth, content) { + log::error!("Failed to write sensor file '{}': {e}", pth.display()); + } + } + } + Err(e) => log::error!("Failed to download sensor file '{}': {e}", f), + } + } + + self.stop_sensors().await; + log::info!("Restarting sensors service"); + + // Load spec before spawning anything + let spec = match libsensors::load(self.cfg.sensors_dir().as_path()) { + Ok(spec) => spec, + Err(e) => { + log::error!("Failed to load sensors spec: {e}"); + return Ok(()); + } + }; + + libsysinspect::reactor::handlers::registry::init_handlers(); + + let mut events = EventProcessor::new(); + if let Some(cfg) = spec.events_config() { + events = events.set_config(Arc::new(cfg.clone()), None); + } + + let events = Arc::new(Mutex::new(events)); + + // Spawn pump and store handle immediately + let events_pump = events.clone(); + let pump_handle = tokio::spawn(async move { + loop { + { + let mut ep = events_pump.lock().await; + ep.process(true).await; + } + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + }); + *self.sensors_pump.lock().await = Some(pump_handle); + + // Spawn service task and store it + let sensors_task = tokio::spawn({ + let events = events.clone(); + async move { + let mut service = SensorService::new(spec); + service.set_event_processor(events); + service.start(); + } + }); + *self.sensors_task.lock().await = Some(sensors_task); + + Ok(()) + } + pub async fn do_stats_update(self: Arc) -> Result<(), SysinspectError> { - tokio::spawn(async move { + let this = self.clone(); + let handle = tokio::spawn(async move { loop { - sleep(tokio::time::Duration::from_secs(5)).await; - let this = self.as_ptr(); + sleep(Duration::from_secs(5)).await; let mut ptc = this.pt_counter.lock().await; ptc.update_stats(); } }); + + *self.stats_task.lock().await = Some(handle); Ok(()) } pub async fn do_proto(self: Arc) -> Result<(), SysinspectError> { let rstm = Arc::clone(&self.rstm); + let this = self.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { loop { let mut buff = [0u8; 4]; if let Err(e) = rstm.lock().await.read_exact(&mut buff).await { - log::trace!("Unknown message length from the master: {e}"); + log::warn!("Proto read failed (len): {e}; triggering reconnect"); + let _ = CONNECTION_TX.send(()); break; } - let msg_len = u32::from_be_bytes(buff) as usize; + let msg_len = u32::from_be_bytes(buff) as usize; let mut msg = vec![0u8; msg_len]; + if let Err(e) = rstm.lock().await.read_exact(&mut msg).await { - log::error!("Invalid message from the master: {e}"); + log::warn!("Proto read failed (msg): {e}; triggering reconnect"); + let _ = CONNECTION_TX.send(()); break; } @@ -273,90 +414,105 @@ impl SysMinion { log::trace!("Received Master message: {msg:#?}"); match msg.req_type() { - RequestType::Add => { - log::debug!("Master accepts registration"); - } + RequestType::Add => log::debug!("Master accepts registration"), RequestType::Reconnect => { - log::debug!("Master requires reconnection"); - log::info!("{}", msg.payload()); - std::process::exit(0); + log::warn!("Master requires reconnection: {}", msg.payload()); + let _ = CONNECTION_TX.send(()); + break; } RequestType::Remove => { log::debug!("Master asks to unregister"); + let _ = CONNECTION_TX.send(()); + break; } + RequestType::Command => { log::debug!("Master sends a command"); match msg.get_retcode() { ProtoErrorCode::Success => { let scheme = msg.target().scheme().to_string(); + if scheme.starts_with(SCHEME_COMMAND) { - self.as_ptr().call_internal_command(&scheme).await; + this.as_ptr().call_internal_command(&scheme).await; continue; } - if let Err(e) = self.as_ptr().dpq.add(WorkItem::MasterCommand(msg.to_owned())) { + + if let Err(e) = this.as_ptr().dpq.add(WorkItem::MasterCommand(msg.to_owned())) { log::error!("Failed to enqueue master command: {e}"); } else { log::info!("Scheduled master command: {}", msg.target().scheme()); } } + ProtoErrorCode::AlreadyConnected => { - if !self.is_connected() { + if !this.is_connected() { log::error!("Another minion from this machine is already connected"); - std::process::exit(1); + let _ = CONNECTION_TX.send(()); + break; } } - ret => { - log::debug!("Return code {ret:?} not yet implemented"); - } + + ret => log::debug!("Return code {ret:?} not yet implemented"), } } + RequestType::Traits => { - if self.as_ptr().get_minion_id().eq(msg.target().id()) - && let Err(err) = self.as_ptr().send_traits().await - { - log::error!("Unable to send traits: {err}"); - } else { - log::info!("Connected"); - self.as_ptr().set_connected(true); + if this.get_minion_id() != msg.target().id() { + log::debug!("Traits request for {}, not me; dropping", msg.target().id()); + continue; + } + + match this.as_ptr().send_traits().await { + Ok(_) => { + log::info!("Connected"); + this.set_connected(true); + } + Err(err) => log::error!("Unable to send traits: {err}"), } } + RequestType::AgentUnknown => { - let pbk_pem = dataconv::as_str(Some(msg.payload()).cloned()); // Expected PEM RSA pub key + let pbk_pem = dataconv::as_str(Some(msg.payload()).cloned()); let (_, pbk) = match rsa::keys::from_pem(None, Some(&pbk_pem)) { Ok(val) => val, Err(e) => { log::error!("Failed to parse PEM: {e}"); - std::process::exit(1); + let _ = CONNECTION_TX.send(()); + break; } }; + let pbk = match pbk { Some(key) => key, None => { log::error!("No public key found in PEM"); - std::process::exit(1); + let _ = CONNECTION_TX.send(()); + break; } }; + let fpt = match rsa::keys::get_fingerprint(&pbk) { Ok(fp) => fp, Err(e) => { log::error!("Failed to get fingerprint: {e}"); - std::process::exit(1); + let _ = CONNECTION_TX.send(()); + break; } }; log::error!("Minion is not registered"); log::info!("Master fingerprint: {fpt}"); - std::process::exit(1); + let _ = CONNECTION_TX.send(()); + break; } + RequestType::Ping => { let p = msg.payload(); - match serde_json::from_value::(p.clone()) { Ok(ProtoValue::PingTypeGeneral) => { let (loadavg, is_done, doneids, io_bps, cpu_usage) = { - let this = self.as_ptr(); let mut ptc = this.pt_counter.lock().await; let (l, d, i, io, cpu) = (ptc.get_loadaverage(), ptc.is_done(), ptc.get_done(), ptc.get_io_bps(), ptc.get_cpu_usage()); @@ -373,34 +529,49 @@ impl SysMinion { "cpu": cpu_usage, }); - self.request(proto::msg::get_pong(ProtoValue::PingTypeGeneral, Some(pl))).await; - self.update_ping().await; + this.request(proto::msg::get_pong(ProtoValue::PingTypeGeneral, Some(pl))).await; + this.update_ping().await; } + Ok(ProtoValue::PingTypeDiscovery) => { - // XXX: On Discovery ping, we also send our traits and all the info for cluster log::debug!("Received discovery ping from master"); - self.request(proto::msg::get_pong(ProtoValue::PingTypeDiscovery, None)).await; - } - Err(e) => { - log::warn!("Invalid ping payload `{}`: {}", p, e); + this.request(proto::msg::get_pong(ProtoValue::PingTypeDiscovery, None)).await; + this.update_ping().await; } + + Err(e) => log::warn!("Invalid ping payload `{}`: {}", p, e), } } + RequestType::ByeAck => { - log::info!("Master confirmed shutdown, terminating"); - std::process::exit(0); + log::info!("Master confirmed shutdown"); + let _ = CONNECTION_TX.send(()); + break; } - _ => { - log::error!("Unknown request type"); + + RequestType::SensorsSyncResponse => { + log::info!("Received sensors sync response from master"); + let sensors = SensorsFiledata::from_payload(msg.payload().clone(), this.cfg.sensors_dir()).unwrap_or_else(|e| { + log::error!("Failed to parse sensors payload: {e}"); + SensorsFiledata::default() + }); + + if let Err(e) = this.as_ptr().do_sensors(sensors).await { + log::error!("Failed to start sensors: {e}"); + } } + + _ => log::error!("Unknown request type"), } } }); + + *self.proto_task.lock().await = Some(handle); Ok(()) } pub async fn send_traits(self: Arc) -> Result<(), SysinspectError> { - let mut r = MinionMessage::new(self.get_minion_id(), RequestType::Traits, traits::get_minion_traits(None).to_json_value()?); + let mut r = MinionMessage::new(self.get_minion_id().to_string(), RequestType::Traits, traits::get_minion_traits(None).to_json_value()?); r.set_sid(MINION_SID.to_string()); self.request(r.sendable().map_err(|e| { log::error!("Error preparing traits message: {e}"); @@ -437,14 +608,20 @@ impl SysMinion { pub async fn send_callback(self: Arc, ar: ActionResponse) -> Result<(), SysinspectError> { log::debug!("Sending sync callback on {}", ar.aid()); log::debug!("Callback: {ar:#?}"); - self.request(MinionMessage::new(self.get_minion_id(), RequestType::Event, json!(ar)).sendable()?).await; + self.request(MinionMessage::new(self.get_minion_id().to_string(), RequestType::Event, json!(ar)).sendable()?).await; Ok(()) } /// Send finalisation marker callback to the master on the results pub async fn send_fin_callback(self: Arc, ar: ActionResponse) -> Result<(), SysinspectError> { log::debug!("Sending fin sync callback on {}", ar.aid()); - self.request(MinionMessage::new(self.get_minion_id(), RequestType::ModelEvent, json!(ar)).sendable()?).await; + self.request(MinionMessage::new(self.get_minion_id().to_string(), RequestType::ModelEvent, json!(ar)).sendable()?).await; + Ok(()) + } + + pub async fn send_sensors_sync(self: Arc) -> Result<(), SysinspectError> { + log::info!("Sending sensors sync callback for cycle"); + self.request(MinionMessage::new(self.get_minion_id().to_string(), RequestType::SensorsSyncRequest, json!({})).sendable()?).await; Ok(()) } @@ -614,9 +791,10 @@ impl SysMinion { } CLUSTER_SYNC => { log::info!("Syncing the minion with the master"); - if let Err(e) = libmodpak::SysInspectModPakMinion::new(self.cfg.clone()).sync().await { + if let Err(e) = SysInspectModPakMinion::new(self.cfg.clone()).sync().await { log::error!("Failed to sync minion with master: {e}"); } + let _ = self.as_ptr().send_sensors_sync().await; } _ => { log::warn!("Unknown command: {cmd}"); @@ -707,7 +885,7 @@ impl SysMinion { /// Constructs and starts an actual minion async fn _minion_instance(cfg: MinionConfig, fingerprint: Option, dpq: Arc) -> Result<(), SysinspectError> { let state = Arc::new(ExitState::new()); - let modpak = libmodpak::SysInspectModPakMinion::new(cfg.clone()); + let modpak = SysInspectModPakMinion::new(cfg.clone()); let minion = SysMinion::new(cfg.clone(), fingerprint, dpq).await?; let m = minion.as_ptr(); @@ -718,7 +896,7 @@ async fn _minion_instance(cfg: MinionConfig, fingerprint: Option, dpq: A let m = m.clone(); async move { match item { - WorkItem::MasterCommand(cmd) => { + WorkItem::MasterCommand(cmd) | WorkItem::EventCommand(cmd) => { while MODPAK_SYNC_STATE.is_syncing().await { tokio::time::sleep(Duration::from_millis(200)).await; } @@ -745,6 +923,9 @@ async fn _minion_instance(cfg: MinionConfig, fingerprint: Option, dpq: A } } + // Send sensors sync request + minion.as_ptr().send_sensors_sync().await?; + minion.as_ptr().do_ping_update(state.clone()).await?; minion.as_ptr().do_stats_update().await?; @@ -753,6 +934,9 @@ async fn _minion_instance(cfg: MinionConfig, fingerprint: Option, dpq: A sleep(tokio::time::Duration::from_millis(200)).await; } + minion.as_ptr().stop_sensors().await; + minion.as_ptr().stop_background().await; + runner.abort(); let _ = runner.await; diff --git a/sysminion/src/ptcounter.rs b/sysminion/src/ptcounter.rs index 81cff8d8..917623ab 100644 --- a/sysminion/src/ptcounter.rs +++ b/sysminion/src/ptcounter.rs @@ -199,7 +199,7 @@ impl PTCounter { let mut top: Vec<&DiskStats> = self.disk_stats.iter().collect(); top.sort_by(|a, b| b.write_bps.partial_cmp(&a.write_bps).unwrap_or(std::cmp::Ordering::Equal)); - log::debug!( + log::trace!( "Stats: loadavg(5m)={:.2}, cpu={:.1}%, procs={}, top_writers={:#?}", self.loadaverage, self.cpu_usage,