LightRAG/tests/test_postgres_migration.py
BukeLy ad68624d02 feat: PostgreSQL model isolation and auto-migration
Why this change is needed:
PostgreSQL vector storage needs model isolation to prevent dimension
conflicts when different workspaces use different embedding models.
Without this, the first workspace locks the vector dimension for all
subsequent workspaces, causing failures.

How it solves it:
- Implements dynamic table naming with model suffix: {table}_{model}_{dim}d
- Adds setup_table() method mirroring Qdrant's approach for consistency
- Implements 4-branch migration logic: both exist -> warn, only new -> use,
  neither -> create, only legacy -> migrate
- Batch migration: 500 records/batch (same as Qdrant)
- No automatic rollback to support idempotent re-runs

Impact:
- PostgreSQL tables now isolated by embedding model and dimension
- Automatic data migration from legacy tables on startup
- Backward compatible: model_name=None defaults to "unknown"
- All SQL operations use dynamic table names

Testing:
- 6 new tests for PostgreSQL migration (100% pass)
- Tests cover: naming, migration trigger, scenarios 1-3
- 3 additional scenario tests added for Qdrant completeness

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 22:54:37 +08:00

366 lines
11 KiB
Python

import os
import pytest
from unittest.mock import MagicMock, patch, AsyncMock, call
import numpy as np
from lightrag.utils import EmbeddingFunc
from lightrag.kg.postgres_impl import (
PGVectorStorage,
_pg_table_exists,
_pg_create_table,
PostgreSQLMigrationError,
)
from lightrag.namespace import NameSpace
# Mock PostgreSQLDB
@pytest.fixture
def mock_pg_db():
"""Mock PostgreSQL database connection"""
db = AsyncMock()
db.workspace = "test_workspace"
# Mock query responses
db.query = AsyncMock(return_value={"exists": False, "count": 0})
db.execute = AsyncMock()
db.fetch = AsyncMock(return_value=[])
return db
# Mock get_data_init_lock to avoid async lock issues in tests
@pytest.fixture(autouse=True)
def mock_data_init_lock():
with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock:
mock_lock_ctx = AsyncMock()
mock_lock.return_value = mock_lock_ctx
yield mock_lock
# Mock ClientManager
@pytest.fixture
def mock_client_manager(mock_pg_db):
with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager:
mock_manager.get_client = AsyncMock(return_value=mock_pg_db)
mock_manager.release_client = AsyncMock()
yield mock_manager
# Mock Embedding function
@pytest.fixture
def mock_embedding_func():
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
func = EmbeddingFunc(
embedding_dim=768,
func=embed_func,
model_name="test_model"
)
return func
@pytest.mark.asyncio
async def test_postgres_table_naming(mock_client_manager, mock_pg_db, mock_embedding_func):
"""Test if table name is correctly generated with model suffix"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws"
)
# Verify table name contains model suffix
expected_suffix = "test_model_768d"
assert expected_suffix in storage.table_name
assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}"
# Verify legacy table name
assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS"
@pytest.mark.asyncio
async def test_postgres_migration_trigger(mock_client_manager, mock_pg_db, mock_embedding_func):
"""Test if migration logic is triggered correctly"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws"
)
# Setup mocks for migration scenario
# 1. New table does not exist, legacy table exists
async def mock_table_exists(db, table_name):
return table_name == storage.legacy_table_name
# 2. Legacy table has 100 records
async def mock_query(sql, params):
if "COUNT(*)" in sql:
return {"count": 100}
return {}
# 3. Mock fetch for batch migration
mock_rows = [
{"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
for i in range(100)
]
async def mock_fetch(sql, params):
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
mock_pg_db.query = AsyncMock(side_effect=mock_query)
mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch)
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()):
# Initialize storage (should trigger migration)
await storage.initialize()
# Verify migration was executed
# Check that execute was called for inserting rows
assert mock_pg_db.execute.call_count > 0
@pytest.mark.asyncio
async def test_postgres_no_migration_needed(mock_client_manager, mock_pg_db, mock_embedding_func):
"""Test scenario where new table already exists (no migration needed)"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws"
)
# Mock: new table already exists
async def mock_table_exists(db, table_name):
return table_name == storage.table_name
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create:
await storage.initialize()
# Verify no table creation was attempted
mock_create.assert_not_called()
@pytest.mark.asyncio
async def test_scenario_1_new_workspace_creation(mock_client_manager, mock_pg_db, mock_embedding_func):
"""
Scenario 1: New workspace creation
Expected behavior:
- No legacy table exists
- Directly create new table with model suffix
- No migration needed
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
embedding_func = EmbeddingFunc(
embedding_dim=3072,
func=mock_embedding_func.func,
model_name="text-embedding-3-large"
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="new_workspace"
)
# Mock: neither table exists
async def mock_table_exists(db, table_name):
return False
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create:
await storage.initialize()
# Verify table name format
assert "text_embedding_3_large_3072d" in storage.table_name
# Verify new table creation was called
mock_create.assert_called_once()
call_args = mock_create.call_args
assert call_args[0][1] == storage.table_name # table_name is second positional arg
@pytest.mark.asyncio
async def test_scenario_2_legacy_upgrade_migration(mock_client_manager, mock_pg_db, mock_embedding_func):
"""
Scenario 2: Upgrade from legacy version
Expected behavior:
- Legacy table exists (without model suffix)
- New table doesn't exist
- Automatically migrate data to new table with suffix
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="text-embedding-ada-002"
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="legacy_workspace"
)
# Mock: only legacy table exists
async def mock_table_exists(db, table_name):
return table_name == storage.legacy_table_name
# Mock: legacy table has 50 records
async def mock_query(sql, params):
if "COUNT(*)" in sql:
# First call for legacy count, then for verification
if storage.legacy_table_name in sql:
return {"count": 50}
else:
return {"count": 50}
return {}
# Mock fetch for migration
mock_rows = [
{"id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace"}
for i in range(50)
]
async def mock_fetch(sql, params):
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
mock_pg_db.query = AsyncMock(side_effect=mock_query)
mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch)
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create:
await storage.initialize()
# Verify table name contains ada-002
assert "text_embedding_ada_002_1536d" in storage.table_name
# Verify migration was executed
assert mock_pg_db.execute.call_count >= 50 # At least one execute per row
mock_create.assert_called_once()
@pytest.mark.asyncio
async def test_scenario_3_multi_model_coexistence(mock_client_manager, mock_pg_db, mock_embedding_func):
"""
Scenario 3: Multiple embedding models coexist
Expected behavior:
- Different embedding models create separate tables
- Tables are isolated by model suffix
- No interference between different models
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.8
}
}
# Workspace A: uses bge-small (768d)
embedding_func_a = EmbeddingFunc(
embedding_dim=768,
func=mock_embedding_func.func,
model_name="bge-small"
)
storage_a = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_a,
workspace="workspace_a"
)
# Workspace B: uses bge-large (1024d)
async def embed_func_b(texts, **kwargs):
return np.array([[0.1] * 1024 for _ in texts])
embedding_func_b = EmbeddingFunc(
embedding_dim=1024,
func=embed_func_b,
model_name="bge-large"
)
storage_b = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_b,
workspace="workspace_b"
)
# Verify different table names
assert storage_a.table_name != storage_b.table_name
assert "bge_small_768d" in storage_a.table_name
assert "bge_large_1024d" in storage_b.table_name
# Mock: both tables don't exist yet
async def mock_table_exists(db, table_name):
return False
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create:
# Initialize both storages
await storage_a.initialize()
await storage_b.initialize()
# Verify two separate tables were created
assert mock_create.call_count == 2
# Verify table names are different
call_args_list = mock_create.call_args_list
table_names = [call[0][1] for call in call_args_list] # Second positional arg
assert len(set(table_names)) == 2 # Two unique table names
assert storage_a.table_name in table_names
assert storage_b.table_name in table_names