feat: track actual LLM usage in entity/relation merging
- Modified _handle_entity_relation_summary to return tuple[str, bool] - Updated merge functions to log "LLMmerg" vs "Merging" based on actual LLM usage - Replaced hardcoded fragment count prediction with real-time LLM usage tracking
This commit is contained in:
parent
cb0fe38b9a
commit
9eb2be79b8
1 changed files with 91 additions and 80 deletions
|
|
@ -120,7 +120,7 @@ async def _handle_entity_relation_summary(
|
||||||
seperator: str,
|
seperator: str,
|
||||||
global_config: dict,
|
global_config: dict,
|
||||||
llm_response_cache: BaseKVStorage | None = None,
|
llm_response_cache: BaseKVStorage | None = None,
|
||||||
) -> str:
|
) -> tuple[str, bool]:
|
||||||
"""Handle entity relation description summary using map-reduce approach.
|
"""Handle entity relation description summary using map-reduce approach.
|
||||||
|
|
||||||
This function summarizes a list of descriptions using a map-reduce strategy:
|
This function summarizes a list of descriptions using a map-reduce strategy:
|
||||||
|
|
@ -137,15 +137,15 @@ async def _handle_entity_relation_summary(
|
||||||
llm_response_cache: Optional cache for LLM responses
|
llm_response_cache: Optional cache for LLM responses
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Final summarized description string
|
Tuple of (final_summarized_description_string, llm_was_used_boolean)
|
||||||
"""
|
"""
|
||||||
# Handle empty input
|
# Handle empty input
|
||||||
if not description_list:
|
if not description_list:
|
||||||
return ""
|
return "", False
|
||||||
|
|
||||||
# If only one description, return it directly (no need for LLM call)
|
# If only one description, return it directly (no need for LLM call)
|
||||||
if len(description_list) == 1:
|
if len(description_list) == 1:
|
||||||
return description_list[0]
|
return description_list[0], False
|
||||||
|
|
||||||
# Get configuration
|
# Get configuration
|
||||||
tokenizer: Tokenizer = global_config["tokenizer"]
|
tokenizer: Tokenizer = global_config["tokenizer"]
|
||||||
|
|
@ -153,6 +153,7 @@ async def _handle_entity_relation_summary(
|
||||||
summary_max_tokens = global_config["summary_max_tokens"]
|
summary_max_tokens = global_config["summary_max_tokens"]
|
||||||
|
|
||||||
current_list = description_list[:] # Copy the list to avoid modifying original
|
current_list = description_list[:] # Copy the list to avoid modifying original
|
||||||
|
llm_was_used = False # Track whether LLM was used during the entire process
|
||||||
|
|
||||||
# Iterative map-reduce process
|
# Iterative map-reduce process
|
||||||
while True:
|
while True:
|
||||||
|
|
@ -165,17 +166,18 @@ async def _handle_entity_relation_summary(
|
||||||
len(current_list) < force_llm_summary_on_merge
|
len(current_list) < force_llm_summary_on_merge
|
||||||
and total_tokens < summary_max_tokens
|
and total_tokens < summary_max_tokens
|
||||||
):
|
):
|
||||||
# Already the final result
|
# Already the final result - no LLM needed
|
||||||
final_description = seperator.join(current_list)
|
final_description = seperator.join(current_list)
|
||||||
return final_description if final_description else ""
|
return final_description if final_description else "", llm_was_used
|
||||||
else:
|
else:
|
||||||
# Final summarization of remaining descriptions
|
# Final summarization of remaining descriptions - LLM will be used
|
||||||
return await _summarize_descriptions(
|
final_summary = await _summarize_descriptions(
|
||||||
entity_or_relation_name,
|
entity_or_relation_name,
|
||||||
current_list,
|
current_list,
|
||||||
global_config,
|
global_config,
|
||||||
llm_response_cache,
|
llm_response_cache,
|
||||||
)
|
)
|
||||||
|
return final_summary, True # LLM was used for final summarization
|
||||||
|
|
||||||
# Need to split into chunks - Map phase
|
# Need to split into chunks - Map phase
|
||||||
chunks = []
|
chunks = []
|
||||||
|
|
@ -188,7 +190,7 @@ async def _handle_entity_relation_summary(
|
||||||
# If adding current description would exceed limit, finalize current chunk
|
# If adding current description would exceed limit, finalize current chunk
|
||||||
if current_tokens + desc_tokens > summary_context_size and current_chunk:
|
if current_tokens + desc_tokens > summary_context_size and current_chunk:
|
||||||
chunks.append(current_chunk)
|
chunks.append(current_chunk)
|
||||||
current_chunk = [desc] # Intial chunk for next group
|
current_chunk = [desc] # Initial chunk for next group
|
||||||
current_tokens = desc_tokens
|
current_tokens = desc_tokens
|
||||||
else:
|
else:
|
||||||
current_chunk.append(desc)
|
current_chunk.append(desc)
|
||||||
|
|
@ -214,6 +216,7 @@ async def _handle_entity_relation_summary(
|
||||||
entity_or_relation_name, chunk, global_config, llm_response_cache
|
entity_or_relation_name, chunk, global_config, llm_response_cache
|
||||||
)
|
)
|
||||||
new_summaries.append(summary)
|
new_summaries.append(summary)
|
||||||
|
llm_was_used = True # Mark that LLM was used in reduce phase
|
||||||
|
|
||||||
# Update current list with new summaries for next iteration
|
# Update current list with new summaries for next iteration
|
||||||
current_list = new_summaries
|
current_list = new_summaries
|
||||||
|
|
@ -529,7 +532,7 @@ async def _rebuild_knowledge_from_chunks(
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failed_entities_count += 1
|
failed_entities_count += 1
|
||||||
status_message = f"Failed to rebuild entity {entity_name}: {e}"
|
status_message = f"Failed to rebuild entity `{entity_name}`: {e}"
|
||||||
logger.info(status_message) # Per requirement, change to info
|
logger.info(status_message) # Per requirement, change to info
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
|
@ -560,7 +563,7 @@ async def _rebuild_knowledge_from_chunks(
|
||||||
global_config=global_config,
|
global_config=global_config,
|
||||||
)
|
)
|
||||||
rebuilt_relationships_count += 1
|
rebuilt_relationships_count += 1
|
||||||
status_message = f"Relationship `{src}->{tgt}` rebuilt from {len(chunk_ids)} chunks"
|
status_message = f"Relationship `{src} - {tgt}` rebuilt from {len(chunk_ids)} chunks"
|
||||||
logger.info(status_message)
|
logger.info(status_message)
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
|
@ -568,7 +571,9 @@ async def _rebuild_knowledge_from_chunks(
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failed_relationships_count += 1
|
failed_relationships_count += 1
|
||||||
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
|
status_message = (
|
||||||
|
f"Failed to rebuild relationship `{src} - {tgt}`: {e}"
|
||||||
|
)
|
||||||
logger.info(status_message) # Per requirement, change to info
|
logger.info(status_message) # Per requirement, change to info
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
|
@ -867,7 +872,7 @@ async def _rebuild_single_entity(
|
||||||
# Generate description from relationships or fallback to current
|
# Generate description from relationships or fallback to current
|
||||||
if description_list:
|
if description_list:
|
||||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||||
final_description = await _handle_entity_relation_summary(
|
final_description, _ = await _handle_entity_relation_summary(
|
||||||
entity_name,
|
entity_name,
|
||||||
description_list,
|
description_list,
|
||||||
force_llm_summary_on_merge,
|
force_llm_summary_on_merge,
|
||||||
|
|
@ -909,7 +914,7 @@ async def _rebuild_single_entity(
|
||||||
# Generate final description and update storage
|
# Generate final description and update storage
|
||||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||||
if description_list:
|
if description_list:
|
||||||
final_description = await _handle_entity_relation_summary(
|
final_description, _ = await _handle_entity_relation_summary(
|
||||||
entity_name,
|
entity_name,
|
||||||
description_list,
|
description_list,
|
||||||
force_llm_summary_on_merge,
|
force_llm_summary_on_merge,
|
||||||
|
|
@ -990,7 +995,7 @@ async def _rebuild_single_relationship(
|
||||||
# Use summary if description has too many fragments
|
# Use summary if description has too many fragments
|
||||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||||
if description_list:
|
if description_list:
|
||||||
final_description = await _handle_entity_relation_summary(
|
final_description, _ = await _handle_entity_relation_summary(
|
||||||
f"{src}-{tgt}",
|
f"{src}-{tgt}",
|
||||||
description_list,
|
description_list,
|
||||||
force_llm_summary_on_merge,
|
force_llm_summary_on_merge,
|
||||||
|
|
@ -1065,13 +1070,9 @@ async def _merge_nodes_then_upsert(
|
||||||
already_node = await knowledge_graph_inst.get_node(entity_name)
|
already_node = await knowledge_graph_inst.get_node(entity_name)
|
||||||
if already_node:
|
if already_node:
|
||||||
already_entity_types.append(already_node["entity_type"])
|
already_entity_types.append(already_node["entity_type"])
|
||||||
already_source_ids.extend(
|
already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP))
|
||||||
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
|
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
|
||||||
)
|
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
|
||||||
already_file_paths.extend(
|
|
||||||
split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP])
|
|
||||||
)
|
|
||||||
already_description.append(already_node["description"])
|
|
||||||
|
|
||||||
entity_type = sorted(
|
entity_type = sorted(
|
||||||
Counter(
|
Counter(
|
||||||
|
|
@ -1079,39 +1080,45 @@ async def _merge_nodes_then_upsert(
|
||||||
).items(),
|
).items(),
|
||||||
key=lambda x: x[1],
|
key=lambda x: x[1],
|
||||||
reverse=True,
|
reverse=True,
|
||||||
)[0][0]
|
)[0][0] # Get the entity type with the highest count
|
||||||
|
|
||||||
description_list = already_description + list(
|
description_list = list(
|
||||||
dict.fromkeys([dp["description"] for dp in nodes_data if dp.get("description")])
|
dict.fromkeys(
|
||||||
|
already_description
|
||||||
|
+ [dp["description"] for dp in nodes_data if dp.get("description")]
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||||
num_fragment = len(description_list)
|
num_fragment = len(description_list)
|
||||||
already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1
|
already_fragment = len(already_description)
|
||||||
|
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
|
||||||
|
if deduplicated_num > 0:
|
||||||
|
dd_message = f"(dd:{deduplicated_num})"
|
||||||
|
else:
|
||||||
|
dd_message = ""
|
||||||
if num_fragment > 0:
|
if num_fragment > 0:
|
||||||
if num_fragment >= force_llm_summary_on_merge:
|
# Get summary and LLM usage status
|
||||||
status_message = f"LLM merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}"
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
logger.info(status_message)
|
entity_name,
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
description_list,
|
||||||
async with pipeline_status_lock:
|
force_llm_summary_on_merge,
|
||||||
pipeline_status["latest_message"] = status_message
|
GRAPH_FIELD_SEP,
|
||||||
pipeline_status["history_messages"].append(status_message)
|
global_config,
|
||||||
description = await _handle_entity_relation_summary(
|
llm_response_cache,
|
||||||
entity_name,
|
)
|
||||||
description_list,
|
|
||||||
force_llm_summary_on_merge,
|
# Log based on actual LLM usage
|
||||||
GRAPH_FIELD_SEP,
|
if llm_was_used:
|
||||||
global_config,
|
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||||
llm_response_cache,
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
status_message = f"Merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}"
|
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||||
logger.info(status_message)
|
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
logger.info(status_message)
|
||||||
async with pipeline_status_lock:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
pipeline_status["latest_message"] = status_message
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["latest_message"] = status_message
|
||||||
description = GRAPH_FIELD_SEP.join(description_list)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
else:
|
else:
|
||||||
logger.error(f"Entity {entity_name} has no description")
|
logger.error(f"Entity {entity_name} has no description")
|
||||||
description = "(no description)"
|
description = "(no description)"
|
||||||
|
|
@ -1167,22 +1174,20 @@ async def _merge_edges_then_upsert(
|
||||||
# Get source_id with empty string default if missing or None
|
# Get source_id with empty string default if missing or None
|
||||||
if already_edge.get("source_id") is not None:
|
if already_edge.get("source_id") is not None:
|
||||||
already_source_ids.extend(
|
already_source_ids.extend(
|
||||||
split_string_by_multi_markers(
|
already_edge["source_id"].split(GRAPH_FIELD_SEP)
|
||||||
already_edge["source_id"], [GRAPH_FIELD_SEP]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get file_path with empty string default if missing or None
|
# Get file_path with empty string default if missing or None
|
||||||
if already_edge.get("file_path") is not None:
|
if already_edge.get("file_path") is not None:
|
||||||
already_file_paths.extend(
|
already_file_paths.extend(
|
||||||
split_string_by_multi_markers(
|
already_edge["file_path"].split(GRAPH_FIELD_SEP)
|
||||||
already_edge["file_path"], [GRAPH_FIELD_SEP]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get description with empty string default if missing or None
|
# Get description with empty string default if missing or None
|
||||||
if already_edge.get("description") is not None:
|
if already_edge.get("description") is not None:
|
||||||
already_description.append(already_edge["description"])
|
already_description.extend(
|
||||||
|
already_edge["description"].split(GRAPH_FIELD_SEP)
|
||||||
|
)
|
||||||
|
|
||||||
# Get keywords with empty string default if missing or None
|
# Get keywords with empty string default if missing or None
|
||||||
if already_edge.get("keywords") is not None:
|
if already_edge.get("keywords") is not None:
|
||||||
|
|
@ -1195,37 +1200,43 @@ async def _merge_edges_then_upsert(
|
||||||
# Process edges_data with None checks
|
# Process edges_data with None checks
|
||||||
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
|
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
|
||||||
|
|
||||||
description_list = already_description + list(
|
description_list = list(
|
||||||
dict.fromkeys([dp["description"] for dp in edges_data if dp.get("description")])
|
dict.fromkeys(
|
||||||
|
already_description
|
||||||
|
+ [dp["description"] for dp in edges_data if dp.get("description")]
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||||
num_fragment = len(description_list)
|
num_fragment = len(description_list)
|
||||||
already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1
|
already_fragment = len(already_description)
|
||||||
|
deduplicated_num = already_fragment + len(edges_data) - num_fragment
|
||||||
|
if deduplicated_num > 0:
|
||||||
|
dd_message = f"(dd:{deduplicated_num})"
|
||||||
|
else:
|
||||||
|
dd_message = ""
|
||||||
if num_fragment > 0:
|
if num_fragment > 0:
|
||||||
if num_fragment >= force_llm_summary_on_merge:
|
# Get summary and LLM usage status
|
||||||
status_message = f"LLM merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}"
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
logger.info(status_message)
|
f"({src_id}, {tgt_id})",
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
description_list,
|
||||||
async with pipeline_status_lock:
|
force_llm_summary_on_merge,
|
||||||
pipeline_status["latest_message"] = status_message
|
GRAPH_FIELD_SEP,
|
||||||
pipeline_status["history_messages"].append(status_message)
|
global_config,
|
||||||
description = await _handle_entity_relation_summary(
|
llm_response_cache,
|
||||||
f"({src_id}, {tgt_id})",
|
)
|
||||||
description_list,
|
|
||||||
force_llm_summary_on_merge,
|
# Log based on actual LLM usage
|
||||||
GRAPH_FIELD_SEP,
|
if llm_was_used:
|
||||||
global_config,
|
status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||||
llm_response_cache,
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
status_message = f"Merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}"
|
status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||||
logger.info(status_message)
|
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
logger.info(status_message)
|
||||||
async with pipeline_status_lock:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
pipeline_status["latest_message"] = status_message
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["latest_message"] = status_message
|
||||||
description = GRAPH_FIELD_SEP.join(description_list)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
else:
|
else:
|
||||||
logger.error(f"Edge {src_id} - {tgt_id} has no description")
|
logger.error(f"Edge {src_id} - {tgt_id} has no description")
|
||||||
description = "(no description)"
|
description = "(no description)"
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue