Why this change is needed: Offline tests were failing with "ModuleNotFoundError: No module named 'qdrant_client'" because test_e2e_multi_instance.py was being imported during test collection, even though it's an E2E test that shouldn't run in offline mode. Pytest imports all test files during collection phase regardless of marks, causing import errors for missing E2E dependencies (qdrant_client, asyncpg, etc.). Additionally, the test mocks for PostgreSQL migration were too permissive - they accepted any parameter format without validation, which allowed bugs (like passing dict instead of positional args to AsyncPG execute()) to slip through undetected. How it solves it: 1. E2E Import Fix: - Use pytest.importorskip() to conditionally import qdrant_client - E2E tests are now skipped cleanly when dependencies are missing - Offline tests can collect and run without E2E dependencies 2. Stricter Test Mocks: - Enhanced mock_pg_db fixture to validate AsyncPG parameter format - Mock execute() now raises TypeError if dict/list passed as single argument - Ensures tests catch parameter passing bugs that would fail in production 3. Parameter Validation Test: - Added test_postgres_migration_params.py for explicit parameter validation - Verifies migration passes positional args correctly to AsyncPG - Provides detailed output for debugging parameter issues Impact: - Offline tests no longer fail due to missing E2E dependencies - Future bugs in AsyncPG parameter passing will be caught by tests - Better test isolation between offline and E2E test suites - Improved test coverage for migration parameter handling Testing: - Verified with `pytest tests/ -m offline -v` - no import errors - All PostgreSQL migration tests pass (6/6 unit + 1 strict validation) - Pre-commit hooks pass (ruff-format, ruff)
393 lines
12 KiB
Python
393 lines
12 KiB
Python
import pytest
|
|
from unittest.mock import patch, AsyncMock
|
|
import numpy as np
|
|
from lightrag.utils import EmbeddingFunc
|
|
from lightrag.kg.postgres_impl import (
|
|
PGVectorStorage,
|
|
)
|
|
from lightrag.namespace import NameSpace
|
|
|
|
|
|
# Mock PostgreSQLDB
|
|
@pytest.fixture
|
|
def mock_pg_db():
|
|
"""Mock PostgreSQL database connection"""
|
|
db = AsyncMock()
|
|
db.workspace = "test_workspace"
|
|
|
|
# Mock query responses with multirows support
|
|
async def mock_query(sql, params=None, multirows=False, **kwargs):
|
|
# Default return value
|
|
if multirows:
|
|
return [] # Return empty list for multirows
|
|
return {"exists": False, "count": 0}
|
|
|
|
# Strict mock for execute that validates parameter types
|
|
async def mock_execute(sql, *args, **kwargs):
|
|
"""
|
|
Strict mock that mimics AsyncPG behavior:
|
|
- Positional parameters must be passed as separate arguments (*args)
|
|
- Raises TypeError if a dict/list is passed as a single argument
|
|
"""
|
|
if args and len(args) == 1:
|
|
# Check if single argument is a dict or list (wrong usage)
|
|
if isinstance(args[0], (dict, list)):
|
|
raise TypeError(
|
|
"AsyncPG execute() expects positional parameters as separate arguments, "
|
|
f"not as {type(args[0]).__name__}. Use: execute(query, val1, val2, ...) "
|
|
"or execute(query, *values)"
|
|
)
|
|
return None
|
|
|
|
db.query = AsyncMock(side_effect=mock_query)
|
|
db.execute = AsyncMock(side_effect=mock_execute)
|
|
|
|
return db
|
|
|
|
|
|
# Mock get_data_init_lock to avoid async lock issues in tests
|
|
@pytest.fixture(autouse=True)
|
|
def mock_data_init_lock():
|
|
with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock:
|
|
mock_lock_ctx = AsyncMock()
|
|
mock_lock.return_value = mock_lock_ctx
|
|
yield mock_lock
|
|
|
|
|
|
# Mock ClientManager
|
|
@pytest.fixture
|
|
def mock_client_manager(mock_pg_db):
|
|
with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager:
|
|
mock_manager.get_client = AsyncMock(return_value=mock_pg_db)
|
|
mock_manager.release_client = AsyncMock()
|
|
yield mock_manager
|
|
|
|
|
|
# Mock Embedding function
|
|
@pytest.fixture
|
|
def mock_embedding_func():
|
|
async def embed_func(texts, **kwargs):
|
|
return np.array([[0.1] * 768 for _ in texts])
|
|
|
|
func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test_model")
|
|
return func
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_postgres_table_naming(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""Test if table name is correctly generated with model suffix"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
storage = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=mock_embedding_func,
|
|
workspace="test_ws",
|
|
)
|
|
|
|
# Verify table name contains model suffix
|
|
expected_suffix = "test_model_768d"
|
|
assert expected_suffix in storage.table_name
|
|
assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}"
|
|
|
|
# Verify legacy table name
|
|
assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_postgres_migration_trigger(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""Test if migration logic is triggered correctly"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
storage = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=mock_embedding_func,
|
|
workspace="test_ws",
|
|
)
|
|
|
|
# Setup mocks for migration scenario
|
|
# 1. New table does not exist, legacy table exists
|
|
async def mock_table_exists(db, table_name):
|
|
return table_name == storage.legacy_table_name
|
|
|
|
# 2. Legacy table has 100 records
|
|
mock_rows = [
|
|
{"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
|
|
for i in range(100)
|
|
]
|
|
|
|
async def mock_query(sql, params=None, multirows=False, **kwargs):
|
|
if "COUNT(*)" in sql:
|
|
return {"count": 100}
|
|
elif multirows and "SELECT *" in sql:
|
|
# Mock batch fetch for migration
|
|
offset = params[0] if params else 0
|
|
limit = params[1] if len(params) > 1 else 500
|
|
start = offset
|
|
end = min(offset + limit, len(mock_rows))
|
|
return mock_rows[start:end]
|
|
return {}
|
|
|
|
mock_pg_db.query = AsyncMock(side_effect=mock_query)
|
|
|
|
with (
|
|
patch(
|
|
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
|
|
),
|
|
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()),
|
|
):
|
|
# Initialize storage (should trigger migration)
|
|
await storage.initialize()
|
|
|
|
# Verify migration was executed
|
|
# Check that execute was called for inserting rows
|
|
assert mock_pg_db.execute.call_count > 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_postgres_no_migration_needed(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""Test scenario where new table already exists (no migration needed)"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
storage = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=mock_embedding_func,
|
|
workspace="test_ws",
|
|
)
|
|
|
|
# Mock: new table already exists
|
|
async def mock_table_exists(db, table_name):
|
|
return table_name == storage.table_name
|
|
|
|
with (
|
|
patch(
|
|
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
|
|
),
|
|
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
|
|
):
|
|
await storage.initialize()
|
|
|
|
# Verify no table creation was attempted
|
|
mock_create.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scenario_1_new_workspace_creation(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""
|
|
Scenario 1: New workspace creation
|
|
|
|
Expected behavior:
|
|
- No legacy table exists
|
|
- Directly create new table with model suffix
|
|
- No migration needed
|
|
"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
embedding_func = EmbeddingFunc(
|
|
embedding_dim=3072,
|
|
func=mock_embedding_func.func,
|
|
model_name="text-embedding-3-large",
|
|
)
|
|
|
|
storage = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=embedding_func,
|
|
workspace="new_workspace",
|
|
)
|
|
|
|
# Mock: neither table exists
|
|
async def mock_table_exists(db, table_name):
|
|
return False
|
|
|
|
with (
|
|
patch(
|
|
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
|
|
),
|
|
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
|
|
):
|
|
await storage.initialize()
|
|
|
|
# Verify table name format
|
|
assert "text_embedding_3_large_3072d" in storage.table_name
|
|
|
|
# Verify new table creation was called
|
|
mock_create.assert_called_once()
|
|
call_args = mock_create.call_args
|
|
assert (
|
|
call_args[0][1] == storage.table_name
|
|
) # table_name is second positional arg
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scenario_2_legacy_upgrade_migration(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""
|
|
Scenario 2: Upgrade from legacy version
|
|
|
|
Expected behavior:
|
|
- Legacy table exists (without model suffix)
|
|
- New table doesn't exist
|
|
- Automatically migrate data to new table with suffix
|
|
"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
embedding_func = EmbeddingFunc(
|
|
embedding_dim=1536,
|
|
func=mock_embedding_func.func,
|
|
model_name="text-embedding-ada-002",
|
|
)
|
|
|
|
storage = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=embedding_func,
|
|
workspace="legacy_workspace",
|
|
)
|
|
|
|
# Mock: only legacy table exists
|
|
async def mock_table_exists(db, table_name):
|
|
return table_name == storage.legacy_table_name
|
|
|
|
# Mock: legacy table has 50 records
|
|
mock_rows = [
|
|
{
|
|
"id": f"legacy_id_{i}",
|
|
"content": f"legacy_content_{i}",
|
|
"workspace": "legacy_workspace",
|
|
}
|
|
for i in range(50)
|
|
]
|
|
|
|
async def mock_query(sql, params=None, multirows=False, **kwargs):
|
|
if "COUNT(*)" in sql:
|
|
# First call for legacy count, then for verification
|
|
if storage.legacy_table_name in sql:
|
|
return {"count": 50}
|
|
else:
|
|
return {"count": 50}
|
|
elif multirows and "SELECT *" in sql:
|
|
# Mock batch fetch for migration
|
|
offset = params[0] if params else 0
|
|
limit = params[1] if len(params) > 1 else 500
|
|
start = offset
|
|
end = min(offset + limit, len(mock_rows))
|
|
return mock_rows[start:end]
|
|
return {}
|
|
|
|
mock_pg_db.query = AsyncMock(side_effect=mock_query)
|
|
|
|
with (
|
|
patch(
|
|
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
|
|
),
|
|
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
|
|
):
|
|
await storage.initialize()
|
|
|
|
# Verify table name contains ada-002
|
|
assert "text_embedding_ada_002_1536d" in storage.table_name
|
|
|
|
# Verify migration was executed
|
|
assert mock_pg_db.execute.call_count >= 50 # At least one execute per row
|
|
mock_create.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scenario_3_multi_model_coexistence(
|
|
mock_client_manager, mock_pg_db, mock_embedding_func
|
|
):
|
|
"""
|
|
Scenario 3: Multiple embedding models coexist
|
|
|
|
Expected behavior:
|
|
- Different embedding models create separate tables
|
|
- Tables are isolated by model suffix
|
|
- No interference between different models
|
|
"""
|
|
config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
|
}
|
|
|
|
# Workspace A: uses bge-small (768d)
|
|
embedding_func_a = EmbeddingFunc(
|
|
embedding_dim=768, func=mock_embedding_func.func, model_name="bge-small"
|
|
)
|
|
|
|
storage_a = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=embedding_func_a,
|
|
workspace="workspace_a",
|
|
)
|
|
|
|
# Workspace B: uses bge-large (1024d)
|
|
async def embed_func_b(texts, **kwargs):
|
|
return np.array([[0.1] * 1024 for _ in texts])
|
|
|
|
embedding_func_b = EmbeddingFunc(
|
|
embedding_dim=1024, func=embed_func_b, model_name="bge-large"
|
|
)
|
|
|
|
storage_b = PGVectorStorage(
|
|
namespace=NameSpace.VECTOR_STORE_CHUNKS,
|
|
global_config=config,
|
|
embedding_func=embedding_func_b,
|
|
workspace="workspace_b",
|
|
)
|
|
|
|
# Verify different table names
|
|
assert storage_a.table_name != storage_b.table_name
|
|
assert "bge_small_768d" in storage_a.table_name
|
|
assert "bge_large_1024d" in storage_b.table_name
|
|
|
|
# Mock: both tables don't exist yet
|
|
async def mock_table_exists(db, table_name):
|
|
return False
|
|
|
|
with (
|
|
patch(
|
|
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
|
|
),
|
|
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
|
|
):
|
|
# Initialize both storages
|
|
await storage_a.initialize()
|
|
await storage_b.initialize()
|
|
|
|
# Verify two separate tables were created
|
|
assert mock_create.call_count == 2
|
|
|
|
# Verify table names are different
|
|
call_args_list = mock_create.call_args_list
|
|
table_names = [call[0][1] for call in call_args_list] # Second positional arg
|
|
assert len(set(table_names)) == 2 # Two unique table names
|
|
assert storage_a.table_name in table_names
|
|
assert storage_b.table_name in table_names
|