From e3ea87da2452ab91ce80b1371320e60a2d0882b7 Mon Sep 17 00:00:00 2001 From: hzywhite <1569582518@qq.com> Date: Fri, 5 Sep 2025 15:01:50 +0800 Subject: [PATCH] merge --- lightrag/lightrag.py | 1 + lightrag/operate.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 57c2714e..447b0bcb 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -9,6 +9,7 @@ import warnings from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from functools import partial +from pathlib import Path from typing import ( Any, AsyncIterator, diff --git a/lightrag/operate.py b/lightrag/operate.py index 22ed9117..459a644e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1760,7 +1760,29 @@ async def merge_nodes_and_edges( if full_entities_storage and full_relations_storage and doc_id: try: # Merge all entities: original entities + entities added during edge processing - final_entity_names = set() + existing_entites_data = None + existing_relations_data = None + + try: + existing_entites_data = await full_entities_storage.get_by_id(doc_id) + existing_relations_data = await full_relations_storage.get_by_id(doc_id) + except Exception as e: + logger.debug( + f"Could not retrieve existing entity/relation data for {doc_id}: {e}" + ) + + existing_entites_names = set() + if existing_entites_data and existing_entites_data.get("entity_names"): + existing_entites_names.update(existing_entites_data["entity_names"]) + + existing_relation_pairs = set() + if existing_relations_data and existing_relations_data.get( + "relation_pairs" + ): + for pair in existing_relations_data["relation_pairs"]: + existing_relation_pairs.add(tuple(sorted(pair))) + + final_entity_names = existing_entites_names.copy() # Add original processed entities for entity_data in processed_entities: @@ -1773,7 +1795,7 @@ async def merge_nodes_and_edges( final_entity_names.add(added_entity["entity_name"]) # Collect all relation pairs - final_relation_pairs = set() + final_relation_pairs = existing_relation_pairs.copy() for edge_data in processed_edges: if edge_data: src_id = edge_data.get("src_id") @@ -1783,6 +1805,12 @@ async def merge_nodes_and_edges( final_relation_pairs.add(relation_pair) log_message = f"Phase 3: Updating final {len(final_entity_names)}({len(processed_entities)}+{len(all_added_entities)}) entities and {len(final_relation_pairs)} relations from {doc_id}" + new_entities_count = len(final_entity_names) - len(existing_entites_names) + new_relation_count = len(final_relation_pairs) - len( + existing_relation_pairs + ) + + log_message = f"Phase 3: Merging storage - existing: {len(existing_entites_names)} entitites, {len(existing_relation_pairs)} relations; new: {new_entities_count} entities. {new_relation_count} relations; total: {len(final_entity_names)} entities, {len(final_relation_pairs)} relations" logger.info(log_message) async with pipeline_status_lock: pipeline_status["latest_message"] = log_message