From 21c203d1eba95585d6e808cf2e29fa6760107a5d Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:29:31 +0200 Subject: [PATCH] feat: adds pydantic settings to set caching + some locks to Kuzu adapter --- .../infrastructure/databases/cache/config.py | 40 +++++++++++++++++++ .../databases/graph/kuzu/adapter.py | 11 +++-- 2 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 cognee/infrastructure/databases/cache/config.py diff --git a/cognee/infrastructure/databases/cache/config.py b/cognee/infrastructure/databases/cache/config.py new file mode 100644 index 000000000..982b0822f --- /dev/null +++ b/cognee/infrastructure/databases/cache/config.py @@ -0,0 +1,40 @@ +import os +import pydantic +from pydantic_settings import BaseSettings, SettingsConfigDict +from functools import lru_cache + + +class CacheConfig(BaseSettings): + """ + Configuration for distributed cache systems (e.g., Redis), used for locking or coordination. + + Attributes: + - caching: Caching logic on/off. + - cache_host: Hostname of the cache service. + - cache_port: Port number for the cache service. + - agentic_lock_expire: Automatic lock expiration time (in seconds). + - agentic_lock_timeout: Maximum time (in seconds) to wait for the lock release. + """ + + caching: bool = False + cache_host: str = "localhost" + cache_port: int = 6379 + agentic_lock_expire: int = 240 + agentic_lock_timeout: int = 300 + + model_config = SettingsConfigDict(env_file=".env", extra="allow") + + def to_dict(self) -> dict: + return { + "caching": self.caching, + "cache_host": self.cache_host, + "cache_port": self.cache_port, + "agentic_lock_expire": self.agentic_lock_expire, + "agentic_lock_timeout": self.agentic_lock_timeout, + } + + +@lru_cache +def get_cache_config(): + return CacheConfig() + diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 3f4e9e7eb..55ddc0818 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -175,7 +175,9 @@ class KuzuAdapter(GraphDBInterface): if self.connection: async with self.KUZU_ASYNC_LOCK: - self.connection.execute("CHECKPOINT;") + self.redis_lock.acquire() + await self.query("CHECKPOINT;") + self.redis_lock.release() s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True) @@ -213,9 +215,8 @@ class KuzuAdapter(GraphDBInterface): def blocking_query(): try: - if not self.connection: - logger.debug("Reconnecting to Kuzu database...") - self._initialize_connection() + if self._is_closed: + self.reopen() result = self.connection.execute(query, params) rows = [] @@ -1590,6 +1591,7 @@ class KuzuAdapter(GraphDBInterface): logger.info(f"Deleted Kuzu database files at {self.db_path}") # Reinitialize the database + self.redis_lock.acquire() self._initialize_connection() # Verify the database is empty result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)") @@ -1600,6 +1602,7 @@ class KuzuAdapter(GraphDBInterface): ) self.connection.execute("MATCH (n:Node) DETACH DELETE n") logger.info("Database cleared successfully") + self.redis_lock.release() except Exception as e: logger.error(f"Error during database clearing: {e}") raise