Optimize PostgreSQL database migrations for LLM cache

- Combine column migration into single operation
- Optimize LLM cache key migration query
- Improve migration error handling
- Add conflict detection for cache migration
This commit is contained in:
yangdx 2025-07-16 17:32:53 +08:00
parent bd340fece6
commit bab2803953

View file

@ -105,80 +105,59 @@ class PostgreSQLDB:
): ):
pass pass
async def _migrate_llm_cache_add_chunk_id(self): async def _migrate_llm_cache_add_columns(self):
"""Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" """Add chunk_id and cache_type columns to LIGHTRAG_LLM_CACHE table if they don't exist"""
try: try:
# Check if chunk_id column exists # Check if both columns exist
check_column_sql = """ check_columns_sql = """
SELECT column_name SELECT column_name
FROM information_schema.columns FROM information_schema.columns
WHERE table_name = 'lightrag_llm_cache' WHERE table_name = 'lightrag_llm_cache'
AND column_name = 'chunk_id' AND column_name IN ('chunk_id', 'cache_type')
""" """
column_info = await self.query(check_column_sql) existing_columns = await self.query(check_columns_sql, multirows=True)
if not column_info: existing_column_names = {col['column_name'] for col in existing_columns} if existing_columns else set()
# Add missing chunk_id column
if 'chunk_id' not in existing_column_names:
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
add_column_sql = """ add_chunk_id_sql = """
ALTER TABLE LIGHTRAG_LLM_CACHE ALTER TABLE LIGHTRAG_LLM_CACHE
ADD COLUMN chunk_id VARCHAR(255) NULL ADD COLUMN chunk_id VARCHAR(255) NULL
""" """
await self.execute(add_column_sql) await self.execute(add_chunk_id_sql)
logger.info( logger.info("Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table")
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
)
else: else:
logger.info( logger.info("chunk_id column already exists in LIGHTRAG_LLM_CACHE table")
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
) # Add missing cache_type column
except Exception as e: if 'cache_type' not in existing_column_names:
logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
async def _migrate_llm_cache_add_cache_type(self):
"""Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
try:
# Check if cache_type column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_llm_cache'
AND column_name = 'cache_type'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table") logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
add_column_sql = """ add_cache_type_sql = """
ALTER TABLE LIGHTRAG_LLM_CACHE ALTER TABLE LIGHTRAG_LLM_CACHE
ADD COLUMN cache_type VARCHAR(32) NULL ADD COLUMN cache_type VARCHAR(32) NULL
""" """
await self.execute(add_column_sql) await self.execute(add_cache_type_sql)
logger.info( logger.info("Successfully added cache_type column to LIGHTRAG_LLM_CACHE table")
"Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
) # Migrate existing data using optimized regex pattern
logger.info("Migrating existing LLM cache data to populate cache_type field (optimized)")
# Migrate existing data: extract cache_type from flattened keys optimized_update_sql = """
logger.info(
"Migrating existing LLM cache data to populate cache_type field"
)
update_sql = """
UPDATE LIGHTRAG_LLM_CACHE UPDATE LIGHTRAG_LLM_CACHE
SET cache_type = CASE SET cache_type = CASE
WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2) WHEN id ~ '^[^:]+:[^:]+:' THEN split_part(id, ':', 2)
ELSE 'extract' ELSE 'extract'
END END
WHERE cache_type IS NULL WHERE cache_type IS NULL
""" """
await self.execute(update_sql) await self.execute(optimized_update_sql)
logger.info("Successfully migrated existing LLM cache data") logger.info("Successfully migrated existing LLM cache data")
else: else:
logger.info( logger.info("cache_type column already exists in LIGHTRAG_LLM_CACHE table")
"cache_type column already exists in LIGHTRAG_LLM_CACHE table"
)
except Exception as e: except Exception as e:
logger.warning( logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}")
f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}"
)
async def _migrate_timestamp_columns(self): async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time""" """Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time"""
@ -292,121 +271,100 @@ class PostgreSQLDB:
# Do not re-raise, to allow the application to start # Do not re-raise, to allow the application to start
async def _check_llm_cache_needs_migration(self): async def _check_llm_cache_needs_migration(self):
"""Check if LLM cache data needs migration by examining the first record""" """Check if LLM cache data needs migration by examining any record with old format"""
try: try:
# Only query the first record to determine format # Optimized query: directly check for old format records without sorting
check_sql = """ check_sql = """
SELECT id FROM LIGHTRAG_LLM_CACHE SELECT 1 FROM LIGHTRAG_LLM_CACHE
ORDER BY create_time ASC WHERE id NOT LIKE '%:%'
LIMIT 1 LIMIT 1
""" """
result = await self.query(check_sql) result = await self.query(check_sql)
if result and result.get("id"): # If any old format record exists, migration is needed
# If id doesn't contain colon, it's old format return result is not None
return ":" not in result["id"]
return False # No data or already new format
except Exception as e: except Exception as e:
logger.warning(f"Failed to check LLM cache migration status: {e}") logger.warning(f"Failed to check LLM cache migration status: {e}")
return False return False
async def _migrate_llm_cache_to_flattened_keys(self): async def _migrate_llm_cache_to_flattened_keys(self):
"""Migrate LLM cache to flattened key format, recalculating hash values""" """Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format"""
try: try:
# Get all old format data # Check if migration is needed
old_data_sql = """ check_sql = """
SELECT id, mode, original_prompt, return_value, chunk_id, SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE
workspace, create_time, update_time
FROM LIGHTRAG_LLM_CACHE
WHERE id NOT LIKE '%:%' WHERE id NOT LIKE '%:%'
""" """
result = await self.query(check_sql)
old_records = await self.query(old_data_sql, multirows=True)
if not result or result["count"] == 0:
if not old_records:
logger.info("No old format LLM cache data found, skipping migration") logger.info("No old format LLM cache data found, skipping migration")
return return
logger.info( old_count = result["count"]
f"Found {len(old_records)} old format cache records, starting migration..." logger.info(f"Found {old_count} old format cache records")
# Check potential primary key conflicts (optional but recommended)
conflict_check_sql = """
WITH new_ids AS (
SELECT
workspace,
mode,
id as old_id,
mode || ':' ||
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
md5(mode || original_prompt) as new_id
FROM LIGHTRAG_LLM_CACHE
WHERE id NOT LIKE '%:%'
) )
SELECT COUNT(*) as conflicts
# Import hash calculation function FROM new_ids n1
from ..utils import compute_args_hash JOIN LIGHTRAG_LLM_CACHE existing
ON existing.workspace = n1.workspace
migrated_count = 0 AND existing.mode = n1.mode
AND existing.id = n1.new_id
# Migrate data in batches WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records
for record in old_records: """
try:
# Recalculate hash using correct method conflict_result = await self.query(conflict_check_sql)
new_hash = compute_args_hash( if conflict_result and conflict_result["conflicts"] > 0:
record["mode"], record["original_prompt"] logger.warning(f"Found {conflict_result['conflicts']} potential ID conflicts with existing records")
) # Can choose to continue or abort, here we choose to continue and log warning
# Determine cache_type based on mode # Execute single UPDATE migration
cache_type = "extract" if record["mode"] == "default" else "unknown" logger.info("Starting optimized LLM cache migration...")
migration_sql = """
# Generate new flattened key UPDATE LIGHTRAG_LLM_CACHE
new_key = f"{record['mode']}:{cache_type}:{new_hash}" SET
id = mode || ':' ||
# Insert new format data with cache_type field CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
insert_sql = """ md5(mode || original_prompt),
INSERT INTO LIGHTRAG_LLM_CACHE cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END,
(workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time) update_time = CURRENT_TIMESTAMP
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) WHERE id NOT LIKE '%:%'
ON CONFLICT (workspace, mode, id) DO NOTHING """
"""
# Execute migration
await self.execute( await self.execute(migration_sql)
insert_sql,
{ # Verify migration results
"workspace": record[ verify_sql = """
"workspace" SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE
], # Use original record's workspace WHERE id NOT LIKE '%:%'
"id": new_key, """
"mode": record["mode"], verify_result = await self.query(verify_sql)
"original_prompt": record["original_prompt"], remaining = verify_result["remaining_old"] if verify_result else -1
"return_value": record["return_value"],
"chunk_id": record["chunk_id"], if remaining == 0:
"cache_type": cache_type, # Add cache_type field logger.info(f"✅ Successfully migrated {old_count} LLM cache records to flattened format")
"create_time": record["create_time"], else:
"update_time": record["update_time"], logger.warning(f"⚠️ Migration completed but {remaining} old format records remain")
},
)
# Delete old data
delete_sql = """
DELETE FROM LIGHTRAG_LLM_CACHE
WHERE workspace=$1 AND mode=$2 AND id=$3
"""
await self.execute(
delete_sql,
{
"workspace": record[
"workspace"
], # Use original record's workspace
"mode": record["mode"],
"id": record["id"], # Old id
},
)
migrated_count += 1
except Exception as e:
logger.warning(
f"Failed to migrate cache record {record['id']}: {e}"
)
continue
logger.info(
f"Successfully migrated {migrated_count} cache records to flattened format"
)
except Exception as e: except Exception as e:
logger.error(f"LLM cache migration failed: {e}") logger.error(f"Optimized LLM cache migration failed: {e}")
# Don't raise exception, allow system to continue startup raise
async def _migrate_doc_status_add_chunks_list(self): async def _migrate_doc_status_add_chunks_list(self):
"""Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist""" """Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
@ -646,20 +604,11 @@ class PostgreSQLDB:
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
# Don't throw an exception, allow the initialization process to continue # Don't throw an exception, allow the initialization process to continue
# Migrate LLM cache table to add chunk_id field if needed # Migrate LLM cache table to add chunk_id and cache_type columns if needed
try: try:
await self._migrate_llm_cache_add_chunk_id() await self._migrate_llm_cache_add_columns()
except Exception as e: except Exception as e:
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") logger.error(f"PostgreSQL, Failed to migrate LLM cache columns: {e}")
# Don't throw an exception, allow the initialization process to continue
# Migrate LLM cache table to add cache_type field if needed
try:
await self._migrate_llm_cache_add_cache_type()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}"
)
# Don't throw an exception, allow the initialization process to continue # Don't throw an exception, allow the initialization process to continue
# Finally, attempt to migrate old doc chunks data if needed # Finally, attempt to migrate old doc chunks data if needed