feat: adds error handling and logging to redis cache

This commit is contained in:
hajdul88 2025-10-16 16:12:50 +02:00
parent 96f2a2f22b
commit fe1b02d937
4 changed files with 130 additions and 34 deletions

View file

@ -3,9 +3,13 @@ import redis
import redis.asyncio as aioredis
from contextlib import contextmanager
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
from cognee.infrastructure.databases.exceptions import CacheConnectionError
from cognee.shared.logging_utils import get_logger
from datetime import datetime
import json
logger = get_logger("RedisAdapter")
class RedisAdapter(CacheDBInterface):
def __init__(
@ -17,15 +21,53 @@ class RedisAdapter(CacheDBInterface):
password=None,
timeout=240,
blocking_timeout=300,
connection_timeout=30,
):
super().__init__(host, port, lock_name)
self.sync_redis = redis.Redis(host=host, port=port, username=username, password=password)
self.async_redis = aioredis.Redis(
host=host, port=port, username=username, password=password, decode_responses=True
)
self.timeout = timeout
self.blocking_timeout = blocking_timeout
self.host = host
self.port = port
self.connection_timeout = connection_timeout
try:
self.sync_redis = redis.Redis(
host=host,
port=port,
username=username,
password=password,
socket_connect_timeout=connection_timeout,
socket_timeout=connection_timeout,
)
self.async_redis = aioredis.Redis(
host=host,
port=port,
username=username,
password=password,
decode_responses=True,
socket_connect_timeout=connection_timeout,
)
self.timeout = timeout
self.blocking_timeout = blocking_timeout
# Validate connection on initialization
self._validate_connection()
logger.info(f"Successfully connected to Redis at {host}:{port}")
except (redis.ConnectionError, redis.TimeoutError) as e:
error_msg = f"Failed to connect to Redis at {host}:{port}: {str(e)}"
logger.error(error_msg)
raise CacheConnectionError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error initializing Redis adapter: {str(e)}"
logger.error(error_msg)
raise CacheConnectionError(error_msg) from e
def _validate_connection(self):
"""Validate Redis connection is available."""
try:
self.sync_redis.ping()
except (redis.ConnectionError, redis.TimeoutError) as e:
raise CacheConnectionError(f"Cannot connect to Redis at {self.host}:{self.port}: {str(e)}") from e
def acquire_lock(self):
"""
@ -85,20 +127,33 @@ class RedisAdapter(CacheDBInterface):
context: Context used to answer.
answer: Assistant answer text.
ttl: Optional time-to-live (seconds). If provided, the session expires after this time.
Raises:
CacheConnectionError: If Redis connection fails or times out.
"""
session_key = f"agent_sessions:{user_id}:{session_id}"
try:
session_key = f"agent_sessions:{user_id}:{session_id}"
qa_entry = {
"time": datetime.utcnow().isoformat(),
"question": question,
"context": context,
"answer": answer,
}
qa_entry = {
"time": datetime.utcnow().isoformat(),
"question": question,
"context": context,
"answer": answer,
}
await self.async_redis.rpush(session_key, json.dumps(qa_entry))
await self.async_redis.rpush(session_key, json.dumps(qa_entry))
if ttl is not None:
await self.async_redis.expire(session_key, ttl)
if ttl is not None:
await self.async_redis.expire(session_key, ttl)
except (redis.ConnectionError, redis.TimeoutError) as e:
error_msg = f"Redis connection error while adding Q&A: {str(e)}"
logger.error(error_msg)
raise CacheConnectionError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error while adding Q&A to Redis: {str(e)}"
logger.error(error_msg)
raise CacheConnectionError(error_msg) from e
async def get_latest_qa(self, user_id: str, session_id: str, last_n: int = 10):
"""

View file

@ -11,4 +11,5 @@ from .exceptions import (
EmbeddingException,
MissingQueryParameterError,
MutuallyExclusiveQueryParametersError,
CacheConnectionError,
)

View file

@ -132,3 +132,19 @@ class MutuallyExclusiveQueryParametersError(CogneeValidationError):
):
message = "The search function accepts either text or embedding as input, but not both."
super().__init__(message, name, status_code)
class CacheConnectionError(CogneeConfigurationError):
"""
Raised when connection to the cache database (e.g., Redis) fails.
This error indicates that the cache service is unavailable or misconfigured.
"""
def __init__(
self,
message: str = "Failed to connect to cache database. Please check your cache configuration.",
name: str = "CacheConnectionError",
status_code: int = status.HTTP_503_SERVICE_UNAVAILABLE,
):
super().__init__(message, name, status_code)

View file

@ -1,6 +1,10 @@
from typing import Optional
from cognee.context_global_variables import session_user
from cognee.infrastructure.databases.cache.config import CacheConfig
from cognee.infrastructure.databases.exceptions import CacheConnectionError
from cognee.shared.logging_utils import get_logger
logger = get_logger("session_cache")
async def save_to_session_cache(
@ -8,9 +12,11 @@ async def save_to_session_cache(
context_summary: str,
answer: str,
session_id: Optional[str] = None,
) -> None:
) -> bool:
"""
Saves Q&A interaction to the session cache if user is authenticated and caching is enabled.
Handles cache unavailability gracefully by logging warnings instead of failing.
Parameters:
-----------
@ -23,25 +29,43 @@ async def save_to_session_cache(
Returns:
--------
- None: This function performs a side effect (saving to cache) and returns nothing.
- bool: True if successfully saved to cache, False otherwise.
"""
cache_config = CacheConfig()
user = session_user.get()
user_id = getattr(user, "id", None)
try:
cache_config = CacheConfig()
user = session_user.get()
user_id = getattr(user, "id", None)
if not (user_id and cache_config.caching):
return
if not (user_id and cache_config.caching):
logger.debug("Session caching disabled or user not authenticated")
return False
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
if session_id is None:
session_id = "default_session"
cache_engine = get_cache_engine()
if session_id is None:
session_id = "default_session"
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
await cache_engine.add_qa(
str(user_id),
session_id=session_id,
question=query,
context=context_summary,
answer=answer,
)
cache_engine = get_cache_engine()
if cache_engine is None:
logger.warning("Cache engine not available, skipping session save")
return False
await cache_engine.add_qa(
str(user_id),
session_id=session_id,
question=query,
context=context_summary,
answer=answer,
)
logger.info(f"Successfully saved Q&A to session cache: user_id={user_id}, session_id={session_id}")
return True
except CacheConnectionError as e:
logger.warning(f"Cache unavailable, continuing without session save: {e.message}")
return False
except Exception as e:
logger.error(f"Unexpected error saving to session cache: {type(e).__name__}: {str(e)}. Continuing without caching.")
return False