cognee/cognee/api/v1/cognify/cognify_v2.py
hajdul88 6fcfb3c398
feat: productionizing ontology solution [COG-1401] (#623)
<!-- .github/pull_request_template.md -->

## Description
This PR contains the ontology feature integrated into cognify

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced ontology management with the introduction of the
`OntologyResolver` class for improved data handling and querying.
- Expanded ontology framework now provides enriched coverage of
technology and automotive domains, including new entities and
relationships.
- Updated entity models now include a validation flag to support
improved data integrity.
- Added support for specifying an ontology file path in relevant
functions to enhance flexibility.

- **Refactor**
- Streamlined integration of ontology processing across data extraction
and workflow routines.

- **Chores**
- Updated project dependencies to include `owlready2` for advanced
ontology functionality.
  
- **Tests**
- Introduced a new test suite for the `OntologyResolver` class to
validate its functionality under various conditions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-03-12 14:31:19 +01:00

153 lines
5.4 KiB
Python

import asyncio
import logging
from typing import Union, Optional
from pydantic import BaseModel
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.data_models import KnowledgeGraph
from cognee.shared.utils import send_telemetry
from cognee.tasks.documents import (
check_permissions_on_documents,
classify_documents,
extract_chunks_from_documents,
)
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.modules.chunking.TextChunker import TextChunker
logger = logging.getLogger("cognify.v2")
update_status_lock = asyncio.Lock()
async def cognify(
datasets: Union[str, list[str]] = None,
user: User = None,
graph_model: BaseModel = KnowledgeGraph,
tasks: list[Task] = None,
ontology_file_path: Optional[str] = None,
):
if user is None:
user = await get_default_user()
existing_datasets = await get_datasets(user.id)
if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets
if isinstance(datasets[0], str):
datasets = await get_datasets_by_name(datasets, user.id)
existing_datasets_map = {
generate_dataset_name(dataset.name): True for dataset in existing_datasets
}
awaitables = []
if tasks is None:
tasks = await get_default_tasks(user, graph_model, ontology_file_path=ontology_file_path)
for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)
if dataset_name in existing_datasets_map:
awaitables.append(run_cognify_pipeline(dataset, user, tasks))
return await asyncio.gather(*awaitables)
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)
dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)
send_telemetry("cognee.cognify EXECUTION STARTED", user.id)
# async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests
task_status = await get_pipeline_status([dataset_id])
if (
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_name)
return
try:
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
for task in tasks:
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline")
pipeline_run_status = None
async for run_status in pipeline_run:
pipeline_run_status = run_status
send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)
return pipeline_run_status
except Exception as error:
send_telemetry("cognee.cognify EXECUTION ERRORED", user.id)
raise error
def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")
async def get_default_tasks( # TODO: Find out a better way to do this (Boris's comment)
user: User = None,
graph_model: BaseModel = KnowledgeGraph,
chunker=TextChunker,
ontology_file_path: Optional[str] = None,
) -> list[Task]:
if user is None:
user = await get_default_user()
try:
cognee_config = get_cognify_config()
ontology_adapter = OntologyResolver(ontology_file=ontology_file_path)
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(
extract_chunks_from_documents,
max_chunk_size=get_max_chunk_tokens(),
chunker=chunker,
), # Extract text chunks based on the document type.
Task(
extract_graph_from_data,
graph_model=graph_model,
ontology_adapter=ontology_adapter,
task_config={"batch_size": 10},
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, task_config={"batch_size": 10}),
]
except Exception as error:
send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)
raise error
return default_tasks