From 4675a8f3231a797182c74272e149c7a2ab5e0265 Mon Sep 17 00:00:00 2001 From: Vasilije <8619304+Vasilije1990@users.noreply.github.com> Date: Thu, 8 Aug 2024 17:10:43 +0200 Subject: [PATCH] Refactor of the tasks --- cognee/api/v1/cognify/cognify_v2.py | 14 +++++++------- .../chunk_extract_summary/chunk_extract_summary.py | 2 +- .../chunk_naive_llm_classifier.py | 2 +- .../chunk_remove_disconnected.py | 2 +- .../chunk_to_graph_decomposition.py | 2 +- .../tasks/chunk_update_check/chunk_update_check.py | 2 +- .../tasks/chunks_into_graph/chunks_into_graph.py | 2 +- .../save_chunks_to_store/save_chunks_to_store.py | 2 +- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 616c710cb..7a187600e 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -92,25 +92,25 @@ async def cognify(datasets: Union[str, list[str]] = None, user: User = None): tasks = [ Task(document_to_ontology, root_node_id = root_node_id), Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type - Task(chunk_to_graph_decomposition_task, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data - Task(chunks_into_graph_task, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes - Task(chunk_update_check_task, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks + Task(chunk_to_graph_decomposition, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data + Task(chunks_into_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes + Task(chunk_update_check, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks Task( - save_chunks_to_store_task, + save_chunks_to_store, collection_name = "chunks", ), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other) run_tasks_parallel([ Task( - chunk_extract_summary_task, + chunk_extract_summary, summarization_model = cognee_config.summarization_model, collection_name = "chunk_summaries", ), # Summarize the document chunks Task( - chunk_naive_llm_classifier_task, + chunk_naive_llm_classifier, classification_model = cognee_config.classification_model, ), ]), - Task(chunk_remove_disconnected_task), # Remove the obsolete document chunks. + Task(chunk_remove_disconnected), # Remove the obsolete document chunks. ] pipeline = run_tasks(tasks, documents) diff --git a/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py b/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py index 8ebd5d5c3..01387b2c9 100644 --- a/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py +++ b/cognee/tasks/chunk_extract_summary/chunk_extract_summary.py @@ -8,7 +8,7 @@ from cognee.modules.data.extraction.extract_summary import extract_summary from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk -async def chunk_extract_summary_task(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel], collection_name: str = "summaries"): +async def chunk_extract_summary(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel], collection_name: str = "summaries"): if len(data_chunks) == 0: return data_chunks diff --git a/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py b/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py index 4e75f87e5..6db89c108 100644 --- a/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py +++ b/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py @@ -8,7 +8,7 @@ from cognee.modules.data.extraction.extract_categories import extract_categories from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk -async def chunk_naive_llm_classifier_task(data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]): +async def chunk_naive_llm_classifier(data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]): if len(data_chunks) == 0: return data_chunks diff --git a/cognee/tasks/chunk_remove_disconnected/chunk_remove_disconnected.py b/cognee/tasks/chunk_remove_disconnected/chunk_remove_disconnected.py index 0f0469873..3ab59a655 100644 --- a/cognee/tasks/chunk_remove_disconnected/chunk_remove_disconnected.py +++ b/cognee/tasks/chunk_remove_disconnected/chunk_remove_disconnected.py @@ -6,7 +6,7 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu # from cognee.infrastructure.databases.vector import get_vector_engine -async def chunk_remove_disconnected_task(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]: +async def chunk_remove_disconnected(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]: graph_engine = await get_graph_engine() document_ids = set((data_chunk.document_id for data_chunk in data_chunks)) diff --git a/cognee/tasks/chunk_to_graph_decomposition/chunk_to_graph_decomposition.py b/cognee/tasks/chunk_to_graph_decomposition/chunk_to_graph_decomposition.py index 6f9c936db..2e2c3b17f 100644 --- a/cognee/tasks/chunk_to_graph_decomposition/chunk_to_graph_decomposition.py +++ b/cognee/tasks/chunk_to_graph_decomposition/chunk_to_graph_decomposition.py @@ -7,7 +7,7 @@ from cognee.infrastructure.databases.graph import get_graph_engine from cognee.modules.data.extraction.knowledge_graph.add_model_class_to_graph import add_model_class_to_graph -async def chunk_to_graph_decomposition_task(data_chunks: list[DocumentChunk], topology_model: Type[BaseModel]): +async def chunk_to_graph_decomposition(data_chunks: list[DocumentChunk], topology_model: Type[BaseModel]): if topology_model == KnowledgeGraph: return data_chunks diff --git a/cognee/tasks/chunk_update_check/chunk_update_check.py b/cognee/tasks/chunk_update_check/chunk_update_check.py index cd532b687..2bd05241c 100644 --- a/cognee/tasks/chunk_update_check/chunk_update_check.py +++ b/cognee/tasks/chunk_update_check/chunk_update_check.py @@ -2,7 +2,7 @@ from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk -async def chunk_update_check_task(data_chunks: list[DocumentChunk], collection_name: str) -> list[DocumentChunk]: +async def chunk_update_check(data_chunks: list[DocumentChunk], collection_name: str) -> list[DocumentChunk]: vector_engine = get_vector_engine() if not await vector_engine.has_collection(collection_name): diff --git a/cognee/tasks/chunks_into_graph/chunks_into_graph.py b/cognee/tasks/chunks_into_graph/chunks_into_graph.py index 40747ffbe..c4cbfcdcc 100644 --- a/cognee/tasks/chunks_into_graph/chunks_into_graph.py +++ b/cognee/tasks/chunks_into_graph/chunks_into_graph.py @@ -18,7 +18,7 @@ class EntityNode(BaseModel): created_at: datetime updated_at: datetime -async def chunks_into_graph_task(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel], collection_name: str): +async def chunks_into_graph(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel], collection_name: str): chunk_graphs = await asyncio.gather( *[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks] ) diff --git a/cognee/tasks/save_chunks_to_store/save_chunks_to_store.py b/cognee/tasks/save_chunks_to_store/save_chunks_to_store.py index 6ae4fb890..710dce557 100644 --- a/cognee/tasks/save_chunks_to_store/save_chunks_to_store.py +++ b/cognee/tasks/save_chunks_to_store/save_chunks_to_store.py @@ -2,7 +2,7 @@ from cognee.infrastructure.databases.vector import DataPoint, get_vector_engine from cognee.infrastructure.databases.graph import get_graph_engine from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk -async def save_chunks_to_store_task(data_chunks: list[DocumentChunk], collection_name: str): +async def save_chunks_to_store(data_chunks: list[DocumentChunk], collection_name: str): if len(data_chunks) == 0: return data_chunks