分布式缓存一致性方案 技术原理 在分布式系统中使用缓存(Cache-Aside模式)时,需要保证数据库与缓存的数据一致性。常见挑战包括: 并发更新: 多个请求同时更新数据库和缓存,导致数据不一致 缓存雪崩: 大量缓存同时失效,数据库压力激增 缓存穿透: 查询不存在的数据,请求直接打到数据库 缓存击穿: 热点key过期,大量请求同时查询数据库 更新顺序: 先更新数据库还是先删除缓存,哪种顺序更可靠 核心方案: Cache-Aside: 应用负责缓存读写,最常用 Read-Through/Write-Through: 缓存层负责同步 Write-Behind: 异步写入数据库,性能高但可能丢数据 延迟双删: 删除缓存->更新数据库->延迟后再次删除缓存 订阅Binlog:
在分布式系统中使用缓存(Cache-Aside模式)时,需要保证数据库与缓存的数据一致性。常见挑战包括:
核心方案:
Cache-Aside基础实现:
import redis import json from typing import Optional class CacheAside: def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.cache_ttl = 3600 # 缓存过期时间(秒) def get_user(self, user_id: int) -> Optional[dict]: """获取用户数据""" # 1. 先查缓存 cache_key = f"user:{user_id}" cached_data = self.redis.get(cache_key) if cached_data: print(f"Cache HIT: {cache_key}") return json.loads(cached_data) print(f"Cache MISS: {cache_key}") # 2. 缓存未命中,查询数据库 user = self._get_user_from_db(user_id) if user: # 3. 写入缓存 self.redis.setex( cache_key, self.cache_ttl, json.dumps(user) ) return user def update_user(self, user_id: int, data: dict): """更新用户数据""" # 1. 先更新数据库 self._update_user_to_db(user_id, data) # 2. 删除缓存(而不是更新缓存) cache_key = f"user:{user_id}" self.redis.delete(cache_key) # 为什么是删除而不是更新? # - 避免并发更新时缓存与数据库不一致 # - 下次查询时自动加载最新数据 # - 删除比更新更简单,避免复杂的序列化逻辑 def _get_user_from_db(self, user_id: int) -> Optional[dict]: """从数据库查询用户""" # 模拟数据库查询 # 实际项目中使用ORM或SQL查询 return { "id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com" } def _update_user_to_db(self, user_id: int, data: dict): """更新数据库""" # 实际项目中使用UPDATE SQL print(f"Updated user {user_id} in database: {data}")
延迟双删方案:
import time import redis import json from typing import Optional class DelayedDoubleDelete: def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.delete_delay = 1.0 # 延迟删除时间(秒) def update_user_with_double_delete(self, user_id: int, data: dict): """延迟双删更新""" cache_key = f"user:{user_id}" # 1. 第一次删除缓存 print(f"First delete cache: {cache_key}") self.redis.delete(cache_key) # 2. 更新数据库 print(f"Update database: user {user_id}") self._update_user_to_db(user_id, data) # 3. 延迟后第二次删除缓存 # 为什么要延迟? # - 确保数据库更新完成 # - 在延迟期间,其他线程读取数据库并写入缓存 # - 第二次删除可以清除这些过期缓存 time.sleep(self.delete_delay) print(f"Second delete cache: {cache_key}") self.redis.delete(cache_key) def _update_user_to_db(self, user_id: int, data: dict): """更新数据库""" # 实际数据库更新逻辑 pass # 使用示例 redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) strategy = DelayedDoubleDelete(redis_client) # 更新用户 strategy.update_user_with_double_delete(123, {"name": "Updated Name"})
订阅Binlog异步更新缓存:
# Canal客户端监听MySQL Binlog from canal.client import Client from canal.protocol import EntryProtocol import redis import json class BinlogCacheSync: def __init__(self, canal_host: str, canal_port: int, redis_client: redis.Redis): self.canal_client = Client() self.canal_client.connect(host=canal_host, port=canal_port) self.canal_client.subscribe(client_id="cache_sync", filter="mydb.users") self.redis = redis_client def start(self): """启动binlog监听""" while True: message = self.canal_client.get(100) if not message: continue for entry in message.entries: if entry.entry_type == EntryProtocol.ENTRYTYPE.ROWDATA: self._process_row_data(entry) def _process_row_data(self, entry): """处理binlog行数据""" # 解析binlog数据 row_change = self._parse_row_change(entry) if row_change.event == "UPDATE" or row_change.event == "DELETE": # 删除缓存 user_id = row_change.primary_key cache_key = f"user:{user_id}" print(f"Binlog: Delete cache {cache_key}") self.redis.delete(cache_key) # 可选: 预热缓存 if row_change.event == "UPDATE": latest_data = self._get_user_from_db(user_id) if latest_data: self.redis.setex( cache_key, 3600, json.dumps(latest_data) ) def _parse_row_data(self, entry): """解析binlog行数据""" # 使用canal协议解析 pass # 优势: # - 完全解耦,应用代码无需关心缓存更新 # - 最终一致,binlog保证不丢失更新 # - 性能好,异步处理不阻塞主流程
缓存雪崩防护:
import redis import random import time from functools import wraps class CacheAvalancheProtection: def __init__(self, redis_client: redis.Redis): self.redis = redis_client def get_with_hot_cache(self, key: str, load_func, ttl: int = 3600): """热点数据永不过期""" # 1. 尝试获取缓存 cached_data = self.redis.get(key) if cached_data: return json.loads(cached_data) # 2. 缓存未命中,加锁防止缓存击穿 lock_key = f"lock:{key}" lock_acquired = self.redis.set( lock_key, "1", nx=True, # 仅不存在时设置 ex=10 # 锁过期时间10秒 ) if lock_acquired: try: # 3. 获取锁的线程加载数据 data = load_func() # 4. 设置永不过期的逻辑过期时间 cache_data = { "data": data, "expire_time": int(time.time()) + ttl } self.redis.set(key, json.dumps(cache_data)) return data finally: self.redis.delete(lock_key) else: # 5. 未获取锁的线程等待并重试 time.sleep(0.1) return self.get_with_hot_cache(key, load_func, ttl) def set_with_random_ttl(self, key: str, value: dict, base_ttl: int = 3600): """设置随机TTL防止缓存雪崩""" # TTL = 基础TTL + 随机0-300秒 # 避免大量缓存同时过期 random_ttl = base_ttl + random.randint(0, 300) self.redis.setex(key, random_ttl, json.dumps(value)) # 使用示例 cache = CacheAvalancheProtection(redis.Redis()) def load_user_from_db(user_id: int): # 从数据库加载用户 return {"id": user_id, "name": f"User{user_id}"} user = cache.get_with_hot_cache( f"user:123", lambda: load_user_from_db(123), ttl=3600 )
分布式锁保证一致性:
import redis import uuid class DistributedLock: def __init__(self, redis_client: redis.Redis): self.redis = redis_client def acquire(self, lock_key: str, expire_time: int = 10) -> str: """获取分布式锁""" lock_value = str(uuid.uuid4()) acquired = self.redis.set( lock_key, lock_value, nx=True, ex=expire_time ) if acquired: return lock_value return None def release(self, lock_key: str, lock_value: str): """释放锁(使用Lua保证原子性)""" lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ self.redis.eval(lua_script, 1, lock_key, lock_value) # 使用分布式锁保证缓存更新顺序 def update_user_safe(user_id: int, data: dict): lock = DistributedLock(redis.Redis()) lock_key = f"lock:user:{user_id}" # 获取锁 lock_value = lock.acquire(lock_key) if not lock_value: raise Exception("Failed to acquire lock") try: # 更新数据库 update_database(user_id, data) # 删除缓存 redis_client.delete(f"user:{user_id}") finally: # 释放锁 lock.release(lock_key, lock_value)