分布式系统原理:构建可扩展的架构 分布式系统是现代云计算和微服务架构的基础。本文将深入探讨分布式系统的核心原理、设计模式和最佳实践。 分布式系统基础 CAP定理再认识 在分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition Tolerance)三者只能满足两个。 实际应用权衡: 分布式一致性算法 Raft算法实现 分布式协调 分布式锁 分布式事务 服务发现与注册 Consul服务注册 负载均衡 一致性哈希负载均衡 容错与恢复 断路器模式 总结 分布式系统设计需要综合考虑一致性、可用性、分区容错性等多个方面。通过合理使用一致性算法、分布式协调、服务发现和负载均衡等技术,可以构建高性能、高可用的分布式系统架构。
分布式系统是现代云计算和微服务架构的基础。本文将深入探讨分布式系统的核心原理、设计模式和最佳实践。
在分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition Tolerance)三者只能满足两个。
实际应用权衡:
class ConsistencyLevel: """一致性级别选择""" @staticmethod def strong_consistency(): """ 强一致性: - 优点:数据始终一致 - 缺点:性能较差,可用性降低 - 适用:金融交易、库存管理 """ return { 'read_after_write': True, 'causal_consistency': True, 'atomic_broadcast': True } @staticmethod def eventual_consistency(): """ 最终一致性: - 优点:高性能,高可用 - 缺点:可能读取到过期数据 - 适用:社交网络、内容分发 """ return { 'read_repair': True, 'quorum_reads': True, 'hinted_handoff': True }
package raft import ( "sync" "time" ) type NodeState int const ( Follower NodeState = iota Candidate Leader ) type LogEntry struct { Index int Term int Command interface{} } type RaftNode struct { id string state NodeState currentTerm int votedFor string log []LogEntry commitIndex int lastApplied int // Leader状态 nextIndex map[string]int matchIndex map[string]int // 网络和持久化 peers []string voteCount int electionTimer *time.Timer heartbeatTimer *time.Timer mu sync.Mutex // RPC通道 voteChannel chan VoteRequest appendChannel chan AppendEntriesRequest } func (rn *RaftNode) startElection() { rn.mu.Lock() defer rn.mu.Unlock() rn.state = Candidate rn.currentTerm++ rn.votedFor = rn.id rn.voteCount = 1 // 发送投票请求给所有节点 for _, peer := range rn.peers { request := VoteRequest{ Term: rn.currentTerm, CandidateId: rn.id, LastLogIndex: len(rn.log) - 1, LastLogTerm: rn.getLastLogTerm(), } go rn.sendRequestVote(peer, request) } // 重置选举超时 rn.resetElectionTimer() } func (rn *RaftNode) becomeLeader() { rn.mu.Lock() defer rn.mu.Unlock() rn.state = Leader // 初始化Leader状态 for _, peer := range rn.peers { rn.nextIndex[peer] = len(rn.log) rn.matchIndex[peer] = -1 } // 立即发送心跳 rn.sendHeartbeats() } func (rn *RaftNode) sendHeartbeats() { for _, peer := range rn.peers { prevLogIndex := rn.nextIndex[peer] - 1 prevLogTerm := 0 if prevLogIndex >= 0 { prevLogTerm = rn.log[prevLogIndex].Term } entries := rn.log[prevLogIndex+1:] request := AppendEntriesRequest{ Term: rn.currentTerm, LeaderId: rn.id, PrevLogIndex: prevLogIndex, PrevLogTerm: prevLogTerm, Entries: entries, LeaderCommit: rn.commitIndex, } go rn.sendAppendEntries(peer, request) } } func (rn *RaftNode) handleAppendEntries(request AppendEntriesRequest) AppendEntriesResponse { rn.mu.Lock() defer rn.mu.Unlock() // 如果term小于当前term,拒绝 if request.Term < rn.currentTerm { return AppendEntriesResponse{ Term: rn.currentTerm, Success: false, } } // 如果term更大,转换为follower if request.Term > rn.currentTerm { rn.currentTerm = request.Term rn.state = Follower rn.votedFor = "" } // 重置选举超时 rn.resetElectionTimer() // 检查日志一致性 if request.PrevLogIndex >= 0 { if len(rn.log) <= request.PrevLogIndex { return AppendEntriesResponse{ Term: rn.currentTerm, Success: false, } } if rn.log[request.PrevLogIndex].Term != request.PrevLogTerm { return AppendEntriesResponse{ Term: rn.currentTerm, Success: false, } } } // 追加新日志 if len(request.Entries) > 0 { rn.log = append(rn.log[:request.PrevLogIndex+1], request.Entries...) } // 更新commitIndex if request.LeaderCommit > rn.commitIndex { rn.commitIndex = min(request.LeaderCommit, len(rn.log)-1) } return AppendEntriesResponse{ Term: rn.currentTerm, Success: true, } } func (rn *RaftNode) commitLog() { // 找到大多数节点都复制的日志条目 for n := rn.commitIndex + 1; n < len(rn.log); n++ { count := 1 // Leader自己 for _, peer := range rn.peers { if rn.matchIndex[peer] >= n { count++ } } if count > len(rn.peers)/2 && rn.log[n].Term == rn.currentTerm { rn.commitIndex = n rn.applyLog(n) } } } func (rn *RaftNode) applyLog(index int) { if index > rn.lastApplied { entry := rn.log[index] // 应用日志到状态机 rn.stateMachine.Apply(entry.Command) rn.lastApplied = index } }
import redis import uuid import time class DistributedLock: """基于Redis的分布式锁""" def __init__(self, redis_client, lock_name, ttl=30): self.redis = redis_client self.lock_name = f"lock:{lock_name}" self.ttl = ttl self.identifier = str(uuid.uuid4()) def acquire(self, blocking=True, timeout=None): """获取锁""" start_time = time.time() while True: # 使用SET命令的NX选项 acquired = self.redis.set( self.lock_name, self.identifier, nx=True, ex=self.ttl ) if acquired: return True if not blocking: return False if timeout and (time.time() - start_time) >= timeout: return False time.sleep(0.01) # 避免忙等待 def release(self): """释放锁""" # 使用Lua脚本确保只释放自己的锁 lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ result = self.redis.eval( lua_script, 1, self.lock_name, self.identifier ) return result == 1 def extend(self, additional_time=10): """延长锁的持有时间""" lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("expire", KEYS[1], ARGV[2]) else return 0 end """ result = self.redis.eval( lua_script, 1, self.lock_name, self.identifier, additional_time ) return result == 1 # 使用示例 def process_task(redis_client, task_id): lock = DistributedLock(redis_client, f"task:{task_id}") if lock.acquire(timeout=5): try: # 处理任务 print(f"Processing task {task_id}") time.sleep(2) # 延长锁时间 lock.extend(additional_time=5) # 继续处理 time.sleep(2) finally: lock.release() else: print(f"Failed to acquire lock for task {task_id}")
import io.seata.spring.annotation.GlobalTransactional; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service public class OrderService { @Autowired private OrderRepository orderRepository; @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @GlobalTransactional(name = "create-order", rollbackFor = Exception.class) public Order createOrder(OrderRequest request) { // 1. 创建订单 Order order = new Order(); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setQuantity(request.getQuantity()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); orderRepository.save(order); // 2. 扣减库存 inventoryService.deductInventory( request.getProductId(), request.getQuantity() ); // 3. 扣减账户余额 accountService.debitBalance( request.getUserId(), request.getAmount() ); // 4. 更新订单状态 order.setStatus(OrderStatus.CONFIRMED); orderRepository.save(order); return order; } } // 库存服务 @Service public class InventoryService { @Autowired private InventoryRepository inventoryRepository; @Transactional public void deductInventory(String productId, int quantity) { Inventory inventory = inventoryRepository.findById(productId) .orElseThrow(() -> new RuntimeException("Inventory not found")); if (inventory.getQuantity() < quantity) { throw new RuntimeException("Insufficient inventory"); } inventory.setQuantity(inventory.getQuantity() - quantity); inventoryRepository.save(inventory); } } // 账户服务 @Service public class AccountService { @Autowired private AccountRepository accountRepository; @Transactional public void debitBalance(String userId, BigDecimal amount) { Account account = accountRepository.findByUserId(userId) .orElseThrow(() -> new RuntimeException("Account not found")); if (account.getBalance().compareTo(amount) < 0) { throw new RuntimeException("Insufficient balance"); } account.setBalance(account.getBalance().subtract(amount)); accountRepository.save(account); } }
package main import ( "github.com/hashicorp/consul/api" "log" "time" ) type ServiceRegistry struct { client *api.Client config *ServiceConfig } type ServiceConfig struct { ID string Name string Address string Port int Tags []string Check *api.AgentServiceCheck } func NewServiceRegistry(consulAddr string) (*ServiceRegistry, error) { config := api.DefaultConfig() config.Address = consulAddr client, err := api.NewClient(config) if err != nil { return nil, err } return &ServiceRegistry{ client: client, }, nil } func (sr *ServiceRegistry) Register(config *ServiceConfig) error { registration := &api.AgentServiceRegistration{ ID: config.ID, Name: config.Name, Address: config.Address, Port: config.Port, Tags: config.Tags, Check: config.Check, } err := sr.client.Agent().ServiceRegister(registration) if err != nil { return err } // 启动健康检查 go sr.startHealthCheck(config) return nil } func (sr *ServiceRegistry) startHealthCheck(config *ServiceConfig) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for range ticker.C { checks, err := sr.client.Health().Checks(config.Name, nil) if err != nil { log.Printf("Health check failed: %v", err) continue } for _, check := range checks { if check.Status != api.HealthPassing { log.Printf("Service %s health check failed: %s", config.ID, check.Output) // 可以在这里添加自动恢复逻辑 } } } } func (sr *ServiceRegistry) Deregister(serviceID string) error { return sr.client.Agent().ServiceDeregister(serviceID) } func (sr *ServiceRegistry) Discover(serviceName string) ([]string, error) { services, _, err := sr.client.Health().Service(serviceName, "", true, nil) if err != nil { return nil, err } var addresses []string for _, service := range services { address := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port) addresses = append(addresses, address) } return addresses, nil } // 使用示例 func main() { registry, err := NewServiceRegistry("localhost:8500") if err != nil { log.Fatal(err) } config := &ServiceConfig{ ID: "user-service-1", Name: "user-service", Address: "localhost", Port: 8080, Tags: []string{"api", "rest"}, Check: &api.AgentServiceCheck{ HTTP: "http://localhost:8080/health", Interval: "10s", Timeout: "3s", DeregisterCriticalServiceAfter: "30s", }, } err = registry.Register(config) if err != nil { log.Fatal(err) } // 服务发现 services, err := registry.Discover("user-service") if err != nil { log.Fatal(err) } log.Printf("Discovered services: %v", services) }
import hashlib from typing import List, Dict, Optional class ConsistentHash: """一致性哈希实现""" def __init__(self, replicas: int = 150): self.replicas = replicas self.ring = {} self.sorted_keys = [] def add_node(self, node: str): """添加节点""" for i in range(self.replicas): virtual_node = f"{node}:{i}" key = self._hash(virtual_node) self.ring[key] = node self.sorted_keys = sorted(self.ring.keys()) def remove_node(self, node: str): """移除节点""" for i in range(self.replicas): virtual_node = f"{node}:{i}" key = self._hash(virtual_node) if key in self.ring: del self.ring[key] self.sorted_keys = sorted(self.ring.keys()) def get_node(self, key: str) -> Optional[str]: """获取key对应的节点""" if not self.ring: return None hash_key = self._hash(key) # 查找第一个大于等于hash_key的节点 for ring_key in self.sorted_keys: if ring_key >= hash_key: return self.ring[ring_key] # 如果没找到,返回第一个节点(环形) return self.ring[self.sorted_keys[0]] def _hash(self, key: str) -> int: """计算哈希值""" return int(hashlib.md5(key.encode()).hexdigest(), 16) class LoadBalancer: """负载均衡器""" def __init__(self, strategy: str = "consistent_hash"): self.strategy = strategy self.nodes = {} self.consistent_hash = ConsistentHash() def add_node(self, node_id: str, address: str, weight: int = 1): """添加节点""" self.nodes[node_id] = { 'address': address, 'weight': weight, 'active_connections': 0, 'total_requests': 0 } if self.strategy == "consistent_hash": self.consistent_hash.add_node(node_id) def get_node(self, key: str = None) -> Optional[str]: """根据策略选择节点""" if not self.nodes: return None if self.strategy == "round_robin": return self._round_robin() elif self.strategy == "least_connections": return self._least_connections() elif self.strategy == "weighted": return self._weighted() elif self.strategy == "consistent_hash": return self.consistent_hash.get_node(key or "") else: return self._random() def _round_robin(self) -> str: """轮询算法""" node_ids = list(self.nodes.keys()) index = self._total_requests % len(node_ids) node_id = node_ids[index] self.nodes[node_id]['total_requests'] += 1 self._total_requests += 1 return node_id def _least_connections(self) -> str: """最少连接算法""" node_id = min(self.nodes.keys(), key=lambda x: self.nodes[x]['active_connections']) self.nodes[node_id]['active_connections'] += 1 return node_id def _weighted(self) -> str: """加权算法""" total_weight = sum(node['weight'] for node in self.nodes.values()) import random rand = random.uniform(0, total_weight) cumulative = 0 for node_id, node in self.nodes.items(): cumulative += node['weight'] if rand <= cumulative: return node_id return list(self.nodes.keys())[-1] def _random(self) -> str: """随机算法""" import random return random.choice(list(self.nodes.keys())) # 使用示例 lb = LoadBalancer(strategy="consistent_hash") # 添加节点 lb.add_node("node1", "192.168.1.1", weight=3) lb.add_node("node2", "192.168.1.2", weight=2) lb.add_node("node3", "192.168.1.3", weight=1) # 请求路由 for i in range(10): node_id = lb.get_node(f"user_{i}") print(f"Request {i} routed to {node_id}")
import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import org.springframework.stereotype.Service; @Service public class RemoteService { private final CircuitBreaker circuitBreaker; public RemoteService() { CircuitBreakerConfig config = CircuitBreakerConfig.custom() .failureRateThreshold(50) // 失败率阈值50% .waitDurationInOpenState(java.time.Duration.ofSeconds(30)) // 开启状态等待30秒 .permittedNumberOfCallsInHalfOpenState(5) // 半开状态允许5次调用 .slidingWindowSize(10) // 滑动窗口大小10 .build(); CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config); this.circuitBreaker = registry.circuitBreaker("remoteService"); } public String callRemoteService(String request) { return CircuitBreaker.decorateSupplier( circuitBreaker, () -> doRemoteCall(request) ).get(); } private String doRemoteCall(String request) { // 实际的远程调用 try { return restTemplate.postForObject( "http://remote-service/api", request, String.class ); } catch (Exception e) { throw new RuntimeException("Remote service failed", e); } } public CircuitBreaker.State getCircuitBreakerState() { return circuitBreaker.getState(); } }
分布式系统设计需要综合考虑一致性、可用性、分区容错性等多个方面。通过合理使用一致性算法、分布式协调、服务发现和负载均衡等技术,可以构建高性能、高可用的分布式系统架构。持续学习和实践是掌握分布式系统原理的关键。