Design a Rate Limiter: Machine Coding Walkthrough
I wrote a companion post on the distributed systems side of rate limiting, covering how you coordinate limits across multiple servers using Redis and atomic Lua scripts. If that’s the problem you’re thinking about, start there. This post is about something different: before you add Redis, before you think about sharding, how do you structure the classes in a single process so that swapping algorithms does not break callers?
That sounds like a smaller question. It is not. The class design you choose here determines whether your rate limiter is testable in isolation, whether you can reconfigure it at runtime, and whether the algorithm selection is a detail buried in one file or a concern that leaks across your codebase.
Let me reason through the design from the ground up.
Requirements
Functional:
- Decide whether a request from a given key (user ID, IP, API key) is allowed or denied
- Support multiple algorithms: token bucket, sliding window counter, fixed window counter
- Allow per-key rules (different limits for free vs. paid users, different endpoints)
- The algorithm must be swappable without changing the calling code
Non-functional:
- Thread-safe in a single-process Python context (the distributed case is a separate concern)
- Easy to unit-test each algorithm independently of the others
- Adding a new algorithm should require adding one new class, not modifying existing ones
Core Entities
RateLimiter is an abstract base class with a single method: given a key, is this request allowed? Every algorithm implements this interface. Callers only ever talk to RateLimiter, never to a concrete class directly.
TokenBucket maintains a count of available tokens per key and refills at a fixed rate. It handles bursts gracefully because a key with no recent traffic accumulates tokens up to a capacity ceiling.
FixedWindowCounter divides time into discrete windows (one-minute buckets, say) and counts requests per key per window. Simple to understand but has an edge case at window boundaries that matters in practice.
SlidingWindowCounter approximates a true sliding window by combining the current window count with a weighted fraction of the previous window. It smooths the boundary problem without the memory overhead of tracking individual timestamps.
RateLimitRule is a configuration value object: a key pattern, a limit, and a window duration. It does not contain algorithm logic. It just describes the policy.
RateLimiterFactory constructs the right RateLimiter implementation given a rule and an algorithm name. It centralizes the wiring so callers do not need to import concrete classes.
Class Design
+----------------------+
| RateLimiter | (abstract)
|----------------------|
| + is_allowed(key) |
| -> bool |
+----------+-----------+
|
+-----+--------+----------+
| | |
+----+----+ +------+---+ +---+------+
|TokenBucket| |FixedWindow| |SlidingWin|
| | |Counter | |Counter |
+---------+ +----------+ +----------+
+----------------------+ +---------------------+
| RateLimitRule | | RateLimiterFactory |
|----------------------| |---------------------|
| - limit: int | | + create(rule, algo) |
| - window_seconds: int| | -> RateLimiter |
| - key_prefix: str | +---------------------+
+----------------------+
The dependency arrow matters here. RateLimiterFactory depends on RateLimitRule and the concrete implementations. The rest of your application depends only on RateLimiter (the abstract base) and RateLimitRule (the config). The concrete classes are a detail that only the factory knows about.
Key Implementation
from __future__ import annotations
import threading
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RateLimitRule:
key_prefix: str # e.g. "user:", "ip:", "endpoint:/api/v1/search"
limit: int # max requests allowed in the window
window_seconds: int # length of the time window
class RateLimiter(ABC):
"""
Strategy interface for rate limiting algorithms.
All concrete implementations must be thread-safe.
"""
@abstractmethod
def is_allowed(self, key: str) -> bool:
pass
class TokenBucket(RateLimiter):
"""
Each key gets a bucket that holds up to `capacity` tokens.
Tokens refill at `refill_rate` tokens per second.
One request consumes one token.
Burst-friendly: a key that has been quiet accumulates tokens
and can handle a sudden spike up to the capacity ceiling.
"""
def __init__(self, capacity: int, refill_rate: float) -> None:
self._capacity = capacity
self._refill_rate = refill_rate
self._buckets: dict[str, tuple[float, float]] = {} # key -> (tokens, last_refill)
self._lock = threading.Lock()
def is_allowed(self, key: str) -> bool:
now = time.monotonic()
with self._lock:
tokens, last_refill = self._buckets.get(key, (self._capacity, now))
elapsed = now - last_refill
tokens = min(self._capacity, tokens + elapsed * self._refill_rate)
if tokens >= 1:
self._buckets[key] = (tokens - 1, now)
return True
self._buckets[key] = (tokens, now)
return False
class FixedWindowCounter(RateLimiter):
"""
Counts requests per key within fixed time windows.
Known limitation: a burst straddling a window boundary can pass through
at up to 2x the limit. Example: 100 requests in the last second of
window N and 100 in the first second of window N+1 both pass, even
though 200 requests arrived in a two-second span.
"""
def __init__(self, limit: int, window_seconds: int) -> None:
self._limit = limit
self._window = window_seconds
self._counts: dict[str, tuple[int, int]] = {} # key -> (count, window_id)
self._lock = threading.Lock()
def _window_id(self, now: float) -> int:
return int(now) // self._window
def is_allowed(self, key: str) -> bool:
now = time.time()
current_window = self._window_id(now)
with self._lock:
count, window = self._counts.get(key, (0, current_window))
if window != current_window:
count = 0
if count < self._limit:
self._counts[key] = (count + 1, current_window)
return True
self._counts[key] = (count, current_window)
return False
class SlidingWindowCounter(RateLimiter):
"""
Approximates a sliding window by blending the current window count
with a weighted fraction of the previous window count.
If the current window is 30% complete, the effective count is:
0.30 * current_count + 0.70 * previous_count
This smooths the boundary spike that FixedWindowCounter suffers from,
without the memory cost of storing individual request timestamps.
"""
def __init__(self, limit: int, window_seconds: int) -> None:
self._limit = limit
self._window = window_seconds
# key -> (prev_count, curr_count, curr_window_id)
self._state: dict[str, tuple[int, int, int]] = {}
self._lock = threading.Lock()
def _window_id(self, now: float) -> int:
return int(now) // self._window
def is_allowed(self, key: str) -> bool:
now = time.time()
current_window = self._window_id(now)
elapsed_fraction = (now % self._window) / self._window
with self._lock:
prev_count, curr_count, stored_window = self._state.get(
key, (0, 0, current_window)
)
if stored_window != current_window:
# Rolled into a new window: previous window becomes "prev"
if stored_window == current_window - 1:
prev_count = curr_count
else:
prev_count = 0
curr_count = 0
effective = (1 - elapsed_fraction) * prev_count + curr_count
if effective < self._limit:
self._state[key] = (prev_count, curr_count + 1, current_window)
return True
self._state[key] = (prev_count, curr_count, current_window)
return False
class RateLimiterFactory:
"""
Centralizes construction of RateLimiter instances.
Callers request an algorithm by name; they never import concrete classes.
"""
_ALGORITHMS = {"token_bucket", "fixed_window", "sliding_window"}
@staticmethod
def create(rule: RateLimitRule, algorithm: str = "sliding_window") -> RateLimiter:
if algorithm == "token_bucket":
# Treat window_seconds as the refill period: limit tokens per window.
refill_rate = rule.limit / rule.window_seconds
return TokenBucket(capacity=rule.limit, refill_rate=refill_rate)
if algorithm == "fixed_window":
return FixedWindowCounter(
limit=rule.limit, window_seconds=rule.window_seconds
)
if algorithm == "sliding_window":
return SlidingWindowCounter(
limit=rule.limit, window_seconds=rule.window_seconds
)
raise ValueError(f"Unknown algorithm: {algorithm!r}. "
f"Choose from {RateLimiterFactory._ALGORITHMS}")
Design Decisions and Trade-offs
Why an abstract base class instead of just duck typing?
Python is dynamic enough that you could skip the ABC entirely and just rely on each class having an is_allowed method. The problem is that duck typing gives you no signal until runtime when something is wired up wrong. The abstract base class catches the mistake at class definition time, documents the contract explicitly, and lets type checkers (mypy, pyright) verify that callers only use the interface. It also makes the Strategy pattern explicit to anyone reading the code, which matters for maintainability.
The boundary spike in FixedWindowCounter.
This is the reason fixed windows are a poor fit for accuracy-sensitive limits. If your limit is 100 requests per minute and a user sends 100 requests in the last second of minute N, then 100 more in the first second of minute N+1, they have sent 200 requests in a two-second span and both batches pass. SlidingWindowCounter smooths this by blending the two window counts. It is still an approximation (a true sliding window would track every individual timestamp), but it is a much better one and it uses O(1) memory per key rather than O(requests) per key.
Why time.monotonic() in TokenBucket but time.time() in the window counters?
time.monotonic() is the right choice for elapsed-time calculations like token refill because it never goes backward (which time.time() can do across NTP adjustments or DST changes). The window counters use time.time() because they need to derive a wall-clock window ID, and time.monotonic() has no fixed reference to calendar time. Using the wrong one in either place would introduce subtle bugs that only show up in edge cases.
Thread safety in a single process.
Every concrete implementation holds a threading.Lock and acquires it around the read-modify-write on per-key state. This is the correct approach in a CPython context. An alternative is to use collections.Counter and atomic increments, but Python’s Counter is not thread-safe without a lock anyway. The lock granularity here is per-limiter (one lock shared across all keys), which is fine for a single-process use case. If contention becomes a problem at very high request rates, you could shard into N buckets of keys with N locks. For the distributed case, this all goes out the window and you hand the atomicity problem to Redis.
RateLimiterFactory as the seam.
The factory is the only class that knows about concrete implementations. Everything else in the system depends only on RateLimiter (the abstract type) and RateLimitRule (the config). This means you can add a new algorithm, say a leaky bucket, by adding one new class and one new branch in the factory. No other file changes. That is the open-closed principle working as intended.
What I’d Add Next
A few natural extensions: a RateLimiterMiddleware that wraps any callable and checks the limiter before invoking it, a MultiKeyLimiter that checks several rules (per-user AND per-endpoint) and denies if any fails, and a TTL-based eviction for the in-memory state so keys that have been quiet for a long time do not hold memory forever. The factory pattern makes all of these composable without touching the algorithm classes themselves.
If you’d approach the sliding window approximation differently or want to argue about where the lock granularity should sit, I’d genuinely like to hear it. Reach out on Twitter or LinkedIn.
Tags:
Related Posts
Design a Vending Machine: State Machines in Practice
A first-principles LLD walkthrough of a vending machine using the State pattern. Covers state transitions, inventory management, payment handling, and why State beats if/else chains when behavior must vary by state.
Design an ATM Machine: Transactions, State, and Security in LLD
A first-principles LLD walkthrough of an ATM covering state management, transaction types, card and account entities, cash dispensing logic, and why atomicity matters even in a machine-coding exercise.
Design a Logging Library Like log4j: Filters, Appenders, and Formatters
A first-principles LLD walkthrough of a logging library covering logger hierarchy, log levels, appenders, formatters, and the Chain of Responsibility pattern for filter chains. Why appenders must be independent of formatters.