diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 06e51384..17b0d4fb 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -27,7 +27,7 @@ config = configparser.ConfigParser() config.read("config.ini", "utf-8") # Constants for Redis connection pool -MAX_CONNECTIONS = 50 +MAX_CONNECTIONS = 100 SOCKET_TIMEOUT = 5.0 SOCKET_CONNECT_TIMEOUT = 3.0 @@ -36,24 +36,55 @@ class RedisConnectionManager: """Shared Redis connection pool manager to avoid creating multiple pools for the same Redis URI""" _pools = {} + _pool_refs = {} # Track reference count for each pool _lock = threading.Lock() @classmethod def get_pool(cls, redis_url: str) -> ConnectionPool: """Get or create a connection pool for the given Redis URL""" - if redis_url not in cls._pools: - with cls._lock: - if redis_url not in cls._pools: - cls._pools[redis_url] = ConnectionPool.from_url( - redis_url, - max_connections=MAX_CONNECTIONS, - decode_responses=True, - socket_timeout=SOCKET_TIMEOUT, - socket_connect_timeout=SOCKET_CONNECT_TIMEOUT, - ) - logger.info(f"Created shared Redis connection pool for {redis_url}") + with cls._lock: + if redis_url not in cls._pools: + cls._pools[redis_url] = ConnectionPool.from_url( + redis_url, + max_connections=MAX_CONNECTIONS, + decode_responses=True, + socket_timeout=SOCKET_TIMEOUT, + socket_connect_timeout=SOCKET_CONNECT_TIMEOUT, + ) + cls._pool_refs[redis_url] = 0 + logger.info(f"Created shared Redis connection pool for {redis_url}") + + # Increment reference count + cls._pool_refs[redis_url] += 1 + logger.debug( + f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}" + ) + return cls._pools[redis_url] + @classmethod + def release_pool(cls, redis_url: str): + """Release a reference to the connection pool""" + with cls._lock: + if redis_url in cls._pool_refs: + cls._pool_refs[redis_url] -= 1 + logger.debug( + f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}" + ) + + # If no more references, close the pool + if cls._pool_refs[redis_url] <= 0: + try: + cls._pools[redis_url].disconnect() + logger.info( + f"Closed Redis connection pool for {redis_url} (no more references)" + ) + except Exception as e: + logger.error(f"Error closing Redis pool for {redis_url}: {e}") + finally: + del cls._pools[redis_url] + del cls._pool_refs[redis_url] + @classmethod def close_all_pools(cls): """Close all connection pools (for cleanup)""" @@ -65,6 +96,7 @@ class RedisConnectionManager: except Exception as e: logger.error(f"Error closing Redis pool for {url}: {e}") cls._pools.clear() + cls._pool_refs.clear() @final @@ -94,35 +126,60 @@ class RedisKVStorage(BaseKVStorage): logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'") # When workspace is empty, keep the original namespace unchanged - redis_url = os.environ.get( + self._redis_url = os.environ.get( "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379") ) - # Use shared connection pool - self._pool = RedisConnectionManager.get_pool(redis_url) - self._redis = Redis(connection_pool=self._pool) - logger.info( - f"Initialized Redis KV storage for {self.namespace} using shared connection pool" - ) + self._pool = None + self._redis = None + self._initialized = False + + try: + # Use shared connection pool + self._pool = RedisConnectionManager.get_pool(self._redis_url) + self._redis = Redis(connection_pool=self._pool) + logger.info( + f"Initialized Redis KV storage for {self.namespace} using shared connection pool" + ) + except Exception as e: + # Clean up on initialization failure + if self._redis_url: + RedisConnectionManager.release_pool(self._redis_url) + logger.error(f"Failed to initialize Redis KV storage: {e}") + raise async def initialize(self): """Initialize Redis connection and migrate legacy cache structure if needed""" + if self._initialized: + return + # Test connection try: async with self._get_redis_connection() as redis: await redis.ping() logger.info(f"Connected to Redis for namespace {self.namespace}") + self._initialized = True except Exception as e: logger.error(f"Failed to connect to Redis: {e}") + # Clean up on connection failure + await self.close() raise # Migrate legacy cache structure if this is a cache namespace if self.namespace.endswith("_cache"): - await self._migrate_legacy_cache_structure() + try: + await self._migrate_legacy_cache_structure() + except Exception as e: + logger.error(f"Failed to migrate legacy cache structure: {e}") + # Don't fail initialization for migration errors, just log them @asynccontextmanager async def _get_redis_connection(self): """Safe context manager for Redis operations.""" + if not self._redis: + raise ConnectionError("Redis connection not initialized") + try: + # Use the existing Redis instance with shared pool yield self._redis except ConnectionError as e: logger.error(f"Redis connection error in {self.namespace}: {e}") @@ -137,11 +194,23 @@ class RedisKVStorage(BaseKVStorage): raise async def close(self): - """Close the Redis connection pool to prevent resource leaks.""" + """Close the Redis connection and release pool reference to prevent resource leaks.""" if hasattr(self, "_redis") and self._redis: - await self._redis.close() - await self._pool.disconnect() - logger.debug(f"Closed Redis connection pool for {self.namespace}") + try: + await self._redis.close() + logger.debug(f"Closed Redis connection for {self.namespace}") + except Exception as e: + logger.error(f"Error closing Redis connection: {e}") + finally: + self._redis = None + + # Release the pool reference (will auto-close pool if no more references) + if hasattr(self, "_redis_url") and self._redis_url: + RedisConnectionManager.release_pool(self._redis_url) + self._pool = None + logger.debug( + f"Released Redis connection pool reference for {self.namespace}" + ) async def __aenter__(self): """Support for async context manager.""" @@ -507,32 +576,53 @@ class RedisDocStatusStorage(DocStatusStorage): logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'") # When workspace is empty, keep the original namespace unchanged - redis_url = os.environ.get( + self._redis_url = os.environ.get( "REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379") ) - # Use shared connection pool - self._pool = RedisConnectionManager.get_pool(redis_url) - self._redis = Redis(connection_pool=self._pool) - logger.info( - f"Initialized Redis doc status storage for {self.namespace} using shared connection pool" - ) + self._pool = None + self._redis = None + self._initialized = False + + try: + # Use shared connection pool + self._pool = RedisConnectionManager.get_pool(self._redis_url) + self._redis = Redis(connection_pool=self._pool) + logger.info( + f"Initialized Redis doc status storage for {self.namespace} using shared connection pool" + ) + except Exception as e: + # Clean up on initialization failure + if self._redis_url: + RedisConnectionManager.release_pool(self._redis_url) + logger.error(f"Failed to initialize Redis doc status storage: {e}") + raise async def initialize(self): """Initialize Redis connection""" + if self._initialized: + return + try: async with self._get_redis_connection() as redis: await redis.ping() logger.info( f"Connected to Redis for doc status namespace {self.namespace}" ) + self._initialized = True except Exception as e: logger.error(f"Failed to connect to Redis for doc status: {e}") + # Clean up on connection failure + await self.close() raise @asynccontextmanager async def _get_redis_connection(self): """Safe context manager for Redis operations.""" + if not self._redis: + raise ConnectionError("Redis connection not initialized") + try: + # Use the existing Redis instance with shared pool yield self._redis except ConnectionError as e: logger.error(f"Redis connection error in doc status {self.namespace}: {e}") @@ -547,10 +637,23 @@ class RedisDocStatusStorage(DocStatusStorage): raise async def close(self): - """Close the Redis connection.""" + """Close the Redis connection and release pool reference to prevent resource leaks.""" if hasattr(self, "_redis") and self._redis: - await self._redis.close() - logger.debug(f"Closed Redis connection for doc status {self.namespace}") + try: + await self._redis.close() + logger.debug(f"Closed Redis connection for doc status {self.namespace}") + except Exception as e: + logger.error(f"Error closing Redis connection: {e}") + finally: + self._redis = None + + # Release the pool reference (will auto-close pool if no more references) + if hasattr(self, "_redis_url") and self._redis_url: + RedisConnectionManager.release_pool(self._redis_url) + self._pool = None + logger.debug( + f"Released Redis connection pool reference for doc status {self.namespace}" + ) async def __aenter__(self): """Support for async context manager."""