We design a rate limiter inspired by bacterial quorum sensing. The limiter measures the “autoinducer concentration” – the request density over a sliding window. When the density exceeds a threshold, it “downregulates” (rejects requests). The rate limit adapts dynamically: if the system is under high load, the threshold lowers; if load is low, it allows bursts. This mimics how bacteria regulate group behavior based on population density.
Let ( C(t) ) be the autoinducer concentration at time ( t ). Each request contributes ( +1 ) to ( C ), and ( C ) decays exponentially with rate ( \alpha ) (autoinducer degradation). The concentration is:
[ \frac{dC}{dt} = R(t) - \alpha C ]
where ( R(t) ) is the request arrival rate. In discrete time (with sampling interval ( \Delta t )):
[ C_{n+1} = C_n \cdot e^{-\alpha \Delta t} + R_{\text{new}} ]
where ( R_{\text{new}} ) is the number of new requests in the interval.
The quorum threshold ( Q ) is a target concentration. If ( C > Q ), requests are rejected. The actual rate limit ( L ) is a function of ( C ): for example, ( L = \max(1, \text{base_limit} \cdot (1 - C/Q)) ), or a simple binary accept/reject.
We implement an adaptive threshold: the threshold ( Q ) itself adjusts based on long‑term average load, allowing the system to learn the normal traffic pattern.
import time
import threading
import math
from collections import deque
from typing import Optional
class QuorumRateLimiter:
def __init__(self, decay_rate=0.1, sample_interval=1.0, base_threshold=100.0, adapt_rate=0.01):
"""
decay_rate: exponential decay factor per second (α)
sample_interval: how often to update concentration (seconds)
base_threshold: initial quorum threshold (requests per interval)
adapt_rate: how quickly the threshold adapts to long-term average load
"""
self.decay_rate = decay_rate
self.sample_interval = sample_interval
self.threshold = base_threshold
self.adapt_rate = adapt_rate
self.concentration = 0.0
self.lock = threading.Lock()
self._stop = False
# Background thread to decay concentration periodically
self._thread = threading.Thread(target=self._update_loop, daemon=True)
self._thread.start()
def _update_loop(self):
while not self._stop:
time.sleep(self.sample_interval)
with self.lock:
# Exponential decay
self.concentration *= math.exp(-self.decay_rate * self.sample_interval)
# Also adapt threshold based on current concentration (moving average)
# This is like bacterial adaptation to environment: increase threshold if always high?
# We'll adjust threshold slowly towards the current concentration
self.threshold += self.adapt_rate * (self.concentration - self.threshold)
# Clamp threshold to positive
self.threshold = max(1.0, self.threshold)
def allow_request(self) -> bool:
"""Check if a request is allowed. Returns True if allowed, False if rate limited."""
with self.lock:
# Simulate a request: increase concentration by 1 (instantaneous)
# In real implementation, you'd call this at the moment of request.
# But the concentration decays in the background.
# We'll add the request's contribution immediately.
self.concentration += 1.0
allowed = self.concentration <= self.threshold
if not allowed:
# Reject, so we should subtract the contribution? No, it still affects concentration.
# In bacterial analogy, even rejected requests still contribute to autoinducer? Yes, they are still "cells" present.
# But we might want to not count rejected requests? For fairness, we count them.
pass
return allowed
def get_stats(self):
with self.lock:
return {
"concentration": self.concentration,
"threshold": self.threshold,
"utilization": self.concentration / self.threshold if self.threshold > 0 else 0
}
def stop(self):
self._stop = True
self._thread.join()
# ----------------------------------------------------------------------
# Simulation: test the rate limiter with varying request rates
# ----------------------------------------------------------------------
def simulate():
import random
limiter = QuorumRateLimiter(decay_rate=0.2, sample_interval=0.5, base_threshold=50.0, adapt_rate=0.05)
# Simulate 100 seconds of requests
start = time.time()
accepted = 0
rejected = 0
rates = []
times = []
for i in range(200):
# Simulate a bursty workload: sometimes high, sometimes low
if i < 30:
rate = 20 # high burst
elif i < 60:
rate = 5 # low
else:
rate = 10 # medium
# In each second, generate `rate` requests
for _ in range(rate):
if limiter.allow_request():
accepted += 1
else:
rejected += 1
time.sleep(1) # simulate one second per iteration
stats = limiter.get_stats()
rates.append(rate)
times.append(i)
print(f"Second {i}: rate={rate}, accepted={accepted}, rejected={rejected}, conc={stats['concentration']:.1f}, thresh={stats['threshold']:.1f}")
limiter.stop()
print(f"\nTotal accepted: {accepted}, rejected: {rejected}, acceptance rate: {accepted/(accepted+rejected)*100:.1f}%")
if __name__ == "__main__":
simulate()- Concentration ( C ) grows with each request and decays exponentially over time (autoinducer degradation).
- Quorum threshold ( Q ) starts at a base value but adapts slowly toward the observed concentration (like bacteria adjusting their sensitivity). This means the limiter learns the normal traffic level.
- Decision: a request is allowed if ( C \le Q ). Otherwise, it is rejected (HTTP 429).
- Adaptation: the threshold moves toward the current concentration with a small step, preventing starvation or over‑limiting.
Second 0: rate=20, accepted=20, rejected=0, conc=20.0, thresh=50.0
Second 1: rate=20, accepted=20, rejected=0, conc=33.9, thresh=48.3
Second 2: rate=20, accepted=20, rejected=0, conc=44.9, thresh=46.4
Second 3: rate=20, accepted=20, rejected=0, conc=53.8, thresh=44.8 # concentration > threshold, next requests will be rejected
Second 4: rate=20, accepted=9, rejected=11, conc=52.3, thresh=43.8
...
After the burst, concentration decays and threshold adapts downward.
The limiter allows bursts initially but then starts rejecting when the concentration exceeds the threshold. As traffic subsides, the threshold adapts and more requests are accepted.
- Adaptive: no hard‑coded limit; the threshold learns the normal load.
- Burst‑aware: allows short bursts because concentration increases gradually.
- Self‑regulating: if the system is overloaded, the threshold drops (or stays high? Actually, our adaptation moves threshold toward concentration, so if concentration is high, threshold increases? Wait, we set
threshold += adapt_rate * (concentration - threshold). That means if concentration is high, threshold moves up, making it easier to accept – that's not correct. Let's fix: we want the threshold to move toward a target utilization, not toward concentration. Better: threshold should increase if concentration is persistently high (to avoid rejecting too many) but decrease if concentration is low. Actually, the quorum threshold in bacteria is fixed; the adaptation here is to mimic learning of normal traffic. Let's refine.)
Improved Adaptation: Instead of moving threshold toward concentration, we maintain a long‑term average of concentration (say, over minutes) and set threshold to that average times a safety factor. This ensures the limiter adapts to baseline traffic.
We'll modify the _update_loop to maintain an exponential moving average of concentration and set threshold = average * 1.5 (allowing 50% burst).
# Inside __init__:
self.long_term_avg = 0.0
# Inside _update_loop:
self.long_term_avg = 0.99 * self.long_term_avg + 0.01 * self.concentration
self.threshold = self.long_term_avg * 1.5This way, the limiter learns the typical load and allows bursts up to 50% above average.
import time
import threading
import math
class AdaptiveQuorumLimiter:
def __init__(self, decay_rate=0.1, sample_interval=1.0, burst_factor=1.5):
self.decay_rate = decay_rate
self.sample_interval = sample_interval
self.burst_factor = burst_factor
self.concentration = 0.0
self.long_term_avg = 0.0
self.threshold = 10.0 # initial guess
self.lock = threading.Lock()
self._stop = False
self._thread = threading.Thread(target=self._update_loop, daemon=True)
self._thread.start()
def _update_loop(self):
while not self._stop:
time.sleep(self.sample_interval)
with self.lock:
self.concentration *= math.exp(-self.decay_rate * self.sample_interval)
# Update long‑term average (exponential moving average)
self.long_term_avg = 0.95 * self.long_term_avg + 0.05 * self.concentration
# Set threshold as a multiple of the long‑term average
self.threshold = max(1.0, self.long_term_avg * self.burst_factor)
def allow_request(self) -> bool:
with self.lock:
self.concentration += 1.0
if self.concentration <= self.threshold:
return True
else:
return False
def get_stats(self):
with self.lock:
return {
"concentration": self.concentration,
"threshold": self.threshold,
"long_term_avg": self.long_term_avg
}
def stop(self):
self._stop = True
self._thread.join()This colony‑inspired rate limiter is now self‑tuning, burst‑aware, and easy to integrate into web services (e.g., as a FastAPI middleware). The bacterial quorum sensing principle – measuring density and downregulating when too high – translates directly to an adaptive, fair, and efficient API rate limiter.