refactor: optimize graph lock scope in document deletion
- Move dependency analysis outside graph database lock - Add persistence call before lock release to prevent dirty reads
This commit is contained in:
parent
cac8e189e7
commit
0b1b264a5d
1 changed files with 124 additions and 129 deletions
|
|
@ -2272,117 +2272,111 @@ class LightRAG:
|
|||
relationships_to_delete = set()
|
||||
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
|
||||
|
||||
# Use graph database lock to ensure atomic merges and updates
|
||||
try:
|
||||
# Get affected entities and relations from full_entities and full_relations storage
|
||||
doc_entities_data = await self.full_entities.get_by_id(doc_id)
|
||||
doc_relations_data = await self.full_relations.get_by_id(doc_id)
|
||||
|
||||
affected_nodes = []
|
||||
affected_edges = []
|
||||
|
||||
# Get entity data from graph storage using entity names from full_entities
|
||||
if doc_entities_data and "entity_names" in doc_entities_data:
|
||||
entity_names = doc_entities_data["entity_names"]
|
||||
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
|
||||
nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch(
|
||||
entity_names
|
||||
)
|
||||
for entity_name in entity_names:
|
||||
node_data = nodes_dict.get(entity_name)
|
||||
if node_data:
|
||||
# Ensure compatibility with existing logic that expects "id" field
|
||||
if "id" not in node_data:
|
||||
node_data["id"] = entity_name
|
||||
affected_nodes.append(node_data)
|
||||
|
||||
# Get relation data from graph storage using relation pairs from full_relations
|
||||
if doc_relations_data and "relation_pairs" in doc_relations_data:
|
||||
relation_pairs = doc_relations_data["relation_pairs"]
|
||||
edge_pairs_dicts = [
|
||||
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
|
||||
]
|
||||
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
|
||||
edges_dict = await self.chunk_entity_relation_graph.get_edges_batch(
|
||||
edge_pairs_dicts
|
||||
)
|
||||
|
||||
for pair in relation_pairs:
|
||||
src, tgt = pair[0], pair[1]
|
||||
edge_key = (src, tgt)
|
||||
edge_data = edges_dict.get(edge_key)
|
||||
if edge_data:
|
||||
# Ensure compatibility with existing logic that expects "source" and "target" fields
|
||||
if "source" not in edge_data:
|
||||
edge_data["source"] = src
|
||||
if "target" not in edge_data:
|
||||
edge_data["target"] = tgt
|
||||
affected_edges.append(edge_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze affected graph elements: {e}")
|
||||
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
|
||||
|
||||
try:
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Found {len(entities_to_rebuild)} affected entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(relationships_to_rebuild)} affected relations"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process graph analysis results: {e}")
|
||||
raise Exception(f"Failed to process graph dependencies: {e}") from e
|
||||
|
||||
# Use graph database lock to prevent dirty read
|
||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||
async with graph_db_lock:
|
||||
try:
|
||||
# Get affected entities and relations from full_entities and full_relations storage
|
||||
doc_entities_data = await self.full_entities.get_by_id(doc_id)
|
||||
doc_relations_data = await self.full_relations.get_by_id(doc_id)
|
||||
|
||||
affected_nodes = []
|
||||
affected_edges = []
|
||||
|
||||
# Get entity data from graph storage using entity names from full_entities
|
||||
if doc_entities_data and "entity_names" in doc_entities_data:
|
||||
entity_names = doc_entities_data["entity_names"]
|
||||
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
|
||||
nodes_dict = (
|
||||
await self.chunk_entity_relation_graph.get_nodes_batch(
|
||||
entity_names
|
||||
)
|
||||
)
|
||||
for entity_name in entity_names:
|
||||
node_data = nodes_dict.get(entity_name)
|
||||
if node_data:
|
||||
# Ensure compatibility with existing logic that expects "id" field
|
||||
if "id" not in node_data:
|
||||
node_data["id"] = entity_name
|
||||
affected_nodes.append(node_data)
|
||||
|
||||
# Get relation data from graph storage using relation pairs from full_relations
|
||||
if doc_relations_data and "relation_pairs" in doc_relations_data:
|
||||
relation_pairs = doc_relations_data["relation_pairs"]
|
||||
edge_pairs_dicts = [
|
||||
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
|
||||
]
|
||||
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
|
||||
edges_dict = (
|
||||
await self.chunk_entity_relation_graph.get_edges_batch(
|
||||
edge_pairs_dicts
|
||||
)
|
||||
)
|
||||
|
||||
for pair in relation_pairs:
|
||||
src, tgt = pair[0], pair[1]
|
||||
edge_key = (src, tgt)
|
||||
edge_data = edges_dict.get(edge_key)
|
||||
if edge_data:
|
||||
# Ensure compatibility with existing logic that expects "source" and "target" fields
|
||||
if "source" not in edge_data:
|
||||
edge_data["source"] = src
|
||||
if "target" not in edge_data:
|
||||
edge_data["target"] = tgt
|
||||
affected_edges.append(edge_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze affected graph elements: {e}")
|
||||
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
|
||||
|
||||
try:
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(entities_to_rebuild)} affected entities"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(relationships_to_rebuild)} affected relations"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process graph analysis results: {e}")
|
||||
raise Exception(f"Failed to process graph dependencies: {e}") from e
|
||||
|
||||
# 5. Delete chunks from storage
|
||||
if chunk_ids:
|
||||
try:
|
||||
|
|
@ -2453,27 +2447,28 @@ class LightRAG:
|
|||
logger.error(f"Failed to delete relationships: {e}")
|
||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||
|
||||
# 8. Rebuild entities and relationships from remaining chunks
|
||||
if entities_to_rebuild or relationships_to_rebuild:
|
||||
try:
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks_storage=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
pipeline_status=pipeline_status,
|
||||
pipeline_status_lock=pipeline_status_lock,
|
||||
)
|
||||
# Persist changes to graph database before releasing graph database lock
|
||||
await self._insert_done()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
|
||||
raise Exception(
|
||||
f"Failed to rebuild knowledge graph: {e}"
|
||||
) from e
|
||||
# 8. Rebuild entities and relationships from remaining chunks
|
||||
if entities_to_rebuild or relationships_to_rebuild:
|
||||
try:
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks_storage=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
pipeline_status=pipeline_status,
|
||||
pipeline_status_lock=pipeline_status_lock,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
|
||||
raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
|
||||
|
||||
# 9. Delete from full_entities and full_relations storage
|
||||
try:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue