Removed auto storage management from LightRAG instance creation

- The `initialize_storages` method must be explicitly called after LightRAG creation.
The `finalize_storages` method should be called before LightRAG destyoyed.
- Added explicit data migration check
This commit is contained in:
yangdx 2025-08-03 12:42:57 +08:00
parent 06efab4af2
commit e8d8afa846
2 changed files with 42 additions and 50 deletions

View file

@ -151,6 +151,7 @@ def create_app(args):
try:
# Initialize database connections
await rag.initialize_storages()
await rag.check_and_migrate_data()
await initialize_pipeline_status()
pipeline_status = await get_namespace_data("pipeline_status")
@ -401,7 +402,6 @@ def create_app(args):
enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract,
enable_llm_cache=args.enable_llm_cache,
rerank_model_func=rerank_model_func,
auto_manage_storages_states=False,
max_parallel_insert=args.max_parallel_insert,
max_graph_nodes=args.max_graph_nodes,
addon_params={"language": args.summary_language},
@ -431,7 +431,6 @@ def create_app(args):
enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract,
enable_llm_cache=args.enable_llm_cache,
rerank_model_func=rerank_model_func,
auto_manage_storages_states=False,
max_parallel_insert=args.max_parallel_insert,
max_graph_nodes=args.max_graph_nodes,
addon_params={"language": args.summary_language},

View file

@ -334,12 +334,10 @@ class LightRAG:
# Storages Management
# ---
auto_manage_storages_states: bool = field(default=True)
# TODO: Deprecated (LightRAG will never initialize storage automatically on creationand finalize should be call before destroying)
auto_manage_storages_states: bool = field(default=False)
"""If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times."""
# Storages Management
# ---
cosine_better_than_threshold: float = field(
default=float(os.getenv("COSINE_THRESHOLD", 0.2))
)
@ -531,32 +529,6 @@ class LightRAG:
self._storages_status = StoragesStatus.CREATED
if self.auto_manage_storages_states:
self._run_async_safely(self.initialize_storages, "Storage Initialization")
def __del__(self):
if self.auto_manage_storages_states:
self._run_async_safely(self.finalize_storages, "Storage Finalization")
def _run_async_safely(self, async_func, action_name=""):
"""Safely execute an async function, avoiding event loop conflicts."""
try:
loop = always_get_an_event_loop()
if loop.is_running():
task = loop.create_task(async_func())
task.add_done_callback(
lambda t: logger.info(f"{action_name} completed!")
)
else:
loop.run_until_complete(async_func())
except RuntimeError:
logger.warning(
f"No running event loop, creating a new loop for {action_name}."
)
loop = asyncio.new_event_loop()
loop.run_until_complete(async_func())
loop.close()
async def initialize_storages(self):
"""Asynchronously initialize the storages"""
if self._storages_status == StoragesStatus.CREATED:
@ -583,31 +555,52 @@ class LightRAG:
logger.debug("All storage types initialized")
async def finalize_storages(self):
"""Asynchronously finalize the storages"""
"""Asynchronously finalize the storages with improved error handling"""
if self._storages_status == StoragesStatus.INITIALIZED:
tasks = []
storages = [
("full_docs", self.full_docs),
("text_chunks", self.text_chunks),
("full_entities", self.full_entities),
("full_relations", self.full_relations),
("entities_vdb", self.entities_vdb),
("relationships_vdb", self.relationships_vdb),
("chunks_vdb", self.chunks_vdb),
("chunk_entity_relation_graph", self.chunk_entity_relation_graph),
("llm_response_cache", self.llm_response_cache),
("doc_status", self.doc_status),
]
for storage in (
self.full_docs,
self.text_chunks,
self.full_entities,
self.full_relations,
self.entities_vdb,
self.relationships_vdb,
self.chunks_vdb,
self.chunk_entity_relation_graph,
self.llm_response_cache,
self.doc_status,
):
# Finalize each storage individually to ensure one failure doesn't prevent others from closing
successful_finalizations = []
failed_finalizations = []
for storage_name, storage in storages:
if storage:
tasks.append(storage.finalize())
try:
await storage.finalize()
successful_finalizations.append(storage_name)
logger.debug(f"Successfully finalized {storage_name}")
except Exception as e:
error_msg = f"Failed to finalize {storage_name}: {e}"
logger.error(error_msg)
failed_finalizations.append(storage_name)
await asyncio.gather(*tasks)
# Log summary of finalization results
if successful_finalizations:
logger.info(
f"Successfully finalized {len(successful_finalizations)} storages"
)
if failed_finalizations:
logger.error(
f"Failed to finalize {len(failed_finalizations)} storages: {', '.join(failed_finalizations)}"
)
else:
logger.debug("All storages finalized successfully")
self._storages_status = StoragesStatus.FINALIZED
logger.debug("Finalized Storages")
async def _check_and_migrate_data(self):
async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary"""
try:
# Check if migration is needed: