Merge branch 'fix-windows-path' of github.com:topoteretes/cognee into fix-windows-path

This commit is contained in:
Igor Ilic 2025-08-27 12:06:19 +02:00
commit 644116a2ce
8 changed files with 22 additions and 22 deletions

View file

@ -2,7 +2,7 @@ from uuid import UUID
from typing import Union, BinaryIO, List, Optional
from cognee.modules.users.models import User
from cognee.modules.pipelines import Task, cognee_pipeline
from cognee.modules.pipelines import Task, run_pipeline
from cognee.modules.pipelines.layers.resolve_authorized_user_dataset import (
resolve_authorized_user_dataset,
)
@ -154,7 +154,7 @@ async def add(
pipeline_run_info = None
async for run_info in cognee_pipeline(
async for run_info in run_pipeline(
tasks=tasks,
datasets=[authorized_dataset.id],
data=data,

View file

@ -7,7 +7,7 @@ from cognee.shared.logging_utils import get_logger
from cognee.shared.data_models import KnowledgeGraph
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.pipelines import cognee_pipeline
from cognee.modules.pipelines import run_pipeline
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
@ -179,12 +179,12 @@ async def cognify(
"""
tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path)
# By calling get pipeline executor we get a function that will have the cognee_pipeline run in the background or a function that we will need to wait for
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
# Run the cognee_pipeline in the background or blocking based on executor
# Run the run_pipeline in the background or blocking based on executor
return await pipeline_executor_func(
pipeline=cognee_pipeline,
pipeline=run_pipeline,
tasks=tasks,
user=user,
datasets=datasets,

View file

@ -5,7 +5,7 @@ from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable
from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.pipelines import cognee_pipeline
from cognee.modules.pipelines import run_pipeline
logger = get_logger(level=ERROR)
@ -61,7 +61,7 @@ class CorpusBuilderExecutor:
await cognee.add(self.raw_corpus)
tasks = await self.task_getter(chunk_size=chunk_size, chunker=chunker)
pipeline_run = cognee_pipeline(tasks=tasks)
pipeline_run = run_pipeline(tasks=tasks)
async for run_info in pipeline_run:
print(run_info)

View file

@ -1,4 +1,4 @@
from .tasks.task import Task
from .operations.run_tasks import run_tasks
from .operations.run_parallel import run_tasks_parallel
from .operations.pipeline import cognee_pipeline
from .operations.pipeline import run_pipeline

View file

@ -14,7 +14,7 @@ from cognee.modules.pipelines.models.PipelineRunInfo import (
logger = get_logger(__name__)
async def process_pipeline_check(
async def check_pipeline_run_qualification(
dataset: Dataset, data: list[Data], pipeline_name: str
) -> Optional[Union[PipelineRunStarted, PipelineRunCompleted]]:
"""

View file

@ -15,7 +15,7 @@ _first_run_done = False
_first_run_lock = asyncio.Lock()
async def environment_setup_and_checks(
async def setup_and_check_environment(
vector_db_config: dict = None,
graph_db_config: dict = None,
):

View file

@ -2,4 +2,3 @@ from .log_pipeline_run_initiated import log_pipeline_run_initiated
from .log_pipeline_run_start import log_pipeline_run_start
from .log_pipeline_run_complete import log_pipeline_run_complete
from .log_pipeline_run_error import log_pipeline_run_error
from .pipeline import cognee_pipeline

View file

@ -2,8 +2,8 @@ import asyncio
from uuid import UUID
from typing import Union
from cognee.modules.pipelines.layers.environment_setup_and_checks import (
environment_setup_and_checks,
from cognee.modules.pipelines.layers.setup_and_check_environment import (
setup_and_check_environment,
)
from cognee.shared.logging_utils import get_logger
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
@ -16,14 +16,16 @@ from cognee.context_global_variables import set_database_global_context_variable
from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
resolve_authorized_user_datasets,
)
from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check
from cognee.modules.pipelines.layers.check_pipeline_run_qualification import (
check_pipeline_run_qualification,
)
logger = get_logger("cognee.pipeline")
update_status_lock = asyncio.Lock()
async def cognee_pipeline(
async def run_pipeline(
tasks: list[Task],
data=None,
datasets: Union[str, list[str], list[UUID]] = None,
@ -33,12 +35,13 @@ async def cognee_pipeline(
graph_db_config: dict = None,
incremental_loading: bool = False,
):
await environment_setup_and_checks(vector_db_config, graph_db_config)
validate_pipeline_tasks(tasks)
await setup_and_check_environment(vector_db_config, graph_db_config)
user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user)
for dataset in authorized_datasets:
async for run_info in run_pipeline(
async for run_info in run_pipeline_per_dataset(
dataset=dataset,
user=user,
tasks=tasks,
@ -50,7 +53,7 @@ async def cognee_pipeline(
yield run_info
async def run_pipeline(
async def run_pipeline_per_dataset(
dataset: Dataset,
user: User,
tasks: list[Task],
@ -59,15 +62,13 @@ async def run_pipeline(
context: dict = None,
incremental_loading=False,
):
validate_pipeline_tasks(tasks)
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
await set_database_global_context_variables(dataset.id, dataset.owner_id)
if not data:
data: list[Data] = await get_dataset_data(dataset_id=dataset.id)
process_pipeline_status = await process_pipeline_check(dataset, data, pipeline_name)
process_pipeline_status = await check_pipeline_run_qualification(dataset, data, pipeline_name)
if process_pipeline_status:
# If pipeline was already processed or is currently being processed
# return status information to async generator and finish execution