feat: adds redis adapter and factory

This commit is contained in:
hajdul88 2025-10-07 13:49:09 +02:00
parent 21c203d1eb
commit ac4dd8776a
5 changed files with 111 additions and 13 deletions

View file

@ -1 +1,2 @@
from .RedisAdapter import RedisAdapter
from .get_cache_engine import get_cache_engine
from .config import get_cache_config

View file

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
class CacheDBInterface(ABC):
"""
Abstract base class for distributed cache coordination systems (e.g., Redis, Memcached).
Provides a common interface for lock acquisition, release, and context-managed locking.
"""
def __init__(self, host: str, port: int, lock_key: str):
self.host = host
self.port = port
self.lock_key = lock_key
self.lock = None
@abstractmethod
def acquire(self):
"""
Acquire a lock on the given key.
Must be implemented by subclasses.
"""
pass
@abstractmethod
def release(self):
"""
Release the lock if it is held.
Must be implemented by subclasses.
"""
pass
@contextmanager
def hold(self):
"""
Context manager for safely acquiring and releasing the lock.
"""
self.acquire()
try:
yield
finally:
self.release()

View file

@ -37,4 +37,3 @@ class CacheConfig(BaseSettings):
@lru_cache
def get_cache_config():
return CacheConfig()

View file

@ -0,0 +1,54 @@
"""Factory to get the appropriate cache coordination engine (e.g., Redis)."""
from functools import lru_cache
from cognee.infrastructure.databases.cache.config import get_cache_config
from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
@lru_cache
def create_cache_engine(
cache_host: str,
cache_port: int,
lock_key: str,
agentic_lock_expire: int = 240,
agentic_lock_timeout: int = 300,
) -> CacheDBInterface:
"""
Factory function to instantiate a cache coordination backend (currently Redis).
Parameters:
-----------
- cache_host: Hostname or IP of the cache server.
- cache_port: Port number to connect to.
- lock_key: Identifier used for the locking resource.
- agentic_lock_expire: Duration to hold the lock after acquisition.
- agentic_lock_timeout: Max time to wait for the lock before failing.
Returns:
--------
- CacheDBInterface: An instance of the appropriate cache adapter. :TODO: Now we support only Redis. later if we add more here we can split the logic
"""
return RedisAdapter(
host=cache_host,
port=cache_port,
lock_name=lock_key,
timeout=agentic_lock_expire,
blocking_timeout=agentic_lock_timeout,
)
def get_cache_engine() -> CacheDBInterface:
"""
Returns a cache adapter instance using current context configuration.
"""
config = get_cache_config()
return create_cache_engine(
cache_host=config.cache_host,
cache_port=config.cache_port,
lock_key=config.lock_key,
agentic_lock_expire=config.agentic_lock_expire,
agentic_lock_timeout=config.agentic_lock_timeout,
)

View file

@ -1,26 +1,28 @@
import redis
from contextlib import contextmanager
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
class RedisAdapter:
def __init__(self, host, port, lock_name):
class RedisAdapter(CacheDBInterface):
def __init__(self, host, port, lock_name, timeout=240, blocking_timeout=300):
super().__init__(host, port, lock_name)
self.redis = redis.Redis(host=host, port=port)
self.lock_name = lock_name
self.lock = None
self.timeout = timeout
self.blocking_timeout = blocking_timeout
def acquire(self, timeout=240, blocking_timeout=300):
def acquire(self):
"""
Acquire the Redis lock manually. Raises if acquisition fails.
"""
self.lock = self.redis.lock(
name=self.lock_name,
timeout=timeout,
blocking_timeout=blocking_timeout,
name=self.lock_key,
timeout=self.timeout,
blocking_timeout=self.blocking_timeout,
)
acquired = self.lock.acquire()
if not acquired:
raise RuntimeError(f"Could not acquire Redis lock: {self.lock_name}")
raise RuntimeError(f"Could not acquire Redis lock: {self.lock_key}")
return self.lock
@ -36,11 +38,11 @@ class RedisAdapter:
pass
@contextmanager
def hold(self, timeout=60, blocking_timeout=300):
def hold(self):
"""
Context manager for acquiring and releasing the Redis lock automatically.
"""
self.acquire(timeout=timeout, blocking_timeout=blocking_timeout)
self.acquire()
try:
yield
finally: