diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 606becb6..55fd5645 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -151,6 +151,7 @@ def create_app(args): try: # Initialize database connections await rag.initialize_storages() + await rag.check_and_migrate_data() await initialize_pipeline_status() pipeline_status = await get_namespace_data("pipeline_status") @@ -401,7 +402,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, @@ -431,7 +431,6 @@ def create_app(args): enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract, enable_llm_cache=args.enable_llm_cache, rerank_model_func=rerank_model_func, - auto_manage_storages_states=False, max_parallel_insert=args.max_parallel_insert, max_graph_nodes=args.max_graph_nodes, addon_params={"language": args.summary_language}, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 73386b0b..288655f7 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -334,12 +334,10 @@ class LightRAG: # Storages Management # --- - auto_manage_storages_states: bool = field(default=True) + # TODO: Deprecated (LightRAG will never initialize storage automatically on creation,and finalize should be call before destroying) + auto_manage_storages_states: bool = field(default=False) """If True, lightrag will automatically calls initialize_storages and finalize_storages at the appropriate times.""" - # Storages Management - # --- - cosine_better_than_threshold: float = field( default=float(os.getenv("COSINE_THRESHOLD", 0.2)) ) @@ -531,32 +529,6 @@ class LightRAG: self._storages_status = StoragesStatus.CREATED - if self.auto_manage_storages_states: - self._run_async_safely(self.initialize_storages, "Storage Initialization") - - def __del__(self): - if self.auto_manage_storages_states: - self._run_async_safely(self.finalize_storages, "Storage Finalization") - - def _run_async_safely(self, async_func, action_name=""): - """Safely execute an async function, avoiding event loop conflicts.""" - try: - loop = always_get_an_event_loop() - if loop.is_running(): - task = loop.create_task(async_func()) - task.add_done_callback( - lambda t: logger.info(f"{action_name} completed!") - ) - else: - loop.run_until_complete(async_func()) - except RuntimeError: - logger.warning( - f"No running event loop, creating a new loop for {action_name}." - ) - loop = asyncio.new_event_loop() - loop.run_until_complete(async_func()) - loop.close() - async def initialize_storages(self): """Asynchronously initialize the storages""" if self._storages_status == StoragesStatus.CREATED: @@ -583,31 +555,52 @@ class LightRAG: logger.debug("All storage types initialized") async def finalize_storages(self): - """Asynchronously finalize the storages""" + """Asynchronously finalize the storages with improved error handling""" if self._storages_status == StoragesStatus.INITIALIZED: - tasks = [] + storages = [ + ("full_docs", self.full_docs), + ("text_chunks", self.text_chunks), + ("full_entities", self.full_entities), + ("full_relations", self.full_relations), + ("entities_vdb", self.entities_vdb), + ("relationships_vdb", self.relationships_vdb), + ("chunks_vdb", self.chunks_vdb), + ("chunk_entity_relation_graph", self.chunk_entity_relation_graph), + ("llm_response_cache", self.llm_response_cache), + ("doc_status", self.doc_status), + ] - for storage in ( - self.full_docs, - self.text_chunks, - self.full_entities, - self.full_relations, - self.entities_vdb, - self.relationships_vdb, - self.chunks_vdb, - self.chunk_entity_relation_graph, - self.llm_response_cache, - self.doc_status, - ): + # Finalize each storage individually to ensure one failure doesn't prevent others from closing + successful_finalizations = [] + failed_finalizations = [] + + for storage_name, storage in storages: if storage: - tasks.append(storage.finalize()) + try: + await storage.finalize() + successful_finalizations.append(storage_name) + logger.debug(f"Successfully finalized {storage_name}") + except Exception as e: + error_msg = f"Failed to finalize {storage_name}: {e}" + logger.error(error_msg) + failed_finalizations.append(storage_name) - await asyncio.gather(*tasks) + # Log summary of finalization results + if successful_finalizations: + logger.info( + f"Successfully finalized {len(successful_finalizations)} storages" + ) + + if failed_finalizations: + logger.error( + f"Failed to finalize {len(failed_finalizations)} storages: {', '.join(failed_finalizations)}" + ) + else: + logger.debug("All storages finalized successfully") self._storages_status = StoragesStatus.FINALIZED - logger.debug("Finalized Storages") - async def _check_and_migrate_data(self): + async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" try: # Check if migration is needed: