import pytest from unittest.mock import patch, AsyncMock import numpy as np from lightrag.utils import EmbeddingFunc from lightrag.kg.postgres_impl import ( PGVectorStorage, ) from lightrag.namespace import NameSpace # Mock PostgreSQLDB @pytest.fixture def mock_pg_db(): """Mock PostgreSQL database connection""" db = AsyncMock() db.workspace = "test_workspace" # Mock query responses with multirows support async def mock_query(sql, params=None, multirows=False, **kwargs): # Default return value if multirows: return [] # Return empty list for multirows return {"exists": False, "count": 0} db.query = AsyncMock(side_effect=mock_query) db.execute = AsyncMock() return db # Mock get_data_init_lock to avoid async lock issues in tests @pytest.fixture(autouse=True) def mock_data_init_lock(): with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock: mock_lock_ctx = AsyncMock() mock_lock.return_value = mock_lock_ctx yield mock_lock # Mock ClientManager @pytest.fixture def mock_client_manager(mock_pg_db): with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager: mock_manager.get_client = AsyncMock(return_value=mock_pg_db) mock_manager.release_client = AsyncMock() yield mock_manager # Mock Embedding function @pytest.fixture def mock_embedding_func(): async def embed_func(texts, **kwargs): return np.array([[0.1] * 768 for _ in texts]) func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test_model") return func @pytest.mark.asyncio async def test_postgres_table_naming( mock_client_manager, mock_pg_db, mock_embedding_func ): """Test if table name is correctly generated with model suffix""" config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } storage = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Verify table name contains model suffix expected_suffix = "test_model_768d" assert expected_suffix in storage.table_name assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}" # Verify legacy table name assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS" @pytest.mark.asyncio async def test_postgres_migration_trigger( mock_client_manager, mock_pg_db, mock_embedding_func ): """Test if migration logic is triggered correctly""" config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } storage = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Setup mocks for migration scenario # 1. New table does not exist, legacy table exists async def mock_table_exists(db, table_name): return table_name == storage.legacy_table_name # 2. Legacy table has 100 records mock_rows = [ {"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"} for i in range(100) ] async def mock_query(sql, params=None, multirows=False, **kwargs): if "COUNT(*)" in sql: return {"count": 100} elif multirows and "SELECT *" in sql: # Mock batch fetch for migration offset = params[0] if params else 0 limit = params[1] if len(params) > 1 else 500 start = offset end = min(offset + limit, len(mock_rows)) return mock_rows[start:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query) with ( patch( "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists ), patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()), ): # Initialize storage (should trigger migration) await storage.initialize() # Verify migration was executed # Check that execute was called for inserting rows assert mock_pg_db.execute.call_count > 0 @pytest.mark.asyncio async def test_postgres_no_migration_needed( mock_client_manager, mock_pg_db, mock_embedding_func ): """Test scenario where new table already exists (no migration needed)""" config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } storage = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Mock: new table already exists async def mock_table_exists(db, table_name): return table_name == storage.table_name with ( patch( "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists ), patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create, ): await storage.initialize() # Verify no table creation was attempted mock_create.assert_not_called() @pytest.mark.asyncio async def test_scenario_1_new_workspace_creation( mock_client_manager, mock_pg_db, mock_embedding_func ): """ Scenario 1: New workspace creation Expected behavior: - No legacy table exists - Directly create new table with model suffix - No migration needed """ config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } embedding_func = EmbeddingFunc( embedding_dim=3072, func=mock_embedding_func.func, model_name="text-embedding-3-large", ) storage = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=embedding_func, workspace="new_workspace", ) # Mock: neither table exists async def mock_table_exists(db, table_name): return False with ( patch( "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists ), patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create, ): await storage.initialize() # Verify table name format assert "text_embedding_3_large_3072d" in storage.table_name # Verify new table creation was called mock_create.assert_called_once() call_args = mock_create.call_args assert ( call_args[0][1] == storage.table_name ) # table_name is second positional arg @pytest.mark.asyncio async def test_scenario_2_legacy_upgrade_migration( mock_client_manager, mock_pg_db, mock_embedding_func ): """ Scenario 2: Upgrade from legacy version Expected behavior: - Legacy table exists (without model suffix) - New table doesn't exist - Automatically migrate data to new table with suffix """ config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } embedding_func = EmbeddingFunc( embedding_dim=1536, func=mock_embedding_func.func, model_name="text-embedding-ada-002", ) storage = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=embedding_func, workspace="legacy_workspace", ) # Mock: only legacy table exists async def mock_table_exists(db, table_name): return table_name == storage.legacy_table_name # Mock: legacy table has 50 records mock_rows = [ { "id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace", } for i in range(50) ] async def mock_query(sql, params=None, multirows=False, **kwargs): if "COUNT(*)" in sql: # First call for legacy count, then for verification if storage.legacy_table_name in sql: return {"count": 50} else: return {"count": 50} elif multirows and "SELECT *" in sql: # Mock batch fetch for migration offset = params[0] if params else 0 limit = params[1] if len(params) > 1 else 500 start = offset end = min(offset + limit, len(mock_rows)) return mock_rows[start:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query) with ( patch( "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists ), patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create, ): await storage.initialize() # Verify table name contains ada-002 assert "text_embedding_ada_002_1536d" in storage.table_name # Verify migration was executed assert mock_pg_db.execute.call_count >= 50 # At least one execute per row mock_create.assert_called_once() @pytest.mark.asyncio async def test_scenario_3_multi_model_coexistence( mock_client_manager, mock_pg_db, mock_embedding_func ): """ Scenario 3: Multiple embedding models coexist Expected behavior: - Different embedding models create separate tables - Tables are isolated by model suffix - No interference between different models """ config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } # Workspace A: uses bge-small (768d) embedding_func_a = EmbeddingFunc( embedding_dim=768, func=mock_embedding_func.func, model_name="bge-small" ) storage_a = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=embedding_func_a, workspace="workspace_a", ) # Workspace B: uses bge-large (1024d) async def embed_func_b(texts, **kwargs): return np.array([[0.1] * 1024 for _ in texts]) embedding_func_b = EmbeddingFunc( embedding_dim=1024, func=embed_func_b, model_name="bge-large" ) storage_b = PGVectorStorage( namespace=NameSpace.VECTOR_STORE_CHUNKS, global_config=config, embedding_func=embedding_func_b, workspace="workspace_b", ) # Verify different table names assert storage_a.table_name != storage_b.table_name assert "bge_small_768d" in storage_a.table_name assert "bge_large_1024d" in storage_b.table_name # Mock: both tables don't exist yet async def mock_table_exists(db, table_name): return False with ( patch( "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists ), patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create, ): # Initialize both storages await storage_a.initialize() await storage_b.initialize() # Verify two separate tables were created assert mock_create.call_count == 2 # Verify table names are different call_args_list = mock_create.call_args_list table_names = [call[0][1] for call in call_args_list] # Second positional arg assert len(set(table_names)) == 2 # Two unique table names assert storage_a.table_name in table_names assert storage_b.table_name in table_names