diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 995edf9ed..1ae816488 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -49,12 +49,13 @@ class KuzuAdapter(GraphDBInterface): self.db_path = db_path # Path for the database directory self.db: Optional[Database] = None self.connection: Optional[Connection] = None - self.executor = ThreadPoolExecutor() if cache_config.caching: + self.executor = ThreadPoolExecutor(max_workers=1) self.redis_lock = get_cache_engine( lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path)) ) else: + self.executor = ThreadPoolExecutor() self._initialize_connection() self.KUZU_ASYNC_LOCK = asyncio.Lock() self._connection_change_lock = asyncio.Lock() @@ -229,20 +230,15 @@ class KuzuAdapter(GraphDBInterface): async with self._connection_change_lock: self.open_connections += 1 if self._is_closed: - self.redis_lock.acquire() self.reopen() - logger.info(f"Open connections after open: {self.open_connections}") + logger.info(f"Open connections after open: {self.open_connections}") - result = await loop.run_in_executor(self.executor, blocking_query) + result = await loop.run_in_executor(self.executor, blocking_query) - if cache_config.caching: - async with self._connection_change_lock: self.open_connections -= 1 logger.info(f"Opened connections after closing {self.open_connections}") if self.open_connections == 0: - self.connection.execute("CHECKPOINT;") self.close() - self.redis_lock.release() return result