分布式缓存架构:Redis集群与缓存策略 引言 在现代分布式系统中,缓存是提升性能的关键手段。本文将深入探讨分布式缓存的架构设计、Redis集群方案以及实用的缓存策略,帮助你构建高性能的缓存系统。 一、分布式缓存的核心价值 1.1 为什么需要分布式缓存? 单机缓存的局限性: 容量受限:单机内存有限 可用性差:单点故障风险 扩展困难:无法水平扩展 分布式缓存的优势: 海量容量:集群总内存可达TB级 高可用性:节点故障自动故障转移 弹性扩展:动态增减节点 1.2 典型应用场景 二、Redis集群方案 2.1 主从复制(Master-Slave) 架构模式: 配置示例: 特点: ✅ 读写分离,提升读性能 ✅ 故障转移需要手动干预 ❌ 写操作无法扩展 2.
在现代分布式系统中,缓存是提升性能的关键手段。本文将深入探讨分布式缓存的架构设计、Redis集群方案以及实用的缓存策略,帮助你构建高性能的缓存系统。
单机缓存的局限性:
分布式缓存的优势:
# 场景1:热点数据缓存 def get_user_info(user_id): # 先查缓存 cache_key = f"user:{user_id}" user_info = redis.get(cache_key) if user_info: return json.loads(user_info) # 缓存未命中,查数据库 user_info = db.query("SELECT * FROM users WHERE id = %s", user_id) # 写入缓存 redis.setex(cache_key, 3600, json.dumps(user_info)) return user_info # 场景2:排行榜 def update_leaderboard(user_id, score): redis.zadd("leaderboard", {user_id: score}) def get_top_users(limit=10): return redis.zrevrange("leaderboard", 0, limit-1, withscores=True) # 场景3:计数器 def incr_page_views(page_id): return redis.incr(f"page_views:{page_id}")
架构模式:
应用 | Master(写) / | \ Slave1 Slave2 Slave3(读)
配置示例:
# master配置 port 6379 bind 0.0.0.0 # slave配置 port 6380 bind 0.0.0.0 slaveof 192.168.1.100 6379
特点:
自动故障转移:
应用 | Sentinel监控 / | \ Master Master Master | | | Slave Slave Slave
配置示例:
# sentinel.conf port 26379 sentinel monitor mymaster 192.168.1.100 6379 2 sentinel down-after-milliseconds mymaster 5000 sentinel failover-timeout mymaster 10000
工作流程:
分片架构:
应用 | 16384个槽位 / | \ 节点A 节点B 节点C 槽位 槽位 槽位 0-5460 5461-10922 10923-16383
创建集群:
# 创建6个节点实例(3主3从) for port in 7000 7001 7002 7003 7004 7005; do mkdir -p cluster/$port cat > cluster/$port/redis.conf <<EOF port $port cluster-enabled yes cluster-config-file nodes-$port.conf cluster-node-timeout 5000 appendonly yes EOF redis-server cluster/$port/redis.conf & done # 创建集群 redis-cli --cluster create \ 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \ 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \ --cluster-replicas 1
特点:
最常用的缓存模式:
def get_user(user_id): # 1. 先查缓存 user = cache.get(f"user:{user_id}") if user: return user # 2. 缓存未命中,查数据库 user = db.get_user(user_id) if not user: return None # 3. 写入缓存 cache.set(f"user:{user_id}", user, ttl=3600) return user def update_user(user_id, data): # 1. 更新数据库 db.update_user(user_id, data) # 2. 删除缓存 cache.delete(f"user:{user_id}")
优缺点:
写入时同步更新缓存:
def update_user(user_id, data): # 1. 同时更新缓存和数据库 cache.set(f"user:{user_id}", data, ttl=3600) db.update_user(user_id, data) # 两者都成功才返回 def get_user(user_id): # 缓存一定存在,直接返回 return cache.get(f"user:{user_id}")
适用场景:
异步写入数据库:
class WriteBehindCache: def __init__(self): self.cache = {} self.write_queue = Queue() self.writer_thread = Thread(target=self._write_worker) self.writer_thread.start() def update(self, key, value): # 1. 立即更新缓存 self.cache[key] = value # 2. 加入写队列 self.write_queue.put((key, value)) def _write_worker(self): while True: key, value = self.write_queue.get() try: # 批量写入数据库 db.update(key, value) except Exception as e: # 失败重试 self.write_queue.put((key, value))
优缺点:
**问题:**查询不存在的数据,缓存和DB都没有,每次都打到DB
解决方案:
def get_user(user_id): user = cache.get(f"user:{user_id}") if user == "NULL": return None # 缓存空值 if user: return user user = db.get_user(user_id) if not user: # 缓存空值,防止穿透 cache.setex(f"user:{user_id}", 300, "NULL") return None cache.setex(f"user:{user_id}", 3600, user) return user
**问题:**热点key过期,大量请求同时打到DB
解决方案1:互斥锁
import threading def get_product(product_id): product = cache.get(f"product:{product_id}") if product: return product lock = locks.acquire(f"lock:{product_id}", timeout=10) if not lock: # 获取锁失败,等待其他线程重建缓存 time.sleep(0.1) return cache.get(f"product:{product_id}") try: # 双重检查 product = cache.get(f"product:{product_id}") if product: return product # 查询数据库 product = db.get_product(product_id) cache.setex(f"product:{product_id}", 3600, product) return product finally: lock.release()
解决方案2:永不过期
# 不设置TTL,后台异步更新 def get_product(product_id): product = cache.get(f"product:{product_id}") if product: return product # 缓存不存在,重建缓存 product = db.get_product(product_id) cache.set(f"product:{product_id}", product) # 不设置过期 # 后台定时刷新 schedule_refresh(product_id) return product
**问题:**大量key同时过期,DB压力激增
解决方案:TTL加随机值
import random def set_cache_with_jitter(key, value, base_ttl): # TTL = 基础TTL + 随机值(0-300秒) ttl = base_ttl + random.randint(0, 300) cache.setex(key, ttl, value) # 批量设置缓存 for item in items: set_cache_with_jitter(f"item:{item.id}", item, base_ttl=3600)
策略1:延迟双删
def update_user(user_id, data): # 1. 删除缓存 cache.delete(f"user:{user_id}") # 2. 更新数据库 db.update_user(user_id, data) # 3. 延迟后再次删除缓存(防止读写并发) time.sleep(1) cache.delete(f"user:{user_id}")
策略2:订阅Binlog
# 监听MySQL Binlog,异步更新缓存 def binlog_listener(event): if event.table == "users": if event.operation == "UPDATE": cache.delete(f"user:{event.data['id']}") elif event.operation == "DELETE": cache.delete(f"user:{event.data['id']}")
# 不好的做法:N次网络请求 for user_id in user_ids: user = cache.get(f"user:{user_id}") # 好的做法:1次网络请求 pipe = cache.pipeline() for user_id in user_ids: pipe.get(f"user:{user_id}") users = pipe.execute()
pool = redis.ConnectionPool( host='localhost', port=6379, max_connections=50, socket_keepalive=True, socket_connect_timeout=5, socket_timeout=5, retry_on_timeout=True ) r = redis.Redis(connection_pool=pool)
# redis.conf优化 maxmemory 2gb maxmemory-policy allkeys-lru # LRU淘汰 save "" # 禁用RDB,使用AOF appendonly yes appendfsync everysec
# 关键指标监控 def monitor_redis(): info = redis.info() metrics = { 'memory_used': info['used_memory'], 'memory_peak': info['used_memory_peak'], 'hit_rate': info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']), 'connections': info['connected_clients'], 'commands_per_sec': info['instantaneous_ops_per_sec'], 'expired_keys': info['expired_keys'], 'evicted_keys': info['evicted_keys'] } # 告警条件 if metrics['hit_rate'] < 0.8: alert("缓存命中率过低") if metrics['memory_used'] / info['maxmemory'] > 0.9: alert("内存使用率过高")
构建高性能分布式缓存系统需要考虑:
掌握这些技能,你就能设计出高可用、高性能的缓存系统。