From 9eb2be79b8788ad9f3706e5c34bb8ebd9a3636cb Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 03:56:18 +0800 Subject: [PATCH] feat: track actual LLM usage in entity/relation merging - Modified _handle_entity_relation_summary to return tuple[str, bool] - Updated merge functions to log "LLMmerg" vs "Merging" based on actual LLM usage - Replaced hardcoded fragment count prediction with real-time LLM usage tracking --- lightrag/operate.py | 171 +++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 80 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index e59e944d..e84abd48 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -120,7 +120,7 @@ async def _handle_entity_relation_summary( seperator: str, global_config: dict, llm_response_cache: BaseKVStorage | None = None, -) -> str: +) -> tuple[str, bool]: """Handle entity relation description summary using map-reduce approach. This function summarizes a list of descriptions using a map-reduce strategy: @@ -137,15 +137,15 @@ async def _handle_entity_relation_summary( llm_response_cache: Optional cache for LLM responses Returns: - Final summarized description string + Tuple of (final_summarized_description_string, llm_was_used_boolean) """ # Handle empty input if not description_list: - return "" + return "", False # If only one description, return it directly (no need for LLM call) if len(description_list) == 1: - return description_list[0] + return description_list[0], False # Get configuration tokenizer: Tokenizer = global_config["tokenizer"] @@ -153,6 +153,7 @@ async def _handle_entity_relation_summary( summary_max_tokens = global_config["summary_max_tokens"] current_list = description_list[:] # Copy the list to avoid modifying original + llm_was_used = False # Track whether LLM was used during the entire process # Iterative map-reduce process while True: @@ -165,17 +166,18 @@ async def _handle_entity_relation_summary( len(current_list) < force_llm_summary_on_merge and total_tokens < summary_max_tokens ): - # Already the final result + # Already the final result - no LLM needed final_description = seperator.join(current_list) - return final_description if final_description else "" + return final_description if final_description else "", llm_was_used else: - # Final summarization of remaining descriptions - return await _summarize_descriptions( + # Final summarization of remaining descriptions - LLM will be used + final_summary = await _summarize_descriptions( entity_or_relation_name, current_list, global_config, llm_response_cache, ) + return final_summary, True # LLM was used for final summarization # Need to split into chunks - Map phase chunks = [] @@ -188,7 +190,7 @@ async def _handle_entity_relation_summary( # If adding current description would exceed limit, finalize current chunk if current_tokens + desc_tokens > summary_context_size and current_chunk: chunks.append(current_chunk) - current_chunk = [desc] # Intial chunk for next group + current_chunk = [desc] # Initial chunk for next group current_tokens = desc_tokens else: current_chunk.append(desc) @@ -214,6 +216,7 @@ async def _handle_entity_relation_summary( entity_or_relation_name, chunk, global_config, llm_response_cache ) new_summaries.append(summary) + llm_was_used = True # Mark that LLM was used in reduce phase # Update current list with new summaries for next iteration current_list = new_summaries @@ -529,7 +532,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 - status_message = f"Failed to rebuild entity {entity_name}: {e}" + status_message = f"Failed to rebuild entity `{entity_name}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -560,7 +563,7 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Relationship `{src}->{tgt}` rebuilt from {len(chunk_ids)} chunks" + status_message = f"Relationship `{src} - {tgt}` rebuilt from {len(chunk_ids)} chunks" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -568,7 +571,9 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" + status_message = ( + f"Failed to rebuild relationship `{src} - {tgt}`: {e}" + ) logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -867,7 +872,7 @@ async def _rebuild_single_entity( # Generate description from relationships or fallback to current if description_list: force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( entity_name, description_list, force_llm_summary_on_merge, @@ -909,7 +914,7 @@ async def _rebuild_single_entity( # Generate final description and update storage force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( entity_name, description_list, force_llm_summary_on_merge, @@ -990,7 +995,7 @@ async def _rebuild_single_relationship( # Use summary if description has too many fragments force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( f"{src}-{tgt}", description_list, force_llm_summary_on_merge, @@ -1065,13 +1070,9 @@ async def _merge_nodes_then_upsert( already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) - already_source_ids.extend( - split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP]) - ) - already_file_paths.extend( - split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP]) - ) - already_description.append(already_node["description"]) + already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP)) + already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) + already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) entity_type = sorted( Counter( @@ -1079,39 +1080,45 @@ async def _merge_nodes_then_upsert( ).items(), key=lambda x: x[1], reverse=True, - )[0][0] + )[0][0] # Get the entity type with the highest count - description_list = already_description + list( - dict.fromkeys([dp["description"] for dp in nodes_data if dp.get("description")]) + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in nodes_data if dp.get("description")] + ) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) - already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" if num_fragment > 0: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - entity_name, - description_list, - force_llm_summary_on_merge, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + entity_name, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" else: - status_message = f"Merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = GRAPH_FIELD_SEP.join(description_list) + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) else: logger.error(f"Entity {entity_name} has no description") description = "(no description)" @@ -1167,22 +1174,20 @@ async def _merge_edges_then_upsert( # Get source_id with empty string default if missing or None if already_edge.get("source_id") is not None: already_source_ids.extend( - split_string_by_multi_markers( - already_edge["source_id"], [GRAPH_FIELD_SEP] - ) + already_edge["source_id"].split(GRAPH_FIELD_SEP) ) # Get file_path with empty string default if missing or None if already_edge.get("file_path") is not None: already_file_paths.extend( - split_string_by_multi_markers( - already_edge["file_path"], [GRAPH_FIELD_SEP] - ) + already_edge["file_path"].split(GRAPH_FIELD_SEP) ) # Get description with empty string default if missing or None if already_edge.get("description") is not None: - already_description.append(already_edge["description"]) + already_description.extend( + already_edge["description"].split(GRAPH_FIELD_SEP) + ) # Get keywords with empty string default if missing or None if already_edge.get("keywords") is not None: @@ -1195,37 +1200,43 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - description_list = already_description + list( - dict.fromkeys([dp["description"] for dp in edges_data if dp.get("description")]) + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in edges_data if dp.get("description")] + ) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) - already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(edges_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" if num_fragment > 0: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description_list, - force_llm_summary_on_merge, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + f"({src_id}, {tgt_id})", + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" else: - status_message = f"Merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = GRAPH_FIELD_SEP.join(description_list) + status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) else: logger.error(f"Edge {src_id} - {tgt_id} has no description") description = "(no description)"