feat: adds new separate parameter for shared kuzu lock

This commit is contained in:
hajdul88 2025-10-08 16:27:17 +02:00
parent fc2f27f21e
commit d657e1b311
2 changed files with 8 additions and 7 deletions

View file

@ -7,14 +7,14 @@ class CacheConfig(BaseSettings):
Configuration for distributed cache systems (e.g., Redis), used for locking or coordination.
Attributes:
- caching: Caching logic on/off.
- shared_kuzu_lock: Shared kuzu lock logic on/off.
- cache_host: Hostname of the cache service.
- cache_port: Port number for the cache service.
- agentic_lock_expire: Automatic lock expiration time (in seconds).
- agentic_lock_timeout: Maximum time (in seconds) to wait for the lock release.
"""
caching: bool = False
shared_kuzu_lock: bool = False
cache_host: str = "localhost"
cache_port: int = 6379
agentic_lock_expire: int = 240
@ -25,6 +25,7 @@ class CacheConfig(BaseSettings):
def to_dict(self) -> dict:
return {
"caching": self.caching,
"shared_kuzu_lock": self.shared_kuzu_lock,
"cache_host": self.cache_host,
"cache_port": self.cache_port,
"agentic_lock_expire": self.agentic_lock_expire,

View file

@ -28,7 +28,7 @@ from cognee.infrastructure.databases.cache.config import get_cache_config
logger = get_logger()
cache_config = get_cache_config()
if cache_config.caching:
if cache_config.shared_kuzu_lock:
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
@ -49,7 +49,7 @@ class KuzuAdapter(GraphDBInterface):
self.db_path = db_path # Path for the database directory
self.db: Optional[Database] = None
self.connection: Optional[Connection] = None
if cache_config.caching:
if cache_config.shared_kuzu_lock:
self.redis_lock = get_cache_engine(
lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path))
)
@ -224,7 +224,7 @@ class KuzuAdapter(GraphDBInterface):
def blocking_query():
lock_acquired = False
try:
if cache_config.caching:
if cache_config.shared_kuzu_lock:
self.redis_lock.acquire()
lock_acquired = True
if not self.connection:
@ -248,13 +248,13 @@ class KuzuAdapter(GraphDBInterface):
logger.error(f"Query execution failed: {str(e)}")
raise
finally:
if cache_config.caching and lock_acquired:
if cache_config.shared_kuzu_lock and lock_acquired:
try:
self.close()
finally:
self.redis_lock.release()
if cache_config.caching:
if cache_config.shared_kuzu_lock:
async with self._connection_change_lock:
self.open_connections += 1
logger.info(f"Open connections after open: {self.open_connections}")