Improve the pipeline status message for document deletetion

This commit is contained in:
yangdx 2025-06-25 15:46:58 +08:00
parent 2aaa6d5f7d
commit 495d6c8cce
3 changed files with 87 additions and 69 deletions

View file

@ -803,9 +803,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
} }
) )
# Use slice assignment to clear the list in place # Use slice assignment to clear the list in place
pipeline_status["history_messages"][:] = [ pipeline_status["history_messages"][:] = ["Starting document deletion process"]
f"Starting deletion for doc_id: {doc_id}"
]
try: try:
result = await rag.adelete_by_doc_id(doc_id) result = await rag.adelete_by_doc_id(doc_id)
@ -823,7 +821,7 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
finally: finally:
async with pipeline_status_lock: async with pipeline_status_lock:
pipeline_status["busy"] = False pipeline_status["busy"] = False
completion_msg = f"Document deletion process for {doc_id} completed." completion_msg = "Document deletion process completed."
pipeline_status["latest_message"] = completion_msg pipeline_status["latest_message"] = completion_msg
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(completion_msg) pipeline_status["history_messages"].append(completion_msg)

View file

@ -1683,15 +1683,7 @@ class LightRAG:
This method orchestrates a comprehensive deletion process for a given document ID. This method orchestrates a comprehensive deletion process for a given document ID.
It ensures that not only the document itself but also all its derived and associated It ensures that not only the document itself but also all its derived and associated
data across different storage layers are removed. This includes: data across different storage layers are removed. If entities or relationships are partially affected, it triggers.
1. **Document and Status**: Deletes the document from `full_docs` and its status from `doc_status`.
2. **Chunks**: Removes all associated text chunks from `chunks_vdb`.
3. **Graph Data**:
- Deletes related entities from `entities_vdb`.
- Deletes related relationships from `relationships_vdb`.
- Removes corresponding nodes and edges from the `chunk_entity_relation_graph`.
4. **Graph Reconstruction**: If entities or relationships are partially affected, it triggers
a reconstruction of their data from the remaining chunks to ensure consistency.
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
@ -1706,9 +1698,17 @@ class LightRAG:
deletion_operations_started = False deletion_operations_started = False
original_exception = None original_exception = None
try: # Get pipeline status shared data and lock for status updates
logger.info(f"Starting deletion process for document {doc_id}") pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
try:
# 1. Get the document status and related data # 1. Get the document status and related data
doc_status_data = await self.doc_status.get_by_id(doc_id) doc_status_data = await self.doc_status.get_by_id(doc_id)
if not doc_status_data: if not doc_status_data:
@ -1720,8 +1720,6 @@ class LightRAG:
status_code=404, status_code=404,
) )
logger.info(f"Starting optimized deletion for document {doc_id}")
# 2. Get all chunks related to this document # 2. Get all chunks related to this document
try: try:
all_chunks = await self.text_chunks.get_all() all_chunks = await self.text_chunks.get_all()
@ -1731,9 +1729,14 @@ class LightRAG:
if isinstance(chunk_data, dict) if isinstance(chunk_data, dict)
and chunk_data.get("full_doc_id") == doc_id and chunk_data.get("full_doc_id") == doc_id
} }
logger.info(
f"Retrieved {len(all_chunks)} total chunks, {len(related_chunks)} related to document {doc_id}" # Update pipeline status after getting chunks count
) async with pipeline_status_lock:
log_message = f"Retrieved {len(related_chunks)} of {len(all_chunks)} related chunks"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}") logger.error(f"Failed to retrieve chunks for document {doc_id}: {e}")
raise Exception(f"Failed to retrieve document chunks: {e}") from e raise Exception(f"Failed to retrieve document chunks: {e}") from e
@ -1753,16 +1756,22 @@ class LightRAG:
) )
raise Exception(f"Failed to delete document entry: {e}") from e raise Exception(f"Failed to delete document entry: {e}") from e
async with pipeline_status_lock:
log_message = (
f"Document {doc_id} is deleted without associated chunks."
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return DeletionResult( return DeletionResult(
status="success", status="success",
doc_id=doc_id, doc_id=doc_id,
message=f"Document {doc_id} found but had no associated chunks. Document entry deleted.", message=log_message,
status_code=200, status_code=200,
) )
chunk_ids = set(related_chunks.keys()) chunk_ids = set(related_chunks.keys())
logger.info(f"Found {len(chunk_ids)} chunks to delete")
# Mark that deletion operations have started # Mark that deletion operations have started
deletion_operations_started = True deletion_operations_started = True
@ -1777,22 +1786,35 @@ class LightRAG:
async with graph_db_lock: async with graph_db_lock:
try: try:
# Get all affected nodes and edges in batch # Get all affected nodes and edges in batch
logger.info( # logger.info(
f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks" # f"Analyzing affected entities and relationships for {len(chunk_ids)} chunks"
) # )
affected_nodes = ( affected_nodes = (
await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids( await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
list(chunk_ids) list(chunk_ids)
) )
) )
# Update pipeline status after getting affected_nodes
async with pipeline_status_lock:
log_message = f"Found {len(affected_nodes)} affected entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
affected_edges = ( affected_edges = (
await self.chunk_entity_relation_graph.get_edges_by_chunk_ids( await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
list(chunk_ids) list(chunk_ids)
) )
) )
logger.info(
f"Found {len(affected_nodes)} affected nodes and {len(affected_edges)} affected edges" # Update pipeline status after getting affected_edges
) async with pipeline_status_lock:
log_message = f"Found {len(affected_edges)} affected relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to analyze affected graph elements: {e}") logger.error(f"Failed to analyze affected graph elements: {e}")
raise Exception(f"Failed to analyze graph dependencies: {e}") from e raise Exception(f"Failed to analyze graph dependencies: {e}") from e
@ -1831,12 +1853,6 @@ class LightRAG:
elif remaining_sources != sources: elif remaining_sources != sources:
relationships_to_rebuild[edge_tuple] = remaining_sources relationships_to_rebuild[edge_tuple] = remaining_sources
logger.info(
f"Analysis complete: {len(entities_to_delete)} entities to delete, "
f"{len(entities_to_rebuild)} entities to rebuild, "
f"{len(relationships_to_delete)} relationships to delete, "
f"{len(relationships_to_rebuild)} relationships to rebuild"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
@ -1844,12 +1860,15 @@ class LightRAG:
# 5. Delete chunks from storage # 5. Delete chunks from storage
if chunk_ids: if chunk_ids:
try: try:
logger.info(f"Deleting {len(chunk_ids)} chunks from storage")
await self.chunks_vdb.delete(chunk_ids) await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids) await self.text_chunks.delete(chunk_ids)
logger.info(
f"Successfully deleted {len(chunk_ids)} chunks from storage" async with pipeline_status_lock:
) log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to delete chunks: {e}") logger.error(f"Failed to delete chunks: {e}")
raise Exception(f"Failed to delete document chunks: {e}") from e raise Exception(f"Failed to delete document chunks: {e}") from e
@ -1857,7 +1876,6 @@ class LightRAG:
# 6. Delete entities that have no remaining sources # 6. Delete entities that have no remaining sources
if entities_to_delete: if entities_to_delete:
try: try:
logger.info(f"Deleting {len(entities_to_delete)} entities")
# Delete from vector database # Delete from vector database
entity_vdb_ids = [ entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-") compute_mdhash_id(entity, prefix="ent-")
@ -1869,9 +1887,13 @@ class LightRAG:
await self.chunk_entity_relation_graph.remove_nodes( await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete) list(entities_to_delete)
) )
logger.info(
f"Successfully deleted {len(entities_to_delete)} entities" async with pipeline_status_lock:
) log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to delete entities: {e}") logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e raise Exception(f"Failed to delete entities: {e}") from e
@ -1879,9 +1901,6 @@ class LightRAG:
# 7. Delete relationships that have no remaining sources # 7. Delete relationships that have no remaining sources
if relationships_to_delete: if relationships_to_delete:
try: try:
logger.info(
f"Deleting {len(relationships_to_delete)} relationships"
)
# Delete from vector database # Delete from vector database
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in relationships_to_delete: for src, tgt in relationships_to_delete:
@ -1897,9 +1916,13 @@ class LightRAG:
await self.chunk_entity_relation_graph.remove_edges( await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete) list(relationships_to_delete)
) )
logger.info(
f"Successfully deleted {len(relationships_to_delete)} relationships" async with pipeline_status_lock:
) log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to delete relationships: {e}") logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e raise Exception(f"Failed to delete relationships: {e}") from e
@ -1907,9 +1930,6 @@ class LightRAG:
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:
try: try:
logger.info(
f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships"
)
await _rebuild_knowledge_from_chunks( await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild, entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild, relationships_to_rebuild=relationships_to_rebuild,
@ -1920,9 +1940,13 @@ class LightRAG:
llm_response_cache=self.llm_response_cache, llm_response_cache=self.llm_response_cache,
global_config=asdict(self), global_config=asdict(self),
) )
logger.info(
f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships" async with pipeline_status_lock:
) log_message = f"Successfully rebuilt {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}") logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception( raise Exception(
@ -1931,20 +1955,22 @@ class LightRAG:
# 9. Delete original document and status # 9. Delete original document and status
try: try:
logger.info(f"Deleting original document {doc_id} and its status")
await self.full_docs.delete([doc_id]) await self.full_docs.delete([doc_id])
await self.doc_status.delete([doc_id]) await self.doc_status.delete([doc_id])
logger.info(f"Successfully deleted document {doc_id} and its status")
except Exception as e: except Exception as e:
logger.error(f"Failed to delete document and status: {e}") logger.error(f"Failed to delete document and status: {e}")
raise Exception(f"Failed to delete document and status: {e}") from e raise Exception(f"Failed to delete document and status: {e}") from e
success_message = f"Successfully deleted document {doc_id}" async with pipeline_status_lock:
logger.info(success_message) log_message = f"Successfully deleted document {doc_id}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return DeletionResult( return DeletionResult(
status="success", status="success",
doc_id=doc_id, doc_id=doc_id,
message=success_message, message=log_message,
status_code=200, status_code=200,
) )
@ -1964,13 +1990,7 @@ class LightRAG:
# ALWAYS ensure persistence if any deletion operations were started # ALWAYS ensure persistence if any deletion operations were started
if deletion_operations_started: if deletion_operations_started:
try: try:
logger.info(
f"Ensuring data persistence for document {doc_id} deletion"
)
await self._insert_done() await self._insert_done()
logger.info(
f"Data persistence completed successfully for document {doc_id} deletion"
)
except Exception as persistence_error: except Exception as persistence_error:
persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}" persistence_error_msg = f"Failed to persist data after deletion attempt for {doc_id}: {persistence_error}"
logger.error(persistence_error_msg) logger.error(persistence_error_msg)

View file

@ -270,7 +270,7 @@ async def _rebuild_knowledge_from_chunks(
for chunk_ids in relationships_to_rebuild.values(): for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids) all_referenced_chunk_ids.update(chunk_ids)
logger.info( logger.debug(
f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
) )
@ -339,7 +339,7 @@ async def _rebuild_knowledge_from_chunks(
except Exception as e: except Exception as e:
logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}") logger.error(f"Failed to rebuild relationship {src}-{tgt}: {e}")
logger.info("Completed rebuilding knowledge from cached extractions") logger.debug("Completed rebuilding knowledge from cached extractions")
async def _get_cached_extraction_results( async def _get_cached_extraction_results(
@ -368,7 +368,7 @@ async def _get_cached_extraction_results(
extraction_result = cache_entry["return"] extraction_result = cache_entry["return"]
cached_results[chunk_id] = extraction_result cached_results[chunk_id] = extraction_result
logger.info( logger.debug(
f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs" f"Found {len(cached_results)} cached extraction results for {len(chunk_ids)} chunk IDs"
) )
return cached_results return cached_results