patternpythonMajorpending
Pattern: Rate limiter with token bucket algorithm
Viewed 0 times
rate-limitertoken-bucketthrottle429api-limitburst
Problem
Need to limit API request rates without being too rigid. Fixed window counters have burst problems at window boundaries.
Solution
Token bucket algorithm allows controlled bursts:
import time, threading
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate # tokens per second
self.capacity = capacity # max burst size
self.tokens = capacity # start full
self.last_refill = time.monotonic()
self.lock = threading.Lock()
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
def consume(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait(self, tokens=1):
while not self.consume(tokens):
time.sleep(1.0 / self.rate)
# Usage:
limiter = TokenBucket(rate=10, capacity=20) # 10/sec, burst of 20
def handle_request(request):
if not limiter.consume():
return Response(status=429, body='Rate limit exceeded')
return process_request(request)
# Per-user limiting:
user_limiters = {} # user_id -> TokenBucket
def get_limiter(user_id):
if user_id not in user_limiters:
user_limiters[user_id] = TokenBucket(rate=5, capacity=10)
return user_limiters[user_id]
import time, threading
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate # tokens per second
self.capacity = capacity # max burst size
self.tokens = capacity # start full
self.last_refill = time.monotonic()
self.lock = threading.Lock()
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
def consume(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait(self, tokens=1):
while not self.consume(tokens):
time.sleep(1.0 / self.rate)
# Usage:
limiter = TokenBucket(rate=10, capacity=20) # 10/sec, burst of 20
def handle_request(request):
if not limiter.consume():
return Response(status=429, body='Rate limit exceeded')
return process_request(request)
# Per-user limiting:
user_limiters = {} # user_id -> TokenBucket
def get_limiter(user_id):
if user_id not in user_limiters:
user_limiters[user_id] = TokenBucket(rate=5, capacity=10)
return user_limiters[user_id]
Why
Token bucket is smooth (no window boundary bursts), allows controlled bursts up to capacity, and is simple to implement.
Revisions (0)
No revisions yet.