Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions cli/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ fn get_start_command() -> Command {
))
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"rate-limit-table-cleanup-interval" <SECONDS>)
.help(concat!(
"Interval in seconds between sweeps of the outbound rate-limit ",
"table to remove expired entries (default: 60)"
))
.default_value("60")
.value_parser(value_parser!(u64)),
)
.arg(
arg!(--"inspect"[HOST_AND_PORT])
.help("Activate inspector on host:port")
Expand Down
10 changes: 9 additions & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,13 @@ fn main() -> Result<ExitCode, anyhow::Error> {
.get_one::<u64>("request-buffer-size")
.copied()
.unwrap();

let rate_limit_cleanup_interval_sec = sub_matches
.get_one::<u64>("rate-limit-table-cleanup-interval")
.copied()
.unwrap_or(60);
if rate_limit_cleanup_interval_sec == 0 {
bail!("--rate-limit-table-cleanup-interval must be >= 1 second");
}
let flags = ServerFlags {
otel: if !enable_otel.is_empty() {
if enable_otel.len() > 1 {
Expand Down Expand Up @@ -264,6 +270,8 @@ fn main() -> Result<ExitCode, anyhow::Error> {
beforeunload_wall_clock_pct: maybe_beforeunload_wall_clock_pct,
beforeunload_cpu_pct: maybe_beforeunload_cpu_pct,
beforeunload_memory_pct: maybe_beforeunload_memory_pct,

rate_limit_cleanup_interval_sec,
};

let mut builder = Builder::new(addr, &main_service_path);
Expand Down
2 changes: 2 additions & 0 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ pub struct ServerFlags {
pub beforeunload_wall_clock_pct: Option<u8>,
pub beforeunload_cpu_pct: Option<u8>,
pub beforeunload_memory_pct: Option<u8>,

pub rate_limit_cleanup_interval_sec: u64,
}

#[derive(Debug)]
Expand Down
25 changes: 25 additions & 0 deletions crates/base/src/worker/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use either::Either::Left;
use enum_as_inner::EnumAsInner;
use ext_event_worker::events::WorkerEventWithMetadata;
use ext_runtime::SharedMetricSource;
use ext_runtime::SharedRateLimitTable;
use ext_runtime::TraceRateLimiterConfig;
use ext_workers::context::CreateUserWorkerResult;
use ext_workers::context::SendRequestResult;
use ext_workers::context::Timing;
Expand Down Expand Up @@ -235,6 +237,7 @@ pub struct WorkerPool {
pub flags: Arc<ServerFlags>,
pub policy: WorkerPoolPolicy,
pub metric_src: SharedMetricSource,
pub shared_rate_limit_table: SharedRateLimitTable,
pub user_workers: HashMap<Uuid, UserWorkerProfile>,
pub active_workers: HashMap<String, ActiveWorkerRegistry>,
pub worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
Expand All @@ -253,11 +256,19 @@ impl WorkerPool {
worker_event_sender: Option<UnboundedSender<WorkerEventWithMetadata>>,
worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
inspector: Option<Inspector>,
cancel: CancellationToken,
) -> Self {
let shared_rate_limit_table = SharedRateLimitTable::default();
shared_rate_limit_table.spawn_cleanup_task(
Duration::from_secs(flags.rate_limit_cleanup_interval_sec),
cancel,
);

Self {
flags,
policy,
metric_src,
shared_rate_limit_table,
worker_event_sender,
user_workers: HashMap::new(),
active_workers: HashMap::new(),
Expand Down Expand Up @@ -384,6 +395,7 @@ impl WorkerPool {
let worker_pool_msgs_tx = self.worker_pool_msgs_tx.clone();
let events_msg_tx = self.worker_event_sender.clone();
let supervisor_policy = self.policy.supervisor_policy;
let shared_rate_limit_table = self.shared_rate_limit_table.clone();

drop(tokio::spawn(async move {
let (permit, tx) = match wait_fence_fut.await {
Expand Down Expand Up @@ -462,6 +474,17 @@ impl WorkerPool {
user_worker_rt_opts.events_msg_tx = events_msg_tx;
user_worker_rt_opts.cancel = Some(cancel.clone());

if let ext_runtime::RateLimiterOpts::Rules { rules, global_key } =
std::mem::take(&mut user_worker_rt_opts.rate_limiter)
{
user_worker_rt_opts.rate_limiter =
ext_runtime::RateLimiterOpts::Configured(TraceRateLimiterConfig {
table: shared_rate_limit_table,
rules,
global_key: Some(global_key),
});
}

worker_options.timing = Some(Timing {
early_drop_rx,
status: status.clone(),
Expand Down Expand Up @@ -792,13 +815,15 @@ pub async fn create_user_worker_pool(
async move {
let token = termination_token.as_ref();
let mut termination_requested = false;
let cleanup_cancel = token.map(|t| t.inbound.clone()).unwrap_or_default();
let mut worker_pool = WorkerPool::new(
flags,
policy,
metric_src_inner,
worker_event_sender,
user_worker_msgs_tx_clone,
inspector,
cleanup_cancel,
);

// Note: Keep this loop non-blocking. Spawn a task to run blocking calls.
Expand Down
16 changes: 16 additions & 0 deletions crates/base/src/worker/worker_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use ext_event_worker::events::UncaughtExceptionEvent;
use ext_event_worker::events::WorkerEventWithMetadata;
use ext_event_worker::events::WorkerEvents;
use ext_runtime::MetricSource;
use ext_runtime::RateLimiterOpts;
use ext_runtime::RuntimeMetricSource;
use ext_runtime::TraceRateLimiter;
use ext_runtime::WorkerMetricSource;
use ext_workers::context::UserWorkerMsgs;
use ext_workers::context::WorkerContextInitOpts;
Expand Down Expand Up @@ -270,6 +272,20 @@ impl Worker {
state_mut.put(metric_src.clone());
MetricSource::Runtime(metric_src)
} else {
if let Some(opts) = new_runtime.conf.as_user_worker().cloned() {
if let RateLimiterOpts::Configured(config) = opts.rate_limiter {
match TraceRateLimiter::new(config) {
Ok(limiter) => {
let state = new_runtime.js_runtime.op_state();
let mut state_mut = state.borrow_mut();
state_mut.put(limiter);
}
Err(err) => {
error!("failed to compile rate limit rules: {err}");
}
}
}
}
MetricSource::Worker(metric_src)
}
};
Expand Down
81 changes: 81 additions & 0 deletions crates/base/test_cases/rate-limit-a/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Worker A: forwards to worker B. Supports two outbound HTTP modes selected
// via the x-http-mode header: "fetch" (default) or "node".
import * as http from "node:http";

function requestViaNode(
url: string,
headers: Record<string, string>,
): Promise<{ status: number; body: string }> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const req = http.request(
{
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname,
method: "GET",
headers,
},
(res) => {
let body = "";
res.on("data", (chunk) => {
body += chunk;
});
res.on("end", () => resolve({ status: res.statusCode ?? 500, body }));
},
);
req.on("error", reject);
req.end();
});
}

Deno.serve(async (req: Request) => {
if (!req.headers.has("traceparent")) {
return new Response(
JSON.stringify({ msg: "missing traceparent header" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

const serverUrl = req.headers.get("x-test-server-url");
if (!serverUrl) {
return new Response(
JSON.stringify({ msg: "missing x-test-server-url header" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

const mode = req.headers.get("x-http-mode") ?? "fetch";
const forwardHeaders: Record<string, string> = {
"x-test-server-url": serverUrl,
"x-http-mode": mode,
};

try {
let status: number;
let body: string;

if (mode === "node") {
({ status, body } = await requestViaNode(
`${serverUrl}/rate-limit-b`,
forwardHeaders,
));
} else {
const resp = await fetch(`${serverUrl}/rate-limit-b`, {
headers: forwardHeaders,
});
status = resp.status;
body = await resp.text();
}

return new Response(body, {
status,
headers: { "Content-Type": "application/json" },
});
} catch (e) {
return new Response(
JSON.stringify({ msg: e.toString() }),
{ status: 500, headers: { "Content-Type": "application/json" } },
);
}
});
81 changes: 81 additions & 0 deletions crates/base/test_cases/rate-limit-b/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Worker B: forwards back to worker A. Supports two outbound HTTP modes
// selected via the x-http-mode header: "fetch" (default) or "node".
import * as http from "node:http";

function requestViaNode(
url: string,
headers: Record<string, string>,
): Promise<{ status: number; body: string }> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const req = http.request(
{
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname,
method: "GET",
headers,
},
(res) => {
let body = "";
res.on("data", (chunk) => {
body += chunk;
});
res.on("end", () => resolve({ status: res.statusCode ?? 500, body }));
},
);
req.on("error", reject);
req.end();
});
}

Deno.serve(async (req: Request) => {
if (!req.headers.has("traceparent")) {
return new Response(
JSON.stringify({ msg: "missing traceparent header" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

const serverUrl = req.headers.get("x-test-server-url");
if (!serverUrl) {
return new Response(
JSON.stringify({ msg: "missing x-test-server-url header" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

const mode = req.headers.get("x-http-mode") ?? "fetch";
const forwardHeaders: Record<string, string> = {
"x-test-server-url": serverUrl,
"x-http-mode": mode,
};

try {
let status: number;
let body: string;

if (mode === "node") {
({ status, body } = await requestViaNode(
`${serverUrl}/rate-limit-a`,
forwardHeaders,
));
} else {
const resp = await fetch(`${serverUrl}/rate-limit-a`, {
headers: forwardHeaders,
});
status = resp.status;
body = await resp.text();
}

return new Response(body, {
status,
headers: { "Content-Type": "application/json" },
});
} catch (e) {
return new Response(
JSON.stringify({ msg: e.toString() }),
{ status: 500, headers: { "Content-Type": "application/json" } },
);
}
});
15 changes: 15 additions & 0 deletions crates/base/test_cases/rate-limit-echo/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Worker that echoes back the trace ID from the AsyncVariable context.
// Exposed via globalThis.getRequestTraceId when exposeRequestTraceId context
// flag is set. Used to verify AsyncVariable isolation across concurrent
// requests.
Deno.serve(async (_req: Request) => {
// Small delay so concurrent requests actually overlap inside the event loop.
await new Promise((resolve) => setTimeout(resolve, 30));

const traceId = (globalThis as any).getRequestTraceId?.() ?? null;

return new Response(
JSON.stringify({ traceId }),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
});
Loading