From fe1b02d937e7dc4a975b599316e3364e642fc23c Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 16 Oct 2025 16:12:50 +0200 Subject: [PATCH] feat: adds error handling and logging to redis cache --- .../databases/cache/redis/RedisAdapter.py | 87 +++++++++++++++---- .../databases/exceptions/__init__.py | 1 + .../databases/exceptions/exceptions.py | 16 ++++ .../modules/retrieval/utils/session_cache.py | 60 +++++++++---- 4 files changed, 130 insertions(+), 34 deletions(-) diff --git a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py index 99c8ebdd9..cc7984b32 100644 --- a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py +++ b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py @@ -3,9 +3,13 @@ import redis import redis.asyncio as aioredis from contextlib import contextmanager from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface +from cognee.infrastructure.databases.exceptions import CacheConnectionError +from cognee.shared.logging_utils import get_logger from datetime import datetime import json +logger = get_logger("RedisAdapter") + class RedisAdapter(CacheDBInterface): def __init__( @@ -17,15 +21,53 @@ class RedisAdapter(CacheDBInterface): password=None, timeout=240, blocking_timeout=300, + connection_timeout=30, ): super().__init__(host, port, lock_name) - self.sync_redis = redis.Redis(host=host, port=port, username=username, password=password) - self.async_redis = aioredis.Redis( - host=host, port=port, username=username, password=password, decode_responses=True - ) - self.timeout = timeout - self.blocking_timeout = blocking_timeout + self.host = host + self.port = port + self.connection_timeout = connection_timeout + + try: + self.sync_redis = redis.Redis( + host=host, + port=port, + username=username, + password=password, + socket_connect_timeout=connection_timeout, + socket_timeout=connection_timeout, + ) + self.async_redis = aioredis.Redis( + host=host, + port=port, + username=username, + password=password, + decode_responses=True, + socket_connect_timeout=connection_timeout, + ) + self.timeout = timeout + self.blocking_timeout = blocking_timeout + + # Validate connection on initialization + self._validate_connection() + logger.info(f"Successfully connected to Redis at {host}:{port}") + + except (redis.ConnectionError, redis.TimeoutError) as e: + error_msg = f"Failed to connect to Redis at {host}:{port}: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error initializing Redis adapter: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + + def _validate_connection(self): + """Validate Redis connection is available.""" + try: + self.sync_redis.ping() + except (redis.ConnectionError, redis.TimeoutError) as e: + raise CacheConnectionError(f"Cannot connect to Redis at {self.host}:{self.port}: {str(e)}") from e def acquire_lock(self): """ @@ -85,20 +127,33 @@ class RedisAdapter(CacheDBInterface): context: Context used to answer. answer: Assistant answer text. ttl: Optional time-to-live (seconds). If provided, the session expires after this time. + + Raises: + CacheConnectionError: If Redis connection fails or times out. """ - session_key = f"agent_sessions:{user_id}:{session_id}" + try: + session_key = f"agent_sessions:{user_id}:{session_id}" - qa_entry = { - "time": datetime.utcnow().isoformat(), - "question": question, - "context": context, - "answer": answer, - } + qa_entry = { + "time": datetime.utcnow().isoformat(), + "question": question, + "context": context, + "answer": answer, + } - await self.async_redis.rpush(session_key, json.dumps(qa_entry)) + await self.async_redis.rpush(session_key, json.dumps(qa_entry)) - if ttl is not None: - await self.async_redis.expire(session_key, ttl) + if ttl is not None: + await self.async_redis.expire(session_key, ttl) + + except (redis.ConnectionError, redis.TimeoutError) as e: + error_msg = f"Redis connection error while adding Q&A: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error while adding Q&A to Redis: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e async def get_latest_qa(self, user_id: str, session_id: str, last_n: int = 10): """ diff --git a/cognee/infrastructure/databases/exceptions/__init__.py b/cognee/infrastructure/databases/exceptions/__init__.py index 2969b1c59..fa6b6bfc5 100644 --- a/cognee/infrastructure/databases/exceptions/__init__.py +++ b/cognee/infrastructure/databases/exceptions/__init__.py @@ -11,4 +11,5 @@ from .exceptions import ( EmbeddingException, MissingQueryParameterError, MutuallyExclusiveQueryParametersError, + CacheConnectionError, ) diff --git a/cognee/infrastructure/databases/exceptions/exceptions.py b/cognee/infrastructure/databases/exceptions/exceptions.py index c240d3133..dec3172a4 100644 --- a/cognee/infrastructure/databases/exceptions/exceptions.py +++ b/cognee/infrastructure/databases/exceptions/exceptions.py @@ -132,3 +132,19 @@ class MutuallyExclusiveQueryParametersError(CogneeValidationError): ): message = "The search function accepts either text or embedding as input, but not both." super().__init__(message, name, status_code) + + +class CacheConnectionError(CogneeConfigurationError): + """ + Raised when connection to the cache database (e.g., Redis) fails. + + This error indicates that the cache service is unavailable or misconfigured. + """ + + def __init__( + self, + message: str = "Failed to connect to cache database. Please check your cache configuration.", + name: str = "CacheConnectionError", + status_code: int = status.HTTP_503_SERVICE_UNAVAILABLE, + ): + super().__init__(message, name, status_code) diff --git a/cognee/modules/retrieval/utils/session_cache.py b/cognee/modules/retrieval/utils/session_cache.py index 5ffacb1ab..d0fe2da08 100644 --- a/cognee/modules/retrieval/utils/session_cache.py +++ b/cognee/modules/retrieval/utils/session_cache.py @@ -1,6 +1,10 @@ from typing import Optional from cognee.context_global_variables import session_user from cognee.infrastructure.databases.cache.config import CacheConfig +from cognee.infrastructure.databases.exceptions import CacheConnectionError +from cognee.shared.logging_utils import get_logger + +logger = get_logger("session_cache") async def save_to_session_cache( @@ -8,9 +12,11 @@ async def save_to_session_cache( context_summary: str, answer: str, session_id: Optional[str] = None, -) -> None: +) -> bool: """ Saves Q&A interaction to the session cache if user is authenticated and caching is enabled. + + Handles cache unavailability gracefully by logging warnings instead of failing. Parameters: ----------- @@ -23,25 +29,43 @@ async def save_to_session_cache( Returns: -------- - - None: This function performs a side effect (saving to cache) and returns nothing. + - bool: True if successfully saved to cache, False otherwise. """ - cache_config = CacheConfig() - user = session_user.get() - user_id = getattr(user, "id", None) + try: + cache_config = CacheConfig() + user = session_user.get() + user_id = getattr(user, "id", None) - if not (user_id and cache_config.caching): - return + if not (user_id and cache_config.caching): + logger.debug("Session caching disabled or user not authenticated") + return False - from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine + if session_id is None: + session_id = "default_session" - cache_engine = get_cache_engine() - if session_id is None: - session_id = "default_session" + from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine - await cache_engine.add_qa( - str(user_id), - session_id=session_id, - question=query, - context=context_summary, - answer=answer, - ) + cache_engine = get_cache_engine() + + if cache_engine is None: + logger.warning("Cache engine not available, skipping session save") + return False + + await cache_engine.add_qa( + str(user_id), + session_id=session_id, + question=query, + context=context_summary, + answer=answer, + ) + + logger.info(f"Successfully saved Q&A to session cache: user_id={user_id}, session_id={session_id}") + return True + + except CacheConnectionError as e: + logger.warning(f"Cache unavailable, continuing without session save: {e.message}") + return False + + except Exception as e: + logger.error(f"Unexpected error saving to session cache: {type(e).__name__}: {str(e)}. Continuing without caching.") + return False