refactor: unify PostgreSQL and Qdrant migration logic for consistency

Why this change is needed:
Previously, PostgreSQL and Qdrant had inconsistent migration behavior:
- PostgreSQL kept legacy tables after migration, requiring manual cleanup
- Qdrant auto-deleted legacy collections after migration
This inconsistency caused confusion for users and required different
documentation for each backend.

How it solves the problem:
Unified both backends to follow the same smart cleanup strategy:
- Case 1 (both exist): Auto-delete if legacy is empty, warn if has data
- Case 4 (migration): Auto-delete legacy after successful verification
This provides a fully automated migration experience without manual intervention.

Impact:
- Eliminates need for users to manually delete legacy tables/collections
- Reduces storage waste from duplicate data
- Provides consistent behavior across PostgreSQL and Qdrant
- Simplifies documentation and user experience

Testing:
- All 16 unit tests pass (8 PostgreSQL + 8 Qdrant)
- Added 4 new tests for Case 1 scenarios (empty vs non-empty legacy)
- Updated E2E tests to verify auto-deletion behavior
- All lint checks pass (ruff-format, ruff, trailing-whitespace)
This commit is contained in:
BukeLy 2025-11-20 11:37:59 +08:00
parent 31e3ad141f
commit 8386ea061e
5 changed files with 431 additions and 19 deletions

View file

@ -2271,13 +2271,41 @@ class PGVectorStorage(BaseVectorStorage):
db, legacy_table_name
)
# Case 1: Both new and legacy tables exist - Warning only (no migration)
# Case 1: Both new and legacy tables exist
# This can happen if:
# 1. Previous migration failed to delete the legacy table
# 2. User manually created both tables
# Strategy: Only delete legacy if it's empty (safe cleanup)
if new_table_exists and legacy_exists:
logger.warning(
f"PostgreSQL: Legacy table '{legacy_table_name}' still exists. "
f"Remove it if migration is complete."
)
# Ensure vector index exists even if migration was not performed
try:
# Check if legacy table is empty
count_query = f"SELECT COUNT(*) as count FROM {legacy_table_name}"
count_result = await db.query(count_query, [])
legacy_count = count_result.get("count", 0) if count_result else 0
if legacy_count == 0:
# Legacy table is empty, safe to delete without data loss
logger.info(
f"PostgreSQL: Legacy table '{legacy_table_name}' is empty. Deleting..."
)
drop_query = f"DROP TABLE {legacy_table_name}"
await db.execute(drop_query, None)
logger.info(
f"PostgreSQL: Legacy table '{legacy_table_name}' deleted successfully"
)
else:
# Legacy table still has data - don't risk deleting it
logger.warning(
f"PostgreSQL: Legacy table '{legacy_table_name}' still contains {legacy_count} records. "
f"Manual intervention required to verify and delete."
)
except Exception as e:
logger.warning(
f"PostgreSQL: Could not check or cleanup legacy table '{legacy_table_name}': {e}. "
"You may need to delete it manually."
)
# Ensure vector index exists even if cleanup was not performed
await db._create_vector_index(table_name, embedding_dim)
return
@ -2385,6 +2413,25 @@ class PGVectorStorage(BaseVectorStorage):
# Create vector index after successful migration
await db._create_vector_index(table_name, embedding_dim)
# Delete legacy table after successful migration
# Data has been verified to match, so legacy table is no longer needed
# and keeping it would cause Case 1 warnings on next startup
try:
logger.info(
f"PostgreSQL: Deleting legacy table '{legacy_table_name}'..."
)
drop_query = f"DROP TABLE {legacy_table_name}"
await db.execute(drop_query, None)
logger.info(
f"PostgreSQL: Legacy table '{legacy_table_name}' deleted successfully"
)
except Exception as delete_error:
# If deletion fails, user will see Case 1 warning on next startup
logger.warning(
f"PostgreSQL: Failed to delete legacy table '{legacy_table_name}': {delete_error}. "
"You may need to delete it manually."
)
except PostgreSQLMigrationError:
# Re-raise migration errors without wrapping
raise

View file

@ -153,12 +153,38 @@ class QdrantVectorDBStorage(BaseVectorStorage):
)
legacy_exists = legacy_collection is not None
# Case 1: Both new and legacy collections exist - Warning only (no migration)
# Case 1: Both new and legacy collections exist
# This can happen if:
# 1. Previous migration failed to delete the legacy collection
# 2. User manually created both collections
# Strategy: Only delete legacy if it's empty (safe cleanup)
if new_collection_exists and legacy_exists:
logger.warning(
f"Qdrant: Legacy collection '{legacy_collection}' still exists. "
f"Remove it if migration is complete."
)
try:
# Check if legacy collection is empty
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
if legacy_count == 0:
# Legacy collection is empty, safe to delete without data loss
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' is empty. Deleting..."
)
client.delete_collection(collection_name=legacy_collection)
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' deleted successfully"
)
else:
# Legacy collection still has data - don't risk deleting it
logger.warning(
f"Qdrant: Legacy collection '{legacy_collection}' still contains {legacy_count} records. "
f"Manual intervention required to verify and delete."
)
except Exception as e:
logger.warning(
f"Qdrant: Could not check or cleanup legacy collection '{legacy_collection}': {e}. "
"You may need to delete it manually."
)
return
# Case 2: Only new collection exists - Ensure index exists
@ -313,6 +339,24 @@ class QdrantVectorDBStorage(BaseVectorStorage):
f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
)
# Delete legacy collection after successful migration
# Data has been verified to match, so legacy collection is no longer needed
# and keeping it would cause Case 1 warnings on next startup
try:
logger.info(
f"Qdrant: Deleting legacy collection '{legacy_collection}'..."
)
client.delete_collection(collection_name=legacy_collection)
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' deleted successfully"
)
except Exception as delete_error:
# If deletion fails, user will see Case 1 warning on next startup
logger.warning(
f"Qdrant: Failed to delete legacy collection '{legacy_collection}': {delete_error}. "
"You may need to delete it manually."
)
except QdrantMigrationError:
# Re-raise migration errors without wrapping
raise

View file

@ -298,6 +298,22 @@ async def test_legacy_migration_postgres(
print(f"✅ Migration successful: {new_count}/{legacy_count} records migrated")
print(f"✅ New table: {new_table}")
# Verify legacy table was automatically deleted after migration (Case 4)
check_legacy_query = """
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = $1
)
"""
legacy_result = await pg_cleanup.query(
check_legacy_query, [legacy_table.lower()]
)
legacy_exists = legacy_result.get("exists", True)
assert (
not legacy_exists
), f"Legacy table '{legacy_table}' should be deleted after successful migration"
print(f"✅ Legacy table '{legacy_table}' automatically deleted after migration")
await rag.finalize_storages()
finally:
@ -419,6 +435,13 @@ async def test_legacy_migration_qdrant(
f"✅ Vector dimension verified: {collection_info.config.params.vectors.size}d"
)
# Verify legacy collection was automatically deleted after migration (Case 4)
legacy_exists = qdrant_cleanup.collection_exists(legacy_collection)
assert not legacy_exists, f"Legacy collection '{legacy_collection}' should be deleted after successful migration"
print(
f"✅ Legacy collection '{legacy_collection}' automatically deleted after migration"
)
await rag.finalize_storages()
finally:
@ -681,14 +704,14 @@ async def test_multi_instance_qdrant(
@pytest.mark.asyncio
async def test_case1_both_exist_warning_qdrant(
async def test_case1_both_exist_with_data_qdrant(
qdrant_cleanup, mock_llm_func, mock_tokenizer, qdrant_config
):
"""
E2E Case 1: Both new and legacy collections exist
Expected: Log warning, do not migrate, use new collection
E2E Case 1b: Both new and legacy collections exist, legacy has data
Expected: Log warning, do not delete legacy (preserve data), use new collection
"""
print("\n[E2E Case 1] Both collections exist - warning scenario")
print("\n[E2E Case 1b] Both collections exist with data - preservation scenario")
import tempfile
import shutil
@ -753,11 +776,17 @@ async def test_case1_both_exist_warning_qdrant(
# Step 3: Verify behavior
# Should use new collection (not migrate)
assert rag.chunks_vdb.final_namespace == new_collection
legacy_count = qdrant_cleanup.count(legacy_collection).count
# Legacy should still have its data (not migrated)
# Verify legacy collection still exists (Case 1b: has data, should NOT be deleted)
legacy_exists = qdrant_cleanup.collection_exists(legacy_collection)
assert legacy_exists, "Legacy collection with data should NOT be deleted"
legacy_count = qdrant_cleanup.count(legacy_collection).count
# Legacy should still have its data (not migrated, not deleted)
assert legacy_count == 3
print(f"✅ Legacy collection still has {legacy_count} points (not migrated)")
print(
f"✅ Legacy collection still has {legacy_count} points (preserved, not deleted)"
)
await rag.finalize_storages()

View file

@ -315,6 +315,22 @@ async def test_scenario_2_legacy_upgrade_migration(
assert mock_pg_db.execute.call_count >= 50 # At least one execute per row
mock_create.assert_called_once()
# Verify legacy table was automatically deleted after successful migration
# This prevents Case 1 warnings on next startup
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
assert (
len(delete_calls) >= 1
), "Legacy table should be deleted after successful migration"
# Check if legacy table was dropped
dropped_table = storage.legacy_table_name
assert any(
dropped_table in str(call) for call in delete_calls
), f"Expected to drop '{dropped_table}'"
@pytest.mark.asyncio
async def test_scenario_3_multi_model_coexistence(
@ -388,3 +404,131 @@ async def test_scenario_3_multi_model_coexistence(
assert len(set(table_names)) == 2 # Two unique table names
assert storage_a.table_name in table_names
assert storage_b.table_name in table_names
@pytest.mark.asyncio
async def test_case1_empty_legacy_auto_cleanup(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1a: Both new and legacy tables exist, but legacy is EMPTY
Expected: Automatically delete empty legacy table (safe cleanup)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_table_exists(db, table_name):
return True # Both new and legacy exist
# Mock: Legacy table is empty (0 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 0} # Empty legacy table
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
):
await storage.initialize()
# Verify: Empty legacy table should be automatically cleaned up
# Empty tables are safe to delete without data loss risk
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
assert len(delete_calls) >= 1, "Empty legacy table should be auto-deleted"
# Check if legacy table was dropped
dropped_table = storage.legacy_table_name
assert any(
dropped_table in str(call) for call in delete_calls
), f"Expected to drop empty legacy table '{dropped_table}'"
print(
f"✅ Case 1a: Empty legacy table '{dropped_table}' auto-deleted successfully"
)
@pytest.mark.asyncio
async def test_case1_nonempty_legacy_warning(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1b: Both new and legacy tables exist, and legacy HAS DATA
Expected: Log warning, do not delete legacy (preserve data)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_table_exists(db, table_name):
return True # Both new and legacy exist
# Mock: Legacy table has data (50 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 50} # Legacy has data
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
):
await storage.initialize()
# Verify: Legacy table with data should be preserved
# We never auto-delete tables that contain data to prevent accidental data loss
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
# Check if legacy table was deleted (it should not be)
dropped_table = storage.legacy_table_name
legacy_deleted = any(dropped_table in str(call) for call in delete_calls)
assert not legacy_deleted, "Legacy table with data should NOT be auto-deleted"
print(
f"✅ Case 1b: Legacy table '{dropped_table}' with data preserved (warning only)"
)

View file

@ -296,8 +296,26 @@ async def test_scenario_2_legacy_upgrade_migration(
assert len(upsert_calls) >= 1
assert upsert_calls[0].kwargs["collection_name"] == new_collection
# 5. Verify legacy collection was automatically deleted after successful migration
# This prevents Case 1 warnings on next startup
delete_calls = [
call for call in mock_qdrant_client.delete_collection.call_args_list
]
assert (
len(delete_calls) >= 1
), "Legacy collection should be deleted after successful migration"
# Check if legacy_collection was passed to delete_collection
deleted_collection = (
delete_calls[0][0][0]
if delete_calls[0][0]
else delete_calls[0].kwargs.get("collection_name")
)
assert (
deleted_collection == legacy_collection
), f"Expected to delete '{legacy_collection}', but deleted '{deleted_collection}'"
print(
f"✅ Scenario 2: Legacy data migrated from '{legacy_collection}' to '{expected_new_collection}'"
f"✅ Scenario 2: Legacy data migrated from '{legacy_collection}' to '{expected_new_collection}' and legacy collection deleted"
)
@ -364,3 +382,133 @@ async def test_scenario_3_multi_model_coexistence(mock_qdrant_client):
print(f" - Workspace A: {expected_collection_a} (768d)")
print(f" - Workspace B: {expected_collection_b} (1024d)")
print(" - Collections are independent")
@pytest.mark.asyncio
async def test_case1_empty_legacy_auto_cleanup(mock_qdrant_client, mock_embedding_func):
"""
Case 1a: 新旧collection都存在且旧库为空
预期自动删除旧库
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
legacy_collection = storage.legacy_namespace
new_collection = storage.final_namespace
# Mock: Both collections exist
mock_qdrant_client.collection_exists.side_effect = lambda name: name in [
legacy_collection,
new_collection,
]
# Mock: Legacy collection is empty (0 records)
def count_mock(collection_name, exact=True):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 0 # Empty legacy collection
else:
mock_result.count = 100 # New collection has data
return mock_result
mock_qdrant_client.count.side_effect = count_mock
# Mock get_collection for Case 2 check
collection_info = MagicMock()
collection_info.payload_schema = {"workspace_id": True}
mock_qdrant_client.get_collection.return_value = collection_info
# Initialize storage
await storage.initialize()
# Verify: Empty legacy collection should be automatically cleaned up
# Empty collections are safe to delete without data loss risk
delete_calls = [
call for call in mock_qdrant_client.delete_collection.call_args_list
]
assert len(delete_calls) >= 1, "Empty legacy collection should be auto-deleted"
deleted_collection = (
delete_calls[0][0][0]
if delete_calls[0][0]
else delete_calls[0].kwargs.get("collection_name")
)
assert (
deleted_collection == legacy_collection
), f"Expected to delete '{legacy_collection}', but deleted '{deleted_collection}'"
print(
f"✅ Case 1a: Empty legacy collection '{legacy_collection}' auto-deleted successfully"
)
@pytest.mark.asyncio
async def test_case1_nonempty_legacy_warning(mock_qdrant_client, mock_embedding_func):
"""
Case 1b: 新旧collection都存在且旧库有数据
预期警告但不删除
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = QdrantVectorDBStorage(
namespace="chunks",
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
legacy_collection = storage.legacy_namespace
new_collection = storage.final_namespace
# Mock: Both collections exist
mock_qdrant_client.collection_exists.side_effect = lambda name: name in [
legacy_collection,
new_collection,
]
# Mock: Legacy collection has data (50 records)
def count_mock(collection_name, exact=True):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 50 # Legacy has data
else:
mock_result.count = 100 # New collection has data
return mock_result
mock_qdrant_client.count.side_effect = count_mock
# Mock get_collection for Case 2 check
collection_info = MagicMock()
collection_info.payload_schema = {"workspace_id": True}
mock_qdrant_client.get_collection.return_value = collection_info
# Initialize storage
await storage.initialize()
# Verify: Legacy collection with data should be preserved
# We never auto-delete collections that contain data to prevent accidental data loss
delete_calls = [
call for call in mock_qdrant_client.delete_collection.call_args_list
]
# Check if legacy collection was deleted (it should not be)
legacy_deleted = any(
(call[0][0] if call[0] else call.kwargs.get("collection_name"))
== legacy_collection
for call in delete_calls
)
assert not legacy_deleted, "Legacy collection with data should NOT be auto-deleted"
print(
f"✅ Case 1b: Legacy collection '{legacy_collection}' with data preserved (warning only)"
)