Fix entity consistency in knowledge graph rebuilding and merging
• Sort src/tgt for consistent ordering • Create missing nodes before edges • Update entity chunks storage • Pass entity_vdb to rebuild function • Ensure entities exist in all storages
This commit is contained in:
parent
a97e5dad4c
commit
5ee9a2f8c6
1 changed files with 84 additions and 0 deletions
|
|
@ -710,6 +710,7 @@ async def rebuild_knowledge_from_chunks(
|
||||||
await _rebuild_single_relationship(
|
await _rebuild_single_relationship(
|
||||||
knowledge_graph_inst=knowledge_graph_inst,
|
knowledge_graph_inst=knowledge_graph_inst,
|
||||||
relationships_vdb=relationships_vdb,
|
relationships_vdb=relationships_vdb,
|
||||||
|
entities_vdb=entities_vdb,
|
||||||
src=src,
|
src=src,
|
||||||
tgt=tgt,
|
tgt=tgt,
|
||||||
chunk_ids=chunk_ids,
|
chunk_ids=chunk_ids,
|
||||||
|
|
@ -717,6 +718,7 @@ async def rebuild_knowledge_from_chunks(
|
||||||
llm_response_cache=llm_response_cache,
|
llm_response_cache=llm_response_cache,
|
||||||
global_config=global_config,
|
global_config=global_config,
|
||||||
relation_chunks_storage=relation_chunks_storage,
|
relation_chunks_storage=relation_chunks_storage,
|
||||||
|
entity_chunks_storage=entity_chunks_storage,
|
||||||
pipeline_status=pipeline_status,
|
pipeline_status=pipeline_status,
|
||||||
pipeline_status_lock=pipeline_status_lock,
|
pipeline_status_lock=pipeline_status_lock,
|
||||||
)
|
)
|
||||||
|
|
@ -1292,6 +1294,7 @@ async def _rebuild_single_entity(
|
||||||
async def _rebuild_single_relationship(
|
async def _rebuild_single_relationship(
|
||||||
knowledge_graph_inst: BaseGraphStorage,
|
knowledge_graph_inst: BaseGraphStorage,
|
||||||
relationships_vdb: BaseVectorStorage,
|
relationships_vdb: BaseVectorStorage,
|
||||||
|
entities_vdb: BaseVectorStorage,
|
||||||
src: str,
|
src: str,
|
||||||
tgt: str,
|
tgt: str,
|
||||||
chunk_ids: list[str],
|
chunk_ids: list[str],
|
||||||
|
|
@ -1299,6 +1302,7 @@ async def _rebuild_single_relationship(
|
||||||
llm_response_cache: BaseKVStorage,
|
llm_response_cache: BaseKVStorage,
|
||||||
global_config: dict[str, str],
|
global_config: dict[str, str],
|
||||||
relation_chunks_storage: BaseKVStorage | None = None,
|
relation_chunks_storage: BaseKVStorage | None = None,
|
||||||
|
entity_chunks_storage: BaseKVStorage | None = None,
|
||||||
pipeline_status: dict | None = None,
|
pipeline_status: dict | None = None,
|
||||||
pipeline_status_lock=None,
|
pipeline_status_lock=None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
@ -1428,6 +1432,10 @@ async def _rebuild_single_relationship(
|
||||||
else:
|
else:
|
||||||
truncation_info = ""
|
truncation_info = ""
|
||||||
|
|
||||||
|
# Sort src and tgt to ensure consistent ordering (smaller string first)
|
||||||
|
if src > tgt:
|
||||||
|
src, tgt = tgt, src
|
||||||
|
|
||||||
# Update relationship in graph storage
|
# Update relationship in graph storage
|
||||||
updated_relationship_data = {
|
updated_relationship_data = {
|
||||||
**current_relationship,
|
**current_relationship,
|
||||||
|
|
@ -1442,6 +1450,63 @@ async def _rebuild_single_relationship(
|
||||||
else current_relationship.get("file_path", "unknown_source"),
|
else current_relationship.get("file_path", "unknown_source"),
|
||||||
"truncate": truncation_info,
|
"truncate": truncation_info,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Ensure both endpoint nodes exist before writing the edge back
|
||||||
|
# (certain storage backends require pre-existing nodes).
|
||||||
|
node_description = (
|
||||||
|
updated_relationship_data["description"]
|
||||||
|
if updated_relationship_data.get("description")
|
||||||
|
else current_relationship.get("description", "")
|
||||||
|
)
|
||||||
|
node_source_id = updated_relationship_data.get("source_id", "")
|
||||||
|
node_file_path = updated_relationship_data.get("file_path", "unknown_source")
|
||||||
|
|
||||||
|
for node_id in {src, tgt}:
|
||||||
|
if not (await knowledge_graph_inst.has_node(node_id)):
|
||||||
|
node_created_at = int(time.time())
|
||||||
|
node_data = {
|
||||||
|
"entity_id": node_id,
|
||||||
|
"source_id": node_source_id,
|
||||||
|
"description": node_description,
|
||||||
|
"entity_type": "UNKNOWN",
|
||||||
|
"file_path": node_file_path,
|
||||||
|
"created_at": node_created_at,
|
||||||
|
"truncate": "",
|
||||||
|
}
|
||||||
|
await knowledge_graph_inst.upsert_node(node_id, node_data=node_data)
|
||||||
|
|
||||||
|
# Update entity_chunks_storage for the newly created entity
|
||||||
|
if entity_chunks_storage is not None and limited_chunk_ids:
|
||||||
|
await entity_chunks_storage.upsert(
|
||||||
|
{
|
||||||
|
node_id: {
|
||||||
|
"chunk_ids": limited_chunk_ids,
|
||||||
|
"count": len(limited_chunk_ids),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update entity_vdb for the newly created entity
|
||||||
|
if entities_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-")
|
||||||
|
entity_content = f"{node_id}\n{node_description}"
|
||||||
|
vdb_data = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"content": entity_content,
|
||||||
|
"entity_name": node_id,
|
||||||
|
"source_id": node_source_id,
|
||||||
|
"entity_type": "UNKNOWN",
|
||||||
|
"file_path": node_file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: entities_vdb.upsert(payload),
|
||||||
|
operation_name="rebuild_added_entity_upsert",
|
||||||
|
entity_name=node_id,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
|
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
|
||||||
|
|
||||||
# Update relationship in vector database
|
# Update relationship in vector database
|
||||||
|
|
@ -1797,6 +1862,7 @@ async def _merge_edges_then_upsert(
|
||||||
llm_response_cache: BaseKVStorage | None = None,
|
llm_response_cache: BaseKVStorage | None = None,
|
||||||
added_entities: list = None, # New parameter to track entities added during edge processing
|
added_entities: list = None, # New parameter to track entities added during edge processing
|
||||||
relation_chunks_storage: BaseKVStorage | None = None,
|
relation_chunks_storage: BaseKVStorage | None = None,
|
||||||
|
entity_chunks_storage: BaseKVStorage | None = None,
|
||||||
):
|
):
|
||||||
if src_id == tgt_id:
|
if src_id == tgt_id:
|
||||||
return None
|
return None
|
||||||
|
|
@ -2079,6 +2145,10 @@ async def _merge_edges_then_upsert(
|
||||||
else:
|
else:
|
||||||
logger.debug(status_message)
|
logger.debug(status_message)
|
||||||
|
|
||||||
|
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
|
||||||
|
if src_id > tgt_id:
|
||||||
|
src_id, tgt_id = tgt_id, src_id
|
||||||
|
|
||||||
# 11. Update both graph and vector db
|
# 11. Update both graph and vector db
|
||||||
for need_insert_id in [src_id, tgt_id]:
|
for need_insert_id in [src_id, tgt_id]:
|
||||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||||
|
|
@ -2094,6 +2164,19 @@ async def _merge_edges_then_upsert(
|
||||||
}
|
}
|
||||||
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
||||||
|
|
||||||
|
# Update entity_chunks_storage for the newly created entity
|
||||||
|
if entity_chunks_storage is not None:
|
||||||
|
chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id]
|
||||||
|
if chunk_ids:
|
||||||
|
await entity_chunks_storage.upsert(
|
||||||
|
{
|
||||||
|
need_insert_id: {
|
||||||
|
"chunk_ids": chunk_ids,
|
||||||
|
"count": len(chunk_ids),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
if entity_vdb is not None:
|
if entity_vdb is not None:
|
||||||
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||||
entity_content = f"{need_insert_id}\n{description}"
|
entity_content = f"{need_insert_id}\n{description}"
|
||||||
|
|
@ -2407,6 +2490,7 @@ async def merge_nodes_and_edges(
|
||||||
llm_response_cache,
|
llm_response_cache,
|
||||||
added_entities, # Pass list to collect added entities
|
added_entities, # Pass list to collect added entities
|
||||||
relation_chunks_storage,
|
relation_chunks_storage,
|
||||||
|
entity_chunks_storage, # Add entity_chunks_storage parameter
|
||||||
)
|
)
|
||||||
|
|
||||||
if edge_data is None:
|
if edge_data is None:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue