diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 77245dea8..2c45774ee 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -2,20 +2,26 @@ import asyncio import logging from typing import Union -from cognee.shared.utils import send_telemetry +from pydantic import BaseModel + from cognee.modules.cognify.config import get_cognify_config -from cognee.shared.data_models import KnowledgeGraph -from cognee.modules.data.models import Dataset, Data -from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.methods import get_datasets, get_datasets_by_name -from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.data.methods.get_dataset_data import get_dataset_data +from cognee.modules.data.models import Data, Dataset from cognee.modules.pipelines import run_tasks -from cognee.modules.users.models import User -from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines.models import PipelineRunStatus -from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status -from cognee.modules.pipelines.operations.log_pipeline_status import log_pipeline_status -from cognee.tasks.documents import classify_documents, check_permissions_on_documents, extract_chunks_from_documents +from cognee.modules.pipelines.operations.get_pipeline_status import \ + get_pipeline_status +from cognee.modules.pipelines.operations.log_pipeline_status import \ + log_pipeline_status +from cognee.modules.pipelines.tasks.Task import Task +from cognee.modules.users.methods import get_default_user +from cognee.modules.users.models import User +from cognee.shared.data_models import KnowledgeGraph +from cognee.shared.utils import send_telemetry +from cognee.tasks.documents import (check_permissions_on_documents, + classify_documents, + extract_chunks_from_documents) from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points from cognee.tasks.storage.index_graph_edges import index_graph_edges @@ -25,7 +31,7 @@ logger = logging.getLogger("cognify.v2") update_status_lock = asyncio.Lock() -async def cognify(datasets: Union[str, list[str]] = None, user: User = None): +async def cognify(datasets: Union[str, list[str]] = None, user: User = None, graph_model: BaseModel = KnowledgeGraph): if user is None: user = await get_default_user() @@ -48,12 +54,12 @@ async def cognify(datasets: Union[str, list[str]] = None, user: User = None): dataset_name = generate_dataset_name(dataset.name) if dataset_name in existing_datasets_map: - awaitables.append(run_cognify_pipeline(dataset, user)) + awaitables.append(run_cognify_pipeline(dataset, user, graph_model)) return await asyncio.gather(*awaitables) -async def run_cognify_pipeline(dataset: Dataset, user: User): +async def run_cognify_pipeline(dataset: Dataset, user: User, graph_model: BaseModel = KnowledgeGraph): data_documents: list[Data] = await get_dataset_data(dataset_id = dataset.id) document_ids_str = [str(document.id) for document in data_documents] @@ -81,7 +87,7 @@ async def run_cognify_pipeline(dataset: Dataset, user: User): Task(classify_documents), Task(check_permissions_on_documents, user = user, permissions = ["write"]), Task(extract_chunks_from_documents), # Extract text chunks based on the document type. - Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks. + Task(extract_graph_from_data, graph_model = graph_model, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks. Task( summarize_text, summarization_model = cognee_config.summarization_model,