数据库连接池原理与实现 技术原理 数据库连接池是一种创建和管理数据库连接的缓冲池技术,其核心思想是在系统初始化时预先创建一定数量的数据库连接,并将这些连接保存在连接池中。当应用程序需要访问数据库时,直接从连接池中获取空闲连接,使用完毕后再将连接归还给连接池,而不是真正关闭连接。 为什么需要连接池 资源重用:避免频繁创建和销毁连接,减少系统开销 响应速度:连接已预先创建,获取连接几乎无延迟 连接管理:统一管理连接数量,避免数据库因连接数过多而崩溃 系统监控:可以监控连接的使用情况,及时发现系统瓶颈 连接池核心参数 initialSize:初始连接数 maxActive:最大活跃连接数 maxIdle:最大空闲连接数 minIdle:最小空闲连接数 maxWait:获取连接的最大等待时间
数据库连接池是一种创建和管理数据库连接的缓冲池技术,其核心思想是在系统初始化时预先创建一定数量的数据库连接,并将这些连接保存在连接池中。当应用程序需要访问数据库时,直接从连接池中获取空闲连接,使用完毕后再将连接归还给连接池,而不是真正关闭连接。
连接池的核心实现包括以下几个关键部分:
import threading import queue import psycopg2 from contextlib import contextmanager class DatabaseConnectionPool: def __init__(self, max_connections=10, idle_connections=5): self.max_connections = max_connections self.idle_connections = idle_connections self.pool = queue.Queue(maxsize=max_connections) self.lock = threading.Lock() self.active_count = 0 # 初始化连接池 self._initialize_pool() def _initialize_pool(self): """初始化连接池,创建最小空闲连接""" for _ in range(self.idle_connections): conn = self._create_connection() if conn: self.pool.put(conn) def _create_connection(self): """创建新的数据库连接""" try: conn = psycopg2.connect( host="localhost", database="mydb", user="user", password="password" ) return conn except Exception as e: print(f"创建连接失败: {e}") return None
def get_connection(self, timeout=5): """从连接池获取连接""" try: # 尝试从池中获取空闲连接 conn = self.pool.get(timeout=timeout) with self.lock: self.active_count += 1 # 检查连接是否有效 if not self._is_connection_valid(conn): conn.close() conn = self._create_connection() return conn except queue.Empty: # 池中无可用连接 if self.active_count < self.max_connections: # 可以创建新连接 conn = self._create_connection() if conn: with self.lock: self.active_count += 1 return conn else: # 已达到最大连接数,等待或抛出异常 raise Exception("连接池已满,无法获取新连接") def _is_connection_valid(self, conn): """检查连接是否有效""" try: with conn.cursor() as cursor: cursor.execute("SELECT 1") return True except: return False
def return_connection(self, conn): """将连接归还给连接池""" if conn: try: # 重置连接状态 conn.rollback() # 回滚未提交的事务 with self.lock: self.active_count -= 1 # 将连接放回池中 self.pool.put(conn, block=False) except queue.Full: # 池已满,关闭连接 conn.close() @contextmanager def get_connection_context(self): """上下文管理器,自动管理连接的获取和归还""" conn = self.get_connection() try: yield conn finally: self.return_connection(conn)
def close_all(self): """关闭连接池中的所有连接""" while not self.pool.empty(): try: conn = self.pool.get_nowait() conn.close() except queue.Empty: break
from flask import Flask app = Flask(__name__) pool = DatabaseConnectionPool(max_connections=20, idle_connections=5) @app.route('/users') def get_users(): with pool.get_connection_context() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users LIMIT 10") users = cursor.fetchall() return {"users": users} if __name__ == '__main__': app.run()
def batch_process_data(data_list, batch_size=100): """使用连接池批量处理数据""" results = [] for i in range(0, len(data_list), batch_size): batch = data_list[i:i + batch_size] with pool.get_connection_context() as conn: cursor = conn.cursor() # 批量插入 cursor.executemany( "INSERT INTO data (name, value) VALUES (%s, %s)", [(item['name'], item['value']) for item in batch] ) conn.commit() results.append(cursor.rowcount) return results
连接池是数据库应用中不可或缺的基础设施,合理使用连接池可以显著提升应用性能和稳定性。