diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index b67502cd3..0417ce0e3 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -28,7 +28,7 @@ from cognee.infrastructure.databases.cache.config import get_cache_config logger = get_logger() cache_config = get_cache_config() -if not cache_config.caching: +if cache_config.caching: from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine @@ -49,8 +49,7 @@ class KuzuAdapter(GraphDBInterface): self.db_path = db_path # Path for the database directory self.db: Optional[Database] = None self.connection: Optional[Connection] = None - if not cache_config.caching: - self.executor = ThreadPoolExecutor(max_workers=1) + if cache_config.caching: self.redis_lock = get_cache_engine( lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path)) ) @@ -223,9 +222,13 @@ class KuzuAdapter(GraphDBInterface): params = params or {} def blocking_query(): + lock_acquired = False try: + if cache_config.caching: + self.redis_lock.acquire() + lock_acquired = True if not self.connection: - logger.debug("Reconnecting to Kuzu database...") + logger.info("Reconnecting to Kuzu database...") self._initialize_connection() result = self.connection.execute(query, params) @@ -239,25 +242,27 @@ class KuzuAdapter(GraphDBInterface): val = val.as_py() processed_rows.append(val) rows.append(tuple(processed_rows)) + return rows except Exception as e: logger.error(f"Query execution failed: {str(e)}") raise + finally: + if cache_config.caching and lock_acquired: + try: + self.close() + finally: + self.redis_lock.release() - if not cache_config.caching: + if cache_config.caching: async with self._connection_change_lock: self.open_connections += 1 - if self._is_closed: - self.reopen() logger.info(f"Open connections after open: {self.open_connections}") - - result = await loop.run_in_executor(self.executor, blocking_query) - - self.open_connections -= 1 - logger.info(f"Opened connections after closing {self.open_connections}") - if self.open_connections == 0: - self.close() - + try: + result = blocking_query() + finally: + self.open_connections -= 1 + logger.info(f"Open connections after close: {self.open_connections}") return result else: result = await loop.run_in_executor(self.executor, blocking_query)