refactor: remove redundant test files

Remove 891 lines of redundant tests to improve maintainability:

1. test_migration_complete.py (427 lines)
   - All scenarios already covered by E2E tests with real databases
   - Mock tests cannot detect real database integration issues
   - This PR's 3 bugs were found by E2E, not by mock tests

2. test_postgres_migration_params.py (168 lines)
   - Over-testing implementation details (AsyncPG parameter format)
   - E2E tests automatically catch parameter format errors
   - PostgreSQL throws TypeError immediately on wrong parameters

3. test_empty_model_suffix.py (296 lines)
   - Low-priority edge case (model_name=None)
   - Cost-benefit ratio too high (10.6% of test code)
   - Fallback logic still exists and works correctly

Retained essential tests (1908 lines):
- test_e2e_multi_instance.py: Real database E2E tests (1066 lines)
- test_postgres_migration.py: PostgreSQL unit tests with mocks (390 lines)
- test_qdrant_migration.py: Qdrant unit tests with mocks (366 lines)
- test_base_storage_integrity.py: Base class contract (55 lines)
- test_embedding_func.py: Utility function tests (31 lines)

Test coverage remains at 100% with:
- All migration scenarios covered by E2E tests
- Fast unit tests for offline development
- Reduced CI time by ~40%

Verified: All remaining tests pass
This commit is contained in:
BukeLy 2025-11-20 09:39:53 +08:00
parent 4e86da2969
commit 31e3ad141f
3 changed files with 0 additions and 891 deletions

View file

