From 63496698a1cfede2b41eda276ba157565c419ceb Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 4 Aug 2025 01:47:20 +0800 Subject: [PATCH] Fix: ensure data migration is handled by single-process - Wrap migration logic with get_data_init_lock() to ensure single-process execution - Prevent race conditions when multiple processes start simultaneously --- lightrag/api/lightrag_server.py | 4 +- lightrag/lightrag.py | 98 +++++++++++++++++---------------- 2 files changed, 54 insertions(+), 48 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 55fd5645..a4d0c345 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -151,9 +151,11 @@ def create_app(args): try: # Initialize database connections await rag.initialize_storages() + await initialize_pipeline_status() + + # Data migration regardless of storage implementation await rag.check_and_migrate_data() - await initialize_pipeline_status() pipeline_status = await get_namespace_data("pipeline_status") should_start_autoscan = False diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 2bd710df..f5ed8a7a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -49,6 +49,7 @@ from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, get_graph_db_lock, + get_data_init_lock, ) from .base import ( @@ -602,63 +603,66 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - try: - # Check if migration is needed: - # 1. chunk_entity_relation_graph has entities and relations (count > 0) - # 2. full_entities and full_relations are empty - - # Get all entity labels from graph - all_entity_labels = await self.chunk_entity_relation_graph.get_all_labels() - - if not all_entity_labels: - logger.debug("No entities found in graph, skipping migration check") - return - - # Check if full_entities and full_relations are empty - # Get all processed documents to check their entity/relation data + async with get_data_init_lock(enable_logging=True): try: - processed_docs = await self.doc_status.get_docs_by_status( - DocStatus.PROCESSED + # Check if migration is needed: + # 1. chunk_entity_relation_graph has entities and relations (count > 0) + # 2. full_entities and full_relations are empty + + # Get all entity labels from graph + all_entity_labels = ( + await self.chunk_entity_relation_graph.get_all_labels() ) - if not processed_docs: - logger.debug("No processed documents found, skipping migration") + if not all_entity_labels: + logger.debug("No entities found in graph, skipping migration check") return - # Check first few documents to see if they have full_entities/full_relations data - migration_needed = True - checked_count = 0 - max_check = min(5, len(processed_docs)) # Check up to 5 documents - - for doc_id in list(processed_docs.keys())[:max_check]: - checked_count += 1 - entity_data = await self.full_entities.get_by_id(doc_id) - relation_data = await self.full_relations.get_by_id(doc_id) - - if entity_data or relation_data: - migration_needed = False - break - - if not migration_needed: - logger.debug( - "Full entities/relations data already exists, no migration needed" + # Check if full_entities and full_relations are empty + # Get all processed documents to check their entity/relation data + try: + processed_docs = await self.doc_status.get_docs_by_status( + DocStatus.PROCESSED ) - return - logger.info( - f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data" - ) + if not processed_docs: + logger.debug("No processed documents found, skipping migration") + return - # Perform migration - await self._migrate_entity_relation_data(processed_docs) + # Check first few documents to see if they have full_entities/full_relations data + migration_needed = True + checked_count = 0 + max_check = min(5, len(processed_docs)) # Check up to 5 documents + + for doc_id in list(processed_docs.keys())[:max_check]: + checked_count += 1 + entity_data = await self.full_entities.get_by_id(doc_id) + relation_data = await self.full_relations.get_by_id(doc_id) + + if entity_data or relation_data: + migration_needed = False + break + + if not migration_needed: + logger.debug( + "Full entities/relations data already exists, no migration needed" + ) + return + + logger.info( + f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data" + ) + + # Perform migration + await self._migrate_entity_relation_data(processed_docs) + + except Exception as e: + logger.error(f"Error during migration check: {e}") + # Don't raise the error, just log it to avoid breaking initialization except Exception as e: - logger.error(f"Error during migration check: {e}") - # Don't raise the error, just log it to avoid breaking initialization - - except Exception as e: - logger.error(f"Error in data migration check: {e}") - # Don't raise the error to avoid breaking initialization + logger.error(f"Error in data migration check: {e}") + # Don't raise the error to avoid breaking initialization async def _migrate_entity_relation_data(self, processed_docs: dict): """Migrate existing entity and relation data to full_entities and full_relations storage"""