From e7e8ec3bfd0b178b86ea34998c041f7cfbba6ea1 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 6 Oct 2025 18:05:17 +0200 Subject: [PATCH] feat: adds redis locking mechanism to KuzuAdapter --- .../databases/cache/RedisAdapter.py | 46 +++++++++++++++++++ .../databases/cache/__init__.py | 1 + .../databases/graph/kuzu/adapter.py | 10 ++-- 3 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 cognee/infrastructure/databases/cache/RedisAdapter.py create mode 100644 cognee/infrastructure/databases/cache/__init__.py diff --git a/cognee/infrastructure/databases/cache/RedisAdapter.py b/cognee/infrastructure/databases/cache/RedisAdapter.py new file mode 100644 index 000000000..204aa81ff --- /dev/null +++ b/cognee/infrastructure/databases/cache/RedisAdapter.py @@ -0,0 +1,46 @@ +import redis +from contextlib import contextmanager + +class RedisAdapter: + def __init__(self, host, port, lock_name): + self.redis = redis.Redis(host=host, port=port) + self.lock_name = lock_name + self.lock = None + + def acquire(self, timeout=240, blocking_timeout=300): + """ + Acquire the Redis lock manually. Raises if acquisition fails. + """ + self.lock = self.redis.lock( + name=self.lock_name, + timeout=timeout, + blocking_timeout=blocking_timeout, + ) + + acquired = self.lock.acquire() + if not acquired: + raise RuntimeError(f"Could not acquire Redis lock: {self.lock_name}") + + return self.lock + + def release(self): + """ + Release the Redis lock manually, if held. + """ + if self.lock: + try: + self.lock.release() + self.lock = None + except redis.exceptions.LockError: + pass + + @contextmanager + def hold(self, timeout=60, blocking_timeout=300): + """ + Context manager for acquiring and releasing the Redis lock automatically. + """ + self.acquire(timeout=timeout, blocking_timeout=blocking_timeout) + try: + yield + finally: + self.release() diff --git a/cognee/infrastructure/databases/cache/__init__.py b/cognee/infrastructure/databases/cache/__init__.py new file mode 100644 index 000000000..c698f0800 --- /dev/null +++ b/cognee/infrastructure/databases/cache/__init__.py @@ -0,0 +1 @@ +from .RedisAdapter import RedisAdapter diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 5baf197b4..3f4e9e7eb 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -4,7 +4,7 @@ import os import json import asyncio import tempfile -from uuid import UUID +from uuid import UUID, uuid5, NAMESPACE_OID from kuzu import Connection from kuzu.database import Database from datetime import datetime, timezone @@ -23,6 +23,7 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.storage.utils import JSONEncoder from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int from cognee.tasks.temporal_graph.models import Timestamp +from cognee.infrastructure.databases.cache.RedisAdapter import RedisAdapter logger = get_logger() @@ -41,10 +42,11 @@ class KuzuAdapter(GraphDBInterface): """Initialize Kuzu database connection and schema.""" self.db_path = db_path # Path for the database directory self.db: Optional[Database] = None - self._is_closed = False + self._is_closed = True + redis_lock_name = "kuzu-lock" + (str)(uuid5(NAMESPACE_OID, db_path)) + self.redis_lock = RedisAdapter(host="localhost", port=6379, lock_name=redis_lock_name) self.connection: Optional[Connection] = None self.executor = ThreadPoolExecutor() - self._initialize_connection() self.KUZU_ASYNC_LOCK = asyncio.Lock() def _initialize_connection(self) -> None: @@ -155,10 +157,12 @@ class KuzuAdapter(GraphDBInterface): del self.db self.db = None self._is_closed = True + self.redis_lock.release() logger.info(f"Kuzu database closed successfully") def reopen(self): if self._is_closed: + self.redis_lock.acquire() self._is_closed = False self._initialize_connection() logger.info(f"Kuzu database re-opened successfully")