This commit is contained in:
Raphaël MANSUY 2025-12-04 19:19:05 +08:00
parent 8357b7795d
commit b064daa2d2
2 changed files with 302 additions and 169 deletions

View file

@ -1619,6 +1619,7 @@ async def background_delete_documents(
pipeline_status.update( pipeline_status.update(
{ {
"busy": True, "busy": True,
# Job name can not be changed, it's verified in adelete_by_doc_id()
"job_name": f"Deleting {total_docs} Documents", "job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(), "job_start": datetime.now().isoformat(),
"docs": total_docs, "docs": total_docs,

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import traceback import traceback
import asyncio import asyncio
import configparser import configparser
import inspect
import os import os
import time import time
import warnings import warnings
@ -12,6 +13,7 @@ from functools import partial
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Awaitable,
Callable, Callable,
Iterator, Iterator,
cast, cast,
@ -20,6 +22,7 @@ from typing import (
Optional, Optional,
List, List,
Dict, Dict,
Union,
) )
from lightrag.prompt import PROMPTS from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException from lightrag.exceptions import PipelineCancelledException
@ -61,9 +64,10 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_default_workspace,
set_default_workspace,
get_namespace_lock,
) )
from lightrag.base import ( from lightrag.base import (
@ -243,11 +247,13 @@ class LightRAG:
int, int,
int, int,
], ],
List[Dict[str, Any]], Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]],
] = field(default_factory=lambda: chunking_by_token_size) ] = field(default_factory=lambda: chunking_by_token_size)
""" """
Custom chunking function for splitting text into chunks before processing. Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters: The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization. - `tokenizer`: A Tokenizer instance to use for tokenization.
@ -257,7 +263,8 @@ class LightRAG:
- `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries, where each dictionary contains the following keys: The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk. - `tokens`: The number of tokens in the chunk.
- `content`: The text content of the chunk. - `content`: The text content of the chunk.
@ -270,6 +277,9 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None) embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use.""" """Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations.""" """Batch size for embedding computations."""
@ -513,6 +523,16 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n") logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding # Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call( self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async, self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout, llm_timeout=self.default_embedding_timeout,
@ -639,6 +659,22 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set the first initialized workspace will set the default workspace
# Allows namespace operation without specifying workspace for backward compatibility
default_workspace = get_default_workspace()
if default_workspace is None:
set_default_workspace(self.workspace)
elif default_workspace != self.workspace:
logger.info(
f"Creating LightRAG instance with workspace='{self.workspace}' "
f"while default workspace is set to '{default_workspace}'"
)
# Auto-initialize pipeline_status for this workspace
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status(workspace=self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
self.text_chunks, self.text_chunks,
@ -1573,8 +1609,12 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_pipeline_status_lock() "pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -1756,7 +1796,28 @@ class LightRAG:
) )
content = content_data["content"] content = content_data["content"]
# Generate chunks from document # Call chunking function, supporting both sync and async implementations
chunking_result = self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
# If result is awaitable, await to get actual result
if inspect.isawaitable(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp, **dp,
@ -1764,14 +1825,7 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk "file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
} }
for dp in self.chunking_func( for dp in chunking_result
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
} }
if not chunks: if not chunks:
@ -1835,7 +1889,7 @@ class LightRAG:
chunks, pipeline_status, pipeline_status_lock chunks, pipeline_status, pipeline_status_lock
) )
) )
await entity_relation_task chunk_results = await entity_relation_task
file_extraction_stage_ok = True file_extraction_stage_ok = True
except Exception as e: except Exception as e:
@ -1917,8 +1971,7 @@ class LightRAG:
"User cancelled" "User cancelled"
) )
# Get chunk_results from entity_relation_task # Use chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges( await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -2895,6 +2948,26 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents. are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2902,20 +2975,62 @@ class LightRAG:
Returns: Returns:
DeletionResult: An object containing the outcome of the deletion process. DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure". - `status` (str): "success", "not_found", "not_allowed", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted. - `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result. - `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available. - `file_path` (str | None): The file path of the deleted document, if available.
""" """
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False deletion_operations_started = False
original_exception = None original_exception = None
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message) logger.info(log_message)
@ -3147,6 +3262,9 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3168,6 +3286,7 @@ class LightRAG:
# Process relationships # Process relationships
for edge_data in affected_edges: for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
@ -3204,6 +3323,9 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3229,38 +3351,31 @@ class LightRAG:
if entity_chunk_updates and self.entity_chunks: if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {} entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items(): for entity_name, remaining in entity_chunk_updates.items():
if not remaining: if not remaining:
entity_delete_ids.add(entity_name) # Empty entities are deleted alongside graph nodes later
else: continue
entity_upsert_payload[entity_name] = { entity_upsert_payload[entity_name] = {
"chunk_ids": remaining, "chunk_ids": remaining,
"count": len(remaining), "count": len(remaining),
"updated_at": current_time, "updated_at": current_time,
} }
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload: if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload) await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks: if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {} relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items(): for edge_tuple, remaining in relation_chunk_updates.items():
storage_key = make_relation_chunk_key(*edge_tuple)
if not remaining: if not remaining:
relation_delete_ids.add(storage_key) # Empty relations are deleted alongside graph edges later
else: continue
relation_upsert_payload[storage_key] = { storage_key = make_relation_chunk_key(*edge_tuple)
"chunk_ids": remaining, relation_upsert_payload[storage_key] = {
"count": len(remaining), "chunk_ids": remaining,
"updated_at": current_time, "count": len(remaining),
} "updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload: if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload) await self.relation_chunks.upsert(relation_upsert_payload)
@ -3268,31 +3383,111 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
# Use graph database lock to prevent dirty read # Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore)
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
async with pipeline_status_lock: # 5. Delete chunks from storage
log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" if chunk_ids:
logger.info(log_message) try:
pipeline_status["latest_message"] = log_message await self.chunks_vdb.delete(chunk_ids)
pipeline_status["history_messages"].append(log_message) await self.text_chunks.delete(chunk_ids)
except Exception as e: async with pipeline_status_lock:
logger.error(f"Failed to delete chunks: {e}") log_message = (
raise Exception(f"Failed to delete document chunks: {e}") from e f"Successfully deleted {len(chunk_ids)} chunks from storage"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 6. Delete relationships that have no remaining sources except Exception as e:
if relationships_to_delete: logger.error(f"Failed to delete chunks: {e}")
try: raise Exception(f"Failed to delete document chunks: {e}") from e
# Delete from vector database
# 6. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
)
)
# Debug: Check and log all edges before deleting nodes
edges_to_delete = set()
edges_still_exist = 0
for entity, edges in nodes_edges_dict.items():
if edges:
for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in relationships_to_delete: for src, tgt in edges_to_delete:
rel_ids_to_delete.extend( rel_ids_to_delete.extend(
[ [
compute_mdhash_id(src + tgt, prefix="rel-"), compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3301,123 +3496,48 @@ class LightRAG:
) )
await self.relationships_vdb.delete(rel_ids_to_delete) await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage # Delete from relation_chunks storage
if self.relation_chunks: if self.relation_chunks:
relation_storage_keys = [ relation_storage_keys = [
make_relation_chunk_key(src, tgt) make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete for src, tgt in edges_to_delete
] ]
await self.relation_chunks.delete(relation_storage_keys) await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock: logger.info(
log_message = f"Successfully deleted {len(relationships_to_delete)} relations" f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
) )
# Debug: Check and log all edges before deleting nodes # Delete from graph (edges will be auto-deleted with nodes)
edges_to_delete = set() await self.chunk_entity_relation_graph.remove_nodes(
edges_still_exist = 0 list(entities_to_delete)
)
for entity, edges in nodes_edges_dict.items(): # Delete from vector vdb
if edges: entity_vdb_ids = [
for src, tgt in edges: compute_mdhash_id(entity, prefix="ent-")
# Normalize edge representation (sorted for consistency) for entity in entities_to_delete
edge_tuple = tuple(sorted((src, tgt))) ]
edges_to_delete.add(edge_tuple) await self.entities_vdb.delete(entity_vdb_ids)
if ( # Delete from entity_chunks storage
src in entities_to_delete if self.entity_chunks:
and tgt in entities_to_delete await self.entity_chunks.delete(list(entities_to_delete))
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist: async with pipeline_status_lock:
logger.warning( log_message = (
f"⚠️ {edges_still_exist} entities still has edges before deletion" f"Successfully deleted {len(entities_to_delete)} entities"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = []
for src, tgt in edges_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
)
# Delete from graph (edges will be auto-deleted with nodes)
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
) )
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Delete from vector database except Exception as e:
entity_vdb_ids = [ logger.error(f"Failed to delete entities: {e}")
compute_mdhash_id(entity, prefix="ent-") raise Exception(f"Failed to delete entities: {e}") from e
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from entity_chunks storage # Persist changes to graph database before entity and relationship rebuild
if self.entity_chunks: await self._insert_done()
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:
@ -3523,6 +3643,18 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence" f"No deletion operations were started for document {doc_id}, skipping persistence"
) )
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult: async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships. """Asynchronously delete an entity and all its relationships.