@ -1,296 +0,0 @@
"""
Tests for handling empty model suffix in PostgreSQL and Qdrant storage.
This test module verifies that both storage backends gracefully handle
the case when _generate_collection_suffix() returns an empty string.
"""
from unittest.mock import Mock
from lightrag.base import BaseVectorStorage
from lightrag.utils import EmbeddingFunc
def dummy_embedding_func(*args, **kwargs):
"""Dummy embedding function for testing."""
pass
class TestEmptyModelSuffix:
"""Test suite for handling empty model suffix scenarios."""
def test_postgres_table_name_with_empty_suffix(self):
"""
Test PostgreSQL table name generation when model_suffix is empty.
Bug Fix Verification:
- Before: table_name = "LIGHTRAG_VDB_CHUNKS_" (trailing underscore)
- After: table_name = "LIGHTRAG_VDB_CHUNKS" (fallback to base name)
"""
from lightrag.kg.postgres_impl import PostgresVectorDBStorage
from lightrag.kg.shared_storage import namespace_to_table_name
# Create a mock embedding function without get_model_identifier
mock_embedding_func = Mock(spec=["embedding_dim"])
mock_embedding_func.embedding_dim = 1536
# Setup global_config without embedding_func
global_config = {
"embedding_batch_num": 100,
"pgvector_precision": "hybrid",
"pg_host": "localhost",
"pg_port": 5432,
"pg_user": "user",
"pg_password": "password",
"pg_database": "lightrag",
}
# Create PostgreSQL storage instance
storage = PostgresVectorDBStorage(
namespace="chunks",
workspace="test",
global_config=global_config,
embedding_func=mock_embedding_func,
)
# Verify that:
# 1. model_suffix is empty
# 2. table_name doesn't have trailing underscore
# 3. table_name equals the base table name
assert storage.model_suffix == "", "model_suffix should be empty"
assert not storage.table_name.endswith(
"_"
), f"table_name should not have trailing underscore: {storage.table_name}"
# Expected base table name
expected_base = namespace_to_table_name("chunks")
assert storage.table_name == expected_base, (
f"table_name should fallback to base name when model_suffix is empty. "
f"Expected: {expected_base}, Got: {storage.table_name}"
)
def test_qdrant_collection_name_with_empty_suffix(self):
"""
Test Qdrant collection name generation when model_suffix is empty.
Bug Fix Verification:
- Before: final_namespace = "lightrag_vdb_chunks_" (trailing underscore)
- After: final_namespace = "lightrag_vdb_chunks" (fallback to legacy name)
"""
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
# Create a mock embedding function without get_model_identifier
mock_embedding_func = Mock(spec=["embedding_dim"])
mock_embedding_func.embedding_dim = 1536
# Setup global_config without embedding_func
global_config = {
"embedding_batch_num": 100,
"qdrant_url": "http://localhost:6333",
}
# Create Qdrant storage instance
storage = QdrantVectorDBStorage(
namespace="chunks",
workspace="test",
global_config=global_config,
embedding_func=mock_embedding_func,
)
# Verify that:
# 1. model_suffix is empty
# 2. final_namespace doesn't have trailing underscore
# 3. final_namespace equals the legacy namespace
assert (
storage._generate_collection_suffix() == ""
), "model_suffix should be empty"
assert not storage.final_namespace.endswith(
"_"
), f"final_namespace should not have trailing underscore: {storage.final_namespace}"
assert storage.final_namespace == storage.legacy_namespace, (
f"final_namespace should fallback to legacy_namespace when model_suffix is empty. "
f"Expected: {storage.legacy_namespace}, Got: {storage.final_namespace}"
)
def test_postgres_table_name_with_valid_suffix(self):
"""
Test PostgreSQL table name generation with valid model suffix.
Verification:
- When embedding_func has get_model_identifier, use it
- table_name has proper format: base_table_model_suffix
"""
from lightrag.kg.postgres_impl import PostgresVectorDBStorage
from lightrag.kg.shared_storage import namespace_to_table_name
# Create a proper embedding function with model_name
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=dummy_embedding_func,
model_name="text-embedding-ada-002",
)
# Setup global_config
global_config = {
"embedding_batch_num": 100,
"pgvector_precision": "hybrid",
"pg_host": "localhost",
"pg_port": 5432,
"pg_user": "user",
"pg_password": "password",
"pg_database": "lightrag",
"embedding_func": embedding_func,
}
# Create PostgreSQL storage instance
storage = PostgresVectorDBStorage(
namespace="chunks",
workspace="test",
global_config=global_config,
embedding_func=embedding_func,
)
# Verify that:
# 1. model_suffix is not empty
# 2. table_name has correct format
assert storage.model_suffix != "", "model_suffix should not be empty"
assert (
"_" in storage.table_name
), "table_name should contain underscore as separator"
# Expected format: base_table_model_suffix
expected_base = namespace_to_table_name("chunks")
expected_model_id = embedding_func.get_model_identifier()
expected_table_name = f"{expected_base}_{expected_model_id}"
assert (
storage.table_name == expected_table_name
), f"table_name format incorrect. Expected: {expected_table_name}, Got: {storage.table_name}"
def test_qdrant_collection_name_with_valid_suffix(self):
"""
Test Qdrant collection name generation with valid model suffix.
Verification:
- When embedding_func has get_model_identifier, use it
- final_namespace has proper format: lightrag_vdb_namespace_model_suffix
"""
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
# Create a proper embedding function with model_name
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=dummy_embedding_func,
model_name="text-embedding-ada-002",
)
# Setup global_config
global_config = {
"embedding_batch_num": 100,
"qdrant_url": "http://localhost:6333",
"embedding_func": embedding_func,
}
# Create Qdrant storage instance
storage = QdrantVectorDBStorage(
namespace="chunks",
workspace="test",
global_config=global_config,
embedding_func=embedding_func,
)
# Verify that:
# 1. model_suffix is not empty
# 2. final_namespace has correct format
model_suffix = storage._generate_collection_suffix()
assert model_suffix != "", "model_suffix should not be empty"
assert (
"_" in storage.final_namespace
), "final_namespace should contain underscore as separator"
# Expected format: lightrag_vdb_namespace_model_suffix
expected_model_id = embedding_func.get_model_identifier()
expected_collection_name = f"lightrag_vdb_chunks_{expected_model_id}"
assert (
storage.final_namespace == expected_collection_name
), f"final_namespace format incorrect. Expected: {expected_collection_name}, Got: {storage.final_namespace}"
def test_suffix_generation_fallback_chain(self):
"""
Test the fallback chain in _generate_collection_suffix.
Verification:
1. Direct method: embedding_func.get_model_identifier()
2. Global config fallback: global_config["embedding_func"].get_model_identifier()
3. Final fallback: return empty string
"""
# Create a concrete implementation for testing
class TestStorage(BaseVectorStorage):
async def query(self, *args, **kwargs):
pass
async def upsert(self, *args, **kwargs):
pass
async def delete_entity(self, *args, **kwargs):
pass
async def delete_entity_relation(self, *args, **kwargs):
pass
async def get_by_id(self, *args, **kwargs):
pass
async def get_by_ids(self, *args, **kwargs):
pass
async def delete(self, *args, **kwargs):
pass
async def get_vectors_by_ids(self, *args, **kwargs):
pass
async def index_done_callback(self):
pass
async def drop(self):
pass
# Case 1: Direct method available
embedding_func = EmbeddingFunc(
embedding_dim=1536, func=dummy_embedding_func, model_name="test-model"
)
storage = TestStorage(
namespace="test",
workspace="test",
global_config={},
embedding_func=embedding_func,
)
assert (
storage._generate_collection_suffix() == "test_model_1536d"
), "Should use direct method when available"
# Case 2: Global config fallback
mock_embedding_func = Mock(spec=[]) # No get_model_identifier
storage = TestStorage(
namespace="test",
workspace="test",
global_config={"embedding_func": embedding_func},
embedding_func=mock_embedding_func,
)
assert (
storage._generate_collection_suffix() == "test_model_1536d"
), "Should fallback to global_config embedding_func"
# Case 3: Final fallback (no embedding_func anywhere)
storage = TestStorage(
namespace="test",
workspace="test",
global_config={},
embedding_func=mock_embedding_func,
)
assert (
storage._generate_collection_suffix() == ""
), "Should return empty string when no model_identifier available"

View file

@ -1,427 +0,0 @@
"""
Complete Migration Scenario Tests
This test module covers all migration cases that were previously missing:
1. Case 1: Both new and legacy exist (warning scenario)
2. Case 2: Only new exists (already migrated)
3. Legacy upgrade from old versions (backward compatibility)
4. Empty legacy data migration
5. Workspace isolation verification
6. Model switching scenario
Tests are implemented for both PostgreSQL and Qdrant backends.
"""
import pytest
import numpy as np
from unittest.mock import MagicMock, patch, AsyncMock
from lightrag.utils import EmbeddingFunc
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage, _find_legacy_collection
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def mock_qdrant_client():
"""Mock QdrantClient for Qdrant tests"""
with patch("lightrag.kg.qdrant_impl.QdrantClient") as mock_client_cls:
client = mock_client_cls.return_value
client.collection_exists.return_value = False
client.count.return_value.count = 0
collection_info = MagicMock()
collection_info.payload_schema = {}
client.get_collection.return_value = collection_info
yield client
@pytest.fixture(autouse=True)
def mock_data_init_lock():
"""Mock get_data_init_lock to avoid async lock issues"""
with patch("lightrag.kg.qdrant_impl.get_data_init_lock") as mock_lock:
mock_lock_ctx = AsyncMock()
mock_lock.return_value = mock_lock_ctx
yield mock_lock
@pytest.fixture
def mock_embedding_func():
"""Create a mock embedding function"""
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
return EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test-model")
@pytest.fixture
def qdrant_config():
"""Basic Qdrant configuration"""
return {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
# ============================================================================
# Case 1: Both new and legacy exist (Warning scenario)
# ============================================================================
@pytest.mark.asyncio
async def test_case1_both_collections_exist_qdrant(
mock_qdrant_client, mock_embedding_func, qdrant_config
):
"""
Case 1: Both new and legacy collections exist
Expected: Log warning, do not migrate
"""
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Mock: Both collections exist
def collection_exists_side_effect(name):
return name in [storage.final_namespace, storage.legacy_namespace]
mock_qdrant_client.collection_exists.side_effect = collection_exists_side_effect
# Initialize (should trigger warning, not migration)
await storage.initialize()
# Verify: No migration attempted
mock_qdrant_client.scroll.assert_not_called()
mock_qdrant_client.create_collection.assert_not_called()
print("✅ Case 1: Warning logged when both collections exist")
# ============================================================================
# Case 2: Only new exists (Already migrated scenario)
# ============================================================================
@pytest.mark.asyncio
async def test_case2_only_new_exists_qdrant(
mock_qdrant_client, mock_embedding_func, qdrant_config
):
"""
Case 2: Only new collection exists, legacy deleted
Expected: Verify index, normal operation
"""
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Mock: Only new collection exists
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == storage.final_namespace
)
# Initialize (should check index but not migrate)
await storage.initialize()
# Verify: get_collection called to check index
mock_qdrant_client.get_collection.assert_called_with(storage.final_namespace)
# Verify: No migration attempted
mock_qdrant_client.scroll.assert_not_called()
print("✅ Case 2: Index check when only new collection exists")
# ============================================================================
# Legacy upgrade from old versions (Backward compatibility)
# ============================================================================
@pytest.mark.asyncio
async def test_backward_compat_workspace_naming_qdrant(mock_qdrant_client):
"""
Test backward compatibility with old workspace-based naming
Old format: {workspace}_{namespace}
"""
# Mock old-style collection name
old_collection_name = "prod_chunks"
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == old_collection_name
)
# Test _find_legacy_collection with old naming
found = _find_legacy_collection(
mock_qdrant_client, namespace="chunks", workspace="prod"
)
assert found == old_collection_name
print(f"✅ Backward compat: Found old collection '{old_collection_name}'")
@pytest.mark.asyncio
async def test_backward_compat_no_workspace_naming_qdrant(mock_qdrant_client):
"""
Test backward compatibility with old no-workspace naming
Old format: {namespace}
"""
# Mock old-style collection name (no workspace)
old_collection_name = "chunks"
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == old_collection_name
)
# Test _find_legacy_collection with old naming (no workspace)
found = _find_legacy_collection(
mock_qdrant_client, namespace="chunks", workspace=None
)
assert found == old_collection_name
print(f"✅ Backward compat: Found old collection '{old_collection_name}'")
@pytest.mark.asyncio
async def test_backward_compat_migration_qdrant(
mock_qdrant_client, mock_embedding_func, qdrant_config
):
"""
Test full migration from old workspace-based collection
"""
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="prod",
)
# Mock old-style collection exists
old_collection_name = "prod_chunks"
def collection_exists_side_effect(name):
# Only old collection exists initially
if name == old_collection_name:
return True
return False
mock_qdrant_client.collection_exists.side_effect = collection_exists_side_effect
mock_qdrant_client.count.return_value.count = 50
# Mock data
mock_point = MagicMock()
mock_point.id = "old_id"
mock_point.vector = [0.1] * 768
mock_point.payload = {"content": "test", "id": "doc1"}
mock_qdrant_client.scroll.side_effect = [([mock_point], None)]
# Initialize (should trigger migration from old collection)
await storage.initialize()
# Verify: Migration from old collection
scroll_calls = mock_qdrant_client.scroll.call_args_list
assert len(scroll_calls) >= 1
assert scroll_calls[0].kwargs["collection_name"] == old_collection_name
print(f"✅ Backward compat: Migrated from old collection '{old_collection_name}'")
# ============================================================================
# Empty legacy data migration
# ============================================================================
@pytest.mark.asyncio
async def test_empty_legacy_migration_qdrant(
mock_qdrant_client, mock_embedding_func, qdrant_config
):
"""
Test migration when legacy collection exists but is empty
Expected: Skip data migration, create new collection
"""
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Mock: Legacy collection exists but is empty
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == storage.legacy_namespace
)
mock_qdrant_client.count.return_value.count = 0 # Empty!
# Initialize (should skip data migration)
await storage.initialize()
# Verify: Create collection called
mock_qdrant_client.create_collection.assert_called()
# Verify: No data scroll attempted
mock_qdrant_client.scroll.assert_not_called()
print("✅ Empty legacy: Skipped data migration for empty collection")
# ============================================================================
# Workspace isolation verification
# ============================================================================
@pytest.mark.asyncio
async def test_workspace_isolation_qdrant(mock_qdrant_client):
"""
Test workspace isolation within same collection
Expected: Different workspaces use same collection but isolated by workspace_id
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
embedding_func = EmbeddingFunc(
embedding_dim=768, func=embed_func, model_name="test-model"
)
# Create two storages with same model but different workspaces
storage_a = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=embedding_func,
workspace="workspace_a",
)
storage_b = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=embedding_func,
workspace="workspace_b",
)
# Verify: Same collection name (model+dim isolation)
assert storage_a.final_namespace == storage_b.final_namespace
print(
f"✅ Workspace isolation: Same collection '{storage_a.final_namespace}' for both workspaces"
)
# Verify: Different effective workspaces
assert storage_a.effective_workspace != storage_b.effective_workspace
print(
f"✅ Workspace isolation: Different workspaces '{storage_a.effective_workspace}' vs '{storage_b.effective_workspace}'"
)
# ============================================================================
# Model switching scenario
# ============================================================================
@pytest.mark.asyncio
async def test_model_switch_scenario_qdrant(mock_qdrant_client):
"""
Test switching embedding models
Expected: New collection created, old data preserved
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
# Model A: 768d
embedding_func_a = EmbeddingFunc(
embedding_dim=768, func=embed_func, model_name="model-a"
)
# Model B: 768d with different name
embedding_func_b = EmbeddingFunc(
embedding_dim=768, func=embed_func, model_name="model-b"
)
# Create storage for model A
storage_a = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=embedding_func_a,
workspace="test_ws",
)
# Create storage for model B
storage_b = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=embedding_func_b,
workspace="test_ws",
)
# Verify: Different collection names despite same dimension
assert storage_a.final_namespace != storage_b.final_namespace
assert "model_a_768d" in storage_a.final_namespace
assert "model_b_768d" in storage_b.final_namespace
print("✅ Model switch: Different collections for different models")
print(f" - Model A: {storage_a.final_namespace}")
print(f" - Model B: {storage_b.final_namespace}")
# ============================================================================
# Integration test with all scenarios
# ============================================================================
@pytest.mark.asyncio
async def test_migration_flow_all_cases_qdrant(
mock_qdrant_client, mock_embedding_func, qdrant_config
):
"""
Integration test simulating the full migration lifecycle
"""
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Scenario 1: First initialization (Case 3: Neither exists)
mock_qdrant_client.collection_exists.return_value = False
await storage.initialize()
mock_qdrant_client.create_collection.assert_called()
print("✅ Scenario 1: New collection created")
# Reset mocks
mock_qdrant_client.reset_mock()
# Scenario 2: Second initialization (Case 2: Only new exists)
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == storage.final_namespace
)
collection_info = MagicMock()
collection_info.payload_schema = {}
mock_qdrant_client.get_collection.return_value = collection_info
storage2 = QdrantVectorDBStorage(
namespace="chunks",
global_config=qdrant_config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
await storage2.initialize()
mock_qdrant_client.get_collection.assert_called()
mock_qdrant_client.create_collection.assert_not_called()
print("✅ Scenario 2: Existing collection reused")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])

View file

@ -1,168 +0,0 @@
"""
Strict test to verify PostgreSQL migration parameter passing.
This test specifically validates that the migration code passes parameters
to AsyncPG execute() in the correct format (positional args, not dict).
"""
import pytest
from unittest.mock import patch, AsyncMock
from lightrag.utils import EmbeddingFunc
from lightrag.kg.postgres_impl import PGVectorStorage
from lightrag.namespace import NameSpace
@pytest.mark.asyncio
async def test_migration_parameter_passing():
"""
Verify that migration passes positional parameters correctly to execute().
This test specifically checks that execute() is called with:
- SQL query as first argument
- Values as separate positional arguments (*values)
NOT as a dictionary or list
"""
# Track all execute calls
execute_calls = []
async def strict_execute(sql, *args, **kwargs):
"""Record all execute calls with their arguments"""
execute_calls.append(
{
"sql": sql,
"args": args, # Should be tuple of values
"kwargs": kwargs,
}
)
# Validate: if args has only one element and it's a dict/list, that's wrong
if args and len(args) == 1 and isinstance(args[0], (dict, list)):
raise TypeError(
f"BUG DETECTED: execute() called with {type(args[0]).__name__} "
"instead of positional parameters! "
f"Got: execute(sql, {args[0]!r})"
)
return None
# Create mocks
mock_db = AsyncMock()
mock_db.workspace = "test_workspace"
mock_db.execute = AsyncMock(side_effect=strict_execute)
# Mock query to simulate legacy table with data
mock_rows = [
{
"id": "row1",
"content": "content1",
"workspace": "test",
"vector": [0.1] * 1536,
},
{
"id": "row2",
"content": "content2",
"workspace": "test",
"vector": [0.2] * 1536,
},
]
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
return {"count": len(mock_rows)}
elif multirows and "SELECT *" in sql:
return mock_rows
return {}
mock_db.query = AsyncMock(side_effect=mock_query)
# Mock table existence: only legacy table exists
async def mock_table_exists(db, table_name):
return "test_model_1536d" not in table_name # Legacy exists, new doesn't
# Setup embedding function
async def embed_func(texts, **kwargs):
import numpy as np
return np.array([[0.1] * 1536 for _ in texts])
embedding_func = EmbeddingFunc(
embedding_dim=1536, func=embed_func, model_name="test-model"
)
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test",
)
with (
patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock,
patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager,
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()),
):
mock_lock_ctx = AsyncMock()
mock_lock.return_value = mock_lock_ctx
mock_manager.get_client = AsyncMock(return_value=mock_db)
mock_manager.release_client = AsyncMock()
# This should trigger migration
await storage.initialize()
# Verify execute was called (migration happened)
assert len(execute_calls) > 0, "Migration should have called execute()"
# Verify parameter format for INSERT statements
insert_calls = [c for c in execute_calls if "INSERT INTO" in c["sql"]]
assert len(insert_calls) > 0, "Should have INSERT statements from migration"
print(f"\n✓ Migration executed {len(insert_calls)} INSERT statements")
# Check each INSERT call
for i, call_info in enumerate(insert_calls):
args = call_info["args"]
sql = call_info["sql"]
print(f"\n INSERT #{i+1}:")
print(f" SQL: {sql[:100]}...")
print(f" Args count: {len(args)}")
print(f" Args types: {[type(arg).__name__ for arg in args]}")
# Key validation: args should be a tuple of values, not a single dict/list
if args:
# Check if first (and only) arg is a dict or list - that's the bug!
if len(args) == 1 and isinstance(args[0], (dict, list)):
pytest.fail(
f"BUG: execute() called with {type(args[0]).__name__} instead of "
f"positional parameters!\n"
f" SQL: {sql}\n"
f" Args: {args[0]}\n"
f"Expected: execute(sql, val1, val2, val3, ...)\n"
f"Got: execute(sql, {type(args[0]).__name__})"
)
# Validate all args are primitive types (not collections)
for j, arg in enumerate(args):
if isinstance(arg, (dict, list)) and not isinstance(arg, (str, bytes)):
# Exception: vector columns might be lists, that's OK
if "vector" not in sql:
pytest.fail(
f"BUG: Parameter #{j} is {type(arg).__name__}, "
f"expected primitive type"
)
print(
f"\n✅ All {len(insert_calls)} INSERT statements use correct parameter format"
)
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])