Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions diagnostic/build-28f76ec5.json

Large diffs are not rendered by default.

Binary file added diagnostic/build-28f76ec5.logd
Binary file not shown.
134 changes: 100 additions & 34 deletions tools/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
suitable for a development environment but should be adjusted for staging
or production benchmarks.

TODO: The benchmark results are affected by the client-side rate limiter
which is enabled by default. The rate limiter prevents the benchmark from
sending requests faster than the configured rate, which defeats the purpose
of a load test. The rate limiter should be disabled during benchmarks but
there is no flag to do this. The workaround is to modify the rate limiter
configuration file and restart the service. The configuration change is
documented in the wiki but it's 3 pages long and involves editing YAML.
The --disable-rate-limiter flag bypasses the client-side request pacing used
by throughput, stress, soak, and spike benchmarks so load tests can push as
fast as the workers and target endpoint allow without editing YAML.

Usage:
python3 bench.py latency --endpoint http://localhost:8080 --requests 1000
Expand Down Expand Up @@ -80,6 +76,10 @@ class LatencySample:
success: bool
error: Optional[str] = None


class RateLimiterBypassError(ValueError):
"""Raised when benchmark request pacing cannot be configured safely."""

# ---------------------------------------------------------------------------
# HTTP CLIENT
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -108,6 +108,33 @@ def make_request(url: str, method: str = "GET", timeout: float = 30.0,
# BENCHMARK WORKERS
# ---------------------------------------------------------------------------

def calculate_worker_rps(total_rps: float, concurrency: int,
disable_rate_limiter: bool) -> float:
if concurrency < 1:
raise RateLimiterBypassError("concurrency must be at least 1")

if disable_rate_limiter:
worker_rps = float("inf")
if math.isfinite(worker_rps):
raise RateLimiterBypassError("failed to disable client-side rate limiter")
return worker_rps

if not math.isfinite(total_rps):
return float("inf")
if total_rps <= 0:
raise RateLimiterBypassError("target RPS must be greater than 0")

return total_rps / concurrency


def interval_for_worker_rps(worker_rps: float) -> float:
if not math.isfinite(worker_rps):
return 0
if worker_rps <= 0:
raise RateLimiterBypassError("worker RPS must be greater than 0")
return 1.0 / worker_rps


def run_worker(url: str, request_count: int, results: List[LatencySample],
stop_flag: threading.Event, timeout: float,
delay_between_requests: float = 0):
Expand All @@ -130,7 +157,7 @@ def run_worker_duration(url: str, duration_seconds: float, results: List[Latency
requests_per_second: float = float('inf')):
start = time.time()
request_count = 0
min_interval = 1.0 / requests_per_second if requests_per_second < float('inf') else 0
min_interval = interval_for_worker_rps(requests_per_second)

while time.time() - start < duration_seconds and not stop_flag.is_set():
status, duration, error = make_request(url, timeout=timeout)
Expand Down Expand Up @@ -160,7 +187,7 @@ def run_worker_spike(url: str, spike_start: float, spike_duration: float,
current_time = time.time() - start
is_spike = spike_start <= current_time < (spike_start + spike_duration)
target_rps = spike_rps if is_spike else normal_rps
interval = 1.0 / max(target_rps, 1)
interval = interval_for_worker_rps(target_rps)

status, duration, error = make_request(url, timeout=timeout)
results.append(LatencySample(
Expand Down Expand Up @@ -256,13 +283,16 @@ def run_latency_benchmark(url: str, concurrency: int, request_count: int,
return aggregate_results(results, "latency", url, concurrency)

def run_throughput_benchmark(url: str, concurrency: int, duration: float,
target_rps: float, timeout: float) -> BenchmarkResult:
target_rps: float, timeout: float,
disable_rate_limiter: bool = False) -> BenchmarkResult:
print(f"Running throughput benchmark: {duration}s, {concurrency} concurrent, target {target_rps} RPS")
results: List[LatencySample] = []
stop_flag = threading.Event()
threads = []

rps_per_worker = target_rps / concurrency if target_rps < float('inf') else float('inf')
rps_per_worker = calculate_worker_rps(target_rps, concurrency, disable_rate_limiter)
if disable_rate_limiter:
print(" Client-side rate limiter disabled; workers will not pace requests.")

for _ in range(concurrency):
t = threading.Thread(target=run_worker_duration,
Expand All @@ -280,8 +310,11 @@ def run_throughput_benchmark(url: str, concurrency: int, duration: float,

def run_stress_benchmark(url: str, concurrency: int, max_rps: float,
step_rps: float, step_duration: float,
error_threshold: float, timeout: float) -> BenchmarkResult:
error_threshold: float, timeout: float,
disable_rate_limiter: bool = False) -> BenchmarkResult:
print(f"Running stress benchmark: max {max_rps} RPS, step {step_rps}, {concurrency} concurrent")
if disable_rate_limiter:
print(" Client-side rate limiter disabled; each step runs unpaced.")
all_results: List[LatencySample] = []
current_rps = step_rps

Expand All @@ -290,7 +323,7 @@ def run_stress_benchmark(url: str, concurrency: int, max_rps: float,
results: List[LatencySample] = []
stop_flag = threading.Event()
threads = []
rps_per_worker = current_rps / concurrency
rps_per_worker = calculate_worker_rps(current_rps, concurrency, disable_rate_limiter)

for _ in range(concurrency):
t = threading.Thread(target=run_worker_duration,
Expand Down Expand Up @@ -321,12 +354,15 @@ def run_stress_benchmark(url: str, concurrency: int, max_rps: float,
return aggregate_results(all_results, "stress", url, concurrency)

def run_soak_benchmark(url: str, concurrency: int, duration: float,
target_rps: float, timeout: float) -> BenchmarkResult:
target_rps: float, timeout: float,
disable_rate_limiter: bool = False) -> BenchmarkResult:
print(f"Running soak benchmark: {duration}s, {concurrency} concurrent, {target_rps} RPS")
results: List[LatencySample] = []
stop_flag = threading.Event()
threads = []
rps_per_worker = target_rps / concurrency if target_rps < float('inf') else float('inf')
rps_per_worker = calculate_worker_rps(target_rps, concurrency, disable_rate_limiter)
if disable_rate_limiter:
print(" Client-side rate limiter disabled; workers will not pace requests.")

print(f" This will take {duration} seconds. Progress reports every 60 seconds.")
progress_thread = threading.Thread(target=lambda: (
Expand Down Expand Up @@ -355,13 +391,15 @@ def run_soak_benchmark(url: str, concurrency: int, duration: float,
def run_spike_benchmark(url: str, concurrency: int, duration: float,
spike_start: float, spike_duration: float,
normal_rps: float, spike_rps: float,
timeout: float) -> BenchmarkResult:
timeout: float, disable_rate_limiter: bool = False) -> BenchmarkResult:
print(f"Running spike benchmark: {duration}s, spike at {spike_start}s for {spike_duration}s")
results: List[LatencySample] = []
stop_flag = threading.Event()
threads = []
rps_per_worker_normal = normal_rps / concurrency
rps_per_worker_spike = spike_rps / concurrency
rps_per_worker_normal = calculate_worker_rps(normal_rps, concurrency, disable_rate_limiter)
rps_per_worker_spike = calculate_worker_rps(spike_rps, concurrency, disable_rate_limiter)
if disable_rate_limiter:
print(" Client-side rate limiter disabled; normal and spike phases run unpaced.")

for _ in range(concurrency):
t = threading.Thread(target=run_worker_spike,
Expand Down Expand Up @@ -419,32 +457,44 @@ def main():
parser.add_argument("--timeout", "-t", type=float, default=30.0,
help="Request timeout in seconds")
parser.add_argument("--output", "-o", help="Save results to JSON file")
parser.add_argument("--disable-rate-limiter", action="store_true",
help="Bypass client-side request pacing during load benchmarks")

rate_limit_parent = argparse.ArgumentParser(add_help=False)
rate_limit_parent.add_argument("--disable-rate-limiter", action="store_true",
default=argparse.SUPPRESS,
help="Bypass client-side request pacing during load benchmarks")

subparsers = parser.add_subparsers(dest="mode", help="Benchmark mode")

# Latency
lat_p = subparsers.add_parser("latency", help="Measure request latency")
lat_p = subparsers.add_parser("latency", parents=[rate_limit_parent],
help="Measure request latency")
lat_p.add_argument("--requests", type=int, default=1000, help="Number of requests")

# Throughput
thr_p = subparsers.add_parser("throughput", help="Measure throughput")
thr_p = subparsers.add_parser("throughput", parents=[rate_limit_parent],
help="Measure throughput")
thr_p.add_argument("--duration", type=float, default=30, help="Test duration in seconds")
thr_p.add_argument("--target-rps", type=float, default=100, help="Target requests per second")

# Stress
str_p = subparsers.add_parser("stress", help="Stress test with ramp-up")
str_p = subparsers.add_parser("stress", parents=[rate_limit_parent],
help="Stress test with ramp-up")
str_p.add_argument("--max-rps", type=float, default=1000, help="Maximum RPS")
str_p.add_argument("--step-rps", type=float, default=50, help="RPS increment per step")
str_p.add_argument("--step-duration", type=float, default=10, help="Duration per step in seconds")
str_p.add_argument("--error-threshold", type=float, default=10, help="Max error rate percentage")

# Soak
soak_p = subparsers.add_parser("soak", help="Soak test for memory leaks")
soak_p = subparsers.add_parser("soak", parents=[rate_limit_parent],
help="Soak test for memory leaks")
soak_p.add_argument("--duration", type=float, default=3600, help="Test duration in seconds")
soak_p.add_argument("--target-rps", type=float, default=50, help="Target requests per second")

# Spike
spike_p = subparsers.add_parser("spike", help="Spike test for auto-scaling")
spike_p = subparsers.add_parser("spike", parents=[rate_limit_parent],
help="Spike test for auto-scaling")
spike_p.add_argument("--duration", type=float, default=120, help="Total test duration")
spike_p.add_argument("--spike-start", type=float, default=30, help="Spike start time")
spike_p.add_argument("--spike-duration", type=float, default=10, help="Spike duration")
Expand All @@ -459,16 +509,32 @@ def main():
signal.signal(signal.SIGINT, lambda s, f: sys.exit(1))

result = None
if args.mode == "latency":
result = run_latency_benchmark(args.endpoint, args.concurrency, args.requests, args.timeout)
elif args.mode == "throughput":
result = run_throughput_benchmark(args.endpoint, args.concurrency, args.duration, args.target_rps, args.timeout)
elif args.mode == "stress":
result = run_stress_benchmark(args.endpoint, args.concurrency, args.max_rps, args.step_rps, args.step_duration, args.error_threshold, args.timeout)
elif args.mode == "soak":
result = run_soak_benchmark(args.endpoint, args.concurrency, args.duration, args.target_rps, args.timeout)
elif args.mode == "spike":
result = run_spike_benchmark(args.endpoint, args.concurrency, args.duration, args.spike_start, args.spike_duration, args.normal_rps, args.spike_rps, args.timeout)
try:
if args.mode == "latency":
result = run_latency_benchmark(args.endpoint, args.concurrency,
args.requests, args.timeout)
elif args.mode == "throughput":
result = run_throughput_benchmark(args.endpoint, args.concurrency,
args.duration, args.target_rps,
args.timeout, args.disable_rate_limiter)
elif args.mode == "stress":
result = run_stress_benchmark(args.endpoint, args.concurrency,
args.max_rps, args.step_rps,
args.step_duration, args.error_threshold,
args.timeout, args.disable_rate_limiter)
elif args.mode == "soak":
result = run_soak_benchmark(args.endpoint, args.concurrency,
args.duration, args.target_rps,
args.timeout, args.disable_rate_limiter)
elif args.mode == "spike":
result = run_spike_benchmark(args.endpoint, args.concurrency,
args.duration, args.spike_start,
args.spike_duration, args.normal_rps,
args.spike_rps, args.timeout,
args.disable_rate_limiter)
except RateLimiterBypassError as e:
print(f"Rate limiter configuration error: {e}", file=sys.stderr)
return 2

if result:
print_results(result)
Expand All @@ -495,4 +561,4 @@ def main():


if __name__ == "__main__":
main()
sys.exit(main())
Loading