""" Connection pooling utilities for database and Redis connections. This module provides optimized connection pooling for both PostgreSQL and Redis to improve performance under concurrent load. """ import os import logging from typing import Optional, Dict, Any from contextlib import contextmanager import redis from redis.connection import ConnectionPool from sqlalchemy import create_engine, event from sqlalchemy.engine import Engine from sqlalchemy.pool import QueuePool, StaticPool logger = logging.getLogger(__name__) class DatabaseConnectionPool: """Manages optimized database connection pooling.""" def __init__(self, database_url: str, **kwargs): """ Initialize database connection pool. Args: database_url: Database connection URL **kwargs: Additional engine options """ self.database_url = database_url # Default pool configuration optimized for chat workload default_config = { 'pool_size': int(os.getenv('DB_POOL_SIZE', '10')), 'max_overflow': int(os.getenv('DB_MAX_OVERFLOW', '20')), 'pool_recycle': int(os.getenv('DB_POOL_RECYCLE', '3600')), # 1 hour 'pool_pre_ping': True, # Validate connections before use 'pool_timeout': int(os.getenv('DB_POOL_TIMEOUT', '30')), 'echo': os.getenv('SQLALCHEMY_ECHO', 'False').lower() == 'true' } # Override with provided kwargs default_config.update(kwargs) # Create engine with optimized settings self.engine = create_engine( database_url, poolclass=QueuePool, **default_config ) # Add connection event listeners for monitoring self._setup_connection_events() logger.info(f"Database connection pool initialized", extra={ 'pool_size': default_config['pool_size'], 'max_overflow': default_config['max_overflow'], 'pool_recycle': default_config['pool_recycle'] }) def _setup_connection_events(self): """Setup SQLAlchemy event listeners for connection monitoring.""" @event.listens_for(self.engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): """Set SQLite pragmas for better performance (if using SQLite).""" if 'sqlite' in self.database_url.lower(): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.execute("PRAGMA journal_mode=WAL") cursor.execute("PRAGMA synchronous=NORMAL") cursor.execute("PRAGMA cache_size=10000") cursor.execute("PRAGMA temp_store=MEMORY") cursor.close() @event.listens_for(self.engine, "checkout") def receive_checkout(dbapi_connection, connection_record, connection_proxy): """Log connection checkout for monitoring.""" logger.debug("Database connection checked out from pool") @event.listens_for(self.engine, "checkin") def receive_checkin(dbapi_connection, connection_record): """Log connection checkin for monitoring.""" logger.debug("Database connection returned to pool") def get_pool_status(self) -> Dict[str, Any]: """ Get current pool status for monitoring. Returns: Dictionary with pool statistics """ pool = self.engine.pool status = { 'pool_size': pool.size(), 'checked_in': pool.checkedin(), 'checked_out': pool.checkedout(), 'overflow': pool.overflow() } # Add invalid count if available (not all pool types have this) if hasattr(pool, 'invalid'): status['invalid'] = pool.invalid() else: status['invalid'] = 0 return status @contextmanager def get_connection(self): """ Context manager for getting database connections. Yields: Database connection from the pool """ connection = self.engine.connect() try: yield connection finally: connection.close() class RedisConnectionPool: """Manages optimized Redis connection pooling.""" def __init__(self, redis_url: str, **kwargs): """ Initialize Redis connection pool. Args: redis_url: Redis connection URL **kwargs: Additional pool options """ self.redis_url = redis_url # Default pool configuration optimized for chat workload default_config = { 'max_connections': int(os.getenv('REDIS_MAX_CONNECTIONS', '20')), 'retry_on_timeout': True, 'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', '5')), 'socket_connect_timeout': int(os.getenv('REDIS_CONNECT_TIMEOUT', '5')), 'socket_keepalive': True, 'socket_keepalive_options': {}, 'health_check_interval': int(os.getenv('REDIS_HEALTH_CHECK_INTERVAL', '30')) } # Override with provided kwargs default_config.update(kwargs) # Create connection pool self.connection_pool = ConnectionPool.from_url( redis_url, **default_config ) # Create Redis client with the pool self.redis_client = redis.Redis(connection_pool=self.connection_pool) # Test connection try: self.redis_client.ping() logger.info(f"Redis connection pool initialized", extra={ 'max_connections': default_config['max_connections'], 'socket_timeout': default_config['socket_timeout'] }) except redis.RedisError as e: logger.error(f"Failed to initialize Redis connection pool: {e}") raise def get_client(self) -> redis.Redis: """ Get Redis client with connection pooling. Returns: Redis client instance """ return self.redis_client def get_pool_status(self) -> Dict[str, Any]: """ Get current pool status for monitoring. Returns: Dictionary with pool statistics """ pool = self.connection_pool status = { 'max_connections': getattr(pool, 'max_connections', 0), } # Add connection counts if available (attributes may vary by Redis version) if hasattr(pool, '_created_connections'): status['created_connections'] = pool._created_connections elif hasattr(pool, 'created_connections'): status['created_connections'] = pool.created_connections else: status['created_connections'] = 0 if hasattr(pool, '_available_connections'): status['available_connections'] = len(pool._available_connections) else: status['available_connections'] = 0 if hasattr(pool, '_in_use_connections'): status['in_use_connections'] = len(pool._in_use_connections) else: status['in_use_connections'] = 0 return status def health_check(self) -> bool: """ Perform health check on Redis connection. Returns: True if Redis is healthy, False otherwise """ try: self.redis_client.ping() return True except redis.RedisError as e: logger.warning(f"Redis health check failed: {e}") return False def close(self): """Close all connections in the pool.""" try: self.connection_pool.disconnect() logger.info("Redis connection pool closed") except Exception as e: logger.error(f"Error closing Redis connection pool: {e}") class ConnectionPoolManager: """Manages both database and Redis connection pools.""" def __init__(self, database_url: str, redis_url: Optional[str] = None): """ Initialize connection pool manager. Args: database_url: Database connection URL redis_url: Redis connection URL (optional) """ self.database_pool = DatabaseConnectionPool(database_url) self.redis_pool = None if redis_url and redis_url != 'None': try: self.redis_pool = RedisConnectionPool(redis_url) except Exception as e: logger.warning(f"Failed to initialize Redis pool: {e}") logger.info("Connection pool manager initialized") def get_database_engine(self) -> Engine: """Get database engine with connection pooling.""" return self.database_pool.engine def get_redis_client(self) -> Optional[redis.Redis]: """Get Redis client with connection pooling.""" return self.redis_pool.get_client() if self.redis_pool else None def get_status(self) -> Dict[str, Any]: """ Get status of all connection pools. Returns: Dictionary with pool status information """ status = { 'database': self.database_pool.get_pool_status(), 'redis': None } if self.redis_pool: status['redis'] = self.redis_pool.get_pool_status() status['redis']['healthy'] = self.redis_pool.health_check() return status def close_all(self): """Close all connection pools.""" try: self.database_pool.engine.dispose() logger.info("Database connection pool closed") except Exception as e: logger.error(f"Error closing database pool: {e}") if self.redis_pool: self.redis_pool.close() # Global connection pool manager instance _connection_pool_manager: Optional[ConnectionPoolManager] = None def initialize_connection_pools(database_url: str, redis_url: Optional[str] = None) -> ConnectionPoolManager: """ Initialize global connection pool manager. Args: database_url: Database connection URL redis_url: Redis connection URL (optional) Returns: ConnectionPoolManager instance """ global _connection_pool_manager if _connection_pool_manager is None: _connection_pool_manager = ConnectionPoolManager(database_url, redis_url) return _connection_pool_manager def get_connection_pool_manager() -> Optional[ConnectionPoolManager]: """ Get the global connection pool manager. Returns: ConnectionPoolManager instance or None if not initialized """ return _connection_pool_manager def cleanup_connection_pools(): """Cleanup all connection pools.""" global _connection_pool_manager if _connection_pool_manager: _connection_pool_manager.close_all() _connection_pool_manager = None