diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 9b892898..dd7afcc7 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2319,7 +2319,7 @@ class PGVectorStorage(BaseVectorStorage): select_query = ( f"SELECT * FROM {legacy_table_name} OFFSET $1 LIMIT $2" ) - rows = await db.fetch(select_query, [offset, batch_size]) + rows = await db.query(select_query, [offset, batch_size], multirows=True) if not rows: break diff --git a/tests/test_postgres_migration.py b/tests/test_postgres_migration.py index 2ca6c770..8569335d 100644 --- a/tests/test_postgres_migration.py +++ b/tests/test_postgres_migration.py @@ -19,10 +19,15 @@ def mock_pg_db(): db = AsyncMock() db.workspace = "test_workspace" - # Mock query responses - db.query = AsyncMock(return_value={"exists": False, "count": 0}) + # Mock query responses with multirows support + async def mock_query(sql, params=None, multirows=False, **kwargs): + # Default return value + if multirows: + return [] # Return empty list for multirows + return {"exists": False, "count": 0} + + db.query = AsyncMock(side_effect=mock_query) db.execute = AsyncMock() - db.fetch = AsyncMock(return_value=[]) return db @@ -108,26 +113,24 @@ async def test_postgres_migration_trigger(mock_client_manager, mock_pg_db, mock_ return table_name == storage.legacy_table_name # 2. Legacy table has 100 records - async def mock_query(sql, params): - if "COUNT(*)" in sql: - return {"count": 100} - return {} - - # 3. Mock fetch for batch migration mock_rows = [ {"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"} for i in range(100) ] - async def mock_fetch(sql, params): - offset = params[0] if params else 0 - limit = params[1] if len(params) > 1 else 500 - start = offset - end = min(offset + limit, len(mock_rows)) - return mock_rows[start:end] + async def mock_query(sql, params=None, multirows=False, **kwargs): + if "COUNT(*)" in sql: + return {"count": 100} + elif multirows and "SELECT *" in sql: + # Mock batch fetch for migration + offset = params[0] if params else 0 + limit = params[1] if len(params) > 1 else 500 + start = offset + end = min(offset + limit, len(mock_rows)) + return mock_rows[start:end] + return {} mock_pg_db.query = AsyncMock(side_effect=mock_query) - mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch) with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \ patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()): @@ -253,30 +256,28 @@ async def test_scenario_2_legacy_upgrade_migration(mock_client_manager, mock_pg_ return table_name == storage.legacy_table_name # Mock: legacy table has 50 records - async def mock_query(sql, params): + mock_rows = [ + {"id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace"} + for i in range(50) + ] + + async def mock_query(sql, params=None, multirows=False, **kwargs): if "COUNT(*)" in sql: # First call for legacy count, then for verification if storage.legacy_table_name in sql: return {"count": 50} else: return {"count": 50} + elif multirows and "SELECT *" in sql: + # Mock batch fetch for migration + offset = params[0] if params else 0 + limit = params[1] if len(params) > 1 else 500 + start = offset + end = min(offset + limit, len(mock_rows)) + return mock_rows[start:end] return {} - # Mock fetch for migration - mock_rows = [ - {"id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace"} - for i in range(50) - ] - - async def mock_fetch(sql, params): - offset = params[0] if params else 0 - limit = params[1] if len(params) > 1 else 500 - start = offset - end = min(offset + limit, len(mock_rows)) - return mock_rows[start:end] - mock_pg_db.query = AsyncMock(side_effect=mock_query) - mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch) with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \ patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create: