Merge pull request #2279 from danielaskdd/fix-edge-merge-stage
Fix Entity Source IDs Tracking ProblemDuring Relationship Processing
This commit is contained in:
commit
ec797276b2
7 changed files with 189 additions and 53 deletions
|
|
@ -1 +1 @@
|
|||
__api_version__ = "0246"
|
||||
__api_version__ = "0247"
|
||||
|
|
|
|||
|
|
@ -3147,6 +3147,9 @@ class LightRAG:
|
|||
]
|
||||
|
||||
if not existing_sources:
|
||||
# No chunk references means this entity should be deleted
|
||||
entities_to_delete.add(node_label)
|
||||
entity_chunk_updates[node_label] = []
|
||||
continue
|
||||
|
||||
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
|
||||
|
|
@ -3168,6 +3171,7 @@ class LightRAG:
|
|||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
# source target is not in normalize order in graph db property
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
|
|
@ -3204,6 +3208,9 @@ class LightRAG:
|
|||
]
|
||||
|
||||
if not existing_sources:
|
||||
# No chunk references means this relationship should be deleted
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
relation_chunk_updates[edge_tuple] = []
|
||||
continue
|
||||
|
||||
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
|
||||
|
|
@ -3287,36 +3294,7 @@ class LightRAG:
|
|||
logger.error(f"Failed to delete chunks: {e}")
|
||||
raise Exception(f"Failed to delete document chunks: {e}") from e
|
||||
|
||||
# 6. Delete entities that have no remaining sources
|
||||
if entities_to_delete:
|
||||
try:
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
|
||||
# Delete from entity_chunks storage
|
||||
if self.entity_chunks:
|
||||
await self.entity_chunks.delete(list(entities_to_delete))
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete entities: {e}")
|
||||
raise Exception(f"Failed to delete entities: {e}") from e
|
||||
|
||||
# 7. Delete relationships that have no remaining sources
|
||||
# 6. Delete relationships that have no remaining sources
|
||||
if relationships_to_delete:
|
||||
try:
|
||||
# Delete from vector database
|
||||
|
|
@ -3353,6 +3331,66 @@ class LightRAG:
|
|||
logger.error(f"Failed to delete relationships: {e}")
|
||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||
|
||||
# 7. Delete entities that have no remaining sources
|
||||
if entities_to_delete:
|
||||
try:
|
||||
# Debug: Check and log all edges before deleting nodes
|
||||
edges_still_exist = 0
|
||||
for entity in entities_to_delete:
|
||||
edges = (
|
||||
await self.chunk_entity_relation_graph.get_node_edges(
|
||||
entity
|
||||
)
|
||||
)
|
||||
if edges:
|
||||
for src, tgt in edges:
|
||||
if (
|
||||
src in entities_to_delete
|
||||
and tgt in entities_to_delete
|
||||
):
|
||||
logger.warning(
|
||||
f"Edge still exists: {src} <-> {tgt}"
|
||||
)
|
||||
elif src in entities_to_delete:
|
||||
logger.warning(
|
||||
f"Edge still exists: {src} --> {tgt}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Edge still exists: {tgt} --> {src}"
|
||||
)
|
||||
edges_still_exist += 1
|
||||
if edges_still_exist:
|
||||
logger.warning(
|
||||
f"⚠️ {edges_still_exist} entities still has edges before deletion"
|
||||
)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
|
||||
# Delete from entity_chunks storage
|
||||
if self.entity_chunks:
|
||||
await self.entity_chunks.delete(list(entities_to_delete))
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete entities: {e}")
|
||||
raise Exception(f"Failed to delete entities: {e}") from e
|
||||
|
||||
# Persist changes to graph database before releasing graph database lock
|
||||
await self._insert_done()
|
||||
|
||||
|
|
|
|||
|
|
@ -677,14 +677,6 @@ async def rebuild_knowledge_from_chunks(
|
|||
entity_chunks_storage=entity_chunks_storage,
|
||||
)
|
||||
rebuilt_entities_count += 1
|
||||
status_message = (
|
||||
f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
|
||||
)
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
except Exception as e:
|
||||
failed_entities_count += 1
|
||||
status_message = f"Failed to rebuild `{entity_name}`: {e}"
|
||||
|
|
@ -1432,10 +1424,6 @@ async def _rebuild_single_relationship(
|
|||
else:
|
||||
truncation_info = ""
|
||||
|
||||
# Sort src and tgt to ensure consistent ordering (smaller string first)
|
||||
if src > tgt:
|
||||
src, tgt = tgt, src
|
||||
|
||||
# Update relationship in graph storage
|
||||
updated_relationship_data = {
|
||||
**current_relationship,
|
||||
|
|
@ -1510,6 +1498,9 @@ async def _rebuild_single_relationship(
|
|||
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
|
||||
|
||||
# Update relationship in vector database
|
||||
# Sort src and tgt to ensure consistent ordering (smaller string first)
|
||||
if src > tgt:
|
||||
src, tgt = tgt, src
|
||||
try:
|
||||
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
|
||||
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
|
||||
|
|
@ -2145,13 +2136,13 @@ async def _merge_edges_then_upsert(
|
|||
else:
|
||||
logger.debug(status_message)
|
||||
|
||||
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
|
||||
if src_id > tgt_id:
|
||||
src_id, tgt_id = tgt_id, src_id
|
||||
|
||||
# 11. Update both graph and vector db
|
||||
for need_insert_id in [src_id, tgt_id]:
|
||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||
# Optimization: Use get_node instead of has_node + get_node
|
||||
existing_node = await knowledge_graph_inst.get_node(need_insert_id)
|
||||
|
||||
if existing_node is None:
|
||||
# Node doesn't exist - create new node
|
||||
node_created_at = int(time.time())
|
||||
node_data = {
|
||||
"entity_id": need_insert_id,
|
||||
|
|
@ -2208,6 +2199,109 @@ async def _merge_edges_then_upsert(
|
|||
"created_at": node_created_at,
|
||||
}
|
||||
added_entities.append(entity_data)
|
||||
else:
|
||||
# Node exists - update its source_ids by merging with new source_ids
|
||||
updated = False # Track if any update occurred
|
||||
|
||||
# 1. Get existing full source_ids from entity_chunks_storage
|
||||
existing_full_source_ids = []
|
||||
if entity_chunks_storage is not None:
|
||||
stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id)
|
||||
if stored_chunks and isinstance(stored_chunks, dict):
|
||||
existing_full_source_ids = [
|
||||
chunk_id
|
||||
for chunk_id in stored_chunks.get("chunk_ids", [])
|
||||
if chunk_id
|
||||
]
|
||||
|
||||
# If not in entity_chunks_storage, get from graph database
|
||||
if not existing_full_source_ids:
|
||||
if existing_node.get("source_id"):
|
||||
existing_full_source_ids = existing_node["source_id"].split(
|
||||
GRAPH_FIELD_SEP
|
||||
)
|
||||
|
||||
# 2. Merge with new source_ids from this relationship
|
||||
new_source_ids_from_relation = [
|
||||
chunk_id for chunk_id in source_ids if chunk_id
|
||||
]
|
||||
merged_full_source_ids = merge_source_ids(
|
||||
existing_full_source_ids, new_source_ids_from_relation
|
||||
)
|
||||
|
||||
# 3. Save merged full list to entity_chunks_storage (conditional)
|
||||
if (
|
||||
entity_chunks_storage is not None
|
||||
and merged_full_source_ids != existing_full_source_ids
|
||||
):
|
||||
updated = True
|
||||
await entity_chunks_storage.upsert(
|
||||
{
|
||||
need_insert_id: {
|
||||
"chunk_ids": merged_full_source_ids,
|
||||
"count": len(merged_full_source_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# 4. Apply source_ids limit for graph and vector db
|
||||
limit_method = global_config.get(
|
||||
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||
)
|
||||
max_source_limit = global_config.get("max_source_ids_per_entity")
|
||||
limited_source_ids = apply_source_ids_limit(
|
||||
merged_full_source_ids,
|
||||
max_source_limit,
|
||||
limit_method,
|
||||
identifier=f"`{need_insert_id}`",
|
||||
)
|
||||
|
||||
# 5. Update graph database and vector database with limited source_ids (conditional)
|
||||
limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids)
|
||||
|
||||
if limited_source_id_str != existing_node.get("source_id", ""):
|
||||
updated = True
|
||||
updated_node_data = {
|
||||
**existing_node,
|
||||
"source_id": limited_source_id_str,
|
||||
}
|
||||
await knowledge_graph_inst.upsert_node(
|
||||
need_insert_id, node_data=updated_node_data
|
||||
)
|
||||
|
||||
# Update vector database
|
||||
if entity_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||
entity_content = (
|
||||
f"{need_insert_id}\n{existing_node.get('description', '')}"
|
||||
)
|
||||
vdb_data = {
|
||||
entity_vdb_id: {
|
||||
"content": entity_content,
|
||||
"entity_name": need_insert_id,
|
||||
"source_id": limited_source_id_str,
|
||||
"entity_type": existing_node.get("entity_type", "UNKNOWN"),
|
||||
"file_path": existing_node.get(
|
||||
"file_path", "unknown_source"
|
||||
),
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||
operation_name="existing_entity_update",
|
||||
entity_name=need_insert_id,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
# 6. Log once at the end if any update occurred
|
||||
if updated:
|
||||
status_message = f"Chunks appended from relation: `{need_insert_id}`"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
|
||||
edge_created_at = int(time.time())
|
||||
await knowledge_graph_inst.upsert_edge(
|
||||
|
|
@ -2236,6 +2330,10 @@ async def _merge_edges_then_upsert(
|
|||
weight=weight,
|
||||
)
|
||||
|
||||
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
|
||||
if src_id > tgt_id:
|
||||
src_id, tgt_id = tgt_id, src_id
|
||||
|
||||
if relationships_vdb is not None:
|
||||
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
|
||||
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Development environment configuration
|
||||
VITE_BACKEND_URL=http://localhost:9621
|
||||
VITE_API_PROXY=true
|
||||
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status
|
||||
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status,/static
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
# Development environment configuration
|
||||
VITE_BACKEND_URL=http://localhost:9621
|
||||
VITE_API_PROXY=true
|
||||
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status
|
||||
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status,/static
|
||||
|
|
|
|||
|
|
@ -226,13 +226,13 @@ const GraphViewer = () => {
|
|||
</div>
|
||||
|
||||
{showPropertyPanel && (
|
||||
<div className="absolute top-2 right-2">
|
||||
<div className="absolute top-2 right-2 z-10">
|
||||
<PropertiesView />
|
||||
</div>
|
||||
)}
|
||||
|
||||
{showLegend && (
|
||||
<div className="absolute bottom-10 right-2">
|
||||
<div className="absolute bottom-10 right-2 z-0">
|
||||
<Legend className="bg-background/60 backdrop-blur-lg" />
|
||||
</div>
|
||||
)}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ export default defineConfig({
|
|||
changeOrigin: true,
|
||||
rewrite: endpoint === '/api' ?
|
||||
(path) => path.replace(/^\/api/, '') :
|
||||
endpoint === '/docs' || endpoint === '/redoc' || endpoint === '/openapi.json' ?
|
||||
endpoint === '/docs' || endpoint === '/redoc' || endpoint === '/openapi.json' || endpoint === '/static' ?
|
||||
(path) => path : undefined
|
||||
}
|
||||
])
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue