chores: centralizes fixture logic into see_default_dataset

This commit is contained in:
hajdul88 2025-12-16 10:10:15 +01:00
parent de525a6324
commit 03d59acce8

View file

@ -30,6 +30,62 @@ from cognee.modules.users.methods import get_default_user
logger = get_logger() logger = get_logger()
async def _reset_engines_and_prune() -> None:
"""Reset db engine caches and prune data/system.
Kept intentionally identical to the inlined setup logic to avoid event loop issues when
using deployed databases (Neo4j, PostgreSQL) and to ensure fresh instances per run.
"""
# Dispose of existing engines and clear caches to ensure fresh instances for each test
try:
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
# Dispose SQLAlchemy engine connection pool if it exists
if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"):
await vector_engine.engine.dispose(close=True)
except Exception:
# Engine might not exist yet
pass
from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine
from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine
from cognee.infrastructure.databases.relational.create_relational_engine import (
create_relational_engine,
)
create_graph_engine.cache_clear()
create_vector_engine.cache_clear()
create_relational_engine.cache_clear()
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
async def _seed_default_dataset(dataset_name: str) -> dict:
"""Add the shared test dataset contents and run cognify (same steps/order as before)."""
text_1 = """Germany is located in europe right next to the Netherlands"""
logger.info(f"Adding text data to dataset: {dataset_name}")
await cognee.add(text_1, dataset_name)
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
logger.info(f"Adding file data to dataset: {dataset_name}")
await cognee.add([explanation_file_path_quantum], dataset_name)
logger.info(f"Running cognify on dataset: {dataset_name}")
await cognee.cognify([dataset_name])
return {
"dataset_name": dataset_name,
"text_1": text_1,
"explanation_file_path_quantum": explanation_file_path_quantum,
}
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def event_loop(): def event_loop():
"""Use a single asyncio event loop for this test module. """Use a single asyncio event loop for this test module.
@ -48,46 +104,10 @@ async def setup_test_environment():
"""Helper function to set up test environment with data, cognify, and triplet embeddings.""" """Helper function to set up test environment with data, cognify, and triplet embeddings."""
# This test runs for multiple db settings, to run this locally set the corresponding db envs # This test runs for multiple db settings, to run this locally set the corresponding db envs
# Dispose of existing engines and clear caches to ensure fresh instances for each test
try:
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
# Dispose SQLAlchemy engine connection pool if it exists
if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"):
await vector_engine.engine.dispose(close=True)
except Exception:
pass
from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine
from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine
from cognee.infrastructure.databases.relational.create_relational_engine import (
create_relational_engine,
)
create_graph_engine.cache_clear()
create_vector_engine.cache_clear()
create_relational_engine.cache_clear()
logger.info("Starting test setup: pruning data and system")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
dataset_name = "test_dataset" dataset_name = "test_dataset"
logger.info("Starting test setup: pruning data and system")
text_1 = """Germany is located in europe right next to the Netherlands""" await _reset_engines_and_prune()
logger.info(f"Adding text data to dataset: {dataset_name}") state = await _seed_default_dataset(dataset_name=dataset_name)
await cognee.add(text_1, dataset_name)
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
logger.info(f"Adding file data to dataset: {dataset_name}")
await cognee.add([explanation_file_path_quantum], dataset_name)
logger.info(f"Running cognify on dataset: {dataset_name}")
await cognee.cognify([dataset_name])
user = await get_default_user() user = await get_default_user()
from cognee.memify_pipelines.create_triplet_embeddings import create_triplet_embeddings from cognee.memify_pipelines.create_triplet_embeddings import create_triplet_embeddings
@ -105,58 +125,14 @@ async def setup_test_environment():
count = await collection.count_rows() if hasattr(collection, "count_rows") else "unknown" count = await collection.count_rows() if hasattr(collection, "count_rows") else "unknown"
logger.info(f"Triplet_text collection row count: {count}") logger.info(f"Triplet_text collection row count: {count}")
return { return state
"dataset_name": dataset_name,
"text_1": text_1,
"explanation_file_path_quantum": explanation_file_path_quantum,
}
async def setup_test_environment_for_feedback(): async def setup_test_environment_for_feedback():
"""Helper function to set up test environment for feedback weight calculation test.""" """Helper function to set up test environment for feedback weight calculation test."""
# Dispose of existing engines and clear caches to ensure fresh instances for each test
# This prevents event loop issues when using deployed databases (Neo4j, PostgreSQL)
try:
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
# Dispose SQLAlchemy engine connection pool if it exists
if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"):
await vector_engine.engine.dispose(close=True)
except Exception:
pass # Engine might not exist yet
from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine
from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine
from cognee.infrastructure.databases.relational.create_relational_engine import (
create_relational_engine,
)
create_graph_engine.cache_clear()
create_vector_engine.cache_clear()
create_relational_engine.cache_clear()
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
dataset_name = "test_dataset" dataset_name = "test_dataset"
await _reset_engines_and_prune()
text_1 = """Germany is located in europe right next to the Netherlands""" return await _seed_default_dataset(dataset_name=dataset_name)
await cognee.add(text_1, dataset_name)
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
return {
"dataset_name": dataset_name,
"text_1": text_1,
"explanation_file_path_quantum": explanation_file_path_quantum,
}
@pytest_asyncio.fixture(scope="session") @pytest_asyncio.fixture(scope="session")