feat: adds redis locking mechanism to KuzuAdapter

This commit is contained in:
hajdul88 2025-10-06 18:05:17 +02:00
parent ce4a55a474
commit e7e8ec3bfd
3 changed files with 54 additions and 3 deletions

View file

@ -0,0 +1,46 @@
import redis
from contextlib import contextmanager
class RedisAdapter:
def __init__(self, host, port, lock_name):
self.redis = redis.Redis(host=host, port=port)
self.lock_name = lock_name
self.lock = None
def acquire(self, timeout=240, blocking_timeout=300):
"""
Acquire the Redis lock manually. Raises if acquisition fails.
"""
self.lock = self.redis.lock(
name=self.lock_name,
timeout=timeout,
blocking_timeout=blocking_timeout,
)
acquired = self.lock.acquire()
if not acquired:
raise RuntimeError(f"Could not acquire Redis lock: {self.lock_name}")
return self.lock
def release(self):
"""
Release the Redis lock manually, if held.
"""
if self.lock:
try:
self.lock.release()
self.lock = None
except redis.exceptions.LockError:
pass
@contextmanager
def hold(self, timeout=60, blocking_timeout=300):
"""
Context manager for acquiring and releasing the Redis lock automatically.
"""
self.acquire(timeout=timeout, blocking_timeout=blocking_timeout)
try:
yield
finally:
self.release()

View file

@ -0,0 +1 @@
from .RedisAdapter import RedisAdapter

View file

@ -4,7 +4,7 @@ import os
import json
import asyncio
import tempfile
from uuid import UUID
from uuid import UUID, uuid5, NAMESPACE_OID
from kuzu import Connection
from kuzu.database import Database
from datetime import datetime, timezone
@ -23,6 +23,7 @@ from cognee.infrastructure.engine import DataPoint
from cognee.modules.storage.utils import JSONEncoder
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
from cognee.tasks.temporal_graph.models import Timestamp
from cognee.infrastructure.databases.cache.RedisAdapter import RedisAdapter
logger = get_logger()
@ -41,10 +42,11 @@ class KuzuAdapter(GraphDBInterface):
"""Initialize Kuzu database connection and schema."""
self.db_path = db_path # Path for the database directory
self.db: Optional[Database] = None
self._is_closed = False
self._is_closed = True
redis_lock_name = "kuzu-lock" + (str)(uuid5(NAMESPACE_OID, db_path))
self.redis_lock = RedisAdapter(host="localhost", port=6379, lock_name=redis_lock_name)
self.connection: Optional[Connection] = None
self.executor = ThreadPoolExecutor()
self._initialize_connection()
self.KUZU_ASYNC_LOCK = asyncio.Lock()
def _initialize_connection(self) -> None:
@ -155,10 +157,12 @@ class KuzuAdapter(GraphDBInterface):
del self.db
self.db = None
self._is_closed = True
self.redis_lock.release()
logger.info(f"Kuzu database closed successfully")
def reopen(self):
if self._is_closed:
self.redis_lock.acquire()
self._is_closed = False
self._initialize_connection()
logger.info(f"Kuzu database re-opened successfully")