Hotfix: Resolves connection pool bugs for Redis
- The previous implementation of the shared Redis connection pool had a critical issue where any Redis storage instance would disconnect the global shared pool upon closing. This caused `ConnectionError` exceptions for other instances still using the pool. - This commit resolves the issue by introducing a reference counting mechanism in `RedisConnectionManager`.
This commit is contained in:
parent
f185b3fb38
commit
6730a89d7c
1 changed files with 137 additions and 34 deletions
|
|
@ -27,7 +27,7 @@ config = configparser.ConfigParser()
|
|||
config.read("config.ini", "utf-8")
|
||||
|
||||
# Constants for Redis connection pool
|
||||
MAX_CONNECTIONS = 50
|
||||
MAX_CONNECTIONS = 100
|
||||
SOCKET_TIMEOUT = 5.0
|
||||
SOCKET_CONNECT_TIMEOUT = 3.0
|
||||
|
||||
|
|
@ -36,24 +36,55 @@ class RedisConnectionManager:
|
|||
"""Shared Redis connection pool manager to avoid creating multiple pools for the same Redis URI"""
|
||||
|
||||
_pools = {}
|
||||
_pool_refs = {} # Track reference count for each pool
|
||||
_lock = threading.Lock()
|
||||
|
||||
@classmethod
|
||||
def get_pool(cls, redis_url: str) -> ConnectionPool:
|
||||
"""Get or create a connection pool for the given Redis URL"""
|
||||
if redis_url not in cls._pools:
|
||||
with cls._lock:
|
||||
if redis_url not in cls._pools:
|
||||
cls._pools[redis_url] = ConnectionPool.from_url(
|
||||
redis_url,
|
||||
max_connections=MAX_CONNECTIONS,
|
||||
decode_responses=True,
|
||||
socket_timeout=SOCKET_TIMEOUT,
|
||||
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
|
||||
)
|
||||
logger.info(f"Created shared Redis connection pool for {redis_url}")
|
||||
with cls._lock:
|
||||
if redis_url not in cls._pools:
|
||||
cls._pools[redis_url] = ConnectionPool.from_url(
|
||||
redis_url,
|
||||
max_connections=MAX_CONNECTIONS,
|
||||
decode_responses=True,
|
||||
socket_timeout=SOCKET_TIMEOUT,
|
||||
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
|
||||
)
|
||||
cls._pool_refs[redis_url] = 0
|
||||
logger.info(f"Created shared Redis connection pool for {redis_url}")
|
||||
|
||||
# Increment reference count
|
||||
cls._pool_refs[redis_url] += 1
|
||||
logger.debug(
|
||||
f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}"
|
||||
)
|
||||
|
||||
return cls._pools[redis_url]
|
||||
|
||||
@classmethod
|
||||
def release_pool(cls, redis_url: str):
|
||||
"""Release a reference to the connection pool"""
|
||||
with cls._lock:
|
||||
if redis_url in cls._pool_refs:
|
||||
cls._pool_refs[redis_url] -= 1
|
||||
logger.debug(
|
||||
f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}"
|
||||
)
|
||||
|
||||
# If no more references, close the pool
|
||||
if cls._pool_refs[redis_url] <= 0:
|
||||
try:
|
||||
cls._pools[redis_url].disconnect()
|
||||
logger.info(
|
||||
f"Closed Redis connection pool for {redis_url} (no more references)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing Redis pool for {redis_url}: {e}")
|
||||
finally:
|
||||
del cls._pools[redis_url]
|
||||
del cls._pool_refs[redis_url]
|
||||
|
||||
@classmethod
|
||||
def close_all_pools(cls):
|
||||
"""Close all connection pools (for cleanup)"""
|
||||
|
|
@ -65,6 +96,7 @@ class RedisConnectionManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Error closing Redis pool for {url}: {e}")
|
||||
cls._pools.clear()
|
||||
cls._pool_refs.clear()
|
||||
|
||||
|
||||
@final
|
||||
|
|
@ -94,35 +126,60 @@ class RedisKVStorage(BaseKVStorage):
|
|||
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
|
||||
# When workspace is empty, keep the original namespace unchanged
|
||||
|
||||
redis_url = os.environ.get(
|
||||
self._redis_url = os.environ.get(
|
||||
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
|
||||
)
|
||||
# Use shared connection pool
|
||||
self._pool = RedisConnectionManager.get_pool(redis_url)
|
||||
self._redis = Redis(connection_pool=self._pool)
|
||||
logger.info(
|
||||
f"Initialized Redis KV storage for {self.namespace} using shared connection pool"
|
||||
)
|
||||
self._pool = None
|
||||
self._redis = None
|
||||
self._initialized = False
|
||||
|
||||
try:
|
||||
# Use shared connection pool
|
||||
self._pool = RedisConnectionManager.get_pool(self._redis_url)
|
||||
self._redis = Redis(connection_pool=self._pool)
|
||||
logger.info(
|
||||
f"Initialized Redis KV storage for {self.namespace} using shared connection pool"
|
||||
)
|
||||
except Exception as e:
|
||||
# Clean up on initialization failure
|
||||
if self._redis_url:
|
||||
RedisConnectionManager.release_pool(self._redis_url)
|
||||
logger.error(f"Failed to initialize Redis KV storage: {e}")
|
||||
raise
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize Redis connection and migrate legacy cache structure if needed"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Test connection
|
||||
try:
|
||||
async with self._get_redis_connection() as redis:
|
||||
await redis.ping()
|
||||
logger.info(f"Connected to Redis for namespace {self.namespace}")
|
||||
self._initialized = True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
# Clean up on connection failure
|
||||
await self.close()
|
||||
raise
|
||||
|
||||
# Migrate legacy cache structure if this is a cache namespace
|
||||
if self.namespace.endswith("_cache"):
|
||||
await self._migrate_legacy_cache_structure()
|
||||
try:
|
||||
await self._migrate_legacy_cache_structure()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to migrate legacy cache structure: {e}")
|
||||
# Don't fail initialization for migration errors, just log them
|
||||
|
||||
@asynccontextmanager
|
||||
async def _get_redis_connection(self):
|
||||
"""Safe context manager for Redis operations."""
|
||||
if not self._redis:
|
||||
raise ConnectionError("Redis connection not initialized")
|
||||
|
||||
try:
|
||||
# Use the existing Redis instance with shared pool
|
||||
yield self._redis
|
||||
except ConnectionError as e:
|
||||
logger.error(f"Redis connection error in {self.namespace}: {e}")
|
||||
|
|
@ -137,11 +194,23 @@ class RedisKVStorage(BaseKVStorage):
|
|||
raise
|
||||
|
||||
async def close(self):
|
||||
"""Close the Redis connection pool to prevent resource leaks."""
|
||||
"""Close the Redis connection and release pool reference to prevent resource leaks."""
|
||||
if hasattr(self, "_redis") and self._redis:
|
||||
await self._redis.close()
|
||||
await self._pool.disconnect()
|
||||
logger.debug(f"Closed Redis connection pool for {self.namespace}")
|
||||
try:
|
||||
await self._redis.close()
|
||||
logger.debug(f"Closed Redis connection for {self.namespace}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing Redis connection: {e}")
|
||||
finally:
|
||||
self._redis = None
|
||||
|
||||
# Release the pool reference (will auto-close pool if no more references)
|
||||
if hasattr(self, "_redis_url") and self._redis_url:
|
||||
RedisConnectionManager.release_pool(self._redis_url)
|
||||
self._pool = None
|
||||
logger.debug(
|
||||
f"Released Redis connection pool reference for {self.namespace}"
|
||||
)
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Support for async context manager."""
|
||||
|
|
@ -507,32 +576,53 @@ class RedisDocStatusStorage(DocStatusStorage):
|
|||
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
|
||||
# When workspace is empty, keep the original namespace unchanged
|
||||
|
||||
redis_url = os.environ.get(
|
||||
self._redis_url = os.environ.get(
|
||||
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
|
||||
)
|
||||
# Use shared connection pool
|
||||
self._pool = RedisConnectionManager.get_pool(redis_url)
|
||||
self._redis = Redis(connection_pool=self._pool)
|
||||
logger.info(
|
||||
f"Initialized Redis doc status storage for {self.namespace} using shared connection pool"
|
||||
)
|
||||
self._pool = None
|
||||
self._redis = None
|
||||
self._initialized = False
|
||||
|
||||
try:
|
||||
# Use shared connection pool
|
||||
self._pool = RedisConnectionManager.get_pool(self._redis_url)
|
||||
self._redis = Redis(connection_pool=self._pool)
|
||||
logger.info(
|
||||
f"Initialized Redis doc status storage for {self.namespace} using shared connection pool"
|
||||
)
|
||||
except Exception as e:
|
||||
# Clean up on initialization failure
|
||||
if self._redis_url:
|
||||
RedisConnectionManager.release_pool(self._redis_url)
|
||||
logger.error(f"Failed to initialize Redis doc status storage: {e}")
|
||||
raise
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize Redis connection"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
try:
|
||||
async with self._get_redis_connection() as redis:
|
||||
await redis.ping()
|
||||
logger.info(
|
||||
f"Connected to Redis for doc status namespace {self.namespace}"
|
||||
)
|
||||
self._initialized = True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Redis for doc status: {e}")
|
||||
# Clean up on connection failure
|
||||
await self.close()
|
||||
raise
|
||||
|
||||
@asynccontextmanager
|
||||
async def _get_redis_connection(self):
|
||||
"""Safe context manager for Redis operations."""
|
||||
if not self._redis:
|
||||
raise ConnectionError("Redis connection not initialized")
|
||||
|
||||
try:
|
||||
# Use the existing Redis instance with shared pool
|
||||
yield self._redis
|
||||
except ConnectionError as e:
|
||||
logger.error(f"Redis connection error in doc status {self.namespace}: {e}")
|
||||
|
|
@ -547,10 +637,23 @@ class RedisDocStatusStorage(DocStatusStorage):
|
|||
raise
|
||||
|
||||
async def close(self):
|
||||
"""Close the Redis connection."""
|
||||
"""Close the Redis connection and release pool reference to prevent resource leaks."""
|
||||
if hasattr(self, "_redis") and self._redis:
|
||||
await self._redis.close()
|
||||
logger.debug(f"Closed Redis connection for doc status {self.namespace}")
|
||||
try:
|
||||
await self._redis.close()
|
||||
logger.debug(f"Closed Redis connection for doc status {self.namespace}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing Redis connection: {e}")
|
||||
finally:
|
||||
self._redis = None
|
||||
|
||||
# Release the pool reference (will auto-close pool if no more references)
|
||||
if hasattr(self, "_redis_url") and self._redis_url:
|
||||
RedisConnectionManager.release_pool(self._redis_url)
|
||||
self._pool = None
|
||||
logger.debug(
|
||||
f"Released Redis connection pool reference for doc status {self.namespace}"
|
||||
)
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Support for async context manager."""
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue