分布式系统一致性:从 CAP 到 Raft 分布式系统基础 什么是分布式系统? 分布式系统是多个独立计算机节点协同工作的系统,通过网络通信。 核心挑战 并发:多个操作同时执行 局部故障:部分节点可能失败 消息延迟:网络不可靠 时钟同步:节点时钟不一致 CAP 定理 CAP 三个属性 一致性(Consistency): 所有节点同时看到相同的数据 线性一致性(Linearizability) 可用性(Availability): 每个请求都能得到响应 响应成功或失败,但不超时 分区容错性(Partition Tolerance): 系统在网络分区时仍能继续运行 CAP 权衡 现实选择: CA:不考虑分区(单机系统) CP:保证一致性,牺牲可用性 AP:保证可用性,牺牲一致性 一致性级别
分布式系统是多个独立计算机节点协同工作的系统,通过网络通信。
一致性(Consistency):
可用性(Availability):
分区容错性(Partition Tolerance):
一致性(C) ┌────┐ │ │ │ │ 可用性(A)─────┴ 分区容错(P)
现实选择:
角色:
两阶段提交:
阶段 1:Prepare/Promise Proposer → Acceptor: Prepare(n) Acceptor → Proposer: Promise(n, accepted_value) 阶段 2:Accept/Accepted Proposer → Acceptor: Accept(n, value) Acceptor → Proposer: Accepted(n, value)
缺点:
核心思想:领导者选举 + 日志复制
领导者(Leader) ├── 处理所有写入请求 └── 管理日志复制 跟随者(Follower) ├── 接收 Leader 的请求 └── 转换为 Candidate(选举时) 候选人(Candidate) ├── 请求投票 └── 转换为 Leader 或 Follower
选举触发条件:
选举过程:
# 伪代码 def start_election(): current_term += 1 voted_for = self votes_received = 1 # 投给自己 # 向所有节点请求投票 for server in servers: send_request_vote(server, current_term, last_log_index, last_log_term) # 等待投票结果 while votes_received <= majority: if received_append_entries(): become_follower() return elif received_vote_response(): votes_received += 1 # 获得多数票,成为 Leader become_leader()
投票规则:
# Leader 处理客户端请求 def handle_client_request(command): # 添加到本地日志 log.append(LogEntry(current_term, command)) # 并行发送给所有 Follower for server in servers: send_append_entries(server, log) # 等待大多数节点复制 while commit_index < log.index: if replicated_on_majority(log.index): commit_index = log.index apply_to_state_machine(log[commit_index]) # 响应客户端 return success # Follower 处理 AppendEntries def handle_append_entries(leader_term, prev_log_index, prev_log_term, entries, leader_commit): # 检查 term if leader_term < current_term: return False # 检查日志一致性 if log[prev_log_index].term != prev_log_term: return False # 追加日志 log.append(entries) # 更新提交索引 if leader_commit > commit_index: commit_index = min(leader_commit, log.index) return True
特点:
阶段:
阶段 1:准备阶段 协调者 → 所有参与者:准备事务 参与者 → 协调者:同意/拒绝 阶段 2:提交阶段 协调者 → 所有参与者:提交/回滚 参与者 → 协调者:确认
缺点:
改进:
仍然有问题:
# Saga 实现 class Saga: def __init__(self): self.transactions = [] def add_transaction(self, transaction): self.transactions.append(transaction) def execute(self): completed = [] # 执行所有事务 for tx in self.transactions: try: tx.execute() completed.append(tx) except Exception as e: # 补偿已执行的事务 for tx in reversed(completed): tx.compensate() raise e # 使用示例 class OrderSaga(Saga): def __init__(self): super().__init__() self.add_transaction(CreateOrder()) self.add_transaction(ReserveInventory()) self.add_transaction(ProcessPayment()) self.add_transaction(ConfirmOrder())
在分布式缓存中,如何均匀分配数据到不同节点?
import hashlib class ConsistentHashing: def __init__(self, nodes=None, replicas=3): self.replicas = replicas self.ring = {} self.sorted_keys = [] if nodes: for node in nodes: self.add_node(node) def _hash(self, key): return int(hashlib.md5(key.encode()).hexdigest(), 16) def add_node(self, node): for i in range(self.replicas): virtual_node = f"{node}:{i}" hash_value = self._hash(virtual_node) self.ring[hash_value] = node self.sorted_keys.append(hash_value) self.sorted_keys.sort() def remove_node(self, node): for i in range(self.replicas): virtual_node = f"{node}:{i}" hash_value = self._hash(virtual_node) del self.ring[hash_value] self.sorted_keys.remove(hash_value) def get_node(self, key): if not self.sorted_keys: return None hash_value = self._hash(key) # 顺时针查找第一个节点 for ring_key in self.sorted_keys: if ring_key >= hash_value: return self.ring[ring_key] # 环绕到第一个节点 return self.ring[self.sorted_keys[0]]
优势:
import redis import uuid import time class DistributedLock: def __init__(self, redis_client, lock_name, expire_time=10): self.redis = redis_client self.lock_name = f"lock:{lock_name}" self.expire_time = expire_time self.identifier = str(uuid.uuid4()) def acquire(self): # 使用 SET NX EX 原子操作 end_time = time.time() + self.expire_time while time.time() < end_time: if self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.expire_time): return True time.sleep(0.001) return False def release(self): # Lua 脚本确保只有锁的持有者才能释放 lua_script = ''' if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end ''' self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
from kazoo.client import KazooClient class ZKLocket: def __init__(self, zk, lock_path): self.zk = zk self.lock_path = lock_path self.lock = zk.Lock(lock_path) def acquire(self): self.lock.acquire() def release(self): self.lock.release()
特点:
实现:
挑战:
解决方案:
分片策略:
一致性保证:
分布式系统是一致性和可用性的权衡:
记住:没有银弹,根据业务场景选择合适的方案!