Fix entity consistency in knowledge graph rebuilding and merging
• Sort src/tgt for consistent ordering • Create missing nodes before edges • Update entity chunks storage • Pass entity_vdb to rebuild function • Ensure entities exist in all storages
This commit is contained in:
parent
a97e5dad4c
commit
5ee9a2f8c6
1 changed files with 84 additions and 0 deletions
|
|
@ -710,6 +710,7 @@ async def rebuild_knowledge_from_chunks(
|
|||
await _rebuild_single_relationship(
|
||||
knowledge_graph_inst=knowledge_graph_inst,
|
||||
relationships_vdb=relationships_vdb,
|
||||
entities_vdb=entities_vdb,
|
||||
src=src,
|
||||
tgt=tgt,
|
||||
chunk_ids=chunk_ids,
|
||||
|
|
@ -717,6 +718,7 @@ async def rebuild_knowledge_from_chunks(
|
|||
llm_response_cache=llm_response_cache,
|
||||
global_config=global_config,
|
||||
relation_chunks_storage=relation_chunks_storage,
|
||||
entity_chunks_storage=entity_chunks_storage,
|
||||
pipeline_status=pipeline_status,
|
||||
pipeline_status_lock=pipeline_status_lock,
|
||||
)
|
||||
|
|
@ -1292,6 +1294,7 @@ async def _rebuild_single_entity(
|
|||
async def _rebuild_single_relationship(
|
||||
knowledge_graph_inst: BaseGraphStorage,
|
||||
relationships_vdb: BaseVectorStorage,
|
||||
entities_vdb: BaseVectorStorage,
|
||||
src: str,
|
||||
tgt: str,
|
||||
chunk_ids: list[str],
|
||||
|
|
@ -1299,6 +1302,7 @@ async def _rebuild_single_relationship(
|
|||
llm_response_cache: BaseKVStorage,
|
||||
global_config: dict[str, str],
|
||||
relation_chunks_storage: BaseKVStorage | None = None,
|
||||
entity_chunks_storage: BaseKVStorage | None = None,
|
||||
pipeline_status: dict | None = None,
|
||||
pipeline_status_lock=None,
|
||||
) -> None:
|
||||
|
|
@ -1428,6 +1432,10 @@ async def _rebuild_single_relationship(
|
|||
else:
|
||||
truncation_info = ""
|
||||
|
||||
# Sort src and tgt to ensure consistent ordering (smaller string first)
|
||||
if src > tgt:
|
||||
src, tgt = tgt, src
|
||||
|
||||
# Update relationship in graph storage
|
||||
updated_relationship_data = {
|
||||
**current_relationship,
|
||||
|
|
@ -1442,6 +1450,63 @@ async def _rebuild_single_relationship(
|
|||
else current_relationship.get("file_path", "unknown_source"),
|
||||
"truncate": truncation_info,
|
||||
}
|
||||
|
||||
# Ensure both endpoint nodes exist before writing the edge back
|
||||
# (certain storage backends require pre-existing nodes).
|
||||
node_description = (
|
||||
updated_relationship_data["description"]
|
||||
if updated_relationship_data.get("description")
|
||||
else current_relationship.get("description", "")
|
||||
)
|
||||
node_source_id = updated_relationship_data.get("source_id", "")
|
||||
node_file_path = updated_relationship_data.get("file_path", "unknown_source")
|
||||
|
||||
for node_id in {src, tgt}:
|
||||
if not (await knowledge_graph_inst.has_node(node_id)):
|
||||
node_created_at = int(time.time())
|
||||
node_data = {
|
||||
"entity_id": node_id,
|
||||
"source_id": node_source_id,
|
||||
"description": node_description,
|
||||
"entity_type": "UNKNOWN",
|
||||
"file_path": node_file_path,
|
||||
"created_at": node_created_at,
|
||||
"truncate": "",
|
||||
}
|
||||
await knowledge_graph_inst.upsert_node(node_id, node_data=node_data)
|
||||
|
||||
# Update entity_chunks_storage for the newly created entity
|
||||
if entity_chunks_storage is not None and limited_chunk_ids:
|
||||
await entity_chunks_storage.upsert(
|
||||
{
|
||||
node_id: {
|
||||
"chunk_ids": limited_chunk_ids,
|
||||
"count": len(limited_chunk_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Update entity_vdb for the newly created entity
|
||||
if entities_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-")
|
||||
entity_content = f"{node_id}\n{node_description}"
|
||||
vdb_data = {
|
||||
entity_vdb_id: {
|
||||
"content": entity_content,
|
||||
"entity_name": node_id,
|
||||
"source_id": node_source_id,
|
||||
"entity_type": "UNKNOWN",
|
||||
"file_path": node_file_path,
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=vdb_data: entities_vdb.upsert(payload),
|
||||
operation_name="rebuild_added_entity_upsert",
|
||||
entity_name=node_id,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
|
||||
|
||||
# Update relationship in vector database
|
||||
|
|
@ -1797,6 +1862,7 @@ async def _merge_edges_then_upsert(
|
|||
llm_response_cache: BaseKVStorage | None = None,
|
||||
added_entities: list = None, # New parameter to track entities added during edge processing
|
||||
relation_chunks_storage: BaseKVStorage | None = None,
|
||||
entity_chunks_storage: BaseKVStorage | None = None,
|
||||
):
|
||||
if src_id == tgt_id:
|
||||
return None
|
||||
|
|
@ -2079,6 +2145,10 @@ async def _merge_edges_then_upsert(
|
|||
else:
|
||||
logger.debug(status_message)
|
||||
|
||||
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
|
||||
if src_id > tgt_id:
|
||||
src_id, tgt_id = tgt_id, src_id
|
||||
|
||||
# 11. Update both graph and vector db
|
||||
for need_insert_id in [src_id, tgt_id]:
|
||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||
|
|
@ -2094,6 +2164,19 @@ async def _merge_edges_then_upsert(
|
|||
}
|
||||
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
||||
|
||||
# Update entity_chunks_storage for the newly created entity
|
||||
if entity_chunks_storage is not None:
|
||||
chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id]
|
||||
if chunk_ids:
|
||||
await entity_chunks_storage.upsert(
|
||||
{
|
||||
need_insert_id: {
|
||||
"chunk_ids": chunk_ids,
|
||||
"count": len(chunk_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if entity_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||
entity_content = f"{need_insert_id}\n{description}"
|
||||
|
|
@ -2407,6 +2490,7 @@ async def merge_nodes_and_edges(
|
|||
llm_response_cache,
|
||||
added_entities, # Pass list to collect added entities
|
||||
relation_chunks_storage,
|
||||
entity_chunks_storage, # Add entity_chunks_storage parameter
|
||||
)
|
||||
|
||||
if edge_data is None:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue