Merge pull request #2262 from danielaskdd/sort-edge

Fix: Ensure Storage Consistency When Creating Implicit Nodes from Relationships
This commit is contained in:
Daniel.y 2025-10-25 23:56:20 +08:00 committed by GitHub
commit 11f1f3664b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 96 additions and 0 deletions

View file

@ -3302,6 +3302,10 @@ class LightRAG:
list(entities_to_delete)
)
# Delete from entity_chunks storage
if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
@ -3331,6 +3335,14 @@ class LightRAG:
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)

View file

@ -710,6 +710,7 @@ async def rebuild_knowledge_from_chunks(
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
entities_vdb=entities_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
@ -717,6 +718,7 @@ async def rebuild_knowledge_from_chunks(
llm_response_cache=llm_response_cache,
global_config=global_config,
relation_chunks_storage=relation_chunks_storage,
entity_chunks_storage=entity_chunks_storage,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
@ -1292,6 +1294,7 @@ async def _rebuild_single_entity(
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
entities_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: list[str],
@ -1299,6 +1302,7 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None:
@ -1428,6 +1432,10 @@ async def _rebuild_single_relationship(
else:
truncation_info = ""
# Sort src and tgt to ensure consistent ordering (smaller string first)
if src > tgt:
src, tgt = tgt, src
# Update relationship in graph storage
updated_relationship_data = {
**current_relationship,
@ -1442,6 +1450,63 @@ async def _rebuild_single_relationship(
else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
}
# Ensure both endpoint nodes exist before writing the edge back
# (certain storage backends require pre-existing nodes).
node_description = (
updated_relationship_data["description"]
if updated_relationship_data.get("description")
else current_relationship.get("description", "")
)
node_source_id = updated_relationship_data.get("source_id", "")
node_file_path = updated_relationship_data.get("file_path", "unknown_source")
for node_id in {src, tgt}:
if not (await knowledge_graph_inst.has_node(node_id)):
node_created_at = int(time.time())
node_data = {
"entity_id": node_id,
"source_id": node_source_id,
"description": node_description,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
"created_at": node_created_at,
"truncate": "",
}
await knowledge_graph_inst.upsert_node(node_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None and limited_chunk_ids:
await entity_chunks_storage.upsert(
{
node_id: {
"chunk_ids": limited_chunk_ids,
"count": len(limited_chunk_ids),
}
}
)
# Update entity_vdb for the newly created entity
if entities_vdb is not None:
entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-")
entity_content = f"{node_id}\n{node_description}"
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": node_id,
"source_id": node_source_id,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entities_vdb.upsert(payload),
operation_name="rebuild_added_entity_upsert",
entity_name=node_id,
max_retries=3,
retry_delay=0.1,
)
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
@ -1797,6 +1862,7 @@ async def _merge_edges_then_upsert(
llm_response_cache: BaseKVStorage | None = None,
added_entities: list = None, # New parameter to track entities added during edge processing
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
):
if src_id == tgt_id:
return None
@ -2079,6 +2145,10 @@ async def _merge_edges_then_upsert(
else:
logger.debug(status_message)
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
if src_id > tgt_id:
src_id, tgt_id = tgt_id, src_id
# 11. Update both graph and vector db
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
@ -2094,6 +2164,19 @@ async def _merge_edges_then_upsert(
}
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None:
chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id]
if chunk_ids:
await entity_chunks_storage.upsert(
{
need_insert_id: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
}
)
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = f"{need_insert_id}\n{description}"
@ -2407,6 +2490,7 @@ async def merge_nodes_and_edges(
llm_response_cache,
added_entities, # Pass list to collect added entities
relation_chunks_storage,
entity_chunks_storage, # Add entity_chunks_storage parameter
)
if edge_data is None: