fix: ensure connection check before executing checkpoint in KuzuAdapter (#1268)
<!-- .github/pull_request_template.md --> ## Description <!-- Provide a clear description of the changes in this PR --> kuzu + s3 is failing on dev. What's happening is: 1. During cognee.add, run_pipeline uses [`run_tasks`](https://github.com/topoteretes/cognee/blob/dev/cognee/modules/pipelines/operations/pipeline.py#L193-L195) 2. run_tasks calls [`push_to_s3`](https://github.com/topoteretes/cognee/blob/dev/cognee/modules/pipelines/operations/run_tasks.py#L336-L338), which attempts to [checkpoint](https://github.com/topoteretes/cognee/blob/dev/cognee/infrastructure/databases/graph/kuzu/adapter.py#L142) before the push (Kuzu connection is initialized in `cognify`) Trace: ``` AttributeError: 'NoneType' object has no attribute 'execute' Traceback (most recent call last): File "/Users/daulet/PycharmProjects/cognee-source/examples/python/dynamic_steps_example.py", line 219, in <module> loop.run_until_complete(main(steps_to_enable)) File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete return future.result() ^^^^^^^^^^^^^^^ File "/Users/daulet/PycharmProjects/cognee-source/examples/python/dynamic_steps_example.py", line 186, in main await cognee.add(text) File "/Users/daulet/PycharmProjects/cognee-source/cognee/api/v1/add/add.py", line 145, in add async for run_info in cognee_pipeline( File "/Users/daulet/PycharmProjects/cognee-source/cognee/modules/pipelines/operations/pipeline.py", line 106, in cognee_pipeline async for run_info in run_pipeline( File "/Users/daulet/PycharmProjects/cognee-source/cognee/modules/pipelines/operations/pipeline.py", line 197, in run_pipeline async for pipeline_run_info in pipeline_run: File "/Users/daulet/PycharmProjects/cognee-source/cognee/modules/pipelines/operations/run_tasks.py", line 53, in wrapper async for run_info in original_gen(*args, **kwargs): File "/Users/daulet/PycharmProjects/cognee-source/cognee/modules/pipelines/operations/run_tasks.py", line 360, in run_tasks raise error File "/Users/daulet/PycharmProjects/cognee-source/cognee/modules/pipelines/operations/run_tasks.py", line 337, in run_tasks await graph_engine.push_to_s3() File "/Users/daulet/PycharmProjects/cognee-source/cognee/infrastructure/databases/graph/kuzu/adapter.py", line 142, in push_to_s3 self.connection.execute("CHECKPOINT;") ``` # After fix Running `dynamic_steps_example.py`: ### Main branch ``` Data pruned. System pruned. User 26ab9bde-39a1-47b7-93d7-368ccccdc503 has registered. Added text: CV 1: Relevant Name: Dr. Emily Car... Added text: CV 2: Relevant Name: Michael Rodri... Added text: CV 3: Relevant Name: Sarah Nguyen ... Added text: CV 4: Not Relevant Name: David Tho... Added text: CV 5: Not Relevant Name: Jessica M... Knowledge graph created. ['David Thompson has experience in design tools, specifically in graphic design with over 8 years of experience and proficiency in Adobe Creative Suite.'] ``` ### This branch ``` Data pruned. System pruned. User 65cbe3a4-d94a-4043-921b-78bc90c3f0a6 has registered. Added text: CV 1: Relevant Name: Dr. Emily Car... Added text: CV 2: Relevant Name: Michael Rodri... Added text: CV 3: Relevant Name: Sarah Nguyen ... Added text: CV 4: Not Relevant Name: David Tho... Added text: CV 5: Not Relevant Name: Jessica M... Knowledge graph created. ['David Thompson — Creative Graphic Designer; proficient in Adobe Photoshop, Illustrator and InDesign (Adobe Creative Suite) and basic web design (HTML/CSS).'] ``` ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
commit
67c7919be3
1 changed files with 3 additions and 2 deletions
|
|
@ -138,8 +138,9 @@ class KuzuAdapter(GraphDBInterface):
|
|||
|
||||
s3_file_storage = S3FileStorage("")
|
||||
|
||||
async with self.KUZU_ASYNC_LOCK:
|
||||
self.connection.execute("CHECKPOINT;")
|
||||
if self.connection:
|
||||
async with self.KUZU_ASYNC_LOCK:
|
||||
self.connection.execute("CHECKPOINT;")
|
||||
|
||||
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue