feat: implement dimension compatibility checks for PostgreSQL and Qdrant migrations

This update introduces checks for vector dimension compatibility before migrating legacy data in both PostgreSQL and Qdrant storage implementations. If a dimension mismatch is detected, the migration is skipped to prevent data loss, and a new empty table or collection is created for the new embedding model.

Key changes include:
- Added dimension checks in `PGVectorStorage` and `QdrantVectorDBStorage` classes.
- Enhanced logging to inform users about dimension mismatches and the creation of new storage.
- Updated E2E tests to validate the new behavior, ensuring legacy data is preserved and new structures are created correctly.

Impact:
- Prevents potential data corruption during migrations with mismatched dimensions.
- Improves user experience by providing clear logging and maintaining legacy data integrity.

Testing:
- New tests confirm that the system behaves as expected when encountering dimension mismatches.
This commit is contained in:
BukeLy 2025-11-20 12:22:13 +08:00
parent e0767b1a47
commit 5180c1e395
5 changed files with 527 additions and 80 deletions

View file

@ -2344,6 +2344,71 @@ class PGVectorStorage(BaseVectorStorage):
await db._create_vector_index(table_name, embedding_dim)
return
# Check vector dimension compatibility before migration
legacy_dim = None
try:
# Try to get vector dimension from pg_attribute metadata
dim_query = """
SELECT
CASE
WHEN typname = 'vector' THEN
COALESCE(atttypmod, -1)
ELSE -1
END as vector_dim
FROM pg_attribute a
JOIN pg_type t ON a.atttypid = t.oid
WHERE a.attrelid = $1::regclass
AND a.attname = 'content_vector'
"""
dim_result = await db.query(dim_query, [legacy_table_name])
legacy_dim = dim_result.get("vector_dim", -1) if dim_result else -1
if legacy_dim <= 0:
# Alternative: Try to detect by sampling a vector
logger.info(
"PostgreSQL: Metadata dimension check failed, trying vector sampling..."
)
sample_query = (
f"SELECT content_vector FROM {legacy_table_name} LIMIT 1"
)
sample_result = await db.query(sample_query, [])
if sample_result and sample_result.get("content_vector"):
vector_data = sample_result["content_vector"]
# pgvector returns list directly
if isinstance(vector_data, (list, tuple)):
legacy_dim = len(vector_data)
elif isinstance(vector_data, str):
import json
vector_list = json.loads(vector_data)
legacy_dim = len(vector_list)
if legacy_dim > 0 and embedding_dim and legacy_dim != embedding_dim:
logger.warning(
f"PostgreSQL: Dimension mismatch detected! "
f"Legacy table '{legacy_table_name}' has {legacy_dim}d vectors, "
f"but new embedding model expects {embedding_dim}d. "
f"Migration skipped to prevent data loss. "
f"Legacy table preserved as '{legacy_table_name}'. "
f"Creating new empty table '{table_name}' for new data."
)
# Create new table but skip migration
await _pg_create_table(db, table_name, base_table, embedding_dim)
await db._create_vector_index(table_name, embedding_dim)
logger.info(
f"PostgreSQL: New table '{table_name}' created. "
f"To query legacy data, please use a {legacy_dim}d embedding model."
)
return
except Exception as e:
logger.warning(
f"PostgreSQL: Could not verify legacy table vector dimension: {e}. "
f"Proceeding with caution..."
)
# Create new table first
logger.info(f"PostgreSQL: Creating new table '{table_name}'")
await _pg_create_table(db, table_name, base_table, embedding_dim)

View file

@ -251,6 +251,51 @@ class QdrantVectorDBStorage(BaseVectorStorage):
)
return
# Check vector dimension compatibility before migration
try:
legacy_info = client.get_collection(legacy_collection)
legacy_dim = legacy_info.config.params.vectors.size
# Get expected dimension from kwargs
new_dim = (
kwargs.get("vectors_config").size
if "vectors_config" in kwargs
else None
)
if new_dim and legacy_dim != new_dim:
logger.warning(
f"Qdrant: Dimension mismatch detected! "
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
f"but new embedding model expects {new_dim}d. "
f"Migration skipped to prevent data loss. "
f"Legacy collection preserved as '{legacy_collection}'. "
f"Creating new empty collection '{collection_name}' for new data."
)
# Create new collection but skip migration
client.create_collection(collection_name, **kwargs)
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
logger.info(
f"Qdrant: New collection '{collection_name}' created. "
f"To query legacy data, please use a {legacy_dim}d embedding model."
)
return
except Exception as e:
logger.warning(
f"Qdrant: Could not verify legacy collection dimension: {e}. "
f"Proceeding with caution..."
)
# Create new collection first
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
client.create_collection(collection_name, **kwargs)

View file

@ -0,0 +1,290 @@
"""
Tests for dimension mismatch handling during migration.
This test module verifies that both PostgreSQL and Qdrant storage backends
properly detect and handle vector dimension mismatches when migrating from
legacy collections/tables to new ones with different embedding models.
"""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
from lightrag.kg.postgres_impl import PGVectorStorage
class TestQdrantDimensionMismatch:
"""Test suite for Qdrant dimension mismatch handling."""
def test_qdrant_dimension_mismatch_skip_migration(self):
"""
Test that Qdrant skips migration when dimensions don't match.
Scenario: Legacy collection has 1536d vectors, new model expects 3072d.
Expected: Migration skipped, new empty collection created, legacy preserved.
"""
from qdrant_client import models
# Setup mock client
client = MagicMock()
# Mock legacy collection with 1536d vectors
legacy_collection_info = MagicMock()
legacy_collection_info.config.params.vectors.size = 1536
# Setup collection existence checks
def collection_exists_side_effect(name):
if name == "lightrag_chunks": # legacy
return True
elif name == "lightrag_chunks_model_3072d": # new
return False
return False
client.collection_exists.side_effect = collection_exists_side_effect
client.get_collection.return_value = legacy_collection_info
client.count.return_value.count = 100 # Legacy has data
# Call setup_collection with 3072d (different from legacy 1536d)
QdrantVectorDBStorage.setup_collection(
client,
"lightrag_chunks_model_3072d",
namespace="chunks",
workspace="test",
vectors_config=models.VectorParams(
size=3072, distance=models.Distance.COSINE
),
)
# Verify new collection was created
client.create_collection.assert_called_once()
# Verify migration was NOT attempted (no scroll/upsert calls)
client.scroll.assert_not_called()
client.upsert.assert_not_called()
def test_qdrant_dimension_match_proceed_migration(self):
"""
Test that Qdrant proceeds with migration when dimensions match.
Scenario: Legacy collection has 1536d vectors, new model also expects 1536d.
Expected: Migration proceeds normally.
"""
from qdrant_client import models
client = MagicMock()
# Mock legacy collection with 1536d vectors (matching new)
legacy_collection_info = MagicMock()
legacy_collection_info.config.params.vectors.size = 1536
def collection_exists_side_effect(name):
if name == "lightrag_chunks": # legacy
return True
elif name == "lightrag_chunks_model_1536d": # new
return False
return False
client.collection_exists.side_effect = collection_exists_side_effect
client.get_collection.return_value = legacy_collection_info
client.count.return_value.count = 100 # Legacy has data
# Mock scroll to return sample data
sample_point = MagicMock()
sample_point.id = "test_id"
sample_point.vector = [0.1] * 1536
sample_point.payload = {"id": "test"}
client.scroll.return_value = ([sample_point], None)
# Call setup_collection with matching 1536d
QdrantVectorDBStorage.setup_collection(
client,
"lightrag_chunks_model_1536d",
namespace="chunks",
workspace="test",
vectors_config=models.VectorParams(
size=1536, distance=models.Distance.COSINE
),
)
# Verify migration WAS attempted
client.create_collection.assert_called_once()
client.scroll.assert_called()
client.upsert.assert_called()
class TestPostgresDimensionMismatch:
"""Test suite for PostgreSQL dimension mismatch handling."""
@pytest.mark.asyncio
async def test_postgres_dimension_mismatch_skip_migration_metadata(self):
"""
Test that PostgreSQL skips migration when dimensions don't match (via metadata).
Scenario: Legacy table has 1536d vectors (detected via pg_attribute),
new model expects 3072d.
Expected: Migration skipped, new empty table created, legacy preserved.
"""
# Setup mock database
db = AsyncMock()
# Mock table existence and dimension checks
async def query_side_effect(query, params, **kwargs):
if "information_schema.tables" in query:
if params[0] == "lightrag_doc_chunks": # legacy
return {"exists": True}
elif params[0] == "lightrag_doc_chunks_model_3072d": # new
return {"exists": False}
elif "COUNT(*)" in query:
return {"count": 100} # Legacy has data
elif "pg_attribute" in query:
return {"vector_dim": 1536} # Legacy has 1536d vectors
return {}
db.query.side_effect = query_side_effect
db.execute = AsyncMock()
db._create_vector_index = AsyncMock()
# Call setup_table with 3072d (different from legacy 1536d)
await PGVectorStorage.setup_table(
db,
"lightrag_doc_chunks_model_3072d",
legacy_table_name="lightrag_doc_chunks",
base_table="lightrag_doc_chunks",
embedding_dim=3072,
)
# Verify new table was created (DDL executed)
create_table_calls = [
call
for call in db.execute.call_args_list
if call[0][0] and "CREATE TABLE" in call[0][0]
]
assert len(create_table_calls) > 0, "New table should be created"
# Verify migration was NOT attempted (no INSERT calls)
insert_calls = [
call
for call in db.execute.call_args_list
if call[0][0] and "INSERT INTO" in call[0][0]
]
assert len(insert_calls) == 0, "Migration should be skipped"
@pytest.mark.asyncio
async def test_postgres_dimension_mismatch_skip_migration_sampling(self):
"""
Test that PostgreSQL skips migration when dimensions don't match (via sampling).
Scenario: Legacy table dimension detection fails via metadata,
falls back to vector sampling, detects 1536d vs expected 3072d.
Expected: Migration skipped, new empty table created, legacy preserved.
"""
db = AsyncMock()
# Mock table existence and dimension checks
async def query_side_effect(query, params, **kwargs):
if "information_schema.tables" in query:
if params[0] == "lightrag_doc_chunks": # legacy
return {"exists": True}
elif params[0] == "lightrag_doc_chunks_model_3072d": # new
return {"exists": False}
elif "COUNT(*)" in query:
return {"count": 100} # Legacy has data
elif "pg_attribute" in query:
return {"vector_dim": -1} # Metadata check fails
elif "SELECT content_vector FROM" in query:
# Return sample vector with 1536 dimensions
return {"content_vector": [0.1] * 1536}
return {}
db.query.side_effect = query_side_effect
db.execute = AsyncMock()
db._create_vector_index = AsyncMock()
# Call setup_table with 3072d (different from legacy 1536d)
await PGVectorStorage.setup_table(
db,
"lightrag_doc_chunks_model_3072d",
legacy_table_name="lightrag_doc_chunks",
base_table="lightrag_doc_chunks",
embedding_dim=3072,
)
# Verify new table was created
create_table_calls = [
call
for call in db.execute.call_args_list
if call[0][0] and "CREATE TABLE" in call[0][0]
]
assert len(create_table_calls) > 0, "New table should be created"
# Verify migration was NOT attempted
insert_calls = [
call
for call in db.execute.call_args_list
if call[0][0] and "INSERT INTO" in call[0][0]
]
assert len(insert_calls) == 0, "Migration should be skipped"
@pytest.mark.asyncio
async def test_postgres_dimension_match_proceed_migration(self):
"""
Test that PostgreSQL proceeds with migration when dimensions match.
Scenario: Legacy table has 1536d vectors, new model also expects 1536d.
Expected: Migration proceeds normally.
"""
db = AsyncMock()
async def query_side_effect(query, params, **kwargs):
multirows = kwargs.get("multirows", False)
if "information_schema.tables" in query:
if params[0] == "lightrag_doc_chunks": # legacy
return {"exists": True}
elif params[0] == "lightrag_doc_chunks_model_1536d": # new
return {"exists": False}
elif "COUNT(*)" in query:
return {"count": 100} # Legacy has data
elif "pg_attribute" in query:
return {"vector_dim": 1536} # Legacy has matching 1536d
elif "SELECT * FROM" in query and multirows:
# Return sample data for migration (first batch)
if params[0] == 0: # offset = 0
return [
{
"id": "test1",
"content_vector": [0.1] * 1536,
"workspace": "test",
},
{
"id": "test2",
"content_vector": [0.2] * 1536,
"workspace": "test",
},
]
else: # offset > 0
return [] # No more data
return {}
db.query.side_effect = query_side_effect
db.execute = AsyncMock()
db._create_vector_index = AsyncMock()
# Call setup_table with matching 1536d
await PGVectorStorage.setup_table(
db,
"lightrag_doc_chunks_model_1536d",
legacy_table_name="lightrag_doc_chunks",
base_table="lightrag_doc_chunks",
embedding_dim=1536,
)
# Verify migration WAS attempted (INSERT calls made)
insert_calls = [
call
for call in db.execute.call_args_list
if call[0][0] and "INSERT INTO" in call[0][0]
]
assert (
len(insert_calls) > 0
), "Migration should proceed with matching dimensions"

View file

@ -1169,56 +1169,73 @@ async def test_dimension_mismatch_postgres(
print("📦 Initializing LightRAG with new model (3072d)...")
# This should handle dimension mismatch gracefully
# Either: 1) Create new table for new model, or 2) Raise clear error
try:
rag = LightRAG(
working_dir=temp_dir,
llm_model_func=mock_llm_func,
embedding_func=embedding_func_new,
tokenizer=mock_tokenizer,
kv_storage="PGKVStorage",
vector_storage="PGVectorStorage",
doc_status_storage="PGDocStatusStorage",
vector_db_storage_cls_kwargs={
**pg_config,
"cosine_better_than_threshold": 0.8,
},
)
# With our fix, this should handle dimension mismatch gracefully:
# Expected behavior:
# 1. Detect dimension mismatch (1536d legacy vs 3072d new)
# 2. Skip migration to prevent data corruption
# 3. Preserve legacy table with original data
# 4. Create new empty table for 3072d model
# 5. System initializes successfully
await rag.initialize_storages()
rag = LightRAG(
working_dir=temp_dir,
llm_model_func=mock_llm_func,
embedding_func=embedding_func_new,
tokenizer=mock_tokenizer,
kv_storage="PGKVStorage",
vector_storage="PGVectorStorage",
doc_status_storage="PGDocStatusStorage",
vector_db_storage_cls_kwargs={
**pg_config,
"cosine_better_than_threshold": 0.8,
},
)
# Check what happened
new_table = rag.chunks_vdb.table_name
print(f"✅ Initialization succeeded, new table: {new_table}")
await rag.initialize_storages()
# Verify new table has correct dimension (3072d)
# Check if both tables exist
check_legacy = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{legacy_table}')"
check_new = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{new_table.lower()}')"
# Verify expected behavior
new_table = rag.chunks_vdb.table_name
print(f"✅ Initialization succeeded, new table: {new_table}")
legacy_exists = await pg_cleanup.query(check_legacy, [])
new_exists = await pg_cleanup.query(check_new, [])
# 1. New table should exist and be created with model suffix
assert "text_embedding_3_large_3072d" in new_table.lower()
check_new = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{new_table.lower()}')"
new_exists = await pg_cleanup.query(check_new, [])
assert new_exists.get("exists") is True, "New table should exist"
print(f"✅ New table created: {new_table}")
print(f"✅ Legacy table exists: {legacy_exists.get('exists')}")
print(f"✅ New table exists: {new_exists.get('exists')}")
# 2. Legacy table should be preserved (not deleted)
check_legacy = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{legacy_table}')"
legacy_exists = await pg_cleanup.query(check_legacy, [])
assert (
legacy_exists.get("exists") is True
), "Legacy table should be preserved when dimensions don't match"
print(f"✅ Legacy table preserved: {legacy_table}")
# Test should verify proper handling:
# - New table created with 3072d
# - Legacy table preserved (or migrated to dimension-matched table)
# - System is operational
# 3. Legacy table should still have original data (not migrated)
legacy_count_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table}", []
)
legacy_count = legacy_count_result.get("count", 0)
assert (
legacy_count == 3
), f"Legacy table should still have 3 records, got {legacy_count}"
print(f"✅ Legacy data preserved: {legacy_count} records")
await rag.finalize_storages()
# 4. New table should be empty (migration skipped)
new_count_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {new_table}", []
)
new_count = new_count_result.get("count", 0)
assert (
new_count == 0
), f"New table should be empty (migration skipped), got {new_count}"
print(f"✅ New table is empty (migration correctly skipped): {new_count} records")
except Exception as e:
# If it raises an error, it should be a clear, actionable error
print(f"⚠️ Initialization raised exception: {e}")
# Verify error message is clear and actionable
assert any(
keyword in str(e).lower()
for keyword in ["dimension", "mismatch", "1536", "3072"]
), f"Error message should mention dimension mismatch: {e}"
print("✅ Clear error message provided to user")
# 5. System should be operational
print("✅ System initialized successfully despite dimension mismatch")
await rag.finalize_storages()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@ -1293,50 +1310,72 @@ async def test_dimension_mismatch_qdrant(
print("📦 Initializing LightRAG with new model (1024d)...")
# This should handle dimension mismatch gracefully
try:
rag = LightRAG(
working_dir=temp_dir,
llm_model_func=mock_llm_func,
embedding_func=embedding_func_new,
tokenizer=mock_tokenizer,
vector_storage="QdrantVectorDBStorage",
vector_db_storage_cls_kwargs={
**qdrant_config,
"cosine_better_than_threshold": 0.8,
},
)
# With our fix, this should handle dimension mismatch gracefully:
# Expected behavior:
# 1. Detect dimension mismatch (768d legacy vs 1024d new)
# 2. Skip migration to prevent data corruption
# 3. Preserve legacy collection with original data
# 4. Create new empty collection for 1024d model
# 5. System initializes successfully
await rag.initialize_storages()
rag = LightRAG(
working_dir=temp_dir,
llm_model_func=mock_llm_func,
embedding_func=embedding_func_new,
tokenizer=mock_tokenizer,
vector_storage="QdrantVectorDBStorage",
vector_db_storage_cls_kwargs={
**qdrant_config,
"cosine_better_than_threshold": 0.8,
},
)
# Check what happened
new_collection = rag.chunks_vdb.final_namespace
print(f"✅ Initialization succeeded, new collection: {new_collection}")
await rag.initialize_storages()
# Verify collections
legacy_exists = client.collection_exists(legacy_collection)
new_exists = client.collection_exists(new_collection)
# Verify expected behavior
new_collection = rag.chunks_vdb.final_namespace
print(f"✅ Initialization succeeded, new collection: {new_collection}")
print(f"✅ Legacy collection exists: {legacy_exists}")
print(f"✅ New collection exists: {new_exists}")
# 1. New collection should exist with model suffix
assert "bge_large_1024d" in new_collection
assert client.collection_exists(
new_collection
), f"New collection {new_collection} should exist"
print(f"✅ New collection created: {new_collection}")
# Verify new collection has correct dimension
collection_info = client.get_collection(new_collection)
new_dim = collection_info.config.params.vectors.size
print(f"✅ New collection dimension: {new_dim}d")
assert new_dim == 1024, f"New collection should have 1024d, got {new_dim}d"
# 2. Legacy collection should be preserved (not deleted)
legacy_exists = client.collection_exists(legacy_collection)
assert (
legacy_exists
), "Legacy collection should be preserved when dimensions don't match"
print(f"✅ Legacy collection preserved: {legacy_collection}")
await rag.finalize_storages()
# 3. Legacy collection should still have original data (not migrated)
legacy_count = client.count(legacy_collection).count
assert (
legacy_count == 3
), f"Legacy collection should still have 3 vectors, got {legacy_count}"
print(f"✅ Legacy data preserved: {legacy_count} vectors")
except Exception as e:
# If it raises an error, it should be a clear, actionable error
print(f"⚠️ Initialization raised exception: {e}")
# Verify error message is clear and actionable
assert any(
keyword in str(e).lower()
for keyword in ["dimension", "mismatch", "768", "1024"]
), f"Error message should mention dimension mismatch: {e}"
print("✅ Clear error message provided to user")
# 4. New collection should be empty (migration skipped)
new_count = client.count(new_collection).count
assert (
new_count == 0
), f"New collection should be empty (migration skipped), got {new_count}"
print(
f"✅ New collection is empty (migration correctly skipped): {new_count} vectors"
)
# 5. Verify new collection has correct dimension
collection_info = client.get_collection(new_collection)
new_dim = collection_info.config.params.vectors.size
assert new_dim == 1024, f"New collection should have 1024d, got {new_dim}d"
print(f"✅ New collection dimension verified: {new_dim}d")
# 6. System should be operational
print("✅ System initialized successfully despite dimension mismatch")
await rag.finalize_storages()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)

View file

@ -12,9 +12,11 @@ def mock_qdrant_client():
client = mock_client_cls.return_value
client.collection_exists.return_value = False
client.count.return_value.count = 0
# Mock payload schema for get_collection
# Mock payload schema and vector config for get_collection
collection_info = MagicMock()
collection_info.payload_schema = {}
# Mock vector dimension to match mock_embedding_func (768d)
collection_info.config.params.vectors.size = 768
client.get_collection.return_value = collection_info
yield client
@ -254,6 +256,12 @@ async def test_scenario_2_legacy_upgrade_migration(
lambda name: name == legacy_collection
)
# Mock legacy collection info with 1536d vectors
legacy_collection_info = MagicMock()
legacy_collection_info.payload_schema = {}
legacy_collection_info.config.params.vectors.size = 1536
mock_qdrant_client.get_collection.return_value = legacy_collection_info
# Mock legacy data
mock_qdrant_client.count.return_value.count = 150