From bab28039531db17287732f34ebc95c93d7cd8ba8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 16 Jul 2025 17:32:53 +0800 Subject: [PATCH] Optimize PostgreSQL database migrations for LLM cache - Combine column migration into single operation - Optimize LLM cache key migration query - Improve migration error handling - Add conflict detection for cache migration --- lightrag/kg/postgres_impl.py | 267 ++++++++++++++--------------------- 1 file changed, 108 insertions(+), 159 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 98a09f9d..38e347d7 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -105,80 +105,59 @@ class PostgreSQLDB: ): pass - async def _migrate_llm_cache_add_chunk_id(self): - """Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" + async def _migrate_llm_cache_add_columns(self): + """Add chunk_id and cache_type columns to LIGHTRAG_LLM_CACHE table if they don't exist""" try: - # Check if chunk_id column exists - check_column_sql = """ + # Check if both columns exist + check_columns_sql = """ SELECT column_name FROM information_schema.columns WHERE table_name = 'lightrag_llm_cache' - AND column_name = 'chunk_id' + AND column_name IN ('chunk_id', 'cache_type') """ - - column_info = await self.query(check_column_sql) - if not column_info: + + existing_columns = await self.query(check_columns_sql, multirows=True) + existing_column_names = {col['column_name'] for col in existing_columns} if existing_columns else set() + + # Add missing chunk_id column + if 'chunk_id' not in existing_column_names: logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") - add_column_sql = """ + add_chunk_id_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE ADD COLUMN chunk_id VARCHAR(255) NULL """ - await self.execute(add_column_sql) - logger.info( - "Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table" - ) + await self.execute(add_chunk_id_sql) + logger.info("Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table") else: - logger.info( - "chunk_id column already exists in LIGHTRAG_LLM_CACHE table" - ) - except Exception as e: - logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}") - - async def _migrate_llm_cache_add_cache_type(self): - """Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" - try: - # Check if cache_type column exists - check_column_sql = """ - SELECT column_name - FROM information_schema.columns - WHERE table_name = 'lightrag_llm_cache' - AND column_name = 'cache_type' - """ - - column_info = await self.query(check_column_sql) - if not column_info: + logger.info("chunk_id column already exists in LIGHTRAG_LLM_CACHE table") + + # Add missing cache_type column + if 'cache_type' not in existing_column_names: logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table") - add_column_sql = """ + add_cache_type_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE ADD COLUMN cache_type VARCHAR(32) NULL """ - await self.execute(add_column_sql) - logger.info( - "Successfully added cache_type column to LIGHTRAG_LLM_CACHE table" - ) - - # Migrate existing data: extract cache_type from flattened keys - logger.info( - "Migrating existing LLM cache data to populate cache_type field" - ) - update_sql = """ + await self.execute(add_cache_type_sql) + logger.info("Successfully added cache_type column to LIGHTRAG_LLM_CACHE table") + + # Migrate existing data using optimized regex pattern + logger.info("Migrating existing LLM cache data to populate cache_type field (optimized)") + optimized_update_sql = """ UPDATE LIGHTRAG_LLM_CACHE SET cache_type = CASE - WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2) + WHEN id ~ '^[^:]+:[^:]+:' THEN split_part(id, ':', 2) ELSE 'extract' END WHERE cache_type IS NULL """ - await self.execute(update_sql) + await self.execute(optimized_update_sql) logger.info("Successfully migrated existing LLM cache data") else: - logger.info( - "cache_type column already exists in LIGHTRAG_LLM_CACHE table" - ) + logger.info("cache_type column already exists in LIGHTRAG_LLM_CACHE table") + except Exception as e: - logger.warning( - f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}" - ) + logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}") async def _migrate_timestamp_columns(self): """Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time""" @@ -292,121 +271,100 @@ class PostgreSQLDB: # Do not re-raise, to allow the application to start async def _check_llm_cache_needs_migration(self): - """Check if LLM cache data needs migration by examining the first record""" + """Check if LLM cache data needs migration by examining any record with old format""" try: - # Only query the first record to determine format + # Optimized query: directly check for old format records without sorting check_sql = """ - SELECT id FROM LIGHTRAG_LLM_CACHE - ORDER BY create_time ASC + SELECT 1 FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' LIMIT 1 """ result = await self.query(check_sql) - if result and result.get("id"): - # If id doesn't contain colon, it's old format - return ":" not in result["id"] + # If any old format record exists, migration is needed + return result is not None - return False # No data or already new format except Exception as e: logger.warning(f"Failed to check LLM cache migration status: {e}") return False + async def _migrate_llm_cache_to_flattened_keys(self): - """Migrate LLM cache to flattened key format, recalculating hash values""" + """Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format""" try: - # Get all old format data - old_data_sql = """ - SELECT id, mode, original_prompt, return_value, chunk_id, - workspace, create_time, update_time - FROM LIGHTRAG_LLM_CACHE + # Check if migration is needed + check_sql = """ + SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE WHERE id NOT LIKE '%:%' """ - - old_records = await self.query(old_data_sql, multirows=True) - - if not old_records: + result = await self.query(check_sql) + + if not result or result["count"] == 0: logger.info("No old format LLM cache data found, skipping migration") return - - logger.info( - f"Found {len(old_records)} old format cache records, starting migration..." + + old_count = result["count"] + logger.info(f"Found {old_count} old format cache records") + + # Check potential primary key conflicts (optional but recommended) + conflict_check_sql = """ + WITH new_ids AS ( + SELECT + workspace, + mode, + id as old_id, + mode || ':' || + CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' || + md5(mode || original_prompt) as new_id + FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' ) - - # Import hash calculation function - from ..utils import compute_args_hash - - migrated_count = 0 - - # Migrate data in batches - for record in old_records: - try: - # Recalculate hash using correct method - new_hash = compute_args_hash( - record["mode"], record["original_prompt"] - ) - - # Determine cache_type based on mode - cache_type = "extract" if record["mode"] == "default" else "unknown" - - # Generate new flattened key - new_key = f"{record['mode']}:{cache_type}:{new_hash}" - - # Insert new format data with cache_type field - insert_sql = """ - INSERT INTO LIGHTRAG_LLM_CACHE - (workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (workspace, mode, id) DO NOTHING - """ - - await self.execute( - insert_sql, - { - "workspace": record[ - "workspace" - ], # Use original record's workspace - "id": new_key, - "mode": record["mode"], - "original_prompt": record["original_prompt"], - "return_value": record["return_value"], - "chunk_id": record["chunk_id"], - "cache_type": cache_type, # Add cache_type field - "create_time": record["create_time"], - "update_time": record["update_time"], - }, - ) - - # Delete old data - delete_sql = """ - DELETE FROM LIGHTRAG_LLM_CACHE - WHERE workspace=$1 AND mode=$2 AND id=$3 - """ - await self.execute( - delete_sql, - { - "workspace": record[ - "workspace" - ], # Use original record's workspace - "mode": record["mode"], - "id": record["id"], # Old id - }, - ) - - migrated_count += 1 - - except Exception as e: - logger.warning( - f"Failed to migrate cache record {record['id']}: {e}" - ) - continue - - logger.info( - f"Successfully migrated {migrated_count} cache records to flattened format" - ) - + SELECT COUNT(*) as conflicts + FROM new_ids n1 + JOIN LIGHTRAG_LLM_CACHE existing + ON existing.workspace = n1.workspace + AND existing.mode = n1.mode + AND existing.id = n1.new_id + WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records + """ + + conflict_result = await self.query(conflict_check_sql) + if conflict_result and conflict_result["conflicts"] > 0: + logger.warning(f"Found {conflict_result['conflicts']} potential ID conflicts with existing records") + # Can choose to continue or abort, here we choose to continue and log warning + + # Execute single UPDATE migration + logger.info("Starting optimized LLM cache migration...") + migration_sql = """ + UPDATE LIGHTRAG_LLM_CACHE + SET + id = mode || ':' || + CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' || + md5(mode || original_prompt), + cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END, + update_time = CURRENT_TIMESTAMP + WHERE id NOT LIKE '%:%' + """ + + # Execute migration + await self.execute(migration_sql) + + # Verify migration results + verify_sql = """ + SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' + """ + verify_result = await self.query(verify_sql) + remaining = verify_result["remaining_old"] if verify_result else -1 + + if remaining == 0: + logger.info(f"✅ Successfully migrated {old_count} LLM cache records to flattened format") + else: + logger.warning(f"⚠️ Migration completed but {remaining} old format records remain") + except Exception as e: - logger.error(f"LLM cache migration failed: {e}") - # Don't raise exception, allow system to continue startup + logger.error(f"Optimized LLM cache migration failed: {e}") + raise async def _migrate_doc_status_add_chunks_list(self): """Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist""" @@ -646,20 +604,11 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") # Don't throw an exception, allow the initialization process to continue - # Migrate LLM cache table to add chunk_id field if needed + # Migrate LLM cache table to add chunk_id and cache_type columns if needed try: - await self._migrate_llm_cache_add_chunk_id() + await self._migrate_llm_cache_add_columns() except Exception as e: - logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") - # Don't throw an exception, allow the initialization process to continue - - # Migrate LLM cache table to add cache_type field if needed - try: - await self._migrate_llm_cache_add_cache_type() - except Exception as e: - logger.error( - f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}" - ) + logger.error(f"PostgreSQL, Failed to migrate LLM cache columns: {e}") # Don't throw an exception, allow the initialization process to continue # Finally, attempt to migrate old doc chunks data if needed