Update adapter.py

This commit is contained in:
hajdul88 2025-10-07 20:43:50 +02:00
parent 466707b003
commit b468a13005

View file

@ -49,12 +49,13 @@ class KuzuAdapter(GraphDBInterface):
self.db_path = db_path # Path for the database directory
self.db: Optional[Database] = None
self.connection: Optional[Connection] = None
self.executor = ThreadPoolExecutor()
if cache_config.caching:
self.executor = ThreadPoolExecutor(max_workers=1)
self.redis_lock = get_cache_engine(
lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path))
)
else:
self.executor = ThreadPoolExecutor()
self._initialize_connection()
self.KUZU_ASYNC_LOCK = asyncio.Lock()
self._connection_change_lock = asyncio.Lock()
@ -229,20 +230,15 @@ class KuzuAdapter(GraphDBInterface):
async with self._connection_change_lock:
self.open_connections += 1
if self._is_closed:
self.redis_lock.acquire()
self.reopen()
logger.info(f"Open connections after open: {self.open_connections}")
logger.info(f"Open connections after open: {self.open_connections}")
result = await loop.run_in_executor(self.executor, blocking_query)
result = await loop.run_in_executor(self.executor, blocking_query)
if cache_config.caching:
async with self._connection_change_lock:
self.open_connections -= 1
logger.info(f"Opened connections after closing {self.open_connections}")
if self.open_connections == 0:
self.connection.execute("CHECKPOINT;")
self.close()
self.redis_lock.release()
return result