分布式数据库设计:分库分表与读写分离


文档摘要

分布式数据库设计:分库分表与读写分离 分库分表基础 为什么需要分库分表? 单表数据量过大: 查询性能下降 索引效率降低 锁竞争严重 单库连接数有限: MySQL 默认 maxconnections=151 高并发下连接池耗尽 磁盘空间限制: 单表文件过大 备份恢复时间长 分库分表策略 垂直拆分 垂直分库:按业务拆分 垂直分表:按字段拆分 水平拆分 水平分库:数据分散到多个库 水平分表:数据分散到多个表 分片算法 哈希分片 优点: 数据均匀分布 扩容方便 实现: 范围分片 优点: 查询范围数据高效 便于数据归档 实现: 地理位置 优点: 数据就近访问 降低延迟 实现: 读写分离 架构设计 MySQL 主从复制 配置主库 配置从库 读写分离中间件 ShardingSphere 配置示例:

分布式数据库设计:分库分表与读写分离

分库分表基础

为什么需要分库分表?

单表数据量过大

  • 查询性能下降
  • 索引效率降低
  • 锁竞争严重

单库连接数有限

  • MySQL 默认 max_connections=151
  • 高并发下连接池耗尽

磁盘空间限制

  • 单表文件过大
  • 备份恢复时间长

分库分表策略

1. 垂直拆分

垂直分库:按业务拆分

单体应用数据库: - 用户表 - 订单表 - 商品表 - 支付表 拆分为多个数据库: - 用户库(user_db) - 订单库(order_db) - 商品库(product_db) - 支付库(payment_db)

垂直分表:按字段拆分

用户表(users): - id - username - email - password - bio(简介,大文本) - avatar(头像,BLOB) 拆分为: - 用户基础表(user_base) - 用户扩展表(user_ext)

2. 水平拆分

水平分库:数据分散到多个库

订单表(orders): - 按 user_id % 2 分库 - orders_0 库:user_id 为偶数 - orders_1 库:user_id 为奇数

水平分表:数据分散到多个表

订单表(orders): - 按 id % 4 分表 - orders_0、orders_1、orders_2、orders_3

分片算法

1. 哈希分片

优点

  • 数据均匀分布
  • 扩容方便

实现

def hash_sharding(key, num_shards): # 使用 MD5 哈希 import hashlib hash_value = int(hashlib.md5(str(key).encode()).hexdigest(), 16) return hash_value % num_shards # 示例 shard = hash_sharding("user_123", 4) print(f"Shard: {shard}") # 输出: Shard: 2

2. 范围分片

优点

  • 查询范围数据高效
  • 便于数据归档

实现

def range_sharding(key, ranges): """ ranges = [(0, 1000000), (1000001, 2000000), ...] """ for i, (start, end) in enumerate(ranges): if start <= key <= end: return i return len(ranges) - 1 # 示例 ranges = [(0, 1000000), (1000001, 2000000), (2000001, 3000000)] shard = range_sharding(1500000, ranges) print(f"Shard: {shard}") # 输出: Shard: 1

3. 地理位置

优点

  • 数据就近访问
  • 降低延迟

实现

def geo_sharding(location, shard_map): """ shard_map = { 'beijing': 0, 'shanghai': 1, 'guangzhou': 2, 'default': 3 } ""` return shard_map.get(location, shard_map['default']) # 示例 shard = geo_sharding('beijing', shard_map) print(f"Shard: {shard}") # 输出: Shard: 0

读写分离

架构设计

应用层 ↓ 主库(Master):处理写操作 ↓ (复制) 从库(Slave 1):处理读操作 从库(Slave 2):处理读操作 从库(Slave 3):处理读操作

MySQL 主从复制

配置主库

# /etc/my.cnf (Master) [mysqld] server-id = 1 log-bin = mysql-bin binlog-format = ROW expire_logs_days = 7 max_binlog_size = 100M # 创建复制用户 CREATE USER 'repl'@'%' IDENTIFIED BY 'password'; GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%'; FLUSH PRIVILEGES; # 查看主库状态 SHOW MASTER STATUS;

配置从库

# /etc/my.cnf (Slave) [mysqld] server-id = 2 relay-log = mysql-relay-bin read-only = 1 # 配置复制 CHANGE MASTER TO MASTER_HOST='master_ip', MASTER_USER='repl', MASTER_PASSWORD='password', MASTER_LOG_FILE='mysql-bin.000001', MASTER_LOG_POS=154; # 启动复制 START SLAVE; # 查看从库状态 SHOW SLAVE STATUS\G

读写分离中间件

ShardingSphere

配置示例

# config-sharding.yaml mode: type: Standalone dataSources: master: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: jdbc:mysql://master:3306/mydb username: root password: password slave0: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.cj.jdbc.Driver jdbcUrl: jdbc:mysql://slave0:3306/mydb username: root password: password rules: - !READWRITE_SPLITTING_DATA_SOURCE name: rw_ds readDataSourceNames: - slave0 writeDataSourceName: master

ProxySQL

配置示例

-- 配置后端服务器 INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES (10, 'master', 3306), (20, 'slave0', 3306), (20, 'slave1', 3306); -- 配置读写分离规则 INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply) VALUES (1, 1, '^SELECT.*', 20, 1), (2, 1, '^INSERT.*|^UPDATE.*|^DELETE.*', 10, 1); -- 加载配置到运行时 LOAD MYSQL SERVERS TO RUNTIME; LOAD MYSQL QUERY RULES TO RUNTIME;

分布式事务

1. 两阶段提交(2PC)

缺点

  • 阻塞协议
  • 单点故障
  • 数据锁定时间长

实现

// 使用 Seata 实现 2PC @GlobalTransactional public void transferMoney(String fromAccount, String toAccount, double amount) { // 阶段 1:准备阶段 accountDao.debit(fromAccount, amount); accountDao.credit(toAccount, amount); // 阶段 2:提交阶段(Seata 自动处理) }

2. Saga 模式

原理:长事务拆分为多个本地事务,通过补偿处理失败。

// 使用 Seata Saga @SagaTask(name = "debit", sagaCode = "transferSaga") public boolean debit(TransferDTO transfer) { accountDao.debit(transfer.getFromAccount(), transfer.getAmount()); return true; } @SagaTask(name = "credit", sagaCode = "transferSaga", concurrentNum = 5) public boolean credit(TransferDTO transfer) { accountDao.credit(transfer.getToAccount(), transfer.getAmount()); return true; } @SagaCompensateTask(name = "debitCompensate", sagaCode = "transferSaga") public boolean debitCompensate(TransferDTO transfer) { accountDao.credit(transfer.getFromAccount(), transfer.getAmount()); return true; }

3. TCC(Try-Confirm-Cancel)

// Try 阶段:预留资源 public boolean tryDebit(String accountId, double amount) { Account account = accountDao.findById(accountId); account.setFrozenAmount(account.getFrozenAmount() + amount); accountDao.update(account); return true; } // Confirm 阶段:确认扣款 public boolean confirmDebit(String accountId, double amount) { Account account = accountDao.findById(accountId); account.setBalance(account.getBalance() - amount); account.setFrozenAmount(account.getFrozenAmount() - amount); accountDao.update(account); return true; } // Cancel 阶段:取消扣款 public boolean cancelDebit(String accountId, double amount) { Account account = accountDao.findById(accountId); account.setFrozenAmount(account.getFrozenAmount() - amount); accountDao.update(account); return true; }

数据一致性

1. 最终一致性

实现

// 使用 MQ 实现最终一致性 @Service public class OrderService { @Transactional public void createOrder(Order order) { // 1. 创建订单 orderDao.save(order); // 2. 发送消息到 MQ Message message = MessageBuilder .withPayload(order) .setHeader("orderId", order.getId()) .build(); mq.send("order-created", message); } } @Service public class InventoryService { @JmsListener(destination = "order-created") public void handleOrderCreated(Message<Order> message) { Order order = message.getPayload(); // 扣减库存 inventoryDao.deduct(order.getProductId(), order.getQuantity()); } }

2. 分布式锁

Redis 实现

public class DistributedLock { private final RedisTemplate<String, String> redisTemplate; public boolean tryLock(String key, String value, long expireTime) { Boolean result = redisTemplate.opsForValue() .setIfAbsent(key, value, expireTime, TimeUnit.SECONDS); return Boolean.TRUE.equals(result); } public void unlock(String key, String value) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), value); } }

跨分片查询

1. 分布式查询

// 查询所有分片的数据 public List<Order> findOrdersByUserId(Long userId) { List<Order> allOrders = new ArrayList<>(); // 查询所有分片 for (int i = 0; i < shardCount; i++) { List<Order> orders = orderDao.findByShardAndUserId(i, userId); allOrders.addAll(orders); } return allOrders; }

2. 聚合查询

// 聚合所有分片的统计信息 public OrderStatistics getStatistics() { OrderStatistics total = new OrderStatistics(); for (int i = 0; i < shardCount; i++) { OrderStatistics shardStats = orderDao.getStatisticsByShard(i); total.add(shardStats); } return total; }

3. 分页查询

// 跨分片分页 public Page<Order> findByUserId(Long userId, Pageable pageable) { // 1. 查询每个分片的数据 List<Order> allOrders = new ArrayList<>(); for (int i = 0; i < shardCount; i++) { List<Order> shardOrders = orderDao.findByShardAndUserId(i, userId); allOrders.addAll(shardOrders); } // 2. 内存中排序和分页 allOrders.sort(Comparator.comparing(Order::getCreatedAt).reversed()); int start = pageable.getPageNumber() * pageable.getPageSize(); int end = Math.min(start + pageable.getPageSize(), allOrders.size()); List<Order> pageData = allOrders.subList(start, end); return new PageImpl<>(pageData, pageable, allOrders.size()); }

数据迁移

1. 在线迁移

# 双写方案 # 1. 同时写入老库和新库 # 2. 定期从老库同步数据到新库 # 3. 验证数据一致性 # 4. 逐步切读流量到新库 # 5. 完全切换到新库

2. 停机迁移

# 停机迁移方案 # 1. 停止应用 # 2. 导出老库数据 # 3. 数据转换 # 4. 导入到新库 # 5. 验证数据 # 6. 切换应用配置 # 7. 启动应用

监控与运维

1. 分片监控

@Component public class ShardMonitor { @Scheduled(cron = "0 */5 * * * *") # 每 5 分钟执行 public void monitorShards() { for (int i = 0; i < shardCount; i++) { ShardStatus status = checkShardStatus(i); if (!status.isHealthy()) { alertService.sendAlert("Shard " + i + " is unhealthy: " + status.getMessage()); } } } private ShardStatus checkShardStatus(int shardId) { // 检查连接数 // 检查查询延迟 // 检查错误率 // 检查数据量 return new ShardStatus(); } }

2. 扩容缩容

# 扩容步骤 # 1. 新增分片 # 2. 配置数据迁移规则 # 3. 执行数据迁移 # 4. 验证数据一致性 # 5. 更新路由配置 # 6. 下线老分片

总结

分布式数据库设计需要综合考虑:

  1. 分片策略:选择合适的分片算法
  2. 读写分离:提升查询性能
  3. 数据一致性:选择合适的一致性方案
  4. 分布式事务:根据场景选择 2PC/Saga/TCC
  5. 监控运维:建立完善的监控体系

记住:没有银弹,根据业务场景选择合适的方案!


发布者: 作者: 转发
评论区 (0)
U