cognee/cognee/modules/pipelines/layers/setup_and_check_environment.py
Boris 6e5acec292
refactor: make run_pipeline a high-level api for running pipelines (#1294)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-08-27 09:49:20 +02:00

41 lines
1.2 KiB
Python

import asyncio
from cognee.context_global_variables import (
graph_db_config as context_graph_db_config,
vector_db_config as context_vector_db_config,
)
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
)
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
_first_run_done = False
_first_run_lock = asyncio.Lock()
async def setup_and_check_environment(
vector_db_config: dict = None,
graph_db_config: dict = None,
):
if vector_db_config:
context_vector_db_config.set(vector_db_config)
if graph_db_config:
context_graph_db_config.set(graph_db_config)
# Create tables for databases
await create_relational_db_and_tables()
await create_pgvector_db_and_tables()
global _first_run_done
async with _first_run_lock:
if not _first_run_done:
from cognee.infrastructure.llm.utils import (
test_llm_connection,
test_embedding_connection,
)
await test_llm_connection()
await test_embedding_connection()
_first_run_done = True