feat: adds redis double locking mechanism (Thread + Process)
This commit is contained in:
parent
57439ae8df
commit
fc2f27f21e
1 changed files with 20 additions and 15 deletions
|
|
@ -28,7 +28,7 @@ from cognee.infrastructure.databases.cache.config import get_cache_config
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
cache_config = get_cache_config()
|
cache_config = get_cache_config()
|
||||||
if not cache_config.caching:
|
if cache_config.caching:
|
||||||
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
|
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -49,8 +49,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
self.db_path = db_path # Path for the database directory
|
self.db_path = db_path # Path for the database directory
|
||||||
self.db: Optional[Database] = None
|
self.db: Optional[Database] = None
|
||||||
self.connection: Optional[Connection] = None
|
self.connection: Optional[Connection] = None
|
||||||
if not cache_config.caching:
|
if cache_config.caching:
|
||||||
self.executor = ThreadPoolExecutor(max_workers=1)
|
|
||||||
self.redis_lock = get_cache_engine(
|
self.redis_lock = get_cache_engine(
|
||||||
lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path))
|
lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path))
|
||||||
)
|
)
|
||||||
|
|
@ -223,9 +222,13 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
params = params or {}
|
params = params or {}
|
||||||
|
|
||||||
def blocking_query():
|
def blocking_query():
|
||||||
|
lock_acquired = False
|
||||||
try:
|
try:
|
||||||
|
if cache_config.caching:
|
||||||
|
self.redis_lock.acquire()
|
||||||
|
lock_acquired = True
|
||||||
if not self.connection:
|
if not self.connection:
|
||||||
logger.debug("Reconnecting to Kuzu database...")
|
logger.info("Reconnecting to Kuzu database...")
|
||||||
self._initialize_connection()
|
self._initialize_connection()
|
||||||
|
|
||||||
result = self.connection.execute(query, params)
|
result = self.connection.execute(query, params)
|
||||||
|
|
@ -239,25 +242,27 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
val = val.as_py()
|
val = val.as_py()
|
||||||
processed_rows.append(val)
|
processed_rows.append(val)
|
||||||
rows.append(tuple(processed_rows))
|
rows.append(tuple(processed_rows))
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Query execution failed: {str(e)}")
|
logger.error(f"Query execution failed: {str(e)}")
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
if cache_config.caching and lock_acquired:
|
||||||
|
try:
|
||||||
|
self.close()
|
||||||
|
finally:
|
||||||
|
self.redis_lock.release()
|
||||||
|
|
||||||
if not cache_config.caching:
|
if cache_config.caching:
|
||||||
async with self._connection_change_lock:
|
async with self._connection_change_lock:
|
||||||
self.open_connections += 1
|
self.open_connections += 1
|
||||||
if self._is_closed:
|
|
||||||
self.reopen()
|
|
||||||
logger.info(f"Open connections after open: {self.open_connections}")
|
logger.info(f"Open connections after open: {self.open_connections}")
|
||||||
|
try:
|
||||||
result = await loop.run_in_executor(self.executor, blocking_query)
|
result = blocking_query()
|
||||||
|
finally:
|
||||||
self.open_connections -= 1
|
self.open_connections -= 1
|
||||||
logger.info(f"Opened connections after closing {self.open_connections}")
|
logger.info(f"Open connections after close: {self.open_connections}")
|
||||||
if self.open_connections == 0:
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
else:
|
else:
|
||||||
result = await loop.run_in_executor(self.executor, blocking_query)
|
result = await loop.run_in_executor(self.executor, blocking_query)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue