fix: replace db.fetch with db.query for PostgreSQL migration

Why this change is needed:
PostgreSQLDB class doesn't have a fetch() method. The migration code
was incorrectly using db.fetch() for batch data retrieval, causing
AttributeError during E2E tests.

How it solves it:
1. Changed db.fetch(sql, params) to db.query(sql, params, multirows=True)
2. Updated all test mocks to support the multirows parameter
3. Consolidated mock_query implementation to handle both single and multi-row queries

Impact:
- PostgreSQL legacy data migration now works correctly in E2E tests
- All unit tests pass (6/6)
- Aligns with PostgreSQLDB's actual API

Testing:
- pytest tests/test_postgres_migration.py -v (6/6 passed)
- Updated test_postgres_migration_trigger mock
- Updated test_scenario_2_legacy_upgrade_migration mock
- Updated base mock_pg_db fixture
This commit is contained in:
BukeLy 2025-11-20 01:12:27 +08:00
parent 5d9547344a
commit e842327486
2 changed files with 33 additions and 32 deletions

View file

@ -2319,7 +2319,7 @@ class PGVectorStorage(BaseVectorStorage):
select_query = (
f"SELECT * FROM {legacy_table_name} OFFSET $1 LIMIT $2"
)
rows = await db.fetch(select_query, [offset, batch_size])
rows = await db.query(select_query, [offset, batch_size], multirows=True)
if not rows:
break

View file

@ -19,10 +19,15 @@ def mock_pg_db():
db = AsyncMock()
db.workspace = "test_workspace"
# Mock query responses
db.query = AsyncMock(return_value={"exists": False, "count": 0})
# Mock query responses with multirows support
async def mock_query(sql, params=None, multirows=False, **kwargs):
# Default return value
if multirows:
return [] # Return empty list for multirows
return {"exists": False, "count": 0}
db.query = AsyncMock(side_effect=mock_query)
db.execute = AsyncMock()
db.fetch = AsyncMock(return_value=[])
return db
@ -108,26 +113,24 @@ async def test_postgres_migration_trigger(mock_client_manager, mock_pg_db, mock_
return table_name == storage.legacy_table_name
# 2. Legacy table has 100 records
async def mock_query(sql, params):
if "COUNT(*)" in sql:
return {"count": 100}
return {}
# 3. Mock fetch for batch migration
mock_rows = [
{"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
for i in range(100)
]
async def mock_fetch(sql, params):
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
return {"count": 100}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch)
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()):
@ -253,30 +256,28 @@ async def test_scenario_2_legacy_upgrade_migration(mock_client_manager, mock_pg_
return table_name == storage.legacy_table_name
# Mock: legacy table has 50 records
async def mock_query(sql, params):
mock_rows = [
{"id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace"}
for i in range(50)
]
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
# First call for legacy count, then for verification
if storage.legacy_table_name in sql:
return {"count": 50}
else:
return {"count": 50}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
return {}
# Mock fetch for migration
mock_rows = [
{"id": f"legacy_id_{i}", "content": f"legacy_content_{i}", "workspace": "legacy_workspace"}
for i in range(50)
]
async def mock_fetch(sql, params):
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
mock_pg_db.query = AsyncMock(side_effect=mock_query)
mock_pg_db.fetch = AsyncMock(side_effect=mock_fetch)
with patch("lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists), \
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create: