From dda34607151e7d61fa1119e2773a529649880752 Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Mon, 19 May 2025 10:48:38 +0200 Subject: [PATCH] fix: decrease batch size for chunks --- distributed/Dockerfile | 2 +- distributed/entrypoint.py | 3 ++- distributed/tasks/save_data_points.py | 4 +++- distributed/workers/data_point_saver_worker.py | 2 +- distributed/workers/graph_extraction_worker.py | 2 +- 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/distributed/Dockerfile b/distributed/Dockerfile index 824820a02..af85f583d 100644 --- a/distributed/Dockerfile +++ b/distributed/Dockerfile @@ -24,7 +24,7 @@ RUN pip install poetry RUN poetry config virtualenvs.create false -RUN poetry install --extras neo4j --extras qdrant --no-root +RUN poetry install --extras neo4j --extras postgres --no-root COPY cognee/ /app/cognee COPY distributed/ /app/distributed diff --git a/distributed/entrypoint.py b/distributed/entrypoint.py index 72b98288f..7124923f6 100644 --- a/distributed/entrypoint.py +++ b/distributed/entrypoint.py @@ -133,7 +133,7 @@ async def main(): Task( process_chunks_remotely, document=item, - task_config={"batch_size": 50}, + task_config={"batch_size": 10}, ), ], data=[item], @@ -155,6 +155,7 @@ async def main(): print(f"Number of documents processed: {len(results)}") results.extend(batch_results) + # Push empty tuple into the queue to signal the end of data. save_data_points_queue.put(()) for consumer_future in consumer_futures: diff --git a/distributed/tasks/save_data_points.py b/distributed/tasks/save_data_points.py index 82d3a5b8a..51b2ee12d 100644 --- a/distributed/tasks/save_data_points.py +++ b/distributed/tasks/save_data_points.py @@ -35,7 +35,9 @@ async def save_data_points(data_points_and_relationships: tuple[list, list]): for nodes, edges in data_points_and_relationships: for node in nodes: if asizeof.asizeof(node) >= 500000: - print(f"Node too large:\n{node.id}\n") + try_pushing_nodes_to_queue([node]) + continue + # print(f"Node too large:\n{node.id}\n") node_batch.append(node) diff --git a/distributed/workers/data_point_saver_worker.py b/distributed/workers/data_point_saver_worker.py index ab50941bd..85b06d4b0 100644 --- a/distributed/workers/data_point_saver_worker.py +++ b/distributed/workers/data_point_saver_worker.py @@ -7,7 +7,7 @@ from distributed.queues import save_data_points_queue from cognee.infrastructure.databases.graph import get_graph_engine -@app.function(image=image, timeout=7200, max_containers=100) +@app.function(image=image, timeout=86400, max_containers=100) async def data_point_saver_worker(): print("Started processing of nodes and edges; starting graph engine queue.") graph_engine = await get_graph_engine() diff --git a/distributed/workers/graph_extraction_worker.py b/distributed/workers/graph_extraction_worker.py index 28dade5ff..698521405 100644 --- a/distributed/workers/graph_extraction_worker.py +++ b/distributed/workers/graph_extraction_worker.py @@ -13,7 +13,7 @@ from distributed.tasks.extract_graph_from_data import extract_graph_from_data from distributed.tasks.save_data_points import save_data_points -@app.function(image=image, timeout=7200, max_containers=100) +@app.function(image=image, timeout=86400, max_containers=100) async def graph_extraction_worker(user, document_name: str, document_chunks: list): cognee_config = get_cognify_config()