diff --git a/cognee/modules/pipelines/layers/environment_setup_and_checks.py b/cognee/modules/pipelines/layers/environment_setup_and_checks.py new file mode 100644 index 000000000..bca8bc4c9 --- /dev/null +++ b/cognee/modules/pipelines/layers/environment_setup_and_checks.py @@ -0,0 +1,41 @@ +import asyncio +from cognee.context_global_variables import ( + graph_db_config as context_graph_db_config, + vector_db_config as context_vector_db_config, +) + +from cognee.infrastructure.databases.relational import ( + create_db_and_tables as create_relational_db_and_tables, +) +from cognee.infrastructure.databases.vector.pgvector import ( + create_db_and_tables as create_pgvector_db_and_tables, +) + +_first_run_done = False +_first_run_lock = asyncio.Lock() + + +async def environment_setup_and_checks( + vector_db_config: dict = None, + graph_db_config: dict = None, +): + if vector_db_config: + context_vector_db_config.set(vector_db_config) + if graph_db_config: + context_graph_db_config.set(graph_db_config) + + # Create tables for databases + await create_relational_db_and_tables() + await create_pgvector_db_and_tables() + + global _first_run_done + async with _first_run_lock: + if not _first_run_done: + from cognee.infrastructure.llm.utils import ( + test_llm_connection, + test_embedding_connection, + ) + + await test_llm_connection() + await test_embedding_connection() + _first_run_done = True diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 7a520f4a4..dcf1fc531 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -2,6 +2,9 @@ import asyncio from uuid import UUID from typing import Union +from cognee.modules.pipelines.layers.environment_setup_and_checks import ( + environment_setup_and_checks, +) from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset @@ -15,17 +18,6 @@ from cognee.context_global_variables import set_database_global_context_variable from cognee.modules.pipelines.layers.authorized_user_datasets import authorized_user_datasets from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check -from cognee.infrastructure.databases.relational import ( - create_db_and_tables as create_relational_db_and_tables, -) -from cognee.infrastructure.databases.vector.pgvector import ( - create_db_and_tables as create_pgvector_db_and_tables, -) -from cognee.context_global_variables import ( - graph_db_config as context_graph_db_config, - vector_db_config as context_vector_db_config, -) - logger = get_logger("cognee.pipeline") update_status_lock = asyncio.Lock() @@ -41,31 +33,7 @@ async def cognee_pipeline( graph_db_config: dict = None, incremental_loading: bool = False, ): - # Note: These context variables allow different value assignment for databases in Cognee - # per async task, thread, process and etc. - if vector_db_config: - context_vector_db_config.set(vector_db_config) - if graph_db_config: - context_graph_db_config.set(graph_db_config) - - # Create tables for databases - await create_relational_db_and_tables() - await create_pgvector_db_and_tables() - - # Initialize first_run attribute if it doesn't exist - if not hasattr(cognee_pipeline, "first_run"): - cognee_pipeline.first_run = True - - if cognee_pipeline.first_run: - from cognee.infrastructure.llm.utils import ( - test_llm_connection, - test_embedding_connection, - ) - - # Test LLM and Embedding configuration once before running Cognee - await test_llm_connection() - await test_embedding_connection() - cognee_pipeline.first_run = False # Update flag after first run + await environment_setup_and_checks(vector_db_config, graph_db_config) user, authorized_datasets = await authorized_user_datasets(datasets, user)