URL Shortener System Design (Part 2): Production Implementation Deep Dive
In Part 1, we designed the high-level architecture for a URL shortener: requirements, capacity planning, distributed ID generation, database design, and caching strategy.
Now it’s time to get into the implementation details that make this production-ready. This is Part 2 where we cover:
- Analytics Pipeline: Kafka streaming, batching, handling 20k events/sec without blocking redirects
- Security & Abuse Prevention: Rate limiting, URL validation, behavioral analysis, incident response
- Observability: Metrics, dashboards, alerting, distributed tracing, cost monitoring
These are the operational concerns that separate hobby projects from production systems at Big Tech scale. Let’s dive in.
Analytics Pipeline: Never Block Redirects
The golden rule: Never block redirects for analytics. Analytics must be completely asynchronous.
Async Event Pipeline
async def redirect_handler(short_code: str, request: Request):
"""
Redirect handler: Optimized for <50ms response time
"""
# 1. Fetch URL (cache-first)
long_url = await get_url_cached(short_code)
if not long_url:
return Response(status_code=404)
# 2. Fire analytics event (non-blocking, fire-and-forget)
# This runs in background, doesn't block response
asyncio.create_task(track_click_async(short_code, request))
# 3. Immediately return redirect (don't wait for analytics)
return RedirectResponse(url=long_url, status_code=302)
async def track_click_async(short_code: str, request: Request):
"""
Async analytics tracking with fallback buffer
"""
try:
event = {
'short_code': short_code,
'timestamp': int(time.time() * 1000), # milliseconds
'ip': request.client.host,
'user_agent': request.headers.get('user-agent', ''),
'referrer': request.headers.get('referer', ''),
'country': get_country_from_ip(request.client.host), # GeoIP lookup
}
# Try to send to Kafka (primary path)
await kafka_producer.send('clicks', value=event, timeout=0.5)
except (KafkaTimeoutError, ConnectionError):
# Kafka unavailable—write to local buffer
# Separate process flushes buffer to Kafka when it recovers
await write_to_local_buffer(event)
logger.warning(f"Kafka unavailable, buffered event for {short_code}")
Key insight: If Kafka is down, buffer events locally and flush later. Analytics can be degraded without affecting redirects. The system fails open, not closed.
Batch Processing Workers
On the consumer side, batch writes to ClickHouse to reduce database load by 1000x:
class AnalyticsConsumer:
"""
Consumes from Kafka, batches, and writes to ClickHouse
"""
def __init__(self):
self.batch = []
self.batch_size = 10_000 # 10k events per batch
self.flush_interval = 5 # seconds
self.last_flush = time.time()
async def consume_and_process(self):
"""Main consumer loop"""
async for msg in kafka_consumer.consume('clicks'):
event = json.loads(msg.value)
self.batch.append(event)
# Flush on size threshold OR time threshold
if len(self.batch) >= self.batch_size or \
time.time() - self.last_flush > self.flush_interval:
await self.flush_batch()
async def flush_batch(self):
"""Batch insert to ClickHouse"""
if not self.batch:
return
try:
# ClickHouse bulk insert (10k rows in one query)
await clickhouse.execute(
"INSERT INTO clicks (short_code, timestamp, ip, ...) VALUES",
self.batch
)
logger.info(f"Flushed {len(self.batch)} events to ClickHouse")
self.batch = []
self.last_flush = time.time()
except Exception as e:
logger.error(f"Failed to flush batch: {e}")
# Retry logic: write to DLQ (Dead Letter Queue)
await write_to_dlq(self.batch)
Load reduction math:
Without batching:
- 20k events/sec × 1 INSERT per event = 20k database operations/sec
- ClickHouse can't sustain this
With batching (10k batch size, 5s flush):
- 20k events/sec ÷ 10k batch = 2 database operations/sec
- 10,000x reduction in DB load
- ClickHouse handles this easily
Handling Backpressure
When consumers can’t keep up with Kafka, implement autoscaling based on lag:
class BackpressureHandler:
"""
Monitor Kafka lag and apply circuit breaker if overwhelmed
"""
async def monitor_lag(self):
"""Check Kafka consumer lag every 30 seconds"""
lag = await kafka_consumer.get_lag() # Messages behind
if lag > 1_000_000: # 1M messages behind
logger.critical(f"Consumer lag critical: {lag}")
# Scale up consumers (trigger autoscaling)
await k8s.scale_deployment("analytics-consumer", replicas=20)
elif lag > 100_000: # 100k messages behind
logger.warning(f"Consumer lag high: {lag}")
await k8s.scale_deployment("analytics-consumer", replicas=10)
else:
# Lag is healthy, scale down to baseline
await k8s.scale_deployment("analytics-consumer", replicas=3)
Partition Kafka by short_code hash to parallelize processing:
# Producer: Partition by short_code for even distribution
await kafka_producer.send(
'clicks',
key=short_code.encode(), # Kafka routes by key hash
value=event
)
# This allows 32 consumer partitions to process in parallel
# Throughput: 20k events/sec ÷ 32 partitions = 625 events/sec per consumer
Security & Abuse Prevention
URL shorteners are notorious for spam, phishing, and malware distribution. Implement multi-layered defense:
Layer 1: Rate Limiting (DDoS & Spam Prevention)
class RateLimiter:
"""
Multi-tier rate limiting with Redis
"""
async def check_rate_limit(self, user_id: Optional[int], ip: str) -> bool:
"""
Enforce hierarchical rate limits
"""
limits = [
# (key, max_requests, window_seconds)
(f"ratelimit:ip:{ip}", 100, 3600), # 100/hour per IP
(f"ratelimit:user:{user_id}", 1000, 3600) if user_id else None, # 1000/hour per user
(f"ratelimit:global", 50_000, 60), # 50k/min globally (DDoS protection)
]
for key, max_req, window in filter(None, limits):
current = await redis.incr(key)
if current == 1:
await redis.expire(key, window) # Set TTL on first request
if current > max_req:
logger.warning(f"Rate limit exceeded: {key}")
return False # Reject
return True # Allow
# Usage in endpoint
@app.post("/api/shorten")
async def shorten_url(request: Request):
if not await rate_limiter.check_rate_limit(user_id, request.client.host):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# ... proceed
Layer 2: URL Validation & Threat Detection
class URLValidator:
"""
Multi-step URL validation pipeline
"""
RESERVED_CODES = {'admin', 'api', 'www', 'cdn', 'assets', 'health', 'metrics'}
BLOCKED_TLDS = {'.tk', '.ml', '.ga'} # Free TLDs abused by spammers
async def validate_url(self, long_url: str) -> tuple[bool, Optional[str]]:
"""
Returns (is_valid, error_message)
"""
# Step 1: Format validation
if not self._is_valid_format(long_url):
return False, "Invalid URL format"
# Step 2: Block known bad TLDs
if any(long_url.endswith(tld) for tld in self.BLOCKED_TLDS):
return False, "Blocked domain TLD"
# Step 3: Check URL reputation (async, 100ms timeout)
try:
is_safe = await asyncio.wait_for(
self._check_google_safe_browsing(long_url),
timeout=0.1
)
if not is_safe:
return False, "URL flagged as malicious"
except asyncio.TimeoutError:
# If Safe Browsing times out, allow (fail open)
logger.warning(f"Safe Browsing timeout for {long_url}")
# Step 4: Check domain age (new domains = higher phishing risk)
domain_age_days = await self._get_domain_age(long_url)
if domain_age_days < 7:
# Flag for manual review (don't auto-reject)
await self._flag_for_review(long_url, "New domain (<7 days)")
return True, None
async def _check_google_safe_browsing(self, url: str) -> bool:
"""Query Google Safe Browsing API"""
response = await http_client.post(
'https://safebrowsing.googleapis.com/v4/threatMatches:find',
params={'key': Config.GOOGLE_SAFE_BROWSING_KEY},
json={
'client': {'clientId': 'url-shortener', 'clientVersion': '1.0'},
'threatInfo': {
'threatTypes': ['MALWARE', 'SOCIAL_ENGINEERING', 'UNWANTED_SOFTWARE'],
'platformTypes': ['ANY_PLATFORM'],
'threatEntryTypes': ['URL'],
'threatEntries': [{'url': url}]
}
}
)
matches = response.json().get('matches', [])
if matches:
logger.warning(f"Malicious URL detected: {url}, threats: {matches}")
await self._record_threat(url, matches)
return False
return True
Layer 3: Custom Alias Abuse Prevention
class AliasValidator:
"""
Prevent abuse of custom aliases
"""
RESERVED_KEYWORDS = {'admin', 'api', 'www', 'cdn', 'app', 'dashboard', 'login'}
PROFANITY_LIST = load_profanity_list() # Load from external wordlist
def validate_custom_alias(self, alias: str) -> tuple[bool, Optional[str]]:
"""
Enforce strict custom alias rules
"""
# Length check
if not (4 <= len(alias) <= 20):
return False, "Alias must be 4-20 characters"
# Character whitelist (alphanumeric + dash/underscore)
if not re.match(r'^[a-zA-Z0-9_-]+$', alias):
return False, "Only alphanumeric, dash, and underscore allowed"
# Reserved keywords
if alias.lower() in self.RESERVED_KEYWORDS:
return False, "Alias is reserved"
# Profanity filter
if self._contains_profanity(alias):
return False, "Alias contains inappropriate content"
# Similarity check (prevent typosquatting popular links)
if self._is_typosquatting(alias):
return False, "Alias too similar to existing popular link"
return True, None
def _is_typosquatting(self, alias: str) -> bool:
"""
Check if alias is suspiciously similar to popular links
(Levenshtein distance < 2)
"""
popular_aliases = redis.smembers("popular_aliases") # Top 1000 aliases
for popular in popular_aliases:
if levenshtein_distance(alias.lower(), popular.lower()) <= 1:
return True
return False
Layer 4: Behavioral Analysis & Anomaly Detection
class AbuseDetector:
"""
Detect abuse patterns using ML and heuristics
"""
async def analyze_user_behavior(self, user_id: int) -> dict:
"""
Flag suspicious patterns
"""
recent_urls = await db.query("""
SELECT short_code, long_url, created_at
FROM urls
WHERE user_id = ?
AND created_at > NOW() - INTERVAL '1 hour'
""", user_id)
signals = {
'url_creation_rate': len(recent_urls),
'unique_domains': len(set(url.domain for url in recent_urls)),
'newly_registered_domains': sum(1 for url in recent_urls if is_new_domain(url)),
'redirect_chain_length': max(get_redirect_chain_length(url) for url in recent_urls),
}
# Heuristic: Spam pattern detection
if signals['url_creation_rate'] > 50: # 50 URLs in 1 hour
if signals['unique_domains'] < 3: # All pointing to same domain
await self._flag_as_spam(user_id, "High volume, single domain")
return {'action': 'block', 'reason': 'Spam pattern detected'}
# Check redirect chains (cloaking detection)
if signals['redirect_chain_length'] > 3:
await self._flag_for_review(user_id, "Long redirect chain (possible cloaking)")
return {'action': 'allow'}
Layer 5: Incident Response & Takedown
@app.post("/api/report")
async def report_url(short_code: str, reason: str, reporter_email: str):
"""
User-reported abuse endpoint
"""
# Log report
await db.execute("""
INSERT INTO abuse_reports (short_code, reason, reporter_email, reported_at)
VALUES (?, ?, ?, NOW())
""", short_code, reason, reporter_email)
# Auto-takedown for high-confidence cases
report_count = await db.query(
"SELECT COUNT(*) FROM abuse_reports WHERE short_code = ? AND reported_at > NOW() - INTERVAL '1 hour'",
short_code
)
if report_count >= 5: # 5 reports in 1 hour = auto-takedown
await deactivate_url(short_code)
logger.critical(f"Auto-takedown: {short_code} (5+ reports)")
return {"status": "URL taken down immediately"}
# Queue for manual review
await abuse_queue.enqueue(short_code, reason, reporter_email)
return {"status": "Report received, queued for review"}
The key is defense in depth. No single layer is perfect, but together they make abuse economically infeasible.
Observability & Production Monitoring
At this scale, you’re blind without metrics, traces, and logs. Monitor these key aspects:
Golden Signals (SRE Four Pillars)
# 1. Latency - Request duration
metrics.histogram('redirect.latency', tags=['cache_layer:cdn|redis|db'])
metrics.histogram('shorten.latency', tags=['user_tier:free|paid'])
# 2. Traffic - Request rate
metrics.increment('redirect.requests', tags=['status_code', 'region'])
metrics.increment('shorten.requests', tags=['status_code', 'user_tier'])
# 3. Errors - Failure rate
metrics.increment('redirect.errors', tags=['error_type:404|500|timeout'])
metrics.increment('shorten.errors', tags=['error_type:validation|db|ratelimit'])
# 4. Saturation - Resource utilization
metrics.gauge('redis.memory_used_pct', redis.info()['used_memory_pct'])
metrics.gauge('postgres.connection_pool_pct', db.pool.utilization())
metrics.gauge('kafka.consumer_lag', kafka_consumer.lag())
Critical Dashboards
1. Real-Time Health Dashboard
[SLO: 99.99% uptime, <100ms p99 latency]
┌─────────────────────────────────────┐
│ Redirect Success Rate (Last 5 min) │
│ 99.97% ⚠️ (target: 99.99%) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ Latency Percentiles (Last 5 min) │
│ p50: 18ms ✅ │
│ p95: 65ms ✅ │
│ p99: 142ms ⚠️ (target: <100ms) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ Cache Hit Rates │
│ CDN: 82% ✅ │
│ Redis: 91% ✅ │
│ DB: 2% (expected) │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ Kafka Consumer Lag │
│ analytics-consumer: 450 msgs ✅ │
│ (< 1000 = healthy) │
└─────────────────────────────────────┘
2. Business Metrics Dashboard
-- Top performing URLs (for cache warming)
SELECT short_code, COUNT(*) as clicks
FROM clicks
WHERE clicked_at > NOW() - INTERVAL '1 hour'
GROUP BY short_code
ORDER BY clicks DESC
LIMIT 100;
-- Geographic distribution (for PoP optimization)
SELECT country_code, COUNT(*) as requests
FROM clicks
WHERE clicked_at > NOW() - INTERVAL '24 hours'
GROUP BY country_code
ORDER BY requests DESC;
-- Abuse detection metrics
SELECT COUNT(*) as flagged_urls
FROM abuse_reports
WHERE reported_at > NOW() - INTERVAL '1 hour';
Alerting Strategy (PagerDuty/Opsgenie)
class AlertManager:
"""
Define alert thresholds and escalation policies
"""
ALERTS = {
# P0: Wake up on-call immediately
'redirect_error_rate_critical': {
'condition': 'error_rate > 1% for 2 minutes',
'severity': 'P0',
'action': 'page_oncall',
'runbook': 'https://wiki/runbooks/redirect-errors'
},
'database_down': {
'condition': 'postgres.health == 0',
'severity': 'P0',
'action': 'page_oncall + auto_failover',
},
# P1: Alert during business hours
'cache_hit_rate_low': {
'condition': 'redis.hit_rate < 80% for 10 minutes',
'severity': 'P1',
'action': 'slack_alert',
'impact': 'Higher DB load, increased latency'
},
'kafka_lag_high': {
'condition': 'kafka.consumer_lag > 100k for 5 minutes',
'severity': 'P1',
'action': 'slack_alert + auto_scale_consumers',
},
# P2: Investigate during business hours
'abuse_spike': {
'condition': 'abuse_reports > 50 in 1 hour',
'severity': 'P2',
'action': 'ticket',
}
}
Distributed Tracing (OpenTelemetry)
For debugging production issues, use distributed tracing to follow a request across services:
from opentelemetry import trace
@app.get("/{short_code}")
async def redirect(short_code: str):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("redirect_request") as span:
span.set_attribute("short_code", short_code)
# Trace cache lookup
with tracer.start_as_current_span("redis_get"):
url = await redis.get(f"url:{short_code}")
if not url:
# Trace database query
with tracer.start_as_current_span("postgres_query"):
url = await db.query(
"SELECT long_url FROM urls WHERE short_code = ?",
short_code
)
# Trace analytics event
with tracer.start_as_current_span("kafka_produce"):
await kafka.send('clicks', event)
span.set_attribute("cache_hit", url is not None)
return RedirectResponse(url)
# Example trace output:
# Trace ID: 7f8a9b2c3d4e5f6
# ├─ redirect_request (145ms)
# │ ├─ redis_get (2ms) ✅
# │ ├─ kafka_produce (8ms) ✅
# Total: 145ms (p99 breach! investigate Kafka)
Capacity Planning & Cost Monitoring
# Monthly cost breakdown (AWS pricing)
COSTS = {
'compute': {
'app_servers': 20 * 730 * 0.15, # 20 × c5.2xlarge = $2,190/mo
'analytics_workers': 10 * 730 * 0.10, # 10 × t3.large = $730/mo
},
'storage': {
'postgres': 100 * 0.10 * 1024, # 100TB = $10,240/mo
'clickhouse': 30 * 0.10 * 1024, # 30TB = $3,072/mo
's3_cold': 200 * 0.023 * 1024, # 200TB S3 = $4,608/mo
},
'cache': {
'redis': 1024 * 0.50 * 3, # 1TB × 3 regions = $1,536/mo
},
'network': {
'cdn': 10_000 * 0.085, # 10TB CDN egress = $850/mo
'data_transfer': 5_000 * 0.09, # 5TB inter-region = $450/mo
},
'message_queue': {
'kafka': 32 * 0.25 * 730, # 32 partitions = $5,840/mo
}
}
# Total: ~$29k/month for 10B redirects/month
# Cost per redirect: $0.0000029 (0.0003 cents)
At this scale, monitor cost per redirect and set up alerts if it spikes. A spike usually means cache hit rate dropped or you are over-provisioned.
Conclusion
Key Principles for Production URL Shorteners
-
Optimize the read path obsessively: 99% of traffic is redirects, so cache aggressively, use CDN, and fail fast.
-
Availability > Consistency: Stale redirects are annoying, but unavailable redirects are catastrophic. Choose AP in CAP theorem.
-
Analytics must never block redirects: Fire-and-forget to Kafka, batch writes, tolerate some data loss.
-
Security is a product feature, not an afterthought: Multi-layer defense (rate limiting, URL scanning, behavioral analysis) prevents abuse that could tank your reputation.
-
Range-based ID allocation eliminates SPOFs: Don’t depend on a central service for ID generation. Pre-allocate ranges instead.
-
Monitor cost, not just performance: A 5% drop in cache hit rate can double your infrastructure cost.
-
TTL is a cost optimization: If half your URLs expire within 30 days, cleanup can save $100k/year in storage at this scale.
-
Observability isn’t optional: You can’t debug production issues without traces, metrics, and logs. Invest early.
Full Series Summary
Together with Part 1, we’ve covered the complete journey from high-level architecture to production implementation:
Part 1 (HLD): Requirements, capacity planning, distributed ID generation, database design, architecture, caching strategy
Part 2 (LLD): Analytics pipeline, security layers, observability, operational concerns
This design handles 10 billion redirects/month (4k QPS average, 20k peak) with <100ms p99 latency globally. It costs ~$29k/month to run, scales horizontally, and has no single points of failure.
Start simpler (single Postgres + Redis + CDN) and evolve as you hit scale. But always design with these principles in mind. They will guide you through the inevitable trade-offs you will face in system design interviews and production systems alike.
Want to discuss system design? Reach out on Twitter or LinkedIn. I love talking about distributed systems, caching strategies, and building for scale.
Tags:
Related Posts
Building a URL Shortener That Scales
Designing and implementing a production-ready URL shortener with custom domains, analytics, and high availability.
URL Shortener System Design (Part 1): High-Level Architecture
Master the fundamentals of designing a production URL shortener: requirements, capacity planning, distributed ID generation, database design, and caching strategies for system design interviews.
Designing a Distributed Rate Limiter
Building a production-ready rate limiter that scales across multiple servers using Redis and token bucket algorithm.