From ac4dd8776a13f01d670e29d11c178bbd01c8907a Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Tue, 7 Oct 2025 13:49:09 +0200 Subject: [PATCH] feat: adds redis adapter and factory --- .../databases/cache/__init__.py | 3 +- .../databases/cache/cache_db_interface.py | 42 +++++++++++++++ .../infrastructure/databases/cache/config.py | 1 - .../databases/cache/get_cache_engine.py | 54 +++++++++++++++++++ .../cache/{ => redis}/RedisAdapter.py | 24 +++++---- 5 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 cognee/infrastructure/databases/cache/cache_db_interface.py create mode 100644 cognee/infrastructure/databases/cache/get_cache_engine.py rename cognee/infrastructure/databases/cache/{ => redis}/RedisAdapter.py (62%) diff --git a/cognee/infrastructure/databases/cache/__init__.py b/cognee/infrastructure/databases/cache/__init__.py index c698f0800..d96c77658 100644 --- a/cognee/infrastructure/databases/cache/__init__.py +++ b/cognee/infrastructure/databases/cache/__init__.py @@ -1 +1,2 @@ -from .RedisAdapter import RedisAdapter +from .get_cache_engine import get_cache_engine +from .config import get_cache_config diff --git a/cognee/infrastructure/databases/cache/cache_db_interface.py b/cognee/infrastructure/databases/cache/cache_db_interface.py new file mode 100644 index 000000000..a70b3fc9c --- /dev/null +++ b/cognee/infrastructure/databases/cache/cache_db_interface.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from contextlib import contextmanager + + +class CacheDBInterface(ABC): + """ + Abstract base class for distributed cache coordination systems (e.g., Redis, Memcached). + Provides a common interface for lock acquisition, release, and context-managed locking. + """ + + def __init__(self, host: str, port: int, lock_key: str): + self.host = host + self.port = port + self.lock_key = lock_key + self.lock = None + + @abstractmethod + def acquire(self): + """ + Acquire a lock on the given key. + Must be implemented by subclasses. + """ + pass + + @abstractmethod + def release(self): + """ + Release the lock if it is held. + Must be implemented by subclasses. + """ + pass + + @contextmanager + def hold(self): + """ + Context manager for safely acquiring and releasing the lock. + """ + self.acquire() + try: + yield + finally: + self.release() diff --git a/cognee/infrastructure/databases/cache/config.py b/cognee/infrastructure/databases/cache/config.py index 982b0822f..c99b91b9a 100644 --- a/cognee/infrastructure/databases/cache/config.py +++ b/cognee/infrastructure/databases/cache/config.py @@ -37,4 +37,3 @@ class CacheConfig(BaseSettings): @lru_cache def get_cache_config(): return CacheConfig() - diff --git a/cognee/infrastructure/databases/cache/get_cache_engine.py b/cognee/infrastructure/databases/cache/get_cache_engine.py new file mode 100644 index 000000000..e22beb549 --- /dev/null +++ b/cognee/infrastructure/databases/cache/get_cache_engine.py @@ -0,0 +1,54 @@ +"""Factory to get the appropriate cache coordination engine (e.g., Redis).""" + +from functools import lru_cache +from cognee.infrastructure.databases.cache.config import get_cache_config + +from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter +from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface + + +@lru_cache +def create_cache_engine( + cache_host: str, + cache_port: int, + lock_key: str, + agentic_lock_expire: int = 240, + agentic_lock_timeout: int = 300, +) -> CacheDBInterface: + """ + Factory function to instantiate a cache coordination backend (currently Redis). + + Parameters: + ----------- + - cache_host: Hostname or IP of the cache server. + - cache_port: Port number to connect to. + - lock_key: Identifier used for the locking resource. + - agentic_lock_expire: Duration to hold the lock after acquisition. + - agentic_lock_timeout: Max time to wait for the lock before failing. + + Returns: + -------- + - CacheDBInterface: An instance of the appropriate cache adapter. :TODO: Now we support only Redis. later if we add more here we can split the logic + """ + + return RedisAdapter( + host=cache_host, + port=cache_port, + lock_name=lock_key, + timeout=agentic_lock_expire, + blocking_timeout=agentic_lock_timeout, + ) + + +def get_cache_engine() -> CacheDBInterface: + """ + Returns a cache adapter instance using current context configuration. + """ + config = get_cache_config() + return create_cache_engine( + cache_host=config.cache_host, + cache_port=config.cache_port, + lock_key=config.lock_key, + agentic_lock_expire=config.agentic_lock_expire, + agentic_lock_timeout=config.agentic_lock_timeout, + ) diff --git a/cognee/infrastructure/databases/cache/RedisAdapter.py b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py similarity index 62% rename from cognee/infrastructure/databases/cache/RedisAdapter.py rename to cognee/infrastructure/databases/cache/redis/RedisAdapter.py index f486eae22..b0d0e6886 100644 --- a/cognee/infrastructure/databases/cache/RedisAdapter.py +++ b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py @@ -1,26 +1,28 @@ import redis from contextlib import contextmanager +from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface -class RedisAdapter: - def __init__(self, host, port, lock_name): +class RedisAdapter(CacheDBInterface): + def __init__(self, host, port, lock_name, timeout=240, blocking_timeout=300): + super().__init__(host, port, lock_name) self.redis = redis.Redis(host=host, port=port) - self.lock_name = lock_name - self.lock = None + self.timeout = timeout + self.blocking_timeout = blocking_timeout - def acquire(self, timeout=240, blocking_timeout=300): + def acquire(self): """ Acquire the Redis lock manually. Raises if acquisition fails. """ self.lock = self.redis.lock( - name=self.lock_name, - timeout=timeout, - blocking_timeout=blocking_timeout, + name=self.lock_key, + timeout=self.timeout, + blocking_timeout=self.blocking_timeout, ) acquired = self.lock.acquire() if not acquired: - raise RuntimeError(f"Could not acquire Redis lock: {self.lock_name}") + raise RuntimeError(f"Could not acquire Redis lock: {self.lock_key}") return self.lock @@ -36,11 +38,11 @@ class RedisAdapter: pass @contextmanager - def hold(self, timeout=60, blocking_timeout=300): + def hold(self): """ Context manager for acquiring and releasing the Redis lock automatically. """ - self.acquire(timeout=timeout, blocking_timeout=blocking_timeout) + self.acquire() try: yield finally: