Fix: ensure data migration is handled by single-process
- Wrap migration logic with get_data_init_lock() to ensure single-process execution - Prevent race conditions when multiple processes start simultaneously
This commit is contained in:
parent
e04d8ed8a7
commit
63496698a1
2 changed files with 54 additions and 48 deletions
|
|
@ -151,9 +151,11 @@ def create_app(args):
|
||||||
try:
|
try:
|
||||||
# Initialize database connections
|
# Initialize database connections
|
||||||
await rag.initialize_storages()
|
await rag.initialize_storages()
|
||||||
|
await initialize_pipeline_status()
|
||||||
|
|
||||||
|
# Data migration regardless of storage implementation
|
||||||
await rag.check_and_migrate_data()
|
await rag.check_and_migrate_data()
|
||||||
|
|
||||||
await initialize_pipeline_status()
|
|
||||||
pipeline_status = await get_namespace_data("pipeline_status")
|
pipeline_status = await get_namespace_data("pipeline_status")
|
||||||
|
|
||||||
should_start_autoscan = False
|
should_start_autoscan = False
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ from lightrag.kg.shared_storage import (
|
||||||
get_namespace_data,
|
get_namespace_data,
|
||||||
get_pipeline_status_lock,
|
get_pipeline_status_lock,
|
||||||
get_graph_db_lock,
|
get_graph_db_lock,
|
||||||
|
get_data_init_lock,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .base import (
|
from .base import (
|
||||||
|
|
@ -602,63 +603,66 @@ class LightRAG:
|
||||||
|
|
||||||
async def check_and_migrate_data(self):
|
async def check_and_migrate_data(self):
|
||||||
"""Check if data migration is needed and perform migration if necessary"""
|
"""Check if data migration is needed and perform migration if necessary"""
|
||||||
try:
|
async with get_data_init_lock(enable_logging=True):
|
||||||
# Check if migration is needed:
|
|
||||||
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
|
|
||||||
# 2. full_entities and full_relations are empty
|
|
||||||
|
|
||||||
# Get all entity labels from graph
|
|
||||||
all_entity_labels = await self.chunk_entity_relation_graph.get_all_labels()
|
|
||||||
|
|
||||||
if not all_entity_labels:
|
|
||||||
logger.debug("No entities found in graph, skipping migration check")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Check if full_entities and full_relations are empty
|
|
||||||
# Get all processed documents to check their entity/relation data
|
|
||||||
try:
|
try:
|
||||||
processed_docs = await self.doc_status.get_docs_by_status(
|
# Check if migration is needed:
|
||||||
DocStatus.PROCESSED
|
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
|
||||||
|
# 2. full_entities and full_relations are empty
|
||||||
|
|
||||||
|
# Get all entity labels from graph
|
||||||
|
all_entity_labels = (
|
||||||
|
await self.chunk_entity_relation_graph.get_all_labels()
|
||||||
)
|
)
|
||||||
|
|
||||||
if not processed_docs:
|
if not all_entity_labels:
|
||||||
logger.debug("No processed documents found, skipping migration")
|
logger.debug("No entities found in graph, skipping migration check")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check first few documents to see if they have full_entities/full_relations data
|
# Check if full_entities and full_relations are empty
|
||||||
migration_needed = True
|
# Get all processed documents to check their entity/relation data
|
||||||
checked_count = 0
|
try:
|
||||||
max_check = min(5, len(processed_docs)) # Check up to 5 documents
|
processed_docs = await self.doc_status.get_docs_by_status(
|
||||||
|
DocStatus.PROCESSED
|
||||||
for doc_id in list(processed_docs.keys())[:max_check]:
|
|
||||||
checked_count += 1
|
|
||||||
entity_data = await self.full_entities.get_by_id(doc_id)
|
|
||||||
relation_data = await self.full_relations.get_by_id(doc_id)
|
|
||||||
|
|
||||||
if entity_data or relation_data:
|
|
||||||
migration_needed = False
|
|
||||||
break
|
|
||||||
|
|
||||||
if not migration_needed:
|
|
||||||
logger.debug(
|
|
||||||
"Full entities/relations data already exists, no migration needed"
|
|
||||||
)
|
)
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(
|
if not processed_docs:
|
||||||
f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data"
|
logger.debug("No processed documents found, skipping migration")
|
||||||
)
|
return
|
||||||
|
|
||||||
# Perform migration
|
# Check first few documents to see if they have full_entities/full_relations data
|
||||||
await self._migrate_entity_relation_data(processed_docs)
|
migration_needed = True
|
||||||
|
checked_count = 0
|
||||||
|
max_check = min(5, len(processed_docs)) # Check up to 5 documents
|
||||||
|
|
||||||
|
for doc_id in list(processed_docs.keys())[:max_check]:
|
||||||
|
checked_count += 1
|
||||||
|
entity_data = await self.full_entities.get_by_id(doc_id)
|
||||||
|
relation_data = await self.full_relations.get_by_id(doc_id)
|
||||||
|
|
||||||
|
if entity_data or relation_data:
|
||||||
|
migration_needed = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if not migration_needed:
|
||||||
|
logger.debug(
|
||||||
|
"Full entities/relations data already exists, no migration needed"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Data migration needed: found {len(all_entity_labels)} entities in graph but no full_entities/full_relations data"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Perform migration
|
||||||
|
await self._migrate_entity_relation_data(processed_docs)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during migration check: {e}")
|
||||||
|
# Don't raise the error, just log it to avoid breaking initialization
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error during migration check: {e}")
|
logger.error(f"Error in data migration check: {e}")
|
||||||
# Don't raise the error, just log it to avoid breaking initialization
|
# Don't raise the error to avoid breaking initialization
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in data migration check: {e}")
|
|
||||||
# Don't raise the error to avoid breaking initialization
|
|
||||||
|
|
||||||
async def _migrate_entity_relation_data(self, processed_docs: dict):
|
async def _migrate_entity_relation_data(self, processed_docs: dict):
|
||||||
"""Migrate existing entity and relation data to full_entities and full_relations storage"""
|
"""Migrate existing entity and relation data to full_entities and full_relations storage"""
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue