Refactor of the tasks
This commit is contained in:
parent
1c9bbd7a43
commit
4675a8f323
8 changed files with 14 additions and 14 deletions
|
|
@ -92,25 +92,25 @@ async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
|
||||||
tasks = [
|
tasks = [
|
||||||
Task(document_to_ontology, root_node_id = root_node_id),
|
Task(document_to_ontology, root_node_id = root_node_id),
|
||||||
Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
Task(source_documents_to_chunks, parent_node_id = root_node_id), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type
|
||||||
Task(chunk_to_graph_decomposition_task, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
|
Task(chunk_to_graph_decomposition, topology_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Set the graph topology for the document chunk data
|
||||||
Task(chunks_into_graph_task, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
|
Task(chunks_into_graph, graph_model = KnowledgeGraph, collection_name = "entities"), # Generate knowledge graphs from the document chunks and attach it to chunk nodes
|
||||||
Task(chunk_update_check_task, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
|
Task(chunk_update_check, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks
|
||||||
Task(
|
Task(
|
||||||
save_chunks_to_store_task,
|
save_chunks_to_store,
|
||||||
collection_name = "chunks",
|
collection_name = "chunks",
|
||||||
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
|
), # Save the document chunks in vector db and as nodes in graph db (connected to the document node and between each other)
|
||||||
run_tasks_parallel([
|
run_tasks_parallel([
|
||||||
Task(
|
Task(
|
||||||
chunk_extract_summary_task,
|
chunk_extract_summary,
|
||||||
summarization_model = cognee_config.summarization_model,
|
summarization_model = cognee_config.summarization_model,
|
||||||
collection_name = "chunk_summaries",
|
collection_name = "chunk_summaries",
|
||||||
), # Summarize the document chunks
|
), # Summarize the document chunks
|
||||||
Task(
|
Task(
|
||||||
chunk_naive_llm_classifier_task,
|
chunk_naive_llm_classifier,
|
||||||
classification_model = cognee_config.classification_model,
|
classification_model = cognee_config.classification_model,
|
||||||
),
|
),
|
||||||
]),
|
]),
|
||||||
Task(chunk_remove_disconnected_task), # Remove the obsolete document chunks.
|
Task(chunk_remove_disconnected), # Remove the obsolete document chunks.
|
||||||
]
|
]
|
||||||
|
|
||||||
pipeline = run_tasks(tasks, documents)
|
pipeline = run_tasks(tasks, documents)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ from cognee.modules.data.extraction.extract_summary import extract_summary
|
||||||
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
||||||
|
|
||||||
|
|
||||||
async def chunk_extract_summary_task(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel], collection_name: str = "summaries"):
|
async def chunk_extract_summary(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel], collection_name: str = "summaries"):
|
||||||
if len(data_chunks) == 0:
|
if len(data_chunks) == 0:
|
||||||
return data_chunks
|
return data_chunks
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ from cognee.modules.data.extraction.extract_categories import extract_categories
|
||||||
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
||||||
|
|
||||||
|
|
||||||
async def chunk_naive_llm_classifier_task(data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]):
|
async def chunk_naive_llm_classifier(data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]):
|
||||||
if len(data_chunks) == 0:
|
if len(data_chunks) == 0:
|
||||||
return data_chunks
|
return data_chunks
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChu
|
||||||
# from cognee.infrastructure.databases.vector import get_vector_engine
|
# from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
|
|
||||||
|
|
||||||
async def chunk_remove_disconnected_task(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]:
|
async def chunk_remove_disconnected(data_chunks: list[DocumentChunk]) -> list[DocumentChunk]:
|
||||||
graph_engine = await get_graph_engine()
|
graph_engine = await get_graph_engine()
|
||||||
|
|
||||||
document_ids = set((data_chunk.document_id for data_chunk in data_chunks))
|
document_ids = set((data_chunk.document_id for data_chunk in data_chunks))
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.modules.data.extraction.knowledge_graph.add_model_class_to_graph import add_model_class_to_graph
|
from cognee.modules.data.extraction.knowledge_graph.add_model_class_to_graph import add_model_class_to_graph
|
||||||
|
|
||||||
|
|
||||||
async def chunk_to_graph_decomposition_task(data_chunks: list[DocumentChunk], topology_model: Type[BaseModel]):
|
async def chunk_to_graph_decomposition(data_chunks: list[DocumentChunk], topology_model: Type[BaseModel]):
|
||||||
if topology_model == KnowledgeGraph:
|
if topology_model == KnowledgeGraph:
|
||||||
return data_chunks
|
return data_chunks
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
||||||
|
|
||||||
|
|
||||||
async def chunk_update_check_task(data_chunks: list[DocumentChunk], collection_name: str) -> list[DocumentChunk]:
|
async def chunk_update_check(data_chunks: list[DocumentChunk], collection_name: str) -> list[DocumentChunk]:
|
||||||
vector_engine = get_vector_engine()
|
vector_engine = get_vector_engine()
|
||||||
|
|
||||||
if not await vector_engine.has_collection(collection_name):
|
if not await vector_engine.has_collection(collection_name):
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ class EntityNode(BaseModel):
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
updated_at: datetime
|
updated_at: datetime
|
||||||
|
|
||||||
async def chunks_into_graph_task(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel], collection_name: str):
|
async def chunks_into_graph(data_chunks: list[DocumentChunk], graph_model: Type[BaseModel], collection_name: str):
|
||||||
chunk_graphs = await asyncio.gather(
|
chunk_graphs = await asyncio.gather(
|
||||||
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
|
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ from cognee.infrastructure.databases.vector import DataPoint, get_vector_engine
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
|
||||||
|
|
||||||
async def save_chunks_to_store_task(data_chunks: list[DocumentChunk], collection_name: str):
|
async def save_chunks_to_store(data_chunks: list[DocumentChunk], collection_name: str):
|
||||||
if len(data_chunks) == 0:
|
if len(data_chunks) == 0:
|
||||||
return data_chunks
|
return data_chunks
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue