diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 21d750875..aaf2939ba 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -22,6 +22,7 @@ from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points from cognee.tasks.summarization import summarize_text from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor +from cognee.tasks.temporal_graph import extract_events_and_entities logger = get_logger("cognify") @@ -39,6 +40,7 @@ async def cognify( graph_db_config: dict = None, run_in_background: bool = False, incremental_loading: bool = True, + temporal_cognify: bool = False, ): """ Transform ingested data into a structured knowledge graph. @@ -177,7 +179,10 @@ async def cognify( - LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False) - LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60) """ - tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path) + if temporal_cognify: + tasks = await get_temporal_tasks(user, graph_model, chunker, chunk_size, ontology_file_path) + else: + tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) @@ -224,3 +229,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's ] return default_tasks + +async def get_temporal_tasks( + user: User = None, chunker=TextChunker, chunk_size: int = None +) -> list[Task]: + temporal_tasks = [ + Task(classify_documents), + Task(check_permissions_on_dataset, user=user, permissions=["write"]), + Task( + extract_chunks_from_documents, + max_chunk_size=chunk_size or get_max_chunk_tokens(), + chunker=chunker, + ), + Task(extract_events_and_entities, task_config={"chunk_size": 10}), + Task(add_data_points, task_config={"batch_size": 10}), + ] + + return temporal_tasks