feat: adds opening and closing kuzuconnection to avoid full lock due to lru_cache
This commit is contained in:
parent
2184ae866b
commit
2848b03bcc
1 changed files with 24 additions and 1 deletions
|
|
@ -41,6 +41,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
"""Initialize Kuzu database connection and schema."""
|
"""Initialize Kuzu database connection and schema."""
|
||||||
self.db_path = db_path # Path for the database directory
|
self.db_path = db_path # Path for the database directory
|
||||||
self.db: Optional[Database] = None
|
self.db: Optional[Database] = None
|
||||||
|
self._is_closed = False
|
||||||
self.connection: Optional[Connection] = None
|
self.connection: Optional[Connection] = None
|
||||||
self.executor = ThreadPoolExecutor()
|
self.executor = ThreadPoolExecutor()
|
||||||
self._initialize_connection()
|
self._initialize_connection()
|
||||||
|
|
@ -146,6 +147,22 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
logger.error(f"Failed to initialize Kuzu database: {e}")
|
logger.error(f"Failed to initialize Kuzu database: {e}")
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.connection:
|
||||||
|
del self.connection
|
||||||
|
self.connection = None
|
||||||
|
if self.db:
|
||||||
|
del self.db
|
||||||
|
self.db = None
|
||||||
|
self._is_closed = True
|
||||||
|
logger.info(f"Kuzu database closed successfully")
|
||||||
|
|
||||||
|
def reopen(self):
|
||||||
|
if self._is_closed:
|
||||||
|
self._is_closed = False
|
||||||
|
self._initialize_connection()
|
||||||
|
logger.info(f"Kuzu database re-opened successfully")
|
||||||
|
|
||||||
async def push_to_s3(self) -> None:
|
async def push_to_s3(self) -> None:
|
||||||
if os.getenv("STORAGE_BACKEND", "").lower() == "s3" and hasattr(self, "temp_graph_file"):
|
if os.getenv("STORAGE_BACKEND", "").lower() == "s3" and hasattr(self, "temp_graph_file"):
|
||||||
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
|
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
|
||||||
|
|
@ -212,7 +229,13 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
logger.error(f"Query execution failed: {str(e)}")
|
logger.error(f"Query execution failed: {str(e)}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
return await loop.run_in_executor(self.executor, blocking_query)
|
if self._is_closed:
|
||||||
|
self.reopen()
|
||||||
|
|
||||||
|
result = await loop.run_in_executor(self.executor, blocking_query)
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_session(self):
|
async def get_session(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue