Update test_search_db.py

This commit is contained in:
hajdul88 2025-12-12 08:09:30 +01:00
parent 987d4dabd4
commit a06c55f907

View file

@ -25,13 +25,25 @@ logger = get_logger()
@pytest_asyncio.fixture(scope="function", autouse=True) @pytest_asyncio.fixture(scope="function", autouse=True)
async def cleanup_litellm_clients(): async def cleanup_resources():
"""Fixture to properly cleanup LiteLLM async clients after each test. """Fixture to properly cleanup resources after each test.
This prevents RuntimeWarning in Python 3.10 about unawaited coroutine - LiteLLM async clients: prevents RuntimeWarning in Python 3.10
'close_litellm_async_clients' during event loop cleanup. - PGVector SQLAlchemy connections: ensures connection pools are properly closed
""" """
yield yield
# Cleanup PGVector SQLAlchemy engine connections
try:
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
# Dispose SQLAlchemy engine connection pool if it exists (for PGVector)
if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"):
await vector_engine.engine.dispose(close=True)
except (RuntimeError, Exception):
# Event loop might already be closing or engine might not exist, ignore
pass
# Cleanup LiteLLM async clients before event loop closes # Cleanup LiteLLM async clients before event loop closes
try: try:
import litellm import litellm