From 76c4a4bd4c86cb41ac42592e11f0c88d5efe57cb Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Tue, 7 Oct 2025 14:21:48 +0200 Subject: [PATCH] back to zero --- .../databases/graph/kuzu/adapter.py | 46 ++++--------------- 1 file changed, 8 insertions(+), 38 deletions(-) diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 55ddc0818..015dcaa78 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -4,7 +4,7 @@ import os import json import asyncio import tempfile -from uuid import UUID, uuid5, NAMESPACE_OID +from uuid import UUID from kuzu import Connection from kuzu.database import Database from datetime import datetime, timezone @@ -23,7 +23,6 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.storage.utils import JSONEncoder from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int from cognee.tasks.temporal_graph.models import Timestamp -from cognee.infrastructure.databases.cache.RedisAdapter import RedisAdapter logger = get_logger() @@ -42,11 +41,9 @@ class KuzuAdapter(GraphDBInterface): """Initialize Kuzu database connection and schema.""" self.db_path = db_path # Path for the database directory self.db: Optional[Database] = None - self._is_closed = True - redis_lock_name = "kuzu-lock" + (str)(uuid5(NAMESPACE_OID, db_path)) - self.redis_lock = RedisAdapter(host="localhost", port=6379, lock_name=redis_lock_name) self.connection: Optional[Connection] = None self.executor = ThreadPoolExecutor() + self._initialize_connection() self.KUZU_ASYNC_LOCK = asyncio.Lock() def _initialize_connection(self) -> None: @@ -149,24 +146,6 @@ class KuzuAdapter(GraphDBInterface): logger.error(f"Failed to initialize Kuzu database: {e}") raise e - def close(self): - if self.connection: - del self.connection - self.connection = None - if self.db: - del self.db - self.db = None - self._is_closed = True - self.redis_lock.release() - logger.info(f"Kuzu database closed successfully") - - def reopen(self): - if self._is_closed: - self.redis_lock.acquire() - self._is_closed = False - self._initialize_connection() - logger.info(f"Kuzu database re-opened successfully") - async def push_to_s3(self) -> None: if os.getenv("STORAGE_BACKEND", "").lower() == "s3" and hasattr(self, "temp_graph_file"): from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage @@ -175,9 +154,7 @@ class KuzuAdapter(GraphDBInterface): if self.connection: async with self.KUZU_ASYNC_LOCK: - self.redis_lock.acquire() - await self.query("CHECKPOINT;") - self.redis_lock.release() + self.connection.execute("CHECKPOINT;") s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True) @@ -215,8 +192,9 @@ class KuzuAdapter(GraphDBInterface): def blocking_query(): try: - if self._is_closed: - self.reopen() + if not self.connection: + logger.debug("Reconnecting to Kuzu database...") + self._initialize_connection() result = self.connection.execute(query, params) rows = [] @@ -234,13 +212,7 @@ class KuzuAdapter(GraphDBInterface): logger.error(f"Query execution failed: {str(e)}") raise - if self._is_closed: - self.reopen() - - result = await loop.run_in_executor(self.executor, blocking_query) - self.close() - - return result + return await loop.run_in_executor(self.executor, blocking_query) @asynccontextmanager async def get_session(self): @@ -1591,7 +1563,6 @@ class KuzuAdapter(GraphDBInterface): logger.info(f"Deleted Kuzu database files at {self.db_path}") # Reinitialize the database - self.redis_lock.acquire() self._initialize_connection() # Verify the database is empty result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)") @@ -1602,7 +1573,6 @@ class KuzuAdapter(GraphDBInterface): ) self.connection.execute("MATCH (n:Node) DETACH DELETE n") logger.info("Database cleared successfully") - self.redis_lock.release() except Exception as e: logger.error(f"Error during database clearing: {e}") raise @@ -1890,4 +1860,4 @@ class KuzuAdapter(GraphDBInterface): time_nodes = await self.query(cypher) time_ids_list = [item[0] for item in time_nodes] - return ", ".join(f"'{uid}'" for uid in time_ids_list) + return ", ".join(f"'{uid}'" for uid in time_ids_list) \ No newline at end of file