diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0a878ac3..2ca543b1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1943,14 +1943,10 @@ class LightRAG: text_chunks=self.text_chunks, llm_response_cache=self.llm_response_cache, global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, ) - async with pipeline_status_lock: - log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - except Exception as e: logger.error(f"Failed to rebuild knowledge from chunks: {e}") raise Exception( diff --git a/lightrag/operate.py b/lightrag/operate.py index d0f5f7bb..b41b3de9 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -250,6 +250,8 @@ async def _rebuild_knowledge_from_chunks( text_chunks: BaseKVStorage, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results @@ -262,6 +264,8 @@ async def _rebuild_knowledge_from_chunks( """ if not entities_to_rebuild and not relationships_to_rebuild: return + rebuilt_entities_count = 0 + rebuilt_relationships_count = 0 # Get all referenced chunk IDs all_referenced_chunk_ids = set() @@ -270,9 +274,12 @@ async def _rebuild_knowledge_from_chunks( for chunk_ids in relationships_to_rebuild.values(): all_referenced_chunk_ids.update(chunk_ids) - logger.debug( - f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" - ) + status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) # Get cached extraction results for these chunks cached_results = await _get_cached_extraction_results( @@ -280,7 +287,12 @@ async def _rebuild_knowledge_from_chunks( ) if not cached_results: - logger.warning("No cached extraction results found, cannot rebuild") + status_message = "No cached extraction results found, cannot rebuild" + logger.warning(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) return # Process cached results to get entities and relationships for each chunk @@ -297,9 +309,14 @@ async def _rebuild_knowledge_from_chunks( chunk_entities[chunk_id] = entities chunk_relationships[chunk_id] = relationships except Exception as e: - logger.error( + status_message = ( f"Failed to parse cached extraction result for chunk {chunk_id}: {e}" ) + logger.info(status_message) # Per requirement, change to info + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) continue # Rebuild entities @@ -314,11 +331,22 @@ async def _rebuild_knowledge_from_chunks( llm_response_cache=llm_response_cache, global_config=global_config, ) - logger.debug( - f"Rebuilt entity {entity_name} from {len(chunk_ids)} cached extractions" + rebuilt_entities_count += 1 + status_message = ( + f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks" ) + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) except Exception as e: - logger.error(f"Failed to rebuild entity {entity_name}: {e}") + status_message = f"Failed to rebuild entity {entity_name}: {e}" + logger.info(status_message) # Per requirement, change to info + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) # Rebuild relationships for (src, tgt), chunk_ids in relationships_to_rebuild.items(): @@ -333,13 +361,29 @@ async def _rebuild_knowledge_from_chunks( llm_response_cache=llm_response_cache, global_config=global_config, ) - logger.debug( - f"Rebuilt relationship {src}-{tgt} from {len(chunk_ids)} cached extractions" + rebuilt_relationships_count += 1 + status_message = ( + f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks" ) + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) except Exception as e: - logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") + status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) - logger.debug("Completed rebuilding knowledge from cached extractions") + status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships." + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) async def _get_cached_extraction_results(