Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
660 changes: 660 additions & 0 deletions .benchmarks/Linux-CPython-3.14-64bit/0001_baseline.json

Large diffs are not rendered by default.

87 changes: 85 additions & 2 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,30 @@
"problemMatcher": []
},
{
"label": "Run App (instrumented)",
"label": "Run App (instrumented/console exporter)",
"type": "shell",
"command": "${workspaceFolder}${/}.venv${/}Scripts${/}opentelemetry-instrument ${command:python.interpreterPath} -m app 2>&1",
"command": "${workspaceFolder}${/}.venv${/}bin${/}opentelemetry-instrument ${workspaceFolder}${/}.venv${/}bin${/}python -m app 2>&1",
"options": {
"env": {
"APP_REDIS_HOSTNAME": "localhost",
"APP_REDIS_PORT": "6379",
"APP_MQTT_HOSTNAME": "localhost",
"APP_MQTT_PORT": "1883",
"APP_MQTT_IDENTIFIER": "app",
"APP_MQTT_CONNECT_TIMEOUT": "10",
"APP_MQTT_PUBLISH_TIMEOUT": "5",
//"APP_MQTT_SAT_TOKEN_PATH": "/var/run/secrets/tokens/broker-sat",
//"APP_MQTT_TLS_CERT_PATH": "/var/run/certs/ca.crt",
"APP_PROCESSING_MESSAGE_EXPIRY_INTERVAL": "10",
"APP_PROCESSING_SHARED_SUBSCRIPTION_NAME": "appsub",
"APP_PROCESSING_TOPIC_PREFIXES": "greetings:app/greetings,machinery-jobs:app/jobs",
"APP_PROCESSING_TOPIC_WILDCARD_LEVELS": "greetings:0,machinery-jobs:3",
"APP_PROCESSING_RESPONSE_TOPICS": "greetings:app/greetings/responses,machinery-jobs:app/jobs/responses",
"APP_PROCESSING_CLOUDEVENT_SOURCE": "https://aschamberger.github.com/microdcs/app",
"APP_LOGGING_LEVEL": "DEBUG",
"APP_MSGPACK_HOSTNAME": "localhost",
"APP_MSGPACK_PORT": "8888",
"KUBECONFIG": "${userHome}${/}.kube${/}config.yaml",
"APP_PROCESSING_OTEL_INSTRUMENTATION_ENABLED": "true",
"OTEL_SERVICE_NAME": "microdcs.app",
"OTEL_TRACES_EXPORTER": "console", // console, otlp, none
Expand All @@ -64,6 +83,70 @@
"dependsOrder": "parallel",
"problemMatcher": []
},
{
"label": "Run App (instrumented/Aspire dashboard)",
"type": "shell",
"command": "${workspaceFolder}${/}.venv${/}bin${/}opentelemetry-instrument ${workspaceFolder}${/}.venv${/}bin${/}python -m app 2>&1",
"options": {
"env": {
"APP_REDIS_HOSTNAME": "localhost",
"APP_REDIS_PORT": "6379",
"APP_MQTT_HOSTNAME": "localhost",
"APP_MQTT_PORT": "1883",
"APP_MQTT_IDENTIFIER": "app",
"APP_MQTT_CONNECT_TIMEOUT": "10",
"APP_MQTT_PUBLISH_TIMEOUT": "5",
//"APP_MQTT_SAT_TOKEN_PATH": "/var/run/secrets/tokens/broker-sat",
//"APP_MQTT_TLS_CERT_PATH": "/var/run/certs/ca.crt",
"APP_PROCESSING_MESSAGE_EXPIRY_INTERVAL": "10",
"APP_PROCESSING_SHARED_SUBSCRIPTION_NAME": "appsub",
"APP_PROCESSING_TOPIC_PREFIXES": "greetings:app/greetings,machinery-jobs:app/jobs",
"APP_PROCESSING_TOPIC_WILDCARD_LEVELS": "greetings:0,machinery-jobs:3",
"APP_PROCESSING_RESPONSE_TOPICS": "greetings:app/greetings/responses,machinery-jobs:app/jobs/responses",
"APP_PROCESSING_CLOUDEVENT_SOURCE": "https://aschamberger.github.com/microdcs/app",
"APP_LOGGING_LEVEL": "DEBUG",
"APP_MSGPACK_HOSTNAME": "localhost",
"APP_MSGPACK_PORT": "8888",
"KUBECONFIG": "${userHome}${/}.kube${/}config.yaml",
"APP_PROCESSING_OTEL_INSTRUMENTATION_ENABLED": "true",
"OTEL_SERVICE_NAME": "microdcs.app",
"OTEL_TRACES_EXPORTER": "otlp",
"OTEL_METRICS_EXPORTER": "otlp",
"OTEL_LOGS_EXPORTER": "otlp",
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4317",
"OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
"OTEL_PYTHON_LOG_CORRELATION": "true",
"OTEL_PYTHON_LOG_LEVEL": "debug",
"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED": "true",
"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS": "system_metrics,threading",
"OTEL_SEMCONV_STABILITY_OPT_IN": "http,database,messaging"
}
},
"group": "test",
"dependsOn": ["Start local MQTT broker", "Start local redis server", "Start Aspire dashboard"],
"dependsOrder": "parallel",
"problemMatcher": []
},
{
"label": "Start Aspire dashboard",
"type": "shell",
"command": "docker start aspire-dashboard 2>/dev/null || docker run --rm -p 18888:18888 -p 4317:18889 -p 4318:18890 -d --name aspire-dashboard mcr.microsoft.com/dotnet/aspire-dashboard:latest",
"group": "test",
"isBackground": true,
"problemMatcher": {
"pattern": {
"regexp": ".",
"file": 1,
"location": 2,
"message": 3
},
"background": {
"activeOnStart": true,
"beginsPattern": ".",
"endsPattern": "."
}
}
},
{
"label": "Start local MQTT broker",
"type": "shell",
Expand Down
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.14-trixie-slim AS builder

# Add git for installing dependencies from git repositories
RUN apt-get update && apt-get install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/*

# Install the project into `/app`
WORKDIR /app

Expand Down Expand Up @@ -62,5 +65,5 @@ ENV PYTHONUNBUFFERED=1
ENTRYPOINT []

# Run the application
#CMD ["opentelemetry-instrument", "python3", "-m", "app", "2>&1"]
CMD ["python3", "-m", "app"]
CMD ["opentelemetry-instrument", "python3", "-m", "app"]
#CMD ["python3", "-m", "app"]
66 changes: 66 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ Run test coverage:
uv run pytest --cov=microdcs --cov-report=term-missing tests/ --ignore=tests/test_mqtt_integration.py --ignore=tests/test_msgpack_integration.py
```

Run benchmarks and save results:

```bash
uv run pytest --benchmark-only --benchmark-autosave
```

Compare against a previously saved baseline:

```bash
# Save current results as the named baseline
uv run pytest --benchmark-only --benchmark-save=baseline

# Later, compare against that baseline
uv run pytest --benchmark-only --benchmark-compare=baseline
```

Benchmarks are automatically disabled during normal test runs. They only execute when `--benchmark-only` is passed, or when running the full suite with `--benchmark-autosave` (which runs them alongside regular tests and persists results under `.benchmarks/`).

The benchmarked tests cover the per-message hot paths:

* **`test_common.py`** — `CloudEvent` JSON and msgpack round-trips; payload serialization/deserialization
* **`test_dataclass.py`** — `DataClassMixin` JSON and msgpack round-trips
* **`test_sfc_recipe.py`** — `SfcStep` and full `SfcRecipe` JSON and msgpack round-trips
* **`statemachine_test.py`** — ISA95 job state machine trigger dispatch

Run the example application:

```bash
Expand Down Expand Up @@ -213,6 +238,47 @@ Generate the SFC recipe dataclasses:
uv run microdcs dataclassgen dataclasses sfc_recipe.schema.json
```

## OpenTelemetry Analysis with Aspire Dashboard

The [Aspire dashboard](https://aspire.dev/dashboard/standalone/) can be run as a standalone container to visualise traces, metrics, and structured logs from any OpenTelemetry-enabled app — no .NET or full Aspire installation required.

Start the dashboard container:

```bash
docker start aspire-dashboard 2>/dev/null || docker run --rm -p 18888:18888 -p 4317:18889 -p 4318:18890 -d --name aspire-dashboard mcr.microsoft.com/dotnet/aspire-dashboard:latest
```

Port mapping:

| Host port | Container port | Purpose |
|---|---|---|
| `18888` | `18888` | Dashboard UI |
| `4317` | `18889` | OTLP/gRPC endpoint |
| `4318` | `18890` | OTLP/HTTP endpoint |

Open `http://localhost:18888` in the browser. The dashboard is secured with a login token by default — retrieve it from the container logs:

```bash
docker logs aspire-dashboard 2>&1 | grep "login?t="
```

To run the app and send telemetry to the dashboard, configure the OTLP exporter and start via `opentelemetry-instrument`:

```bash
APP_PROCESSING_OTEL_INSTRUMENTATION_ENABLED=true \
OTEL_SERVICE_NAME=microdcs.app \
OTEL_TRACES_EXPORTER=otlp \
OTEL_METRICS_EXPORTER=otlp \
OTEL_LOGS_EXPORTER=otlp \
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \
OTEL_EXPORTER_OTLP_PROTOCOL=grpc \
OTEL_PYTHON_LOG_CORRELATION=true \
OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true \
.venv/bin/opentelemetry-instrument python -m app
```

Or use the VS Code task **Run App (instrumented/Aspire dashboard)** which starts MQTT, Redis, and the Aspire dashboard automatically.

## Documentation

The documentation site is built with Zensical.
Expand Down
19 changes: 1 addition & 18 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ structure `APP_{SECTION}_{FIELD}`.
| `APP_MQTT_HOSTNAME` | `str` | `localhost` | MQTT broker hostname |
| `APP_MQTT_PORT` | `int` | `1883` | MQTT broker port |
| `APP_MQTT_IDENTIFIER` | `str` | `app_client` | MQTT client identifier |
| `APP_MQTT_CONNECT_TIMEOUT` | `int` | `10` | Broker connection timeout (seconds) |
| `APP_MQTT_PUBLISH_TIMEOUT` | `int` | `5` | Publish confirmation timeout (seconds) |
| `APP_MQTT_SAT_TOKEN_PATH` | `Path` | `/var/run/secrets/tokens/broker-sat` | Path to SAT token for broker auth |
| `APP_MQTT_TLS_CERT_PATH` | `Path` | `/var/run/certs/ca.crt` | CA certificate for TLS connections |
| `APP_MQTT_INCOMING_QUEUE_SIZE` | `int` | `0` (unbounded) | Max queued incoming messages in aiomqtt client |
| `APP_MQTT_OUTGOING_QUEUE_SIZE` | `int` | `0` (unbounded) | Max queued outgoing messages in aiomqtt client |
| `APP_MQTT_MESSAGE_WORKERS` | `int` | `5` | Concurrent tasks processing incoming messages |
| `APP_MQTT_DEDUPE_TTL_SECONDS` | `int` | `600` | TTL for Redis deduplication keys (seconds) |
| `APP_MQTT_BINDING_OUTGOING_QUEUE_SIZE` | `int` | `5` | Per-binding outgoing queue capacity |
| `APP_MQTT_SESSION_EXPIRY_INTERVAL` | `int` | `4294967295` | MQTT v5 session expiry interval in seconds (`2³²−1` = never expire) |

### MessagePack RPC (`APP_MSGPACK_*`)

Expand Down Expand Up @@ -255,8 +252,6 @@ Each queue has a distinct fill condition and a distinct consequence when full.

| Queue | Config variable | Default | Fills when | Producer behaviour when full |
|---|---|---|---|---|
| MQTT incoming | `APP_MQTT_INCOMING_QUEUE_SIZE` | `0` (unbounded) | Messages arrive faster than `message_workers` can dispatch | With default `0`: unbounded growth until OOM. With a finite value: backpressure to aiomqtt receive loop — new messages are not read from the socket until space is available |
| MQTT outgoing | `APP_MQTT_OUTGOING_QUEUE_SIZE` | `0` (unbounded) | Processors produce outgoing events faster than the MQTT handler can publish | With default `0`: unbounded growth until OOM. With a finite value: paho client blocks publish calls until space is available |
| MQTT binding outgoing | `APP_MQTT_BINDING_OUTGOING_QUEUE_SIZE` | `5` | A single binding's outgoing events accumulate faster than the handler drains them | **Raises `RuntimeError`** — the producer is not blocked, the error propagates to the caller |
| MessagePack binding outgoing | `APP_MSGPACK_BINDING_OUTGOING_QUEUE_SIZE` | `5` | Outgoing notification frames queue faster than connected clients consume them | **Raises `RuntimeError`** — same behaviour as MQTT binding queues |
| MessagePack concurrent requests | `APP_MSGPACK_MAX_CONCURRENT_REQUESTS` | `10` | More than N simultaneous `publish` RPC calls arrive from the same client | Server stops reading from the socket — TCP-level backpressure to the client |
Expand All @@ -267,25 +262,13 @@ Each queue has a distinct fill condition and a distinct consequence when full.
to the cap value with a warning log. If the protocol-level setting is `0`, the cap value
is used as the effective size.

!!! warning "Unbounded MQTT queues by default"
The MQTT incoming and outgoing queues default to `0` (unbounded). This means they will
never apply backpressure — instead, memory grows without bound under sustained overload.
For production deployments, set explicit finite values for `APP_MQTT_INCOMING_QUEUE_SIZE`
and `APP_MQTT_OUTGOING_QUEUE_SIZE` based on your expected burst profile.

### Isolation

The binding-level queues (`APP_MQTT_BINDING_OUTGOING_QUEUE_SIZE`,
`APP_MSGPACK_BINDING_OUTGOING_QUEUE_SIZE`) are per-binding instances. A slow or
saturated binding does not affect other bindings — a tightening controller processor
that falls behind does not block a QA camera processor from publishing its results.

The shared queues (`APP_MQTT_INCOMING_QUEUE_SIZE`, `APP_MQTT_OUTGOING_QUEUE_SIZE`)
sit at the handler level and are shared across all bindings registered with that
handler. A sustained fill on either of these affects the whole handler and therefore
all processors attached to it. Sizing these queues appropriately for the expected burst
profile is the primary tuning lever for the shared transport layer.

### Tuning Signals

Before a queue fills, it will show up as latency — the time between an MQTT message
Expand Down
16 changes: 15 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ authors = [
]
requires-python = ">=3.14"
dependencies = [
"aiomqtt>=2.4.0",
"aiomqtt @ git+https://github.com/empicano/aiomqtt.git@main",
"mashumaro[orjson]>=3.21",
"msgpack>=1.1.2",
"opentelemetry-distro>=0.60b1",
"opentelemetry-exporter-otlp-proto-grpc>=1.41.1",
"opentelemetry-exporter-otlp-proto-http>=1.41.1",
"opentelemetry-instrumentation-aiomqtt",
"opentelemetry-instrumentation-redis>=0.62b0",
"redis[hiredis]>=7.4.0",
"transitions>=0.9.3",
Expand All @@ -31,6 +34,7 @@ dev = [
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
"pytest-cov>=7.0.0",
"pytest-benchmark>=5.2.3",
]

[project.scripts]
Expand All @@ -46,5 +50,15 @@ markers = [
"integration: integration tests requiring external services (MQTT, Redis)",
]

[tool.uv]
override-dependencies = [
"opentelemetry-api==1.41.1",
"opentelemetry-instrumentation==0.62b1",
"opentelemetry-semantic-conventions==0.62b1",
]

[tool.uv.sources]
opentelemetry-instrumentation-aiomqtt = { git = "https://github.com/aschamberger/opentelemetry-python-contrib", subdirectory = "instrumentation/opentelemetry-instrumentation-aiomqtt", branch = "feat/aiomqtt-instrumentation" }

[tool.ruff]
format.preview = true
9 changes: 1 addition & 8 deletions src/microdcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ class MQTTConfig:
hostname: str = "localhost"
port: int = 1883
identifier: str = "app_client"
connect_timeout: int = 10
publish_timeout: int = 5
sat_token_path: Path = Path("/var/run/secrets/tokens/broker-sat")
tls_cert_path: Path = Path("/var/run/certs/ca.crt")
incoming_queue_size: int = 0
outgoing_queue_size: int = 0
message_workers: int = 5
dedupe_ttl_seconds: int = 60 * 10 # 10 minutes
binding_outgoing_queue_size: int = 5
session_expiry_interval: int = 2**32 - 1 # never expire


@dataclass
Expand Down Expand Up @@ -321,10 +318,6 @@ def require_positive(value: int, field_name: str) -> None:
require_port(self.mqtt.port, "mqtt.port")
require_port(self.msgpack.port, "msgpack.port")

require_positive(self.mqtt.connect_timeout, "mqtt.connect_timeout")
require_positive(self.mqtt.publish_timeout, "mqtt.publish_timeout")
require_non_negative(self.mqtt.incoming_queue_size, "mqtt.incoming_queue_size")
require_non_negative(self.mqtt.outgoing_queue_size, "mqtt.outgoing_queue_size")
require_positive(self.mqtt.message_workers, "mqtt.message_workers")
require_non_negative(self.mqtt.dedupe_ttl_seconds, "mqtt.dedupe_ttl_seconds")
require_non_negative(
Expand Down
Loading