feat: Add initial cognee pipeline simplification [COG-1705] (#670)

<!-- .github/pull_request_template.md -->

## Description
Simplify Cognee pipeline usage for users

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin
This commit is contained in:
Igor Ilic 2025-04-17 14:02:12 +02:00 committed by GitHub
parent da332e85fe
commit af276b8999
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 179 additions and 127 deletions

View file

@ -1,15 +1,8 @@
from typing import Union, BinaryIO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines import run_tasks, Task
from cognee.modules.pipelines import Task
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
)
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
from uuid import uuid5, NAMESPACE_OID
from cognee.modules.pipelines import cognee_pipeline
async def add(
@ -17,31 +10,8 @@ async def add(
dataset_name: str = "main_dataset",
user: User = None,
):
# Create tables for databases
await create_relational_db_and_tables()
await create_pgvector_db_and_tables()
# Initialize first_run attribute if it doesn't exist
if not hasattr(add, "first_run"):
add.first_run = True
if add.first_run:
from cognee.infrastructure.llm.utils import test_llm_connection, test_embedding_connection
# Test LLM and Embedding configuration once before running Cognee
await test_llm_connection()
await test_embedding_connection()
add.first_run = False # Update flag after first run
if user is None:
user = await get_default_user()
tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)]
dataset_id = uuid5(NAMESPACE_OID, dataset_name)
pipeline = run_tasks(
tasks=tasks, dataset_id=dataset_id, data=data, pipeline_name="add_pipeline"
await cognee_pipeline(
tasks=tasks, datasets=dataset_name, data=data, user=user, pipeline_name="add_pipeline"
)
async for pipeline_status in pipeline:
print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}")

View file

@ -70,11 +70,13 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
dataset_id = uuid5(NAMESPACE_OID, "codebase")
if include_docs:
non_code_pipeline_run = run_tasks(non_code_tasks, dataset_id, repo_path, "cognify_pipeline")
non_code_pipeline_run = run_tasks(
non_code_tasks, dataset_id, repo_path, user, "cognify_pipeline"
)
async for run_status in non_code_pipeline_run:
yield run_status
async for run_status in run_tasks(tasks, dataset_id, repo_path, "cognify_code_pipeline"):
async for run_status in run_tasks(tasks, dataset_id, repo_path, user, "cognify_code_pipeline"):
yield run_status

View file

@ -1,20 +1,11 @@
import asyncio
from cognee.shared.logging_utils import get_logger
from typing import Union, Optional
from pydantic import BaseModel
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.documents import (
@ -26,6 +17,7 @@ from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.pipelines import cognee_pipeline
logger = get_logger("cognify")
@ -36,73 +28,13 @@ async def cognify(
datasets: Union[str, list[str]] = None,
user: User = None,
graph_model: BaseModel = KnowledgeGraph,
tasks: list[Task] = None,
chunker=TextChunker,
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
):
if user is None:
user = await get_default_user()
tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path)
existing_datasets = await get_datasets(user.id)
if datasets is None or len(datasets) == 0:
# If no datasets are provided, cognify all existing datasets.
datasets = existing_datasets
if isinstance(datasets[0], str):
datasets = await get_datasets_by_name(datasets, user.id)
existing_datasets_map = {
generate_dataset_name(dataset.name): True for dataset in existing_datasets
}
awaitables = []
if tasks is None:
tasks = await get_default_tasks(user, graph_model, ontology_file_path=ontology_file_path)
for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)
if dataset_name in existing_datasets_map:
awaitables.append(run_cognify_pipeline(dataset, user, tasks))
return await asyncio.gather(*awaitables)
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)
dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)
# async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests
task_status = await get_pipeline_status([dataset_id])
if (
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_name)
return
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
for task in tasks:
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline")
pipeline_run_status = None
async for run_status in pipeline_run:
pipeline_run_status = run_status
return pipeline_run_status
def generate_dataset_name(dataset_name: str) -> str:
return dataset_name.replace(".", "_").replace(" ", "_")
return await cognee_pipeline(tasks=tasks, datasets=datasets, user=user)
async def get_default_tasks( # TODO: Find out a better way to do this (Boris's comment)
@ -112,13 +44,6 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
) -> list[Task]:
if user is None:
user = await get_default_user()
cognee_config = get_cognify_config()
ontology_adapter = OntologyResolver(ontology_file=ontology_file_path)
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
@ -130,12 +55,11 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
Task(
extract_graph_from_data,
graph_model=graph_model,
ontology_adapter=ontology_adapter,
ontology_adapter=OntologyResolver(ontology_file=ontology_file_path),
task_config={"batch_size": 10},
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, task_config={"batch_size": 10}),

View file

@ -5,6 +5,7 @@ from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable
from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.pipelines import cognee_pipeline
logger = get_logger(level=ERROR)
@ -60,4 +61,4 @@ class CorpusBuilderExecutor:
await cognee.add(self.raw_corpus)
tasks = await self.task_getter(chunk_size=chunk_size, chunker=chunker)
await cognee.cognify(tasks=tasks)
await cognee_pipeline(tasks=tasks)

View file

@ -1,3 +1,4 @@
from .tasks.task import Task
from .operations.run_tasks import run_tasks
from .operations.run_parallel import run_tasks_parallel
from .operations.pipeline import cognee_pipeline

View file

@ -1,3 +1,4 @@
from .log_pipeline_run_start import log_pipeline_run_start
from .log_pipeline_run_complete import log_pipeline_run_complete
from .log_pipeline_run_error import log_pipeline_run_error
from .pipeline import cognee_pipeline

View file

@ -0,0 +1,134 @@
import asyncio
from cognee.shared.logging_utils import get_logger
from typing import Union
from uuid import uuid5, NAMESPACE_OID
from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
)
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
logger = get_logger("cognee.pipeline")
update_status_lock = asyncio.Lock()
async def cognee_pipeline(
tasks: list[Task],
data=None,
datasets: Union[str, list[str]] = None,
user: User = None,
pipeline_name: str = "custom_pipeline",
):
# Create tables for databases
await create_relational_db_and_tables()
await create_pgvector_db_and_tables()
# Initialize first_run attribute if it doesn't exist
if not hasattr(cognee_pipeline, "first_run"):
cognee_pipeline.first_run = True
if cognee_pipeline.first_run:
from cognee.infrastructure.llm.utils import test_llm_connection, test_embedding_connection
# Test LLM and Embedding configuration once before running Cognee
await test_llm_connection()
await test_embedding_connection()
cognee_pipeline.first_run = False # Update flag after first run
# If no user is provided use default user
if user is None:
user = await get_default_user()
# Convert datasets to list in case it's a string
if isinstance(datasets, str):
datasets = [datasets]
# If no datasets are provided, work with all existing datasets.
existing_datasets = await get_datasets(user.id)
if datasets is None or len(datasets) == 0:
datasets = existing_datasets
if isinstance(datasets[0], str):
datasets = await get_datasets_by_name(datasets, user.id)
else:
# Try to get datasets objects from database, if they don't exist use dataset name
datasets_names = await get_datasets_by_name(datasets, user.id)
if datasets_names:
datasets = datasets_names
awaitables = []
for dataset in datasets:
awaitables.append(
run_pipeline(
dataset=dataset, user=user, tasks=tasks, data=data, pipeline_name=pipeline_name
)
)
return await asyncio.gather(*awaitables)
async def run_pipeline(
dataset: Dataset,
user: User,
tasks: list[Task],
data=None,
pipeline_name: str = "custom_pipeline",
):
if isinstance(dataset, Dataset):
check_dataset_name(dataset.name)
dataset_id = dataset.id
elif isinstance(dataset, str):
check_dataset_name(dataset)
# Generate id based on unique dataset_id formula
dataset_id = uuid5(NAMESPACE_OID, f"{dataset}{str(user.id)}")
if not data:
data: list[Data] = await get_dataset_data(dataset_id=dataset_id)
# async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests
if isinstance(dataset, Dataset):
task_status = await get_pipeline_status([dataset_id])
else:
task_status = [
PipelineRunStatus.DATASET_PROCESSING_COMPLETED
] # TODO: this is a random assignment, find permanent solution
if (
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_id)
return
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
for task in tasks:
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name)
pipeline_run_status = None
async for run_status in pipeline_run:
pipeline_run_status = run_status
return pipeline_run_status
def check_dataset_name(dataset_name: str) -> str:
if "." in dataset_name or " " in dataset_name:
raise ValueError("Dataset name cannot contain spaces or underscores")

View file

@ -20,13 +20,11 @@ from ..tasks.task import Task
logger = get_logger("run_tasks(tasks: [Task], data)")
async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str):
async def run_tasks_with_telemetry(tasks: list[Task], data, user: User, pipeline_name: str):
config = get_current_settings()
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent=1))
user = await get_default_user()
try:
logger.info("Pipeline run started: `%s`", pipeline_name)
send_telemetry(
@ -72,6 +70,7 @@ async def run_tasks(
tasks: list[Task],
dataset_id: UUID = uuid4(),
data: Any = None,
user: User = None,
pipeline_name: str = "unknown_pipeline",
):
pipeline_id = uuid5(NAMESPACE_OID, pipeline_name)
@ -82,7 +81,9 @@ async def run_tasks(
pipeline_run_id = pipeline_run.pipeline_run_id
try:
async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
async for _ in run_tasks_with_telemetry(
tasks=tasks, data=data, user=user, pipeline_name=pipeline_id
):
pass
yield await log_pipeline_run_complete(

View file

@ -1,4 +1,5 @@
from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import joinedload
@ -13,6 +14,9 @@ logger = get_logger()
async def check_permission_on_documents(user: User, permission_type: str, document_ids: list[UUID]):
if user is None:
user = await get_default_user()
# TODO: Enable user role permissions again. Temporarily disabled during rework.
# user_roles_ids = [role.id for role in user.roles]
user_roles_ids = []

View file

@ -5,6 +5,7 @@ import s3fs
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset, get_dataset_data, get_datasets_by_name
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.models.DatasetData import DatasetData
from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document
@ -20,6 +21,9 @@ from cognee.api.v1.add.config import get_s3_config
async def ingest_data(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination()
if not user:
user = await get_default_user()
pipeline = dlt.pipeline(
pipeline_name="metadata_extraction_pipeline",
destination=destination,
@ -169,7 +173,10 @@ async def ingest_data(data: Any, dataset_name: str, user: User):
)
datasets = await get_datasets_by_name(dataset_name, user.id)
dataset = datasets[0]
data_documents = await get_dataset_data(dataset_id=dataset.id)
return data_documents
# In case no files were processed no dataset will be created
if datasets:
dataset = datasets[0]
data_documents = await get_dataset_data(dataset_id=dataset.id)
return data_documents
return []

View file

@ -4,13 +4,20 @@ from uuid import uuid5
from pydantic import BaseModel
from cognee.modules.data.extraction.extract_summary import extract_summary
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.cognify.config import get_cognify_config
from .models import TextSummary
async def summarize_text(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel]):
async def summarize_text(
data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel] = None
):
if len(data_chunks) == 0:
return data_chunks
if summarization_model is None:
cognee_config = get_cognify_config()
summarization_model = cognee_config.summarization_model
chunk_summaries = await asyncio.gather(
*[extract_summary(chunk.text, summarization_model) for chunk in data_chunks]
)