Improve error handling and add cancellation checks in pipeline

(cherry picked from commit 77336e50b6)
This commit is contained in:
yangdx 2025-10-24 17:54:17 +08:00 committed by Raphaël MANSUY
parent a471f1ca0e
commit 687d2b6b13
2 changed files with 386 additions and 331 deletions

View file

@ -1699,10 +1699,16 @@ class LightRAG:
semaphore: asyncio.Semaphore,
) -> None:
"""Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore:
nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None
@ -1833,16 +1839,29 @@ class LightRAG:
file_extraction_stage_ok = True
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
@ -1951,18 +1970,29 @@ class LightRAG:
)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache
if self.llm_response_cache:

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from functools import partial
from pathlib import Path
import asyncio
import json
@ -58,6 +59,8 @@ from lightrag.constants import (
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
DEFAULT_MAX_FILE_PATHS,
DEFAULT_ENTITY_NAME_MAX_LENGTH,
)
from lightrag.kg.shared_storage import get_storage_keyed_lock
import time
@ -66,7 +69,28 @@ from dotenv import load_dotenv
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
def _truncate_entity_identifier(
identifier: str, limit: int, chunk_key: str, identifier_role: str
) -> str:
"""Truncate entity identifiers that exceed the configured length limit."""
if len(identifier) <= limit:
return identifier
display_value = identifier[:limit]
preview = identifier[:20] # Show first 20 characters as preview
logger.warning(
"%s: %s len %d > %d chars (Name: '%s...')",
chunk_key,
identifier_role,
len(identifier),
limit,
preview,
)
return display_value
def chunking_by_token_size(
@ -952,7 +976,14 @@ async def _process_extraction_result(
record_attributes, chunk_key, timestamp, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
truncated_name = _truncate_entity_identifier(
entity_data["entity_name"],
DEFAULT_ENTITY_NAME_MAX_LENGTH,
chunk_key,
"Entity name",
)
entity_data["entity_name"] = truncated_name
maybe_nodes[truncated_name].append(entity_data)
continue
# Try to parse as relationship
@ -960,9 +991,21 @@ async def _process_extraction_result(
record_attributes, chunk_key, timestamp, file_path
)
if relationship_data is not None:
maybe_edges[
(relationship_data["src_id"], relationship_data["tgt_id"])
].append(relationship_data)
truncated_source = _truncate_entity_identifier(
relationship_data["src_id"],
DEFAULT_ENTITY_NAME_MAX_LENGTH,
chunk_key,
"Relation entity",
)
truncated_target = _truncate_entity_identifier(
relationship_data["tgt_id"],
DEFAULT_ENTITY_NAME_MAX_LENGTH,
chunk_key,
"Relation entity",
)
relationship_data["src_id"] = truncated_source
relationship_data["tgt_id"] = truncated_target
maybe_edges[(truncated_source, truncated_target)].append(relationship_data)
return dict(maybe_nodes), dict(maybe_edges)
@ -1026,7 +1069,7 @@ async def _rebuild_single_entity(
async def _update_entity_storage(
final_description: str,
entity_type: str,
file_paths: set[str],
file_paths: list[str],
source_chunk_ids: list[str],
truncation_info: str = "",
):
@ -1189,14 +1232,12 @@ async def _rebuild_single_entity(
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})"
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions))
entity_types = list(dict.fromkeys(entity_types))
@ -1223,7 +1264,7 @@ async def _rebuild_single_entity(
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
@ -1231,7 +1272,7 @@ async def _rebuild_single_entity(
await _update_entity_storage(
final_description,
entity_type,
file_paths,
file_paths_list,
limited_chunk_ids,
truncation_info,
)
@ -1348,14 +1389,12 @@ async def _rebuild_single_relationship(
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})"
)
logger.info(
f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions))
keywords = list(dict.fromkeys(keywords))
@ -1384,7 +1423,7 @@ async def _rebuild_single_relationship(
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
@ -1398,8 +1437,8 @@ async def _rebuild_single_relationship(
"keywords": combined_keywords,
"weight": weight,
"source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids),
"file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp])
if file_paths
"file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths_list if fp])
if file_paths_list
else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
}
@ -1484,6 +1523,7 @@ async def _merge_nodes_then_upsert(
already_description = []
already_file_paths = []
# 1. Get existing node data from knowledge graph
already_node = await knowledge_graph_inst.get_node(entity_name)
if already_node:
already_entity_types.append(already_node["entity_type"])
@ -1491,14 +1531,6 @@ async def _merge_nodes_then_upsert(
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
entity_type = sorted(
Counter(
[dp["entity_type"] for dp in nodes_data] + already_entity_types
).items(),
key=lambda x: x[1],
reverse=True,
)[0][0] # Get the entity type with the highest count
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
existing_full_source_ids = []
@ -1514,6 +1546,7 @@ async def _merge_nodes_then_upsert(
chunk_id for chunk_id in already_source_ids if chunk_id
]
# 2. Merging new source ids with existing ones
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if entity_chunks_storage is not None and full_source_ids:
@ -1526,6 +1559,7 @@ async def _merge_nodes_then_upsert(
}
)
# 3. Finalize source_id by applying source ids limit
limit_method = global_config.get("source_ids_limit_method")
max_source_limit = global_config.get("max_source_ids_per_entity")
source_ids = apply_source_ids_limit(
@ -1535,7 +1569,7 @@ async def _merge_nodes_then_upsert(
identifier=f"`{entity_name}`",
)
# Only apply filtering in KEEP(ignore new) mode
# 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_nodes = []
@ -1550,18 +1584,40 @@ async def _merge_nodes_then_upsert(
continue
filtered_nodes.append(dp)
nodes_data = filtered_nodes
else:
# In FIFO mode, keep all node descriptions - truncation happens at source_ids level only
else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only
nodes_data = list(nodes_data)
skip_summary_due_to_limit = (
# 5. Check if we need to skip summary due to source_ids limit
if (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not nodes_data
and already_description
)
):
if already_node:
logger.info(
f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}"
)
existing_node_data = dict(already_node)
return existing_node_data
else:
logger.error(f"Internal Error: already_node missing for `{entity_name}`")
raise ValueError(
f"Internal Error: already_node missing for `{entity_name}`"
)
# Deduplicate by description, keeping first occurrence
# 6.1 Finalize source_id
source_id = GRAPH_FIELD_SEP.join(source_ids)
# 6.2 Finalize entity type by highest count
entity_type = sorted(
Counter(
[dp["entity_type"] for dp in nodes_data] + already_entity_types
).items(),
key=lambda x: x[1],
reverse=True,
)[0][0]
# 7. Deduplicate nodes by description, keeping first occurrence in the same document
unique_nodes = {}
for dp in nodes_data:
desc = dp.get("description")
@ -1570,154 +1626,128 @@ async def _merge_nodes_then_upsert(
if desc not in unique_nodes:
unique_nodes[desc] = dp
# Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
# Sort description by timestamp, then by description length when timestamps are the same
sorted_nodes = sorted(
unique_nodes.values(),
key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))),
)
sorted_descriptions = [dp["description"] for dp in sorted_nodes]
truncation_info = ""
dd_message = ""
has_placeholder = False # Initialize to track placeholder in file paths
# Combine already_description with sorted new sorted descriptions
description_list = already_description + sorted_descriptions
num_fragment = len(description_list)
already_fragment = len(already_description)
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
if skip_summary_due_to_limit:
logger.info(f"Skipped `{entity_name}`: KEEP old chunks")
description = (
already_node.get("description", "(no description)")
if already_node
else "(no description)"
)
existing_node_data = dict(already_node or {})
if not existing_node_data:
existing_node_data = {
"entity_id": entity_name,
"entity_type": entity_type,
"description": description,
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
"created_at": int(time.time()),
"truncate": "",
}
existing_node_data["entity_name"] = entity_name
return existing_node_data
elif num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
entity_name,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# Log based on actual LLM usage
if llm_was_used:
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
else:
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
# Add + sign if has_placeholder is True, indicating actual file count is higher
full_source_count_str = (
f"{len(full_source_ids)}+"
if has_placeholder
else str(len(full_source_ids))
)
truncation_info = (
f"{limit_method}:{len(source_ids)}/{full_source_count_str}"
)
if dd_message or truncation_info:
status_message += (
f" ({', '.join(filter(None, [truncation_info, dd_message]))})"
)
if already_fragment > 0 or llm_was_used:
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.debug(status_message)
else:
if not description_list:
logger.error(f"Entity {entity_name} has no description")
description = "(no description)"
raise ValueError(f"Entity {entity_name} has no description")
source_id = GRAPH_FIELD_SEP.join(source_ids)
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during entity summary")
# Build file_path with count limit
if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
has_placeholder = False # Track if already_file_paths contains placeholder
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
entity_name,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# Get placeholder to filter it out
# 9. Build file_path within MAX_FILE_PATHS
file_paths_list = []
seen_paths = set()
has_placeholder = False # Indicating file_path has been truncated before
max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
has_placeholder = True
continue
if fp and fp not in seen_paths:
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in nodes_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Add + sign to indicate actual file count is higher
original_count_str = (
f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list))
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Check if this is a placeholder record
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
has_placeholder = True
continue
if fp and fp not in seen_paths:
file_paths_list.append(fp)
seen_paths.add(fp)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
file_paths_list.append(f"...{file_path_placeholder}...(FIFO)")
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)")
# Collect from new data
for dp in nodes_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
logger.info(
f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})"
)
# Finalize file_path
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
# 10.Log based on actual LLM usage
num_fragment = len(description_list)
already_fragment = len(already_description)
if llm_was_used:
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
else:
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
truncation_info = truncation_info_log = ""
if len(source_ids) < len(full_source_ids):
# Add truncation info from apply_source_ids_limit if truncation occurred
truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}"
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncation_info = truncation_info_log
else:
truncation_info = "KEEP Old"
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
dd_message = ""
if deduplicated_num > 0:
# Duplicated description detected across multiple trucks for the same entity
dd_message = f"dd {deduplicated_num}"
file_paths_list.append(
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
if dd_message or truncation_info_log:
status_message += (
f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})"
)
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
# Add message to pipeline satus when merge happens
if already_fragment > 0 or llm_was_used:
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.debug(status_message)
# 11. Update both graph and vector db
node_data = dict(
entity_id=entity_name,
entity_type=entity_type,
@ -1778,6 +1808,7 @@ async def _merge_edges_then_upsert(
already_keywords = []
already_file_paths = []
# 1. Get existing edge data from graph storage
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
# Handle the case where get_edge returns None or missing fields
@ -1827,6 +1858,7 @@ async def _merge_edges_then_upsert(
chunk_id for chunk_id in already_source_ids if chunk_id
]
# 2. Merge new source ids with existing ones
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if relation_chunks_storage is not None and full_source_ids:
@ -1839,6 +1871,7 @@ async def _merge_edges_then_upsert(
}
)
# 3. Finalize source_id by applying source ids limit
limit_method = global_config.get("source_ids_limit_method")
max_source_limit = global_config.get("max_source_ids_per_relation")
source_ids = apply_source_ids_limit(
@ -1851,7 +1884,7 @@ async def _merge_edges_then_upsert(
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
# Only apply filtering in KEEP(ignore new) mode
# 4. Only keep edges with source_id in the final source_ids list if in KEEP mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_edges = []
@ -1866,21 +1899,51 @@ async def _merge_edges_then_upsert(
continue
filtered_edges.append(dp)
edges_data = filtered_edges
else:
# In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only
else: # In FIFO mode, keep all edges - truncation happens at source_ids level only
edges_data = list(edges_data)
skip_summary_due_to_limit = (
# 5. Check if we need to skip summary due to source_ids limit
if (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not edges_data
and already_description
)
):
if already_edge:
logger.info(
f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}"
)
existing_edge_data = dict(already_edge)
return existing_edge_data
else:
logger.error(
f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`"
)
raise ValueError(
f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`"
)
# Process edges_data with None checks
# 6.1 Finalize source_id
source_id = GRAPH_FIELD_SEP.join(source_ids)
# 6.2 Finalize weight by summing new edges and existing weights
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
# Deduplicate by description, keeping first occurrence
# 6.2 Finalize keywords by merging existing and new keywords
all_keywords = set()
# Process already_keywords (which are comma-separated)
for keyword_str in already_keywords:
if keyword_str: # Skip empty strings
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
# Process new keywords from edges_data
for edge in edges_data:
if edge.get("keywords"):
all_keywords.update(
k.strip() for k in edge["keywords"].split(",") if k.strip()
)
# Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords))
# 7. Deduplicate by description, keeping first occurrence in the same document
unique_edges = {}
for dp in edges_data:
description_value = dp.get("description")
@ -1896,165 +1959,127 @@ async def _merge_edges_then_upsert(
)
sorted_descriptions = [dp["description"] for dp in sorted_edges]
truncation_info = ""
dd_message = ""
has_placeholder = False # Initialize to track placeholder in file paths
# Combine already_description with sorted new descriptions
description_list = already_description + sorted_descriptions
if not description_list:
logger.error(f"Relation {src_id}~{tgt_id} has no description")
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
num_fragment = len(description_list)
already_fragment = len(already_description)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation summary"
)
if skip_summary_due_to_limit:
logger.info(f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks")
description = (
already_edge.get("description", "(no description)")
if already_edge
else "(no description)"
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
f"({src_id}, {tgt_id})",
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# 9. Build file_path within MAX_FILE_PATHS limit
file_paths_list = []
seen_paths = set()
has_placeholder = False # Track if already_file_paths contains placeholder
max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Check if this is a placeholder record
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
has_placeholder = True
continue
if fp and fp not in seen_paths:
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in edges_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
existing_edge_data = dict(already_edge or {})
if not existing_edge_data:
existing_edge_data = {
"description": description,
"keywords": GRAPH_FIELD_SEP.join(already_keywords),
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
"weight": sum(already_weights) if already_weights else 0.0,
"truncate": "",
"created_at": int(time.time()),
}
existing_edge_data.setdefault("created_at", int(time.time()))
existing_edge_data["src_id"] = src_id
existing_edge_data["tgt_id"] = tgt_id
return existing_edge_data
elif num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
f"({src_id}, {tgt_id})",
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# Log based on actual LLM usage
if llm_was_used:
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
else:
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
# Add + sign if has_placeholder is True, indicating actual file count is higher
full_source_count_str = (
f"{len(full_source_ids)}+"
if has_placeholder
else str(len(full_source_ids))
)
truncation_info = (
f"{limit_method}:{len(source_ids)}/{full_source_count_str}"
)
if dd_message or truncation_info:
status_message += (
f" ({', '.join(filter(None, [truncation_info, dd_message]))})"
)
if already_fragment > 0 or llm_was_used:
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.debug(status_message)
else:
logger.error(f"Edge {src_id} - {tgt_id} has no description")
description = "(no description)"
# Split all existing and new keywords into individual terms, then combine and deduplicate
all_keywords = set()
# Process already_keywords (which are comma-separated)
for keyword_str in already_keywords:
if keyword_str: # Skip empty strings
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
# Process new keywords from edges_data
for edge in edges_data:
if edge.get("keywords"):
all_keywords.update(
k.strip() for k in edge["keywords"].split(",") if k.strip()
)
# Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords))
source_id = GRAPH_FIELD_SEP.join(source_ids)
# Build file_path with count limit
if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
has_placeholder = False # Track if already_file_paths contains placeholder
# Get placeholder to filter it out
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Check if this is a placeholder record
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
has_placeholder = True
continue
if fp and fp not in seen_paths:
file_paths_list.append(fp)
seen_paths.add(fp)
# Add + sign to indicate actual file count is higher
original_count_str = (
f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list))
)
# Collect from new data
for dp in edges_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
file_paths_list.append(f"...{file_path_placeholder}...(FIFO)")
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)")
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
logger.info(
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})"
)
# Finalize file_path
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
# 10. Log based on actual LLM usage
num_fragment = len(description_list)
already_fragment = len(already_description)
if llm_was_used:
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
else:
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
truncation_info = truncation_info_log = ""
if len(source_ids) < len(full_source_ids):
# Add truncation info from apply_source_ids_limit if truncation occurred
truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}"
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncation_info = truncation_info_log
else:
truncation_info = "KEEP Old"
file_paths_list.append(
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
)
logger.info(
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
dd_message = ""
if deduplicated_num > 0:
# Duplicated description detected across multiple trucks for the same entity
dd_message = f"dd {deduplicated_num}"
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
if dd_message or truncation_info_log:
status_message += (
f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})"
)
# Add message to pipeline satus when merge happens
if already_fragment > 0 or llm_was_used:
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.debug(status_message)
# 11. Update both graph and vector db
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
node_created_at = int(time.time())