Merge pull request #1775 from danielaskdd/fix-redisk-conn-pool

Hotfix: Resolves connection pool bugs for Redis
This commit is contained in:
Daniel.y 2025-07-13 22:57:18 +08:00 committed by GitHub
commit 77b8fb9b77
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -27,7 +27,7 @@ config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
# Constants for Redis connection pool
MAX_CONNECTIONS = 50
MAX_CONNECTIONS = 100
SOCKET_TIMEOUT = 5.0
SOCKET_CONNECT_TIMEOUT = 3.0
@ -36,24 +36,55 @@ class RedisConnectionManager:
"""Shared Redis connection pool manager to avoid creating multiple pools for the same Redis URI"""
_pools = {}
_pool_refs = {} # Track reference count for each pool
_lock = threading.Lock()
@classmethod
def get_pool(cls, redis_url: str) -> ConnectionPool:
"""Get or create a connection pool for the given Redis URL"""
if redis_url not in cls._pools:
with cls._lock:
if redis_url not in cls._pools:
cls._pools[redis_url] = ConnectionPool.from_url(
redis_url,
max_connections=MAX_CONNECTIONS,
decode_responses=True,
socket_timeout=SOCKET_TIMEOUT,
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
)
logger.info(f"Created shared Redis connection pool for {redis_url}")
with cls._lock:
if redis_url not in cls._pools:
cls._pools[redis_url] = ConnectionPool.from_url(
redis_url,
max_connections=MAX_CONNECTIONS,
decode_responses=True,
socket_timeout=SOCKET_TIMEOUT,
socket_connect_timeout=SOCKET_CONNECT_TIMEOUT,
)
cls._pool_refs[redis_url] = 0
logger.info(f"Created shared Redis connection pool for {redis_url}")
# Increment reference count
cls._pool_refs[redis_url] += 1
logger.debug(
f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}"
)
return cls._pools[redis_url]
@classmethod
def release_pool(cls, redis_url: str):
"""Release a reference to the connection pool"""
with cls._lock:
if redis_url in cls._pool_refs:
cls._pool_refs[redis_url] -= 1
logger.debug(
f"Redis pool {redis_url} reference count: {cls._pool_refs[redis_url]}"
)
# If no more references, close the pool
if cls._pool_refs[redis_url] <= 0:
try:
cls._pools[redis_url].disconnect()
logger.info(
f"Closed Redis connection pool for {redis_url} (no more references)"
)
except Exception as e:
logger.error(f"Error closing Redis pool for {redis_url}: {e}")
finally:
del cls._pools[redis_url]
del cls._pool_refs[redis_url]
@classmethod
def close_all_pools(cls):
"""Close all connection pools (for cleanup)"""
@ -65,6 +96,7 @@ class RedisConnectionManager:
except Exception as e:
logger.error(f"Error closing Redis pool for {url}: {e}")
cls._pools.clear()
cls._pool_refs.clear()
@final
@ -94,35 +126,60 @@ class RedisKVStorage(BaseKVStorage):
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
redis_url = os.environ.get(
self._redis_url = os.environ.get(
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
)
# Use shared connection pool
self._pool = RedisConnectionManager.get_pool(redis_url)
self._redis = Redis(connection_pool=self._pool)
logger.info(
f"Initialized Redis KV storage for {self.namespace} using shared connection pool"
)
self._pool = None
self._redis = None
self._initialized = False
try:
# Use shared connection pool
self._pool = RedisConnectionManager.get_pool(self._redis_url)
self._redis = Redis(connection_pool=self._pool)
logger.info(
f"Initialized Redis KV storage for {self.namespace} using shared connection pool"
)
except Exception as e:
# Clean up on initialization failure
if self._redis_url:
RedisConnectionManager.release_pool(self._redis_url)
logger.error(f"Failed to initialize Redis KV storage: {e}")
raise
async def initialize(self):
"""Initialize Redis connection and migrate legacy cache structure if needed"""
if self._initialized:
return
# Test connection
try:
async with self._get_redis_connection() as redis:
await redis.ping()
logger.info(f"Connected to Redis for namespace {self.namespace}")
self._initialized = True
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
# Clean up on connection failure
await self.close()
raise
# Migrate legacy cache structure if this is a cache namespace
if self.namespace.endswith("_cache"):
await self._migrate_legacy_cache_structure()
try:
await self._migrate_legacy_cache_structure()
except Exception as e:
logger.error(f"Failed to migrate legacy cache structure: {e}")
# Don't fail initialization for migration errors, just log them
@asynccontextmanager
async def _get_redis_connection(self):
"""Safe context manager for Redis operations."""
if not self._redis:
raise ConnectionError("Redis connection not initialized")
try:
# Use the existing Redis instance with shared pool
yield self._redis
except ConnectionError as e:
logger.error(f"Redis connection error in {self.namespace}: {e}")
@ -137,11 +194,23 @@ class RedisKVStorage(BaseKVStorage):
raise
async def close(self):
"""Close the Redis connection pool to prevent resource leaks."""
"""Close the Redis connection and release pool reference to prevent resource leaks."""
if hasattr(self, "_redis") and self._redis:
await self._redis.close()
await self._pool.disconnect()
logger.debug(f"Closed Redis connection pool for {self.namespace}")
try:
await self._redis.close()
logger.debug(f"Closed Redis connection for {self.namespace}")
except Exception as e:
logger.error(f"Error closing Redis connection: {e}")
finally:
self._redis = None
# Release the pool reference (will auto-close pool if no more references)
if hasattr(self, "_redis_url") and self._redis_url:
RedisConnectionManager.release_pool(self._redis_url)
self._pool = None
logger.debug(
f"Released Redis connection pool reference for {self.namespace}"
)
async def __aenter__(self):
"""Support for async context manager."""
@ -507,32 +576,53 @@ class RedisDocStatusStorage(DocStatusStorage):
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
redis_url = os.environ.get(
self._redis_url = os.environ.get(
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
)
# Use shared connection pool
self._pool = RedisConnectionManager.get_pool(redis_url)
self._redis = Redis(connection_pool=self._pool)
logger.info(
f"Initialized Redis doc status storage for {self.namespace} using shared connection pool"
)
self._pool = None
self._redis = None
self._initialized = False
try:
# Use shared connection pool
self._pool = RedisConnectionManager.get_pool(self._redis_url)
self._redis = Redis(connection_pool=self._pool)
logger.info(
f"Initialized Redis doc status storage for {self.namespace} using shared connection pool"
)
except Exception as e:
# Clean up on initialization failure
if self._redis_url:
RedisConnectionManager.release_pool(self._redis_url)
logger.error(f"Failed to initialize Redis doc status storage: {e}")
raise
async def initialize(self):
"""Initialize Redis connection"""
if self._initialized:
return
try:
async with self._get_redis_connection() as redis:
await redis.ping()
logger.info(
f"Connected to Redis for doc status namespace {self.namespace}"
)
self._initialized = True
except Exception as e:
logger.error(f"Failed to connect to Redis for doc status: {e}")
# Clean up on connection failure
await self.close()
raise
@asynccontextmanager
async def _get_redis_connection(self):
"""Safe context manager for Redis operations."""
if not self._redis:
raise ConnectionError("Redis connection not initialized")
try:
# Use the existing Redis instance with shared pool
yield self._redis
except ConnectionError as e:
logger.error(f"Redis connection error in doc status {self.namespace}: {e}")
@ -547,10 +637,23 @@ class RedisDocStatusStorage(DocStatusStorage):
raise
async def close(self):
"""Close the Redis connection."""
"""Close the Redis connection and release pool reference to prevent resource leaks."""
if hasattr(self, "_redis") and self._redis:
await self._redis.close()
logger.debug(f"Closed Redis connection for doc status {self.namespace}")
try:
await self._redis.close()
logger.debug(f"Closed Redis connection for doc status {self.namespace}")
except Exception as e:
logger.error(f"Error closing Redis connection: {e}")
finally:
self._redis = None
# Release the pool reference (will auto-close pool if no more references)
if hasattr(self, "_redis_url") and self._redis_url:
RedisConnectionManager.release_pool(self._redis_url)
self._pool = None
logger.debug(
f"Released Redis connection pool reference for doc status {self.namespace}"
)
async def __aenter__(self):
"""Support for async context manager."""