From 5ee9a2f8c63c7b4a535ff79cbd6e251f6914c552 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 21:37:03 +0800 Subject: [PATCH] Fix entity consistency in knowledge graph rebuilding and merging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Sort src/tgt for consistent ordering • Create missing nodes before edges • Update entity chunks storage • Pass entity_vdb to rebuild function • Ensure entities exist in all storages --- lightrag/operate.py | 84 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/lightrag/operate.py b/lightrag/operate.py index cca9db15..7363ab7a 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -710,6 +710,7 @@ async def rebuild_knowledge_from_chunks( await _rebuild_single_relationship( knowledge_graph_inst=knowledge_graph_inst, relationships_vdb=relationships_vdb, + entities_vdb=entities_vdb, src=src, tgt=tgt, chunk_ids=chunk_ids, @@ -717,6 +718,7 @@ async def rebuild_knowledge_from_chunks( llm_response_cache=llm_response_cache, global_config=global_config, relation_chunks_storage=relation_chunks_storage, + entity_chunks_storage=entity_chunks_storage, pipeline_status=pipeline_status, pipeline_status_lock=pipeline_status_lock, ) @@ -1292,6 +1294,7 @@ async def _rebuild_single_entity( async def _rebuild_single_relationship( knowledge_graph_inst: BaseGraphStorage, relationships_vdb: BaseVectorStorage, + entities_vdb: BaseVectorStorage, src: str, tgt: str, chunk_ids: list[str], @@ -1299,6 +1302,7 @@ async def _rebuild_single_relationship( llm_response_cache: BaseKVStorage, global_config: dict[str, str], relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, pipeline_status: dict | None = None, pipeline_status_lock=None, ) -> None: @@ -1428,6 +1432,10 @@ async def _rebuild_single_relationship( else: truncation_info = "" + # Sort src and tgt to ensure consistent ordering (smaller string first) + if src > tgt: + src, tgt = tgt, src + # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1442,6 +1450,63 @@ async def _rebuild_single_relationship( else current_relationship.get("file_path", "unknown_source"), "truncate": truncation_info, } + + # Ensure both endpoint nodes exist before writing the edge back + # (certain storage backends require pre-existing nodes). + node_description = ( + updated_relationship_data["description"] + if updated_relationship_data.get("description") + else current_relationship.get("description", "") + ) + node_source_id = updated_relationship_data.get("source_id", "") + node_file_path = updated_relationship_data.get("file_path", "unknown_source") + + for node_id in {src, tgt}: + if not (await knowledge_graph_inst.has_node(node_id)): + node_created_at = int(time.time()) + node_data = { + "entity_id": node_id, + "source_id": node_source_id, + "description": node_description, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + "created_at": node_created_at, + "truncate": "", + } + await knowledge_graph_inst.upsert_node(node_id, node_data=node_data) + + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None and limited_chunk_ids: + await entity_chunks_storage.upsert( + { + node_id: { + "chunk_ids": limited_chunk_ids, + "count": len(limited_chunk_ids), + } + } + ) + + # Update entity_vdb for the newly created entity + if entities_vdb is not None: + entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-") + entity_content = f"{node_id}\n{node_description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": node_id, + "source_id": node_source_id, + "entity_type": "UNKNOWN", + "file_path": node_file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entities_vdb.upsert(payload), + operation_name="rebuild_added_entity_upsert", + entity_name=node_id, + max_retries=3, + retry_delay=0.1, + ) + await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) # Update relationship in vector database @@ -1797,6 +1862,7 @@ async def _merge_edges_then_upsert( llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing relation_chunks_storage: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None @@ -2079,6 +2145,10 @@ async def _merge_edges_then_upsert( else: logger.debug(status_message) + # Sort src_id and tgt_id to ensure consistent ordering (smaller string first) + if src_id > tgt_id: + src_id, tgt_id = tgt_id, src_id + # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): @@ -2094,6 +2164,19 @@ async def _merge_edges_then_upsert( } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + # Update entity_chunks_storage for the newly created entity + if entity_chunks_storage is not None: + chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id] + if chunk_ids: + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": chunk_ids, + "count": len(chunk_ids), + } + } + ) + if entity_vdb is not None: entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") entity_content = f"{need_insert_id}\n{description}" @@ -2407,6 +2490,7 @@ async def merge_nodes_and_edges( llm_response_cache, added_entities, # Pass list to collect added entities relation_chunks_storage, + entity_chunks_storage, # Add entity_chunks_storage parameter ) if edge_data is None: