diff --git a/env.example b/env.example index 3f9cb6ee..6ec37a0f 100644 --- a/env.example +++ b/env.example @@ -216,6 +216,10 @@ QDRANT_URL=http://localhost:6333 ### Redis REDIS_URI=redis://localhost:6379 +REDIS_SOCKET_TIMEOUT=30 +REDIS_CONNECT_TIMEOUT=10 +REDIS_MAX_CONNECTIONS=100 +REDIS_RETRY_ATTEMPTS=3 # REDIS_WORKSPACE=forced_workspace_name ### Memgraph Configuration diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 0ac0cb44..31752d69 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -11,7 +11,7 @@ if not pm.is_installed("redis"): # aioredis is a depricated library, replaced with redis from redis.asyncio import Redis, ConnectionPool # type: ignore -from redis.exceptions import RedisError, ConnectionError # type: ignore +from redis.exceptions import RedisError, ConnectionError, TimeoutError # type: ignore from lightrag.utils import logger from lightrag.base import ( @@ -22,14 +22,35 @@ from lightrag.base import ( ) import json +# Import tenacity for retry logic +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, + before_sleep_log, +) config = configparser.ConfigParser() config.read("config.ini", "utf-8") -# Constants for Redis connection pool -MAX_CONNECTIONS = 100 -SOCKET_TIMEOUT = 5.0 -SOCKET_CONNECT_TIMEOUT = 3.0 +# Constants for Redis connection pool with environment variable support +MAX_CONNECTIONS = int(os.getenv("REDIS_MAX_CONNECTIONS", "200")) +SOCKET_TIMEOUT = float(os.getenv("REDIS_SOCKET_TIMEOUT", "30.0")) +SOCKET_CONNECT_TIMEOUT = float(os.getenv("REDIS_CONNECT_TIMEOUT", "10.0")) +RETRY_ATTEMPTS = int(os.getenv("REDIS_RETRY_ATTEMPTS", "3")) + +# Tenacity retry decorator for Redis operations +redis_retry = retry( + stop=stop_after_attempt(RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=1, min=1, max=8), + retry=( + retry_if_exception_type(ConnectionError) + | retry_if_exception_type(TimeoutError) + | retry_if_exception_type(RedisError) + ), + before_sleep=before_sleep_log(logger, "WARNING"), +) class RedisConnectionManager: @@ -220,6 +241,7 @@ class RedisKVStorage(BaseKVStorage): """Ensure Redis resources are cleaned up when exiting context.""" await self.close() + @redis_retry async def get_by_id(self, id: str) -> dict[str, Any] | None: async with self._get_redis_connection() as redis: try: @@ -235,6 +257,7 @@ class RedisKVStorage(BaseKVStorage): logger.error(f"JSON decode error for id {id}: {e}") return None + @redis_retry async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: async with self._get_redis_connection() as redis: try: @@ -311,6 +334,7 @@ class RedisKVStorage(BaseKVStorage): existing_ids = {keys_list[i] for i, exists in enumerate(results) if exists} return set(keys) - existing_ids + @redis_retry async def upsert(self, data: dict[str, dict[str, Any]]) -> None: if not data: return @@ -790,6 +814,7 @@ class RedisDocStatusStorage(DocStatusStorage): """Redis handles persistence automatically""" pass + @redis_retry async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """Insert or update document status data""" if not data: @@ -811,6 +836,7 @@ class RedisDocStatusStorage(DocStatusStorage): logger.error(f"JSON decode error during upsert: {e}") raise + @redis_retry async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: async with self._get_redis_connection() as redis: try: