back to zero

This commit is contained in:
hajdul88 2025-10-07 14:21:48 +02:00
parent ac4dd8776a
commit 76c4a4bd4c

View file

@ -4,7 +4,7 @@ import os
import json import json
import asyncio import asyncio
import tempfile import tempfile
from uuid import UUID, uuid5, NAMESPACE_OID from uuid import UUID
from kuzu import Connection from kuzu import Connection
from kuzu.database import Database from kuzu.database import Database
from datetime import datetime, timezone from datetime import datetime, timezone
@ -23,7 +23,6 @@ from cognee.infrastructure.engine import DataPoint
from cognee.modules.storage.utils import JSONEncoder from cognee.modules.storage.utils import JSONEncoder
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
from cognee.tasks.temporal_graph.models import Timestamp from cognee.tasks.temporal_graph.models import Timestamp
from cognee.infrastructure.databases.cache.RedisAdapter import RedisAdapter
logger = get_logger() logger = get_logger()
@ -42,11 +41,9 @@ class KuzuAdapter(GraphDBInterface):
"""Initialize Kuzu database connection and schema.""" """Initialize Kuzu database connection and schema."""
self.db_path = db_path # Path for the database directory self.db_path = db_path # Path for the database directory
self.db: Optional[Database] = None self.db: Optional[Database] = None
self._is_closed = True
redis_lock_name = "kuzu-lock" + (str)(uuid5(NAMESPACE_OID, db_path))
self.redis_lock = RedisAdapter(host="localhost", port=6379, lock_name=redis_lock_name)
self.connection: Optional[Connection] = None self.connection: Optional[Connection] = None
self.executor = ThreadPoolExecutor() self.executor = ThreadPoolExecutor()
self._initialize_connection()
self.KUZU_ASYNC_LOCK = asyncio.Lock() self.KUZU_ASYNC_LOCK = asyncio.Lock()
def _initialize_connection(self) -> None: def _initialize_connection(self) -> None:
@ -149,24 +146,6 @@ class KuzuAdapter(GraphDBInterface):
logger.error(f"Failed to initialize Kuzu database: {e}") logger.error(f"Failed to initialize Kuzu database: {e}")
raise e raise e
def close(self):
if self.connection:
del self.connection
self.connection = None
if self.db:
del self.db
self.db = None
self._is_closed = True
self.redis_lock.release()
logger.info(f"Kuzu database closed successfully")
def reopen(self):
if self._is_closed:
self.redis_lock.acquire()
self._is_closed = False
self._initialize_connection()
logger.info(f"Kuzu database re-opened successfully")
async def push_to_s3(self) -> None: async def push_to_s3(self) -> None:
if os.getenv("STORAGE_BACKEND", "").lower() == "s3" and hasattr(self, "temp_graph_file"): if os.getenv("STORAGE_BACKEND", "").lower() == "s3" and hasattr(self, "temp_graph_file"):
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
@ -175,9 +154,7 @@ class KuzuAdapter(GraphDBInterface):
if self.connection: if self.connection:
async with self.KUZU_ASYNC_LOCK: async with self.KUZU_ASYNC_LOCK:
self.redis_lock.acquire() self.connection.execute("CHECKPOINT;")
await self.query("CHECKPOINT;")
self.redis_lock.release()
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True) s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
@ -215,8 +192,9 @@ class KuzuAdapter(GraphDBInterface):
def blocking_query(): def blocking_query():
try: try:
if self._is_closed: if not self.connection:
self.reopen() logger.debug("Reconnecting to Kuzu database...")
self._initialize_connection()
result = self.connection.execute(query, params) result = self.connection.execute(query, params)
rows = [] rows = []
@ -234,13 +212,7 @@ class KuzuAdapter(GraphDBInterface):
logger.error(f"Query execution failed: {str(e)}") logger.error(f"Query execution failed: {str(e)}")
raise raise
if self._is_closed: return await loop.run_in_executor(self.executor, blocking_query)
self.reopen()
result = await loop.run_in_executor(self.executor, blocking_query)
self.close()
return result
@asynccontextmanager @asynccontextmanager
async def get_session(self): async def get_session(self):
@ -1591,7 +1563,6 @@ class KuzuAdapter(GraphDBInterface):
logger.info(f"Deleted Kuzu database files at {self.db_path}") logger.info(f"Deleted Kuzu database files at {self.db_path}")
# Reinitialize the database # Reinitialize the database
self.redis_lock.acquire()
self._initialize_connection() self._initialize_connection()
# Verify the database is empty # Verify the database is empty
result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)") result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)")
@ -1602,7 +1573,6 @@ class KuzuAdapter(GraphDBInterface):
) )
self.connection.execute("MATCH (n:Node) DETACH DELETE n") self.connection.execute("MATCH (n:Node) DETACH DELETE n")
logger.info("Database cleared successfully") logger.info("Database cleared successfully")
self.redis_lock.release()
except Exception as e: except Exception as e:
logger.error(f"Error during database clearing: {e}") logger.error(f"Error during database clearing: {e}")
raise raise