fix: Resolve Kuzu S3 issue (#1235)
<!-- .github/pull_request_template.md --> ## Description Resolve issue with pushing Kuzu DBs to S3 ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
parent
6e7e4506cc
commit
c33536685d
1 changed files with 6 additions and 1 deletions
|
|
@ -42,6 +42,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
self.connection: Optional[Connection] = None
|
self.connection: Optional[Connection] = None
|
||||||
self.executor = ThreadPoolExecutor()
|
self.executor = ThreadPoolExecutor()
|
||||||
self._initialize_connection()
|
self._initialize_connection()
|
||||||
|
self.KUZU_ASYNC_LOCK = asyncio.Lock()
|
||||||
|
|
||||||
def _initialize_connection(self) -> None:
|
def _initialize_connection(self) -> None:
|
||||||
"""Initialize the Kuzu database connection and schema."""
|
"""Initialize the Kuzu database connection and schema."""
|
||||||
|
|
@ -136,6 +137,10 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
|
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
|
||||||
|
|
||||||
s3_file_storage = S3FileStorage("")
|
s3_file_storage = S3FileStorage("")
|
||||||
|
|
||||||
|
async with self.KUZU_ASYNC_LOCK:
|
||||||
|
self.connection.execute("CHECKPOINT;")
|
||||||
|
|
||||||
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
|
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
|
||||||
|
|
||||||
async def pull_from_s3(self) -> None:
|
async def pull_from_s3(self) -> None:
|
||||||
|
|
@ -145,7 +150,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
try:
|
try:
|
||||||
s3_file_storage.s3.get(self.db_path, self.temp_graph_file, recursive=True)
|
s3_file_storage.s3.get(self.db_path, self.temp_graph_file, recursive=True)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass
|
logger.warning(f"Kuzu S3 storage file not found: {self.db_path}")
|
||||||
|
|
||||||
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]:
|
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]:
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue