Fix Entity Source IDs Tracking Problem
- Handle existing node updates properly in edge merging stage - Fix source_ids merging logic - Reorder entity deletion and optimize node operations - Delete relationships before entities - Add edge existence debugging logs
This commit is contained in:
parent
29c4a91dc3
commit
3fa79026e0
2 changed files with 169 additions and 31 deletions
|
|
@ -3294,36 +3294,7 @@ class LightRAG:
|
||||||
logger.error(f"Failed to delete chunks: {e}")
|
logger.error(f"Failed to delete chunks: {e}")
|
||||||
raise Exception(f"Failed to delete document chunks: {e}") from e
|
raise Exception(f"Failed to delete document chunks: {e}") from e
|
||||||
|
|
||||||
# 6. Delete entities that have no remaining sources
|
# 6. Delete relationships that have no remaining sources
|
||||||
if entities_to_delete:
|
|
||||||
try:
|
|
||||||
# Delete from vector database
|
|
||||||
entity_vdb_ids = [
|
|
||||||
compute_mdhash_id(entity, prefix="ent-")
|
|
||||||
for entity in entities_to_delete
|
|
||||||
]
|
|
||||||
await self.entities_vdb.delete(entity_vdb_ids)
|
|
||||||
|
|
||||||
# Delete from graph
|
|
||||||
await self.chunk_entity_relation_graph.remove_nodes(
|
|
||||||
list(entities_to_delete)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Delete from entity_chunks storage
|
|
||||||
if self.entity_chunks:
|
|
||||||
await self.entity_chunks.delete(list(entities_to_delete))
|
|
||||||
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
|
||||||
logger.info(log_message)
|
|
||||||
pipeline_status["latest_message"] = log_message
|
|
||||||
pipeline_status["history_messages"].append(log_message)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to delete entities: {e}")
|
|
||||||
raise Exception(f"Failed to delete entities: {e}") from e
|
|
||||||
|
|
||||||
# 7. Delete relationships that have no remaining sources
|
|
||||||
if relationships_to_delete:
|
if relationships_to_delete:
|
||||||
try:
|
try:
|
||||||
# Delete from vector database
|
# Delete from vector database
|
||||||
|
|
@ -3360,6 +3331,66 @@ class LightRAG:
|
||||||
logger.error(f"Failed to delete relationships: {e}")
|
logger.error(f"Failed to delete relationships: {e}")
|
||||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||||
|
|
||||||
|
# 7. Delete entities that have no remaining sources
|
||||||
|
if entities_to_delete:
|
||||||
|
try:
|
||||||
|
# Debug: Check and log all edges before deleting nodes
|
||||||
|
edges_still_exist = 0
|
||||||
|
for entity in entities_to_delete:
|
||||||
|
edges = (
|
||||||
|
await self.chunk_entity_relation_graph.get_node_edges(
|
||||||
|
entity
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if edges:
|
||||||
|
for src, tgt in edges:
|
||||||
|
if (
|
||||||
|
src in entities_to_delete
|
||||||
|
and tgt in entities_to_delete
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {src} <-> {tgt}"
|
||||||
|
)
|
||||||
|
elif src in entities_to_delete:
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {src} --> {tgt}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Edge still exists: {tgt} --> {src}"
|
||||||
|
)
|
||||||
|
edges_still_exist += 1
|
||||||
|
if edges_still_exist:
|
||||||
|
logger.warning(
|
||||||
|
f"⚠️ {edges_still_exist} entities still has edges before deletion"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete from graph
|
||||||
|
await self.chunk_entity_relation_graph.remove_nodes(
|
||||||
|
list(entities_to_delete)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete from vector database
|
||||||
|
entity_vdb_ids = [
|
||||||
|
compute_mdhash_id(entity, prefix="ent-")
|
||||||
|
for entity in entities_to_delete
|
||||||
|
]
|
||||||
|
await self.entities_vdb.delete(entity_vdb_ids)
|
||||||
|
|
||||||
|
# Delete from entity_chunks storage
|
||||||
|
if self.entity_chunks:
|
||||||
|
await self.entity_chunks.delete(list(entities_to_delete))
|
||||||
|
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||||
|
logger.info(log_message)
|
||||||
|
pipeline_status["latest_message"] = log_message
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete entities: {e}")
|
||||||
|
raise Exception(f"Failed to delete entities: {e}") from e
|
||||||
|
|
||||||
# Persist changes to graph database before releasing graph database lock
|
# Persist changes to graph database before releasing graph database lock
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2138,7 +2138,11 @@ async def _merge_edges_then_upsert(
|
||||||
|
|
||||||
# 11. Update both graph and vector db
|
# 11. Update both graph and vector db
|
||||||
for need_insert_id in [src_id, tgt_id]:
|
for need_insert_id in [src_id, tgt_id]:
|
||||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
# Optimization: Use get_node instead of has_node + get_node
|
||||||
|
existing_node = await knowledge_graph_inst.get_node(need_insert_id)
|
||||||
|
|
||||||
|
if existing_node is None:
|
||||||
|
# Node doesn't exist - create new node
|
||||||
node_created_at = int(time.time())
|
node_created_at = int(time.time())
|
||||||
node_data = {
|
node_data = {
|
||||||
"entity_id": need_insert_id,
|
"entity_id": need_insert_id,
|
||||||
|
|
@ -2195,6 +2199,109 @@ async def _merge_edges_then_upsert(
|
||||||
"created_at": node_created_at,
|
"created_at": node_created_at,
|
||||||
}
|
}
|
||||||
added_entities.append(entity_data)
|
added_entities.append(entity_data)
|
||||||
|
else:
|
||||||
|
# Node exists - update its source_ids by merging with new source_ids
|
||||||
|
updated = False # Track if any update occurred
|
||||||
|
|
||||||
|
# 1. Get existing full source_ids from entity_chunks_storage
|
||||||
|
existing_full_source_ids = []
|
||||||
|
if entity_chunks_storage is not None:
|
||||||
|
stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id)
|
||||||
|
if stored_chunks and isinstance(stored_chunks, dict):
|
||||||
|
existing_full_source_ids = [
|
||||||
|
chunk_id
|
||||||
|
for chunk_id in stored_chunks.get("chunk_ids", [])
|
||||||
|
if chunk_id
|
||||||
|
]
|
||||||
|
|
||||||
|
# If not in entity_chunks_storage, get from graph database
|
||||||
|
if not existing_full_source_ids:
|
||||||
|
if existing_node.get("source_id"):
|
||||||
|
existing_full_source_ids = existing_node["source_id"].split(
|
||||||
|
GRAPH_FIELD_SEP
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Merge with new source_ids from this relationship
|
||||||
|
new_source_ids_from_relation = [
|
||||||
|
chunk_id for chunk_id in source_ids if chunk_id
|
||||||
|
]
|
||||||
|
merged_full_source_ids = merge_source_ids(
|
||||||
|
existing_full_source_ids, new_source_ids_from_relation
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Save merged full list to entity_chunks_storage (conditional)
|
||||||
|
if (
|
||||||
|
entity_chunks_storage is not None
|
||||||
|
and merged_full_source_ids != existing_full_source_ids
|
||||||
|
):
|
||||||
|
updated = True
|
||||||
|
await entity_chunks_storage.upsert(
|
||||||
|
{
|
||||||
|
need_insert_id: {
|
||||||
|
"chunk_ids": merged_full_source_ids,
|
||||||
|
"count": len(merged_full_source_ids),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. Apply source_ids limit for graph and vector db
|
||||||
|
limit_method = global_config.get(
|
||||||
|
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
|
)
|
||||||
|
max_source_limit = global_config.get("max_source_ids_per_entity")
|
||||||
|
limited_source_ids = apply_source_ids_limit(
|
||||||
|
merged_full_source_ids,
|
||||||
|
max_source_limit,
|
||||||
|
limit_method,
|
||||||
|
identifier=f"`{need_insert_id}`",
|
||||||
|
)
|
||||||
|
|
||||||
|
# 5. Update graph database and vector database with limited source_ids (conditional)
|
||||||
|
limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids)
|
||||||
|
|
||||||
|
if limited_source_id_str != existing_node.get("source_id", ""):
|
||||||
|
updated = True
|
||||||
|
updated_node_data = {
|
||||||
|
**existing_node,
|
||||||
|
"source_id": limited_source_id_str,
|
||||||
|
}
|
||||||
|
await knowledge_graph_inst.upsert_node(
|
||||||
|
need_insert_id, node_data=updated_node_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update vector database
|
||||||
|
if entity_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||||
|
entity_content = (
|
||||||
|
f"{need_insert_id}\n{existing_node.get('description', '')}"
|
||||||
|
)
|
||||||
|
vdb_data = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"content": entity_content,
|
||||||
|
"entity_name": need_insert_id,
|
||||||
|
"source_id": limited_source_id_str,
|
||||||
|
"entity_type": existing_node.get("entity_type", "UNKNOWN"),
|
||||||
|
"file_path": existing_node.get(
|
||||||
|
"file_path", "unknown_source"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||||
|
operation_name="existing_entity_update",
|
||||||
|
entity_name=need_insert_id,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 6. Log once at the end if any update occurred
|
||||||
|
if updated:
|
||||||
|
status_message = f"Chunks appended from relation: `{need_insert_id}`"
|
||||||
|
logger.info(status_message)
|
||||||
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = status_message
|
||||||
|
pipeline_status["history_messages"].append(status_message)
|
||||||
|
|
||||||
edge_created_at = int(time.time())
|
edge_created_at = int(time.time())
|
||||||
await knowledge_graph_inst.upsert_edge(
|
await knowledge_graph_inst.upsert_edge(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue