From 61c6c930e8a35d29e802d38687f3b2159bccbb5d Mon Sep 17 00:00:00 2001 From: jim <303600370@qq.com> Date: Thu, 11 Dec 2025 08:05:07 +0000 Subject: [PATCH 1/5] feat(script): introduce Rhai script module for request/response handling This commit adds a new `ferron-script` module that allows users to define and execute Rhai scripts within the Ferron server. The module supports various triggers such as `on_request_start` and `on_response_ready`, enabling dynamic request handling and response modification. Additionally, it includes configuration options for script limits and failure policies, enhancing the server's extensibility and flexibility. New files include: - `ferron-script` crate with core functionality - Example scripts for authentication and response modification - Documentation for the script module and testing guidelines Also updated `.gitignore` to exclude build artifacts and added `AGENTS.md` for repository guidelines. --- .gitignore | 3 + Cargo.lock | 140 ++- Cargo.toml | 1 + docs/rhai-script-module.md | 132 +++ ferron-build.yaml | 3 + ferron-common/src/config.rs | 11 + ferron-common/src/util/module_cache.rs | 6 + ferron-load-modules/Cargo.toml | 1 + ferron-modules-builtin/Cargo.toml | 4 + ferron-modules-builtin/src/lib.rs | 3 + ferron-script/Cargo.toml | 21 + ferron-script/TESTING.md | 256 +++++ ferron-script/src/config.rs | 326 +++++++ ferron-script/src/context.rs | 543 +++++++++++ ferron-script/src/engine.rs | 65 ++ ferron-script/src/lib.rs | 9 + ferron-script/src/runtime.rs | 1075 +++++++++++++++++++++ ferron/src/config/adapters/docker_auto.rs | 1 + ferron/src/config/adapters/kdl.rs | 18 +- ferron/src/config/adapters/yaml_legacy.rs | 18 +- ferron/src/config/processing.rs | 2 + script_module_wrk.sh | 342 +++++++ scripts/auth.rhai | 1 + scripts/example.rhai | 7 + simple-module.kdl | 4 + test-script.kdl | 22 + 26 files changed, 3011 insertions(+), 3 deletions(-) create mode 100644 docs/rhai-script-module.md create mode 100644 ferron-script/Cargo.toml create mode 100644 ferron-script/TESTING.md create mode 100644 ferron-script/src/config.rs create mode 100644 ferron-script/src/context.rs create mode 100644 ferron-script/src/engine.rs create mode 100644 ferron-script/src/lib.rs create mode 100644 ferron-script/src/runtime.rs create mode 100755 script_module_wrk.sh create mode 100644 scripts/auth.rhai create mode 100644 scripts/example.rhai create mode 100644 simple-module.kdl create mode 100644 test-script.kdl diff --git a/.gitignore b/.gitignore index 190c0916..874e69ea 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,6 @@ Thumbs.db /packaging/rpm/data/ /packaging/rpm/rpm/ /packaging/rpm/ferron.spec + +.cargo_home +.target \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 85809ba7..65b6bfcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" dependencies = [ "cfg-if", + "const-random", "getrandom 0.3.4", "once_cell", "version_check", @@ -1085,6 +1086,26 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const-hex" version = "1.17.0" @@ -1220,6 +1241,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.7" @@ -1598,6 +1625,7 @@ dependencies = [ "fancy-regex", "fastrand", "ferron-common", + "ferron-script", "futures-util", "hashlink 0.11.0", "http-body-util", @@ -1676,6 +1704,25 @@ dependencies = [ "zstd", ] +[[package]] +name = "ferron-script" +version = "2.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "ferron-common", + "hex", + "http-body-util", + "humantime", + "hyper", + "parking_lot", + "rhai", + "sha2", + "tokio", + "tokio-test", +] + [[package]] name = "ferron-yaml2kdl" version = "2.2.1" @@ -1724,7 +1771,7 @@ dependencies = [ "futures-core", "futures-sink", "nanorand", - "spin", + "spin 0.9.8", ] [[package]] @@ -2203,6 +2250,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hyper" version = "1.8.1" @@ -2429,6 +2482,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "instant-acme" version = "0.8.4" @@ -2887,6 +2949,15 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" +dependencies = [ + "spin 0.5.2", +] + [[package]] name = "nom" version = "7.1.3" @@ -3993,6 +4064,35 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e0e61cd21fbddd85fbd9367b775660a01d388c08a61c6d2824af480b0309bb9" +[[package]] +name = "rhai" +version = "1.23.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4e35aaaa439a5bda2f8d15251bc375e4edfac75f9865734644782c9701b5709" +dependencies = [ + "ahash", + "bitflags 2.10.0", + "instant", + "no-std-compat", + "num-traits", + "once_cell", + "rhai_codegen", + "smallvec", + "smartstring", + "thin-vec", +] + +[[package]] +name = "rhai_codegen" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4322a2a4e8cf30771dd9f27f7f37ca9ac8fe812dddd811096a98483080dabe6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "ring" version = "0.17.14" @@ -4446,6 +4546,17 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "snafu" version = "0.7.5" @@ -4489,6 +4600,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spin" version = "0.9.8" @@ -4504,6 +4621,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -4632,6 +4755,12 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "thin-vec" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "144f754d318415ac792f9d69fc87abbbfc043ce2ef041c60f16ad828f638717d" + [[package]] name = "thiserror" version = "1.0.69" @@ -4714,6 +4843,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 20988650..6e3fb137 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "ferron-dns-builtin", "ferron-load-modules", "ferron-modules-builtin", + "ferron-script", "ferron-passwd", "ferron-yaml2kdl-core", "ferron-yaml2kdl", "ferron-precompress", "ferron-observability-builtin", diff --git a/docs/rhai-script-module.md b/docs/rhai-script-module.md new file mode 100644 index 00000000..6570f743 --- /dev/null +++ b/docs/rhai-script-module.md @@ -0,0 +1,132 @@ +--- +title: Rhai script module +--- + +The **script-exec** module lets you attach [Rhai](https://rhai.rs) programs to Ferron’s request pipeline. A script can inspect or mutate the inbound request, adjust the generated response, maintain shared state across workers, or even schedule background work, making it possible to express lots of “edge logic” without recompiling Ferron. + +This page describes how script execution is wired into the server, how to configure scripts in `ferron.kdl`, and which APIs are available from Rhai. + +## When scripts run + +Every script declares one or more *triggers*. Triggers map to server phases: + +| Trigger | Script phase | When it runs | +|------------------------|--------------------|-----------------------------------------------------------------------------------------------------------------| +| `on_request_start` | `RequestStart` | Before Ferron passes the request to the rest of the module stack. You can rewrite method/URI/headers/body here. | +| `on_request_body` | `RequestBody` | After the body is buffered. Only scripts that request this trigger force buffering. | +| `on_response_ready` | `Response` | After Ferron has received the response from the downstream module but before bytes are sent to the client. | +| `on_tick` | `Tick` | Fired periodically (default `1s`, configurable per script via `tick_interval`). | +| `spawn_task(...)` | `BackgroundTask` | Runs asynchronously when a script explicitly spawns a task. | + +Each trigger produces a [`ScriptPhase`](../ferron-script/src/context.rs) under the hood, so host helpers such as `set_header` automatically act on either the request or the response depending on where the script is running. + +## Quick start configuration + +Enable the module inside a host block and add one or more `script` entries: + +```kdl +:8080 { + module "script-exec" { + script "auth-check" { + file "scripts/auth.rhai" + trigger "on_request_start" + trigger "on_response_ready" + env { + shared_secret "super-secret" + } + allow "spawn_task" + limits { + max_operations 300000 // default 200_000 + max_exec_time "75ms" // default 50ms + } + tick_interval "5s" + failure_policy "block" + } + } + + root "wwwroot" +} +``` + +You may list multiple `script` blocks; each block’s first value (`"auth-check"` above) is the script identifier that appears in logs. + +### Script block reference + +| Directive | Description | +|----------------------|-------------| +| `file ""` | Required. Path to the Rhai file. File-based scripts automatically reload when the source file changes unless you set `reload_on_change false`. | +| `trigger ""` | Required at least once. One of `on_request_start`, `on_request_body`, `on_response_ready`, or `on_tick`. Multiple triggers are allowed. | +| `tick_interval ""` | Optional. Overrides the default `1s` tick cadence when `on_tick` is enabled. Uses `humantime` durations (`"250ms"`, `"5s"`, etc.). | +| `env { ... }` | Optional key/value map injected into the script as the `env` object. | +| `allow "spawn_task"` | Grants access to `spawn_task`. Omit to disable background work for this script. | +| `reload_on_change ` | Optional. Defaults to `true` for file-backed scripts; set to `false` to pin to the compiled AST until Ferron restarts. | +| `limits { ... }` | Optional guardrails. Supports `max_operations` (default `200_000`), `max_call_depth` (default `32`), and `max_exec_time` (default `50ms`). | +| `failure_policy ""` | `block` (default) propagates an error to the HTTP stack, `skip` converts failures into “continue” decisions. | +| `allow { ... }` | Helper block to group permission entries (currently only `spawn_task`). | + +> **Note** +> Scripts that register `on_request_body` force Ferron to buffer the entire request body so Rhai can read or modify it. Avoid adding that trigger unless the script truly needs the body—streaming throughput drops if every request must be buffered. + +## Runtime APIs available to Rhai + +Each script sees a pre-populated scope: + +| Name | Type | Purpose | +|-------------|------------------------|---------| +| `request` | `Request` handle | Allows you to read or mutate `method`, `uri`, `body`, and individual headers during request-side phases. | +| `response` | `Response` handle | When running during `on_response_ready`, represents the downstream response. Scripts can edit `status`, `headers`, or `body`. | +| `env` | `Map` | The immutable key/value map from the configuration’s `env` block. | +| `state` | `StateStore` | A synchronized map shared across all scripts and workers (`get`, `set`, `remove`, `clear`, `keys`). Useful for counters or caches. | + +Helpers registered via `rhai::Engine` provide the rest of the integration surface: + +- `log(level, message)` queues log lines that Ferron writes once the script finishes (`level` is an arbitrary string such as `"info"` or `"debug"`). +- `set_header(name, value)` and `remove_header(name)` adjust headers on the *current* phase (request vs. response) without manually switching handles. +- `deny(status, body)` stops the pipeline and instructs Ferron to return `status`/`body` immediately. The script’s `failure_policy` isn’t consulted because this is an explicit decision, not an error. +- `spawn_task(name, fn_ptr)` schedules background work. The function pointer must reference another Rhai function; the background worker inherits the script’s `env`, `state`, and failure policy. Tasks obey the same execution limits and log forwarding as the main script. + +In addition, the `request` and `response` handles expose idiomatic getters/setters for headers and bodies, so you can write expressive Rhai like: + +```rhai +if request.header("x-api-key").is_none() { + deny(401, "missing key"); +} + +response.set_header("x-script-version", "1.0"); +log("debug", `handled ${request.uri}`); +``` + +## Background work and scheduled tasks + +`on_tick` triggers are tied to Ferron’s scheduler: when configured, the script executes on every tick interval even if no requests arrive. This is helpful for cache warm-up or periodic housekeeping. The tick handler uses the same Rhai file and scope setup as other triggers, but the `request` handle is absent because no HTTP exchange caused the invocation. + +`spawn_task` allows scripts to dispatch extra async work (for example, to refresh an external ACL) without blocking the request thread. Tasks run through the same sandbox: + +- They observe the script’s `limits`. +- Timeouts or panics count toward the script’s failure counter. +- Logs emitted inside the task flow into the main error logger along with the task name. + +Because background work also runs on the Tokio runtime via `block_in_place`, you should keep those functions short and avoid long sleeps. Prefer scheduling periodic logic via `on_tick` if you don’t need ad-hoc spawns. + +## Failure handling, throttling, and logging + +Each script instance tracks its own `failure_state`. Consecutive runtime errors (panics, exceeded limits, compile failures) trip a breaker after **five** failures. Once tripped, the script is temporarily disabled; Ferron logs a message and continues to skip that script until it successfully runs again. + +- `failure_policy "block"` propagates the error back to the module chain (usually returning a 500 unless another module overrides the response). +- `failure_policy "skip"` treats failures as “continue” decisions so the HTTP request proceeds, but the incident is still logged and the breaker counter increments. + +Use the `limits` block to prevent runaway scripts: `max_operations` guards Rhai’s internal operation counter, `max_call_depth` protects the call stack, and `max_exec_time` wraps the whole script in a Tokio timeout. Choose tighter values in production to minimize blast radius. + +Ferron buffers all log messages produced via `log()` and flushes them even if the script eventually fails. Logs are labeled with the script ID so you can correlate them with `script_module_wrk.sh` benchmarks or regular server logs. + +## Hot reload and testing + +File-backed scripts watch their source files by default. Whenever the file’s mtime changes, Ferron recompiles the script and swaps in the new AST on the next execution. Set `reload_on_change false` only when you need deterministic behavior (for example, in production with a read-only deploy). + +The repository ships with `test-script.kdl`, two sample scripts under `scripts/`, and the helper `./script_module_wrk.sh` that runs a battery of `wrk` scenarios. That script is the quickest way to validate new handlers locally: + +```bash +bash ./script_module_wrk.sh -t8 -c64 -d30s http://127.0.0.1:8080/ +``` + +For a complete walkthrough of the benchmark scenarios and tuning tips, refer to `README.md` (section “script_module_wrk.sh”). diff --git a/ferron-build.yaml b/ferron-build.yaml index c8b4e0d1..57b7c01d 100644 --- a/ferron-build.yaml +++ b/ferron-build.yaml @@ -16,6 +16,9 @@ modules: loader: BufferModuleLoader - builtin: true loader: RewriteModuleLoader + - builtin: true + cargo_feature: script + loader: ScriptExecModuleLoader - builtin: true loader: StatusCodesModuleLoader - builtin: true diff --git a/ferron-common/src/config.rs b/ferron-common/src/config.rs index 018d3ba6..6bf33043 100644 --- a/ferron-common/src/config.rs +++ b/ferron-common/src/config.rs @@ -341,6 +341,9 @@ pub struct ServerConfigurationEntry { /// Props for the entry pub props: HashMap, + + /// Child nodes for the entry, keyed by node name + pub children: HashMap, } impl std::hash::Hash for ServerConfigurationEntry { @@ -359,6 +362,14 @@ impl std::hash::Hash for ServerConfigurationEntry { key.hash(state); value.hash(state); } + + let mut children_vec: Vec<_> = self.children.iter().collect(); + children_vec.sort_by(|a, b| a.0.cmp(b.0)); + children_vec.len().hash(state); + for (key, entries) in children_vec { + key.hash(state); + entries.hash(state); + } } } diff --git a/ferron-common/src/util/module_cache.rs b/ferron-common/src/util/module_cache.rs index 8c5fdd8a..2f5e1dd0 100644 --- a/ferron-common/src/util/module_cache.rs +++ b/ferron-common/src/util/module_cache.rs @@ -150,6 +150,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); @@ -174,6 +175,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); @@ -183,6 +185,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something else".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); @@ -229,6 +232,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); @@ -253,6 +257,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); @@ -262,6 +267,7 @@ mod test { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("something else".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); diff --git a/ferron-load-modules/Cargo.toml b/ferron-load-modules/Cargo.toml index 8ffaba04..d76a41ad 100644 --- a/ferron-load-modules/Cargo.toml +++ b/ferron-load-modules/Cargo.toml @@ -17,6 +17,7 @@ ferron-modules-builtin = { workspace = true, features = [ "rproxy", "scgi", "static", + "script", ] } ferron-dns-builtin = { workspace = true, features = [ "cloudflare", diff --git a/ferron-modules-builtin/Cargo.toml b/ferron-modules-builtin/Cargo.toml index cc25c2f1..d5bc0e52 100644 --- a/ferron-modules-builtin/Cargo.toml +++ b/ferron-modules-builtin/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] # Ferron internal dependencies ferron-common = { workspace = true } +ferron-script = { path = "../ferron-script", optional = true } # Error Handling & Utilities anyhow = "1.0.98" @@ -112,6 +113,7 @@ default = [ "runtime-monoio", "scgi", "static", + "script", ] default-tokio = [ "cache", @@ -126,6 +128,7 @@ default-tokio = [ "runtime-monoio", "scgi", "static", + "script", ] cache = ["cache_control", "papaya", "ahash"] cgi = ["async-process", "httparse", "memchr"] @@ -140,3 +143,4 @@ runtime-monoio = ["monoio", "monoio-compat"] runtime-tokio = ["hyper-util/tokio"] scgi = ["httparse", "memchr"] static = ["new_mime_guess", "async-compression", "sha2"] +script = ["ferron-script"] diff --git a/ferron-modules-builtin/src/lib.rs b/ferron-modules-builtin/src/lib.rs index a2b398e5..2917290c 100644 --- a/ferron-modules-builtin/src/lib.rs +++ b/ferron-modules-builtin/src/lib.rs @@ -8,6 +8,9 @@ mod status_codes; mod trailing; mod util; +#[cfg(feature = "script")] +pub use ferron_script::ScriptExecModuleLoader; + pub use blocklist::*; pub use buffer::*; pub use core::*; diff --git a/ferron-script/Cargo.toml b/ferron-script/Cargo.toml new file mode 100644 index 00000000..4d72ee06 --- /dev/null +++ b/ferron-script/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "ferron-script" +version = "2.1.0" +edition = "2021" + +[dependencies] +ferron-common = { workspace = true } +anyhow = "1.0.98" +async-trait = "0.1.88" +bytes = { version = "1.10.1" } +http-body-util = "0.1.3" +hyper = { version = "1.6.0", features = ["full"] } +rhai = { version = "1.17.1", features = ["sync"] } +tokio = { version = "1.45.0", features = ["fs", "rt", "rt-multi-thread", "sync", "time"] } +sha2 = "0.10.9" +humantime = "2.1.0" +parking_lot = "0.12.3" +hex = "0.4.3" + +[dev-dependencies] +tokio-test = "0.4.4" diff --git a/ferron-script/TESTING.md b/ferron-script/TESTING.md new file mode 100644 index 00000000..da65d8a0 --- /dev/null +++ b/ferron-script/TESTING.md @@ -0,0 +1,256 @@ +# Script 模块测试指南 + +目标:提供可直接运行的用例,覆盖请求拦截、请求体修改、响应改写、定时任务、后台任务、状态存储、环境注入与热重载。 + +## 快速验证:文件脚本改写响应 + +1) 配置 `test-script.kdl`(已随仓库提供,示例放在 `on_response_ready` 阶段): +```kdl +globals { + log "access.log" + error_log "error.log" +} + +:8080 { + module "script-exec" { + script "test-script" { + file "scripts/example.rhai" + trigger "on_response_ready" + reload_on_change #true + limits { + max_operations 200_000 + max_call_depth 32 + max_exec_time "50ms" + } + failure_policy "skip" + } + } + + root "wwwroot" +} +``` + +2) 脚本 `scripts/example.rhai` 内容(会覆盖响应并加头): +```rhai +log("info", "Executing script from file"); +log("info", "Request method: " + request.method); +log("info", "Request URI: " + request.uri); + +response.set_header("X-Script-Source", "file"); +response.set_header("X-Script-Version", "1.0"); +response.set_status(200); +response.set_body("Hello from file script!"); +``` + +3) 运行并验证: +```bash +./target/debug/ferron --config test-script.kdl +curl -i http://localhost:8080/ +# 预期:响应体为 Hello from file script!,头含 X-Script-Source/X-Script-Version +``` + +> 注意:修改响应体后,运行时会自动重算 Content-Length。 + +## 组合测试场景 + +可将以下脚本块逐个加入 `module "script-exec"`,或拆分为多个 script。 + +### 1. 请求拦截 deny +```kdl +script "auth-check" { + inline " + if request.header('Authorization') == () { + deny(403, 'Access denied'); + } + " + trigger "on_request_start" +} +``` +验证: +```bash +curl -i http://localhost:8080/ # 403 +curl -i -H "Authorization: Bearer token" http://localhost:8080/ # 继续 +``` + +### 2. 修改请求头/方法/URI(请求阶段) +```kdl +script "rewrite-request" { + inline " + request.set_header('X-Debug', '1'); + request.method = 'POST'; + request.uri = '/rewritten'; + " + trigger "on_request_start" +} +``` + +### 3. 修改请求体(需要 on_request_body,会自动缓冲请求体) +```kdl +script "body-rewrite" { + inline " + let body = request.body; + let new_body = body + '#patched'; + request.body = new_body; + " + trigger "on_request_body" +} +``` + +### 4. 响应改写(头 + 体) +```kdl +script "resp-rewrite" { + inline " + response.set_header('X-Custom', 'yes'); + response.set_body('patched by script'); + response.set_status(202); + " + trigger "on_response_ready" +} +``` + +### 5. 定时任务 on_tick +```kdl +script "ticker" { + inline " + log('info', 'tick fired'); + " + trigger "on_tick" + tick_interval "2s" +} +``` + +### 6. 后台任务(需允许 spawn_task) +```kdl +script "bg-task" { + inline " + spawn_task('cleanup', || { + log('info', 'background task executed'); + }); + " + trigger "on_request_start" + allow ["spawn_task"] +} +``` + +### 7. 模块级状态存储 +```kdl +script "counter" { + inline " + let count = state.get('count'); + if count == () { count = 0; } + count = count + 1; + state.set('count', count); + response.set_header('X-Count', count.to_string()); + " + trigger "on_response_ready" +} +``` + +### 8. 环境变量注入 +```kdl +script "env-test" { + inline " + log('info', 'API key = ' + env.api_key); + " + trigger "on_request_start" + env { api_key "secret-key-123" } +} +``` + +### 9. 文件脚本热重载 +```kdl +script "file-reload" { + file "scripts/hot.rhai" + trigger "on_response_ready" + reload_on_change true +} +``` +修改 `scripts/hot.rhai` 后再次请求即可看到生效,无需重启。 + +## 运行与调试提示 + +- 运行:`./target/debug/ferron --config test-script.kdl` 或 `make run-dev`(确保复制配置为 `ferron.kdl`)。 +- 日志:`tail -f access.log`、`tail -f error.log`,脚本内 `log(level, msg)` 会落在错误日志。 +- 失败策略:`failure_policy "block"` 会中断请求;`"skip"` 仅跳过脚本。 +- 超时与限额:`limits.max_exec_time`/`max_operations`/`max_call_depth` 可按需调高测试。 + +## 自动化测试建议 + +- 单元测试:`cargo test -p ferron-script`。 +- Smoketest:在 `smoketest/` 复制 `ferron.kdl`,添加 `module "script-exec"` 配置,运行 `smoketest/smoketest.sh`,验证请求/响应改写和 deny 场景。 + +### 1. 查看日志 + +```bash +# 查看访问日志 +tail -f access.log + +# 查看错误日志 +tail -f error.log +``` + +### 2. 脚本错误处理 + +如果脚本有错误,根据 `failure_policy` 设置: +- `"block"`: 请求会被阻止 +- `"skip"`: 请求会继续,但脚本不会执行 + +### 3. 检查脚本编译 + +脚本会在首次加载时编译。如果编译失败,会在错误日志中显示。 + +### 4. 热重载测试 + +对于文件脚本,设置 `reload_on_change true`: + +```kdl +script "reload-test" { + file "scripts/test.rhai" + trigger ["on_request_start"] + reload_on_change true +} +``` + +修改文件后,脚本会自动重新加载。 + +## 单元测试 + +运行单元测试: + +```bash +cargo test -p ferron-script +``` + +## 集成测试 + +在 `smoketest` 目录创建测试: + +```bash +# 创建测试配置 +cp smoketest/ferron.kdl smoketest/ferron-script.kdl + +# 添加 script 模块配置到 smoketest/ferron-script.kdl + +# 运行测试 +cd smoketest +./smoketest.sh +``` + +## 常见问题 + +### Q: 脚本没有执行? + +- 检查 `trigger` 配置是否正确 +- 检查 `failure_policy` 是否为 `"block"` 且脚本有错误 +- 查看错误日志 + +### Q: 脚本执行超时? + +- 增加 `max_exec_time` 限制 +- 检查脚本逻辑是否有死循环 + +### Q: 如何调试脚本? + +- 使用 `log()` 函数输出调试信息 +- 查看错误日志 +- 使用 `failure_policy "skip"` 避免阻塞请求 diff --git a/ferron-script/src/config.rs b/ferron-script/src/config.rs new file mode 100644 index 00000000..e607b24a --- /dev/null +++ b/ferron-script/src/config.rs @@ -0,0 +1,326 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use ferron_common::config::{ServerConfiguration, ServerConfigurationEntries, ServerConfigurationEntry}; +use ferron_common::get_entries; +use humantime::parse_duration; + +const DEFAULT_MAX_OPERATIONS: u64 = 200_000; +const DEFAULT_MAX_CALL_DEPTH: u64 = 32; +const DEFAULT_MAX_EXEC_TIME: Duration = Duration::from_millis(50); +const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Clone)] +pub struct ScriptModuleConfig { + pub scripts: Vec, +} + +impl ScriptModuleConfig { + pub fn from_server_config(config: &ServerConfiguration) -> Result { + let mut scripts = Vec::new(); + if let Some(modules) = get_entries!("module", config) { + for module_entry in &modules.inner { + let Some(name) = module_entry.values.first().and_then(|v| v.as_str()) else { + continue; + }; + if name != "script-exec" { + continue; + } + if let Some(script_entries) = module_entry.children.get("script") { + for script_entry in &script_entries.inner { + scripts.push(parse_script(script_entry)?); + } + } + } + } + Ok(Self { scripts }) + } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.scripts.is_empty() + } +} + +#[derive(Clone, Debug)] +pub struct ScriptDefinition { + pub id: String, + pub source: ScriptSource, + pub triggers: Vec, + pub env: HashMap, + pub permissions: ScriptPermissions, + pub reload_on_change: bool, + pub limits: ScriptLimits, + pub failure_policy: FailurePolicy, +} + +#[derive(Clone, Debug)] +pub enum ScriptSource { + File(PathBuf), +} + +#[derive(Clone, Debug)] +pub enum ScriptTrigger { + RequestStart, + RequestBody, + ResponseReady, + Tick(Duration), +} + +#[derive(Clone, Debug)] +pub struct ScriptLimits { + pub max_operations: u64, + pub max_call_depth: u64, + pub max_exec_time: Duration, +} + +impl Default for ScriptLimits { + fn default() -> Self { + Self { + max_operations: DEFAULT_MAX_OPERATIONS, + max_call_depth: DEFAULT_MAX_CALL_DEPTH, + max_exec_time: DEFAULT_MAX_EXEC_TIME, + } + } +} + +#[derive(Clone, Debug)] +pub struct ScriptPermissions { + pub allow_spawn_task: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum FailurePolicy { + Block, + Skip, +} + +fn parse_script(entry: &ServerConfigurationEntry) -> Result { + let id = entry + .values + .first() + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("script block is missing an identifier"))?; + let source = parse_source(entry)?; + let env = parse_env(entry); + let permissions = parse_permissions(entry); + let reload_on_change = match parse_flag(entry, "reload_on_change")? { + Some(value) => value && matches!(source, ScriptSource::File(_)), + None => matches!(source, ScriptSource::File(_)), + }; + let limits = parse_limits(entry)?; + let failure_policy = parse_failure_policy(entry)?; + let tick_interval = parse_duration_field(entry, "tick_interval")?.unwrap_or(DEFAULT_TICK_INTERVAL); + let triggers = parse_triggers(entry, tick_interval)?; + if triggers.is_empty() { + Err(anyhow!("script '{id}' must specify at least one trigger"))? + } + + Ok(ScriptDefinition { + id: id.to_string(), + source, + triggers, + env, + permissions, + reload_on_change, + limits, + failure_policy, + }) +} + +fn parse_source(entry: &ServerConfigurationEntry) -> Result { + let file = entry + .children + .get("file") + .and_then(|entries| entries.inner.first()) + .and_then(|e| e.values.first()) + .and_then(|v| v.as_str()); + + match file { + Some(path) => Ok(ScriptSource::File(PathBuf::from(path))), + None => Err(anyhow!("script '{}' must specify a source", entry_name(entry))), + } +} + +fn entry_name(entry: &ServerConfigurationEntry) -> String { + entry + .values + .first() + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string() +} + +fn parse_env(entry: &ServerConfigurationEntry) -> HashMap { + let mut env = HashMap::new(); + if let Some(env_blocks) = entry.children.get("env") { + for env_block in &env_blocks.inner { + for (key, values) in &env_block.children { + for value_entry in &values.inner { + if let Some(value) = value_entry.values.first().and_then(|v| v.as_str()) { + env.insert(key.to_string(), value.to_string()); + } + } + } + } + } + env +} + +fn parse_permissions(entry: &ServerConfigurationEntry) -> ScriptPermissions { + let mut allow_spawn_task = false; + if let Some(allows) = entry.children.get("allow") { + for allow_entry in &allows.inner { + for value in &allow_entry.values { + if value.as_str().is_some_and(|v| v.eq_ignore_ascii_case("spawn_task")) { + allow_spawn_task = true; + } + } + } + } + ScriptPermissions { allow_spawn_task } +} + +fn parse_flag(entry: &ServerConfigurationEntry, name: &str) -> Result> { + Ok( + entry + .children + .get(name) + .and_then(|entries| entries.inner.first()) + .and_then(|e| e.values.first()) + .and_then(|v| v.as_bool()), + ) +} + +fn parse_limits(entry: &ServerConfigurationEntry) -> Result { + let mut limits = ScriptLimits::default(); + if let Some(limit_blocks) = entry.children.get("limits") { + for block in &limit_blocks.inner { + if let Some(max_ops) = extract_integer(block.children.get("max_operations")) { + limits.max_operations = validate_positive_i128(max_ops, "max_operations", entry)?; + } + if let Some(call_depth) = extract_integer(block.children.get("max_call_depth")) { + limits.max_call_depth = validate_positive_i128(call_depth, "max_call_depth", entry)?; + } + if let Some(duration) = extract_duration(block.children.get("max_exec_time"))? { + ensure_positive_duration(duration, "max_exec_time", entry)?; + limits.max_exec_time = duration; + } + } + } + Ok(limits) +} + +fn validate_positive_i128(value: i128, field: &str, entry: &ServerConfigurationEntry) -> Result { + if value <= 0 { + Err(anyhow!( + "script '{}' has invalid {field} (must be > 0)", + entry_name(entry) + )) + } else { + Ok(value as u64) + } +} + +fn ensure_positive_duration(duration: Duration, field: &str, entry: &ServerConfigurationEntry) -> Result<()> { + if duration.is_zero() { + Err(anyhow!( + "script '{}' has invalid {field} duration (must be > 0)", + entry_name(entry) + )) + } else { + Ok(()) + } +} + +fn extract_integer(entries: Option<&ServerConfigurationEntries>) -> Option { + entries + .and_then(|entries| entries.inner.first()) + .and_then(|entry| entry.values.first()) + .and_then(|v| v.as_i128()) +} + +fn extract_duration(entries: Option<&ServerConfigurationEntries>) -> Result> { + if let Some(value) = entries + .and_then(|entries| entries.inner.first()) + .and_then(|entry| entry.values.first()) + .and_then(|v| v.as_str()) + { + Ok(Some(parse_duration(value)?)) + } else { + Ok(None) + } +} + +fn parse_failure_policy(entry: &ServerConfigurationEntry) -> Result { + let value = entry + .children + .get("failure_policy") + .and_then(|entries| entries.inner.first()) + .and_then(|entry| entry.values.first()) + .and_then(|v| v.as_str()) + .unwrap_or("block"); + match value.to_ascii_lowercase().as_str() { + "block" => Ok(FailurePolicy::Block), + "skip" => Ok(FailurePolicy::Skip), + other => Err(anyhow!("unsupported failure_policy '{other}'")), + } +} + +fn parse_duration_field(entry: &ServerConfigurationEntry, name: &str) -> Result> { + let duration = entry + .children + .get(name) + .and_then(|entries| entries.inner.first()) + .and_then(|entry| entry.values.first()) + .and_then(|v| v.as_str()) + .map(parse_duration) + .transpose()?; + Ok(duration) +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ScriptTriggerKind { + RequestStart, + RequestBody, + ResponseReady, + Tick, +} + +impl ScriptTriggerKind { + fn from_str(name: &str) -> Result { + match name { + "on_request_start" => Ok(Self::RequestStart), + "on_request_body" => Ok(Self::RequestBody), + "on_response_ready" => Ok(Self::ResponseReady), + "on_tick" => Ok(Self::Tick), + other => Err(anyhow!("unsupported trigger '{other}'")), + } + } +} + +fn parse_triggers(entry: &ServerConfigurationEntry, tick_interval: Duration) -> Result> { + let mut triggers = Vec::new(); + if let Some(trigger_entries) = entry.children.get("trigger") { + for trigger_entry in &trigger_entries.inner { + for value in &trigger_entry.values { + let Some(name) = value.as_str() else { + return Err(anyhow!( + "script '{}' has an invalid trigger value; use `trigger \"on_request_start\"` style entries", + entry_name(entry) + )); + }; + let trigger = match ScriptTriggerKind::from_str(name)? { + ScriptTriggerKind::RequestStart => ScriptTrigger::RequestStart, + ScriptTriggerKind::RequestBody => ScriptTrigger::RequestBody, + ScriptTriggerKind::ResponseReady => ScriptTrigger::ResponseReady, + ScriptTriggerKind::Tick => ScriptTrigger::Tick(tick_interval), + }; + triggers.push(trigger); + } + } + } + Ok(triggers) +} diff --git a/ferron-script/src/context.rs b/ferron-script/src/context.rs new file mode 100644 index 00000000..e6fb309d --- /dev/null +++ b/ferron-script/src/context.rs @@ -0,0 +1,543 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use ferron_common::logging::ErrorLogger; +use hyper::header::{HeaderName, HeaderValue}; +use hyper::{HeaderMap, StatusCode}; +use parking_lot::RwLock; +use rhai::{Dynamic, FnPtr, ImmutableString}; + +/// Identifies the context in which a script executes. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ScriptPhase { + RequestStart, + RequestBody, + Response, + Tick, + BackgroundTask, +} + +/// Represents the result requested by the script. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ScriptDecision { + Continue, + Deny { status: StatusCode, body: Vec }, +} + +impl ScriptDecision { + #[allow(dead_code)] + pub fn is_terminating(&self) -> bool { + !matches!(self, ScriptDecision::Continue) + } +} + +/// A mutable state store shared between scripts. +#[derive(Clone, Default)] +pub struct ScriptStateHandle { + inner: Arc>>, +} + +impl ScriptStateHandle { + pub fn new() -> Self { + Self::default() + } + + pub fn get(&self, key: &str) -> Dynamic { + self.inner.read().get(key).cloned().unwrap_or(Dynamic::UNIT) + } + + pub fn set(&self, key: &str, value: Dynamic) { + self.inner.write().insert(key.to_string(), value); + } + + pub fn remove(&self, key: &str) { + self.inner.write().remove(key); + } + + pub fn clear(&self) { + self.inner.write().clear(); + } + + pub fn keys(&self) -> Vec { + self.inner.read().keys().cloned().collect() + } +} + +fn header_map_to_vec(headers: &HeaderMap) -> Vec<(String, String)> { + headers + .iter() + .map(|(name, value)| { + ( + name.as_str().to_string(), + value.to_str().map(ToString::to_string).unwrap_or_default(), + ) + }) + .collect() +} + +fn headers_vec_to_map(headers: &[(String, String)]) -> HeaderMap { + let mut map = HeaderMap::new(); + for (name, value) in headers { + if let (Ok(header_name), Ok(header_value)) = (HeaderName::try_from(name.as_str()), HeaderValue::from_str(value)) { + map.append(header_name, header_value); + } + } + map +} + +fn names_equal(left: &str, right: &str) -> bool { + left.eq_ignore_ascii_case(right) +} + +#[derive(Clone)] +pub struct ScriptRequestHandle { + inner: Arc>, +} + +#[derive(Clone, Debug)] +pub struct ScriptRequestSnapshot { + pub method: String, + pub uri: String, + pub headers: Vec<(String, String)>, + pub body: Vec, + pub body_modified: bool, +} + +#[derive(Clone, Debug)] +struct ScriptRequestData { + method: String, + uri: String, + headers: Vec<(String, String)>, + body: Vec, + body_modified: bool, +} + +impl ScriptRequestHandle { + pub fn from_parts(method: &str, uri: &str, headers: &HeaderMap, body: Vec) -> Self { + Self { + inner: Arc::new(RwLock::new(ScriptRequestData { + method: method.to_string(), + uri: uri.to_string(), + headers: header_map_to_vec(headers), + body, + body_modified: false, + })), + } + } + + pub fn snapshot(&self) -> ScriptRequestSnapshot { + let guard = self.inner.read(); + ScriptRequestSnapshot { + method: guard.method.clone(), + uri: guard.uri.clone(), + headers: guard.headers.clone(), + body: guard.body.clone(), + body_modified: guard.body_modified, + } + } + + pub fn get_method(&self) -> String { + self.inner.read().method.clone() + } + + pub fn set_method(&self, method: &str) { + self.inner.write().method = method.to_string(); + } + + pub fn get_uri(&self) -> String { + self.inner.read().uri.clone() + } + + pub fn set_uri(&self, uri: &str) { + self.inner.write().uri = uri.to_string(); + } + + pub fn get_body(&self) -> Vec { + self.inner.read().body.clone() + } + + pub fn set_body(&self, body: Vec) { + let mut guard = self.inner.write(); + guard.body = body; + guard.body_modified = true; + } + + pub fn get_header(&self, name: &str) -> Option { + self + .inner + .read() + .headers + .iter() + .rfind(|(n, _)| names_equal(n, name)) + .map(|(_, v)| v.clone()) + } + + pub fn set_header(&self, name: &str, value: &str) { + let mut guard = self.inner.write(); + guard.headers.retain(|(n, _)| !names_equal(n, name)); + guard.headers.push((name.to_ascii_lowercase(), value.to_string())); + } + + pub fn remove_header(&self, name: &str) { + self.inner.write().headers.retain(|(n, _)| !names_equal(n, name)); + } +} + +#[derive(Clone)] +pub struct ScriptResponseHandle { + inner: Arc>, +} + +#[derive(Clone, Debug, Default)] +pub struct ScriptResponseSnapshot { + pub status: Option, + pub headers: Vec<(String, String)>, + pub body: Vec, +} + +#[derive(Clone, Debug, Default)] +struct ScriptResponseData { + status: Option, + headers: Vec<(String, String)>, + body: Vec, +} + +impl ScriptResponseHandle { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(ScriptResponseData::default())), + } + } + + pub fn snapshot(&self) -> ScriptResponseSnapshot { + let guard = self.inner.read(); + ScriptResponseSnapshot { + status: guard.status, + headers: guard.headers.clone(), + body: guard.body.clone(), + } + } + + pub fn set_status(&self, status: u16) { + self.inner.write().status = Some(status); + } + + pub fn get_status(&self) -> Option { + self.inner.read().status.map(|s| s as i64) + } + + pub fn get_body(&self) -> Vec { + self.inner.read().body.clone() + } + + pub fn set_body(&self, body: Vec) { + self.inner.write().body = body; + } + + pub fn get_header(&self, name: &str) -> Option { + self + .inner + .read() + .headers + .iter() + .rfind(|(n, _)| names_equal(n, name)) + .map(|(_, v)| v.clone()) + } + + pub fn set_header(&self, name: &str, value: &str) { + let mut guard = self.inner.write(); + guard.headers.retain(|(n, _)| !names_equal(n, name)); + guard.headers.push((name.to_ascii_lowercase(), value.to_string())); + } + + pub fn remove_header(&self, name: &str) { + self.inner.write().headers.retain(|(n, _)| !names_equal(n, name)); + } +} + +/// Shared context for host APIs executed during script evaluation. +pub struct ScriptExecutionContext<'a> { + pub script_id: &'a str, + pub phase: ScriptPhase, + pub request: Option, + pub response: ScriptResponseHandle, + #[allow(dead_code)] + pub env: Arc>, + #[allow(dead_code)] + pub state: ScriptStateHandle, + pub decision: ScriptDecision, + #[allow(dead_code)] + pub logger: &'a ErrorLogger, + pub allow_spawn_task: bool, + pub spawn_callback: Option>, + pub pending_logs: Vec, +} + +impl<'a> ScriptExecutionContext<'a> { + #[allow(clippy::too_many_arguments)] + pub fn new( + script_id: &'a str, + phase: ScriptPhase, + request: Option, + response: ScriptResponseHandle, + env: Arc>, + state: ScriptStateHandle, + logger: &'a ErrorLogger, + allow_spawn_task: bool, + spawn_callback: Option>, + ) -> Self { + Self { + script_id, + phase, + request, + response, + env, + state, + decision: ScriptDecision::Continue, + logger, + allow_spawn_task, + spawn_callback, + pending_logs: Vec::new(), + } + } + + pub fn log(&mut self, level: &str, message: &str) { + self + .pending_logs + .push(format!("[script:{}][{}] {}", self.script_id, level, message)); + } + + pub fn drain_logs(&mut self) -> Vec { + std::mem::take(&mut self.pending_logs) + } +} + +thread_local! { + static ACTIVE_CONTEXT: std::cell::RefCell>> = + const { std::cell::RefCell::new(None) }; +} + +pub struct ContextGuard; + +impl ContextGuard { + pub fn activate(ctx: *mut ScriptExecutionContext<'_>) -> Self { + ACTIVE_CONTEXT.with(|slot| { + slot.replace(Some(ctx.cast::>())); + }); + Self + } +} + +impl Drop for ContextGuard { + fn drop(&mut self) { + ACTIVE_CONTEXT.with(|slot| { + slot.replace(None); + }); + } +} + +fn with_context(func: F) -> Option +where + F: FnOnce(&mut ScriptExecutionContext<'_>) -> R, +{ + ACTIVE_CONTEXT.with(|slot| { + slot + .borrow() + .map(|ctx| unsafe { func(&mut *ctx.cast::>()) }) + }) +} + +/// Converts the script environment into a `Dynamic` value for the Rhai scope. +pub fn env_to_dynamic(env: &HashMap) -> Dynamic { + let mut map = rhai::Map::new(); + for (key, value) in env { + let owned_key = key.clone(); + let owned_value = value.clone(); + map.insert(owned_key.into(), Dynamic::from(owned_value)); + } + Dynamic::from_map(map) +} + +/// Registers script-visible types and helpers on the provided engine. +pub fn register_types(engine: &mut rhai::Engine) { + engine.register_type_with_name::("Request"); + engine.register_type_with_name::("Response"); + engine.register_type_with_name::("StateStore"); + + engine.register_get("method", |req: &mut ScriptRequestHandle| req.get_method()); + engine.register_set("method", |req: &mut ScriptRequestHandle, value: ImmutableString| { + req.set_method(&value); + }); + engine.register_get("uri", |req: &mut ScriptRequestHandle| req.get_uri()); + engine.register_set("uri", |req: &mut ScriptRequestHandle, value: ImmutableString| { + req.set_uri(&value); + }); + engine.register_fn("get_header", |req: &mut ScriptRequestHandle, name: ImmutableString| { + req.get_header(&name) + }); + engine.register_fn("header", |req: &mut ScriptRequestHandle, name: ImmutableString| { + req.get_header(&name).map(Dynamic::from).unwrap_or(Dynamic::UNIT) + }); + engine.register_fn( + "set_header", + |req: &mut ScriptRequestHandle, name: ImmutableString, value: ImmutableString| { + req.set_header(&name, &value); + }, + ); + engine.register_fn( + "remove_header", + |req: &mut ScriptRequestHandle, name: ImmutableString| { + req.remove_header(&name); + }, + ); + engine.register_get("body", |req: &mut ScriptRequestHandle| req.get_body()); + engine.register_set("body", |req: &mut ScriptRequestHandle, value: Vec| { + req.set_body(value); + }); + + engine.register_get("status", |resp: &mut ScriptResponseHandle| { + resp.get_status().unwrap_or_default() + }); + engine.register_set("status", |resp: &mut ScriptResponseHandle, value: rhai::INT| { + resp.set_status(value as u16); + }); + engine.register_fn("set_status", |resp: &mut ScriptResponseHandle, value: rhai::INT| { + resp.set_status(value as u16); + }); + engine.register_fn( + "get_header", + |resp: &mut ScriptResponseHandle, name: ImmutableString| resp.get_header(&name), + ); + engine.register_fn( + "set_header", + |resp: &mut ScriptResponseHandle, name: ImmutableString, value: ImmutableString| { + resp.set_header(&name, &value); + }, + ); + engine.register_fn( + "remove_header", + |resp: &mut ScriptResponseHandle, name: ImmutableString| { + resp.remove_header(&name); + }, + ); + engine.register_get("body", |resp: &mut ScriptResponseHandle| resp.get_body()); + engine.register_set("body", |resp: &mut ScriptResponseHandle, value: Vec| { + resp.set_body(value); + }); + engine.register_fn("set_body", |resp: &mut ScriptResponseHandle, value: Vec| { + resp.set_body(value); + }); + engine.register_fn("set_body", |resp: &mut ScriptResponseHandle, value: ImmutableString| { + resp.set_body(value.as_str().as_bytes().to_vec()); + }); + + engine.register_fn("get", |state: &mut ScriptStateHandle, key: ImmutableString| { + state.get(&key) + }); + engine.register_fn( + "set", + |state: &mut ScriptStateHandle, key: ImmutableString, value: Dynamic| { + state.set(&key, value); + }, + ); + engine.register_fn("remove", |state: &mut ScriptStateHandle, key: ImmutableString| { + state.remove(&key); + }); + engine.register_fn("clear", |state: &mut ScriptStateHandle| state.clear()); + engine.register_fn("keys", |state: &mut ScriptStateHandle| state.keys()); + + engine.register_fn("log", host_log); + engine.register_fn("set_header", host_set_header); + engine.register_fn("remove_header", host_remove_header); + engine.register_fn("deny", host_deny); + engine.register_fn("spawn_task", host_spawn_task); +} + +fn host_log(level: ImmutableString, message: ImmutableString) { + let level = level.to_string(); + let message = message.to_string(); + let _ = with_context(|ctx| { + ctx.log(&level, &message); + }); +} + +fn update_headers_for_phase(name: &str, value: Option<&str>) { + with_context(|ctx| match ctx.phase { + ScriptPhase::RequestStart | ScriptPhase::RequestBody => { + if let Some(request) = &ctx.request { + match value { + Some(value) => request.set_header(name, value), + None => request.remove_header(name), + } + } + } + ScriptPhase::Response | ScriptPhase::Tick | ScriptPhase::BackgroundTask => match value { + Some(value) => ctx.response.set_header(name, value), + None => ctx.response.remove_header(name), + }, + }); +} + +fn host_set_header(name: ImmutableString, value: ImmutableString) { + update_headers_for_phase(&name, Some(&value)); +} + +fn host_remove_header(name: ImmutableString) { + update_headers_for_phase(&name, None); +} + +fn host_deny(status: rhai::INT, body: ImmutableString) { + with_context(|ctx| { + let status_code = StatusCode::from_u16(status as u16).unwrap_or(StatusCode::FORBIDDEN); + let bytes = body.as_str().as_bytes().to_vec(); + ctx.decision = ScriptDecision::Deny { + status: status_code, + body: bytes.clone(), + }; + ctx.response.set_status(status_code.as_u16()); + ctx.response.set_body(bytes); + }); +} + +fn host_spawn_task(name: ImmutableString, callback: FnPtr) -> Result<(), Dynamic> { + with_context(|ctx| { + if !ctx.allow_spawn_task { + return Err(Dynamic::from("spawn_task not allowed")); + } + let spawner = ctx + .spawn_callback + .as_ref() + .ok_or_else(|| Dynamic::from("task spawner unavailable"))?; + spawner(name.to_string(), callback); + Ok(()) + }) + .unwrap_or_else(|| Err(Dynamic::from("no active script context"))) +} + +/// Applies request updates back into the Hyper request parts. +pub fn apply_request_snapshot(snapshot: ScriptRequestSnapshot, parts: &mut hyper::http::request::Parts) -> Result<()> { + parts.method = snapshot.method.parse().context("invalid HTTP method")?; + parts.uri = snapshot.uri.parse().context("invalid URI")?; + parts.headers = headers_vec_to_map(&snapshot.headers); + Ok(()) +} + +/// Indicates whether the request body was modified by the script. +pub fn request_body_modified(snapshot: &ScriptRequestSnapshot) -> bool { + snapshot.body_modified +} + +/// Applies response snapshot changes onto response parts. +pub fn apply_response_snapshot( + snapshot: &ScriptResponseSnapshot, + parts: &mut hyper::http::response::Parts, +) -> Result<()> { + if let Some(status) = snapshot.status { + parts.status = StatusCode::from_u16(status)?; + } + if !snapshot.headers.is_empty() { + parts.headers = headers_vec_to_map(&snapshot.headers); + } + Ok(()) +} diff --git a/ferron-script/src/engine.rs b/ferron-script/src/engine.rs new file mode 100644 index 00000000..9e5a1dc1 --- /dev/null +++ b/ferron-script/src/engine.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use parking_lot::Mutex; +use rhai::Engine; + +use crate::context; + +/// Wraps the configured Rhai engine used by the script runtime. +pub struct ScriptEngine { + pool: Arc>>, +} + +impl ScriptEngine { + pub fn new() -> Self { + Self { + pool: Arc::new(Mutex::new(Vec::new())), + } + } + + fn build_engine() -> Engine { + let mut engine = Engine::new(); + engine.set_strict_variables(true); + engine.disable_symbol("eval"); + engine.disable_symbol("import"); + context::register_types(&mut engine); + engine + } + + /// Creates a new engine instance for script execution. + pub fn instantiate(&self) -> EngineHandle { + let engine = self.pool.lock().pop().unwrap_or_else(Self::build_engine); + EngineHandle { + engine: Some(engine), + pool: Arc::clone(&self.pool), + } + } +} + +pub struct EngineHandle { + engine: Option, + pool: Arc>>, +} + +impl std::ops::Deref for EngineHandle { + type Target = Engine; + + fn deref(&self) -> &Self::Target { + self.engine.as_ref().expect("engine available") + } +} + +impl std::ops::DerefMut for EngineHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + self.engine.as_mut().expect("engine available") + } +} + +impl Drop for EngineHandle { + fn drop(&mut self) { + if let Some(engine) = self.engine.take() { + let mut pool = self.pool.lock(); + pool.push(engine); + } + } +} diff --git a/ferron-script/src/lib.rs b/ferron-script/src/lib.rs new file mode 100644 index 00000000..c274dbe0 --- /dev/null +++ b/ferron-script/src/lib.rs @@ -0,0 +1,9 @@ +mod config; +mod context; +mod engine; +mod runtime; + +pub use runtime::ScriptExecModuleLoader; + +#[cfg(test)] +pub use config::ScriptModuleConfig; diff --git a/ferron-script/src/runtime.rs b/ferron-script/src/runtime.rs new file mode 100644 index 00000000..8d5c5bbb --- /dev/null +++ b/ferron-script/src/runtime.rs @@ -0,0 +1,1075 @@ +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::env; +use std::error::Error; +use std::fs; +use std::panic::{self, AssertUnwindSafe}; +use std::path::Path; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, OnceLock}; +use std::time::{Duration, SystemTime}; + +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use ferron_common::config::ServerConfiguration; +use ferron_common::get_entries_for_validation; +use ferron_common::logging::ErrorLogger; +use ferron_common::modules::{Module, ModuleHandlers, ModuleLoader, ResponseData, SocketData}; +use ferron_common::util::ModuleCache; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::{HeaderMap, Request, Response, StatusCode}; +use parking_lot::Mutex; +use rhai::{FnPtr, Scope}; +use sha2::{Digest, Sha256}; +use tokio::runtime::Handle; +use tokio::sync::RwLock; +use tokio::task::{block_in_place, JoinHandle}; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::config::{FailurePolicy, ScriptDefinition, ScriptModuleConfig, ScriptSource, ScriptTrigger}; +use crate::context::{ + apply_request_snapshot, apply_response_snapshot, env_to_dynamic, request_body_modified, ContextGuard, ScriptDecision, + ScriptExecutionContext, ScriptPhase, ScriptRequestHandle, ScriptResponseHandle, ScriptStateHandle, +}; + +#[derive(Clone, Copy)] +struct SendableContextPtr(*mut ScriptExecutionContext<'static>); + +impl SendableContextPtr { + #[allow(clippy::unnecessary_cast)] + fn new(ctx: &mut ScriptExecutionContext<'_>) -> Self { + Self(ctx as *mut _ as *mut ScriptExecutionContext<'static>) + } + + fn as_ptr(self) -> *mut ScriptExecutionContext<'static> { + self.0 + } +} + +unsafe impl Send for SendableContextPtr {} +use crate::engine::ScriptEngine; + +const FAILURE_THRESHOLD: usize = 5; + +fn script_debug_enabled() -> bool { + static ENABLED: OnceLock = OnceLock::new(); + *ENABLED.get_or_init(|| env::var("FERRON_SCRIPT_DEBUG").is_ok()) +} + +macro_rules! script_debug { + ($($arg:tt)*) => { + if $crate::runtime::script_debug_enabled() { + eprintln!($($arg)*); + } + }; +} + +pub struct ScriptExecModuleLoader { + cache: ModuleCache, +} + +impl Default for ScriptExecModuleLoader { + fn default() -> Self { + Self::new() + } +} + +impl ScriptExecModuleLoader { + pub fn new() -> Self { + Self { + cache: ModuleCache::new(vec!["module"]), + } + } +} + +impl ModuleLoader for ScriptExecModuleLoader { + fn load_module( + &mut self, + config: &ServerConfiguration, + _global_config: Option<&ServerConfiguration>, + secondary_runtime: &tokio::runtime::Runtime, + ) -> Result, Box> { + Ok( + self + .cache + .get_or_init::<_, Box>(config, |cfg| { + let module_config = ScriptModuleConfig::from_server_config(cfg)?; + let runtime = ScriptRuntime::new(module_config, secondary_runtime.handle()); + Ok(Arc::new(ScriptExecModule { runtime })) + })?, + ) + } + + fn get_requirements(&self) -> Vec<&'static str> { + vec!["module"] + } + + fn validate_configuration( + &self, + config: &ServerConfiguration, + used_properties: &mut HashSet, + ) -> Result<(), Box> { + if get_entries_for_validation!("module", config, used_properties).is_some() { + ScriptModuleConfig::from_server_config(config)?; + } + Ok(()) + } +} + +struct ScriptExecModule { + runtime: Arc, +} + +impl Module for ScriptExecModule { + fn get_module_handlers(&self) -> Box { + Box::new(ScriptModuleHandlers::new(self.runtime.clone())) + } +} + +struct ScriptRuntime { + engine: Arc, + state: ScriptStateHandle, + scripts: Vec>, + request_start: Vec>, + request_body: Vec>, + response_ready: Vec>, + requires_body: bool, + tokio_handle: Handle, + background_tasks: Mutex>>, +} + +impl ScriptRuntime { + fn new(config: ScriptModuleConfig, handle: &Handle) -> Arc { + let engine = Arc::new(ScriptEngine::new()); + let state = ScriptStateHandle::new(); + let mut scripts = Vec::new(); + let mut request_start = Vec::new(); + let mut request_body = Vec::new(); + let mut response_ready = Vec::new(); + + for definition in config.scripts { + let script = Arc::new(ManagedScript::new(definition)); + script_debug!( + "DEBUG: Loading script '{}' with {} triggers", + script.id, + script.triggers.len() + ); + if script.has_trigger(|t| matches!(t, ScriptTrigger::RequestStart)) { + script_debug!("DEBUG: Script '{}' added to request_start", script.id); + request_start.push(script.clone()); + } + if script.has_trigger(|t| matches!(t, ScriptTrigger::RequestBody)) { + request_body.push(script.clone()); + } + if script.has_trigger(|t| matches!(t, ScriptTrigger::ResponseReady)) { + response_ready.push(script.clone()); + } + scripts.push(script); + } + script_debug!( + "DEBUG: ScriptRuntime initialized with {} request_start scripts", + request_start.len() + ); + + let requires_body = !request_body.is_empty(); + let runtime = Arc::new(ScriptRuntime { + engine, + state, + scripts, + request_start, + request_body, + response_ready, + requires_body, + tokio_handle: handle.clone(), + background_tasks: Mutex::new(Vec::new()), + }); + + runtime.spawn_tick_tasks(); + runtime + } + + fn spawn_tick_tasks(self: &Arc) { + for script in &self.scripts { + for trigger in &script.triggers { + if let ScriptTrigger::Tick(interval) = trigger { + self.spawn_tick_worker(script.clone(), *interval); + } + } + } + } + + fn spawn_tick_worker(self: &Arc, script: Arc, interval_duration: Duration) { + let runtime = self.clone(); + let mut ticker = interval(interval_duration); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let handle = self.tokio_handle.spawn(async move { + let logger = ErrorLogger::without_logger(); + let response_handle = ScriptResponseHandle::new(); + let mut ticker = ticker; + loop { + ticker.tick().await; + if let Err(err) = runtime + .execute_script(script.clone(), ScriptPhase::Tick, None, &response_handle, &logger) + .await + { + logger.log(&format!("Tick script failed: {err}")).await; + } + } + }); + + self.background_tasks.lock().push(handle); + } + + fn is_empty(&self) -> bool { + self.scripts.is_empty() + } + + fn requires_request_body(&self) -> bool { + self.requires_body + } + + fn request_start_scripts(&self) -> &[Arc] { + &self.request_start + } + + fn request_body_scripts(&self) -> &[Arc] { + &self.request_body + } + + fn response_ready_scripts(&self) -> &[Arc] { + &self.response_ready + } + + async fn run_scripts( + self: &Arc, + scripts: &[Arc], + phase: ScriptPhase, + request_handle: Option, + response_handle: &ScriptResponseHandle, + logger: &ErrorLogger, + ) -> Result>>> { + if scripts.is_empty() { + script_debug!("DEBUG: run_scripts called with empty scripts array"); + return Ok(None); + } + + script_debug!("DEBUG: run_scripts executing {} scripts", scripts.len()); + for script in scripts { + script_debug!("DEBUG: Executing script: {}", script.id); + match self + .execute_script(script.clone(), phase, request_handle.clone(), response_handle, logger) + .await + { + Ok(ScriptDecision::Continue) => {} + Ok(ScriptDecision::Deny { status, body }) => { + return Ok(Some(deny_response(status, body))); + } + Err(err) => Err(err)?, + } + } + + Ok(None) + } + + async fn execute_script( + self: &Arc, + script: Arc, + phase: ScriptPhase, + request_handle: Option, + response_handle: &ScriptResponseHandle, + logger: &ErrorLogger, + ) -> Result { + script_debug!( + "DEBUG: execute_script called for script '{}' in phase {:?}", + script.id, + phase + ); + if script.failure_state.is_tripped() { + script_debug!("DEBUG: Script '{}' is tripped, skipping", script.id); + logger + .log(&format!( + "Script '{}' skipped because it is temporarily disabled", + script.id + )) + .await; + return Ok(ScriptDecision::Continue); + } + + let (ast, version_hash) = match script.ast_cache.ensure_ast(&self.engine, &self.tokio_handle).await { + Ok(data) => data, + Err(err) => { + // Treat compilation errors the same way as runtime failures so `failure_policy` + // is applied and we log the root cause. + return self + .handle_failure(&script, &None, logger, &format!("compile error: {err:#}")) + .await; + } + }; + + let mut scope = Scope::new(); + if let Some(request) = &request_handle { + scope.push_constant("request", request.clone()); + } + scope.push_constant("response", response_handle.clone()); + scope.push_constant("env", env_to_dynamic(&script.env)); + scope.push("state", self.state.clone()); + + let logger_clone = logger.clone(); + let spawn_callback: Option> = + if script.permissions.allow_spawn_task { + let runtime = Arc::downgrade(self); + let script_for_task = script.clone(); + Some(Arc::new(move |task_name: String, fn_ptr: FnPtr| { + if let Some(runtime) = runtime.upgrade() { + runtime.queue_spawn_task(script_for_task.clone(), task_name, fn_ptr, logger_clone.clone()); + } + }) as Arc) + } else { + None + }; + + let mut execution_context = ScriptExecutionContext::new( + &script.id, + phase, + request_handle, + response_handle.clone(), + script.env.clone(), + self.state.clone(), + logger, + script.permissions.allow_spawn_task, + spawn_callback, + ); + + let mut engine = self.engine.instantiate(); + let max_exec_time = script.limits.max_exec_time; + engine.set_max_operations(script.limits.max_operations); + engine.set_max_call_levels(script.limits.max_call_depth as usize); + + let ast_clone = ast.clone(); + let scope = scope; + let ctx_ptr = SendableContextPtr::new(&mut execution_context); + let timeout_handle = self.tokio_handle.clone(); + let result = timeout_handle + .spawn(async move { + tokio::time::timeout(max_exec_time, async move { + block_in_place(move || { + let mut scope = scope; + let engine = engine; + panic::catch_unwind(AssertUnwindSafe(|| { + let guard = ContextGuard::activate(ctx_ptr.as_ptr()); + let run_result = engine.run_ast_with_scope(&mut scope, &ast_clone); + drop(guard); + run_result + })) + }) + }) + .await + }) + .await; + + // Drain logs before handling result to ensure they are written even if script fails + let pending_logs = execution_context.drain_logs(); + + match result { + Ok(Ok(Ok(Ok(_)))) => { + script.failure_state.record_success(); + for log_line in pending_logs { + logger.log(&log_line).await; + } + script_debug!( + "DEBUG: Script '{}' executed successfully, decision: {:?}", + script.id, + execution_context.decision + ); + Ok(execution_context.decision) + } + Ok(Ok(Ok(Err(err)))) => { + for log_line in pending_logs { + logger.log(&log_line).await; + } + self + .handle_failure(&script, &version_hash, logger, &format!("execution error: {err}")) + .await + } + Ok(Ok(Err(panic_payload))) => { + for log_line in pending_logs { + logger.log(&log_line).await; + } + let panic_msg = describe_panic(panic_payload); + self + .handle_failure(&script, &version_hash, logger, &format!("panic: {panic_msg}")) + .await + } + Ok(Err(elapsed)) => { + for log_line in pending_logs { + logger.log(&log_line).await; + } + self + .handle_failure( + &script, + &version_hash, + logger, + &format!("timed out after {:?}", elapsed), + ) + .await + } + Err(join_err) => { + for log_line in pending_logs { + logger.log(&log_line).await; + } + self + .handle_failure(&script, &version_hash, logger, &format!("tokio join error: {join_err}")) + .await + } + } + } + + async fn handle_failure( + self: &Arc, + script: &Arc, + version_hash: &Option, + logger: &ErrorLogger, + reason: &str, + ) -> Result { + let hash_text = version_hash.as_deref().map(|h| format!(" @{}", h)).unwrap_or_default(); + logger + .log(&format!("Script '{}'{hash_text} failed: {reason}", script.id)) + .await; + + let tripped = script.failure_state.record_failure(); + if tripped { + logger + .log(&format!( + "Script '{}' has been disabled after {} consecutive failures", + script.id, FAILURE_THRESHOLD + )) + .await; + } + + match script.failure_policy { + FailurePolicy::Block => Err(anyhow!("script '{}' failed", script.id)), + FailurePolicy::Skip => Ok(ScriptDecision::Continue), + } + } + + fn queue_spawn_task( + self: &Arc, + script: Arc, + task_name: String, + fn_ptr: FnPtr, + logger: ErrorLogger, + ) { + self.cleanup_finished_tasks(); + let runtime = self.clone(); + let handle = self.tokio_handle.spawn(async move { + runtime.run_spawned_function(script, task_name, fn_ptr, logger).await; + }); + self.background_tasks.lock().push(handle); + } + + fn cleanup_finished_tasks(&self) { + let mut handles = self.background_tasks.lock(); + handles.retain(|handle| !handle.is_finished()); + } + + async fn run_spawned_function( + self: Arc, + script: Arc, + task_name: String, + fn_ptr: FnPtr, + logger: ErrorLogger, + ) { + let (ast, version_hash) = match script.ast_cache.ensure_ast(&self.engine, &self.tokio_handle).await { + Ok(data) => data, + Err(err) => { + logger + .log(&format!("Failed to compile background task '{}': {err}", task_name)) + .await; + return; + } + }; + + let mut scope = Scope::new(); + scope.push_constant("env", env_to_dynamic(&script.env)); + scope.push("state", self.state.clone()); + + let mut engine = self.engine.instantiate(); + let max_exec_time = script.limits.max_exec_time; + engine.set_max_operations(script.limits.max_operations); + engine.set_max_call_levels(script.limits.max_call_depth as usize); + + let spawn_logger = logger.clone(); + let runtime = Arc::downgrade(&self); + let script_for_task = script.clone(); + let callback: Option> = if script.permissions.allow_spawn_task { + Some(Arc::new(move |name: String, pointer: FnPtr| { + if let Some(runtime) = runtime.upgrade() { + runtime.queue_spawn_task(script_for_task.clone(), name, pointer, spawn_logger.clone()); + } + }) as Arc) + } else { + None + }; + + let mut exec_ctx = ScriptExecutionContext::new( + &script.id, + ScriptPhase::BackgroundTask, + None, + ScriptResponseHandle::new(), + script.env.clone(), + self.state.clone(), + &logger, + script.permissions.allow_spawn_task, + callback, + ); + + let args: Vec = fn_ptr.iter_curry().cloned().collect(); + let fn_name = fn_ptr.fn_name().to_string(); + let ast_clone = ast.clone(); + let scope = scope; + let ctx_ptr = SendableContextPtr::new(&mut exec_ctx); + let timeout_handle = self.tokio_handle.clone(); + let result = timeout_handle + .spawn(async move { + tokio::time::timeout(max_exec_time, async move { + block_in_place(move || { + let mut scope = scope; + let engine = engine; + let mut args = args; + #[allow(deprecated)] + panic::catch_unwind(AssertUnwindSafe(|| { + let guard = ContextGuard::activate(ctx_ptr.as_ptr()); + let call_result = engine.call_fn_raw( + &mut scope, + &ast_clone, + false, + true, + fn_name.as_str(), + None, + args.as_mut_slice(), + ); + drop(guard); + call_result + })) + }) + }) + .await + }) + .await; + + match result { + Ok(Ok(Ok(Ok(_)))) => { + script.failure_state.record_success(); + for log_line in exec_ctx.drain_logs() { + logger.log(&log_line).await; + } + } + Ok(Ok(Ok(Err(err)))) => { + for log_line in exec_ctx.drain_logs() { + logger.log(&log_line).await; + } + self + .handle_failure(&script, &version_hash, &logger, &format!("task error: {err}")) + .await + .ok(); + } + Ok(Ok(Err(panic_payload))) => { + for log_line in exec_ctx.drain_logs() { + logger.log(&log_line).await; + } + let panic_msg = describe_panic(panic_payload); + self + .handle_failure(&script, &version_hash, &logger, &format!("task panic: {panic_msg}")) + .await + .ok(); + } + Ok(Err(elapsed)) => { + for log_line in exec_ctx.drain_logs() { + logger.log(&log_line).await; + } + self + .handle_failure( + &script, + &version_hash, + &logger, + &format!("task timed out after {:?}", elapsed), + ) + .await + .ok(); + } + Err(join_err) => { + for log_line in exec_ctx.drain_logs() { + logger.log(&log_line).await; + } + self + .handle_failure( + &script, + &version_hash, + &logger, + &format!("task tokio join error: {join_err}"), + ) + .await + .ok(); + } + } + } +} + +impl Drop for ScriptRuntime { + fn drop(&mut self) { + for handle in self.background_tasks.lock().drain(..) { + handle.abort(); + } + } +} + +#[derive(Clone)] +struct ManagedScript { + id: String, + env: Arc>, + permissions: crate::config::ScriptPermissions, + limits: crate::config::ScriptLimits, + failure_policy: FailurePolicy, + triggers: Vec, + ast_cache: ScriptAstCache, + failure_state: ScriptFailureState, +} + +impl ManagedScript { + fn new(definition: ScriptDefinition) -> Self { + Self { + id: definition.id.clone(), + env: Arc::new(definition.env.clone()), + permissions: definition.permissions.clone(), + limits: definition.limits.clone(), + failure_policy: definition.failure_policy, + triggers: definition.triggers, + ast_cache: ScriptAstCache::new(definition.source, definition.reload_on_change), + failure_state: ScriptFailureState::new(), + } + } + + fn has_trigger(&self, predicate: F) -> bool + where + F: Fn(&ScriptTrigger) -> bool, + { + self.triggers.iter().any(predicate) + } +} + +#[derive(Clone)] +struct ScriptFailureState { + consecutive: Arc, + tripped: Arc, +} + +impl ScriptFailureState { + fn new() -> Self { + Self { + consecutive: Arc::new(AtomicUsize::new(0)), + tripped: Arc::new(AtomicBool::new(false)), + } + } + + fn record_success(&self) { + self.consecutive.store(0, Ordering::Relaxed); + self.tripped.store(false, Ordering::Relaxed); + } + + fn record_failure(&self) -> bool { + let failures = self.consecutive.fetch_add(1, Ordering::Relaxed) + 1; + if failures >= FAILURE_THRESHOLD { + self.tripped.store(true, Ordering::Relaxed); + true + } else { + false + } + } + + fn is_tripped(&self) -> bool { + self.tripped.load(Ordering::Relaxed) + } +} + +#[derive(Clone)] +struct ScriptAstCache { + source: ScriptSource, + reload_on_change: bool, + compiled: Arc>>, +} + +#[derive(Clone)] +struct CompiledAst { + ast: Arc, + version_hash: String, + modified: Option, +} + +impl ScriptAstCache { + fn new(source: ScriptSource, reload_on_change: bool) -> Self { + Self { + source, + reload_on_change, + compiled: Arc::new(RwLock::new(None)), + } + } + + async fn ensure_ast(&self, engine: &ScriptEngine, handle: &Handle) -> Result<(Arc, Option)> { + loop { + let cached = { self.compiled.read().await.clone() }; + let cached_version = cached.as_ref().map(|c| c.version_hash.clone()); + let should_reload = self.should_reload(&cached, handle).await; + + if !should_reload { + if let Some(compiled) = cached { + return Ok((compiled.ast.clone(), Some(compiled.version_hash.clone()))); + } + } + + let mut guard = self.compiled.write().await; + if guard.as_ref().map(|c| c.version_hash.as_str()) != cached_version.as_deref() { + continue; + } + + let code = match &self.source { + ScriptSource::File(path) => read_to_string(handle, path).await?, + }; + + let ast = Arc::new(compile_script(engine, &code)?); + let version_hash = compute_hash(&code); + let modified = match &self.source { + ScriptSource::File(path) => read_metadata(handle, path).await.ok().and_then(|m| m.modified().ok()), + }; + let compiled = CompiledAst { + ast: ast.clone(), + version_hash, + modified, + }; + let version_hash = Some(compiled.version_hash.clone()); + *guard = Some(compiled); + return Ok((ast, version_hash)); + } + } + + async fn should_reload(&self, cached: &Option, handle: &Handle) -> bool { + match (&self.source, self.reload_on_change, cached) { + (_, _, None) => true, + (ScriptSource::File(path), true, Some(compiled)) => { + let metadata = read_metadata(handle, path).await.ok(); + metadata + .and_then(|m| m.modified().ok()) + .map(|mtime| compiled.modified.is_none_or(|prev| mtime > prev)) + .unwrap_or(false) + } + _ => false, + } + } +} + +fn compute_hash(code: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(code.as_bytes()); + hex::encode(hasher.finalize()) +} + +fn compile_script(engine: &ScriptEngine, code: &str) -> Result { + let engine = engine.instantiate(); + let scope = initial_compile_scope(); + engine + .compile_with_scope(&scope, code) + .with_context(|| "failed to compile script".to_string()) +} + +fn describe_panic(panic: Box) -> String { + match panic.downcast::() { + Ok(message) => *message, + Err(panic) => match panic.downcast::<&'static str>() { + Ok(message) => message.to_string(), + Err(_) => "unknown panic".to_string(), + }, + } +} + +fn initial_compile_scope() -> Scope<'static> { + let mut scope = Scope::new(); + let headers = HeaderMap::new(); + scope.push_constant( + "request", + ScriptRequestHandle::from_parts("GET", "/", &headers, Vec::new()), + ); + scope.push_constant("response", ScriptResponseHandle::new()); + scope.push_constant("env", env_to_dynamic(&HashMap::new())); + scope.push("state", ScriptStateHandle::new()); + scope +} + +async fn read_to_string(handle: &Handle, path: &Path) -> Result { + let owned = path.to_path_buf(); + let display = owned.display().to_string(); + handle + .spawn_blocking(move || fs::read_to_string(&owned)) + .await + .context("failed to join file read task")? + .with_context(|| format!("unable to read script at {display}")) +} + +async fn read_metadata(handle: &Handle, path: &Path) -> Result { + let owned = path.to_path_buf(); + let display = owned.display().to_string(); + handle + .spawn_blocking(move || fs::metadata(&owned)) + .await + .context("failed to join metadata read task")? + .with_context(|| format!("unable to read metadata for {display}")) +} + +enum RequestBodyState { + Buffered(Vec), + Stream(BoxBody), +} + +fn deny_response(status: StatusCode, body: Vec) -> Response> { + Response::builder() + .status(status) + .body( + Full::new(Bytes::from(body)) + .map_err(|e: std::convert::Infallible| -> std::io::Error { match e {} }) + .boxed(), + ) + .unwrap_or_else(|_| { + Response::new( + Empty::new() + .map_err(|e: std::convert::Infallible| -> std::io::Error { match e {} }) + .boxed(), + ) + }) +} + +#[derive(Debug)] +struct ResponseHandlerError(anyhow::Error); + +impl std::fmt::Display for ResponseHandlerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for ResponseHandlerError {} + +pub struct ScriptModuleHandlers { + runtime: Arc, + request_handle: Option, + response_handle: ScriptResponseHandle, + request_parts: Option, + request_body: Option, + error_logger: Option, +} + +impl ScriptModuleHandlers { + fn new(runtime: Arc) -> Self { + Self { + runtime, + request_handle: None, + response_handle: ScriptResponseHandle::new(), + request_parts: None, + request_body: None, + error_logger: None, + } + } +} + +#[async_trait(?Send)] +impl ModuleHandlers for ScriptModuleHandlers { + async fn request_handler( + &mut self, + request: Request>, + _config: &ServerConfiguration, + _socket_data: &SocketData, + error_logger: &ErrorLogger, + ) -> Result> { + if self.runtime.is_empty() { + return Ok(ResponseData { + request: Some(request), + response: None, + response_status: None, + response_headers: None, + new_remote_address: None, + }); + } + + self.request_handle = None; + self.response_handle = ScriptResponseHandle::new(); + self.request_parts = None; + self.request_body = None; + self.error_logger = Some(error_logger.clone()); + + let (parts, body) = request.into_parts(); + self.request_parts = Some(parts); + self.request_body = Some(if self.runtime.requires_request_body() { + let collected = body.collect().await?; + RequestBodyState::Buffered(collected.to_bytes().to_vec()) + } else { + RequestBodyState::Stream(body) + }); + + if let Some(parts_ref) = self.request_parts.as_ref() { + let body_clone = match self.request_body.as_ref().unwrap() { + RequestBodyState::Buffered(bytes) => bytes.clone(), + RequestBodyState::Stream(_) => Vec::new(), + }; + let uri = parts_ref.uri.to_string(); + let handle = + ScriptRequestHandle::from_parts(parts_ref.method.as_str(), uri.as_str(), &parts_ref.headers, body_clone); + self.request_handle = Some(handle); + } + + let request_start_scripts = self.runtime.request_start_scripts(); + script_debug!("DEBUG: request_start_scripts count: {}", request_start_scripts.len()); + if let Some(response) = self + .runtime + .run_scripts( + request_start_scripts, + ScriptPhase::RequestStart, + self.request_handle.clone(), + &self.response_handle, + error_logger, + ) + .await + .map_err(|err| -> Box { err.into() })? + { + script_debug!("DEBUG: Script returned deny response"); + return Ok(ResponseData { + request: None, + response: Some(response), + response_status: None, + response_headers: None, + new_remote_address: None, + }); + } + script_debug!("DEBUG: No deny response from scripts, continuing"); + + if self.runtime.requires_request_body() { + if let Some(response) = self + .runtime + .run_scripts( + self.runtime.request_body_scripts(), + ScriptPhase::RequestBody, + self.request_handle.clone(), + &self.response_handle, + error_logger, + ) + .await + .map_err(|err| -> Box { err.into() })? + { + return Ok(ResponseData { + request: None, + response: Some(response), + response_status: None, + response_headers: None, + new_remote_address: None, + }); + } + } + + if let (Some(handle), Some(parts)) = (&self.request_handle, &mut self.request_parts) { + let snapshot = handle.snapshot(); + apply_request_snapshot(snapshot.clone(), parts)?; + if request_body_modified(&snapshot) { + let new_body = snapshot.body.clone(); + match self.request_body.as_mut() { + Some(RequestBodyState::Buffered(buffer)) => { + *buffer = new_body; + } + Some(RequestBodyState::Stream(_)) | None => { + self.request_body = Some(RequestBodyState::Buffered(new_body)); + } + } + } + } + + let parts = self.request_parts.take().unwrap(); + let body = match self.request_body.take().unwrap() { + RequestBodyState::Buffered(bytes) => Full::new(Bytes::from(bytes)).map_err(|e| match e {}).boxed(), + RequestBodyState::Stream(body) => body, + }; + + Ok(ResponseData { + request: Some(Request::from_parts(parts, body)), + response: None, + response_status: None, + response_headers: None, + new_remote_address: None, + }) + } + + async fn response_modifying_handler( + &mut self, + response: Response>, + ) -> Result>, Box> { + if self.runtime.response_ready_scripts().is_empty() { + return Ok(response); + } + + let (mut parts, body) = response.into_parts(); + let collected = body.collect().await?; + let bytes = collected.to_bytes(); + self.response_handle.set_status(parts.status.as_u16()); + self.response_handle.set_body(bytes.to_vec()); + for (name, value) in parts.headers.iter() { + if let Ok(value_str) = value.to_str() { + self.response_handle.set_header(name.as_str(), value_str); + } + } + + let logger = match &self.error_logger { + Some(logger) => { + // Clone the logger to ensure it's available for script execution + logger.clone() + } + None => { + // If error_logger is None, create a dummy logger that does nothing + // This shouldn't happen, but we handle it gracefully + eprintln!("WARNING: error_logger is None in response_modifying_handler"); + ErrorLogger::without_logger() + } + }; + + if let Some(response) = self + .runtime + .run_scripts( + self.runtime.response_ready_scripts(), + ScriptPhase::Response, + self.request_handle.clone(), + &self.response_handle, + &logger, + ) + .await + .map_err(|err| -> Box { Box::new(ResponseHandlerError(err)) })? + { + self.request_handle = None; + self.error_logger = None; + return Ok(response); + } + + let mut snapshot = self.response_handle.snapshot(); + let body_len = snapshot.body.len(); + // 确保 Content-Length 与修改后的响应体一致,避免超/少报导致 hyper panic。 + snapshot + .headers + .retain(|(name, _)| !name.eq_ignore_ascii_case("content-length")); + snapshot + .headers + .push(("content-length".to_string(), body_len.to_string())); + + apply_response_snapshot(&snapshot, &mut parts)?; + let body = Full::new(Bytes::from(snapshot.body)).map_err(|e| match e {}).boxed(); + + self.request_handle = None; + self.error_logger = None; + + Ok(Response::from_parts(parts, body)) + } +} diff --git a/ferron/src/config/adapters/docker_auto.rs b/ferron/src/config/adapters/docker_auto.rs index 07d74568..a7d672f8 100644 --- a/ferron/src/config/adapters/docker_auto.rs +++ b/ferron/src/config/adapters/docker_auto.rs @@ -61,6 +61,7 @@ impl ConfigurationAdapter for DockerAutoConfigurationAdapter { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String("/var/cache/ferron-acme".to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); diff --git a/ferron/src/config/adapters/kdl.rs b/ferron/src/config/adapters/kdl.rs index 362000be..c972766a 100644 --- a/ferron/src/config/adapters/kdl.rs +++ b/ferron/src/config/adapters/kdl.rs @@ -21,6 +21,7 @@ use super::ConfigurationAdapter; fn kdl_node_to_configuration_entry(kdl_node: &KdlNode) -> ServerConfigurationEntry { let mut values = Vec::new(); let mut props = HashMap::new(); + let mut children = HashMap::new(); for kdl_entry in kdl_node.iter() { let value = match kdl_entry.value().to_owned() { KdlValue::String(value) => ServerConfigurationValue::String(value), @@ -39,7 +40,21 @@ fn kdl_node_to_configuration_entry(kdl_node: &KdlNode) -> ServerConfigurationEnt // If KDL node doesn't have any arguments, add the "#true" KDL value values.push(ServerConfigurationValue::Bool(true)); } - ServerConfigurationEntry { values, props } + if let Some(children_nodes) = kdl_node.children() { + for child in children_nodes.nodes() { + let entry = kdl_node_to_configuration_entry(child); + children + .entry(child.name().value().to_string()) + .or_insert_with(ServerConfigurationEntries::default) + .inner + .push(entry); + } + } + ServerConfigurationEntry { + values, + props, + children, + } } fn load_configuration_inner( @@ -243,6 +258,7 @@ fn load_configuration_inner( inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String(location_str.to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); diff --git a/ferron/src/config/adapters/yaml_legacy.rs b/ferron/src/config/adapters/yaml_legacy.rs index 4f21488d..d31a426d 100644 --- a/ferron/src/config/adapters/yaml_legacy.rs +++ b/ferron/src/config/adapters/yaml_legacy.rs @@ -19,6 +19,7 @@ use super::ConfigurationAdapter; fn kdl_node_to_configuration_entry(kdl_node: &KdlNode) -> ServerConfigurationEntry { let mut values = Vec::new(); let mut props = HashMap::new(); + let mut children = HashMap::new(); for kdl_entry in kdl_node.iter() { let value = match kdl_entry.value().to_owned() { KdlValue::String(value) => ServerConfigurationValue::String(value), @@ -37,7 +38,21 @@ fn kdl_node_to_configuration_entry(kdl_node: &KdlNode) -> ServerConfigurationEnt // If KDL node doesn't have any arguments, add the "#true" KDL value values.push(ServerConfigurationValue::Bool(true)); } - ServerConfigurationEntry { values, props } + if let Some(children_nodes) = kdl_node.children() { + for child in children_nodes.nodes() { + let entry = kdl_node_to_configuration_entry(child); + children + .entry(child.name().value().to_string()) + .or_insert_with(ServerConfigurationEntries::default) + .inner + .push(entry); + } + } + ServerConfigurationEntry { + values, + props, + children, + } } /// A legacy YAML configuration adapter that utilizes `ferron-yaml2kdl-core` component @@ -223,6 +238,7 @@ impl ConfigurationAdapter for YamlLegacyConfigurationAdapter { inner: vec![ServerConfigurationEntry { values: vec![ServerConfigurationValue::String(location_str.to_string())], props: HashMap::new(), + children: HashMap::new(), }], }, ); diff --git a/ferron/src/config/processing.rs b/ferron/src/config/processing.rs index 9836f605..2aa44629 100644 --- a/ferron/src/config/processing.rs +++ b/ferron/src/config/processing.rs @@ -537,6 +537,7 @@ mod tests { inner: vec![ServerConfigurationEntry { values, props: HashMap::new(), + children: HashMap::new(), }], } } @@ -545,6 +546,7 @@ mod tests { let entry = ServerConfigurationEntry { values: vec![value], props: HashMap::new(), + children: HashMap::new(), }; (key.to_string(), ServerConfigurationEntries { inner: vec![entry] }) } diff --git a/script_module_wrk.sh b/script_module_wrk.sh new file mode 100755 index 00000000..0f5a8b95 --- /dev/null +++ b/script_module_wrk.sh @@ -0,0 +1,342 @@ +#!/usr/bin/env bash + +set -euo pipefail + +FERRON_BIN="${FERRON_BIN:-./target/debug/ferron}" +FERRON_CONFIG="${FERRON_CONFIG:-test-script.kdl}" +SCRIPT_FILE="${SCRIPT_FILE:-scripts/example.rhai}" +FERRON_URL="${FERRON_URL:-http://127.0.0.1:8080/}" +WRK_BIN="${WRK_BIN:-wrk}" +SERVER_LOG="${SERVER_LOG:-/tmp/ferron-script-module.log}" +WRK_THREADS="${WRK_THREADS:-4}" +WRK_CONNECTIONS="${WRK_CONNECTIONS:-32}" +WRK_DURATION="${WRK_DURATION:-10s}" + +require_cmd() { + if ! command -v "$1" >/dev/null 2>&1; then + echo "error: 未找到依赖命令 '$1'" >&2 + exit 1 + fi +} + +require_cmd curl +require_cmd "$WRK_BIN" + +CONFIG_BACKUP=$(mktemp) +SCRIPT_BACKUP=$(mktemp) +CONFIG_ORIG_EXISTS=0 +SCRIPT_ORIG_EXISTS=0 + +if [[ -f "$FERRON_CONFIG" ]]; then + CONFIG_ORIG_EXISTS=1 + cp "$FERRON_CONFIG" "$CONFIG_BACKUP" +fi + +if [[ -f "$SCRIPT_FILE" ]]; then + SCRIPT_ORIG_EXISTS=1 + cp "$SCRIPT_FILE" "$SCRIPT_BACKUP" +fi + +log() { + printf '[%s] %s\n' "$(date '+%H:%M:%S')" "$*" >&2 +} + +fail() { + echo "error: $*" >&2 + if [[ -s "$SERVER_LOG" ]]; then + echo "--- Ferron 日志 (最近 40 行) ---" >&2 + tail -n 40 "$SERVER_LOG" >&2 || true + fi + exit 1 +} + +SERVER_PID="" +WRK_PARAMS=() +if [[ $# -gt 0 ]]; then + WRK_PARAMS=("$@") +else + WRK_PARAMS=(-t"$WRK_THREADS" -c"$WRK_CONNECTIONS" -d"$WRK_DURATION" "$FERRON_URL") +fi + +cleanup() { + if [[ -n "$SERVER_PID" ]] && kill -0 "$SERVER_PID" >/dev/null 2>&1; then + kill "$SERVER_PID" >/dev/null 2>&1 || true + wait "$SERVER_PID" >/dev/null 2>&1 || true + fi + if [[ $CONFIG_ORIG_EXISTS -eq 1 ]]; then + mv "$CONFIG_BACKUP" "$FERRON_CONFIG" + else + rm -f "$FERRON_CONFIG" + fi + if [[ $SCRIPT_ORIG_EXISTS -eq 1 ]]; then + mv "$SCRIPT_BACKUP" "$SCRIPT_FILE" + else + rm -f "$SCRIPT_FILE" + fi + rm -f "$CONFIG_BACKUP" "$SCRIPT_BACKUP" +} +trap cleanup EXIT + +write_file() { + local path="$1" + local content="$2" + printf '%s\n' "$content" >"$path" +} + +stop_server() { + if [[ -n "$SERVER_PID" ]] && kill -0 "$SERVER_PID" >/dev/null 2>&1; then + kill "$SERVER_PID" >/dev/null 2>&1 || true + wait "$SERVER_PID" >/dev/null 2>&1 || true + fi + SERVER_PID="" +} + +wait_for_ready() { + for _ in $(seq 1 30); do + if curl -sS -o /dev/null "$FERRON_URL"; then + return 0 + fi + if ! kill -0 "$SERVER_PID" >/dev/null 2>&1; then + fail "Ferron 提前退出,请检查 $SERVER_LOG" + fi + sleep 1 + done + fail "等待 Ferron 启动超时 ($FERRON_URL)" +} + +start_server() { + local scenario="$1" + local config_content="$2" + local script_content="$3" + + stop_server + write_file "$FERRON_CONFIG" "$config_content" + write_file "$SCRIPT_FILE" "$script_content" + + log "启动 Ferron(场景:$scenario)" + "$FERRON_BIN" --config "$FERRON_CONFIG" >"$SERVER_LOG" 2>&1 & + SERVER_PID=$! + wait_for_ready +} + +RESULT_STATUS="" +RESULT_HEADERS="" +RESULT_BODY="" + +collect_response() { + RESULT_HEADERS=$(mktemp) + RESULT_BODY=$(mktemp) + RESULT_STATUS=$(curl -sS -D "$RESULT_HEADERS" -o "$RESULT_BODY" -w "%{http_code}" "$FERRON_URL") +} + +assert_status() { + local expected="$1" + if [[ "$RESULT_STATUS" != "$expected" ]]; then + fail "期望 HTTP $expected,实际 $RESULT_STATUS" + fi +} + +assert_status_ge() { + local expected="$1" + if (( RESULT_STATUS < expected )); then + fail "期望 HTTP >= $expected,实际 $RESULT_STATUS" + fi +} + +assert_header_equals() { + local name="$1" + local value="$2" + if ! tr -d '\r' <"$RESULT_HEADERS" | grep -qi "^$name: $value\$"; then + fail "响应头 $name 不符合预期(缺少 $value)" + fi +} + +assert_body_contains() { + local text="$1" + if ! grep -q "$text" "$RESULT_BODY"; then + fail "响应体未包含:$text" + fi +} + +WRK_LAST_OUTPUT="" +run_wrk() { + log "运行 wrk: ${WRK_PARAMS[*]}" + local tmp + tmp=$(mktemp) + set +e + "$WRK_BIN" "${WRK_PARAMS[@]}" | tee "$tmp" + local status=${PIPESTATUS[0]} + set -e + WRK_LAST_OUTPUT=$(cat "$tmp") + rm -f "$tmp" + if (( status != 0 )); then + fail "wrk 执行失败(状态 $status)" + fi +} + +response_ready_config() { + cat <<'KDL' +globals { + log "access.log" + error_log "error.log" +} + +:8080 { + module "script-exec" { + script "test-script" { + file "scripts/example.rhai" + trigger "on_response_ready" + reload_on_change #true + limits { + max_operations 200_000 + max_call_depth 32 + max_exec_time "50ms" + } + failure_policy "skip" + } + } + + root "wwwroot" +} +KDL +} + +response_script() { + local version="$1" + cat <