cherry-pick 3ed2abd8
This commit is contained in:
parent
6a4300d0bf
commit
ab6e8a9cf4
1 changed files with 357 additions and 378 deletions
|
|
@ -57,6 +57,7 @@ from lightrag.constants import (
|
||||||
SOURCE_IDS_LIMIT_METHOD_KEEP,
|
SOURCE_IDS_LIMIT_METHOD_KEEP,
|
||||||
SOURCE_IDS_LIMIT_METHOD_FIFO,
|
SOURCE_IDS_LIMIT_METHOD_FIFO,
|
||||||
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
|
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
|
||||||
|
DEFAULT_MAX_FILE_PATHS,
|
||||||
)
|
)
|
||||||
from lightrag.kg.shared_storage import get_storage_keyed_lock
|
from lightrag.kg.shared_storage import get_storage_keyed_lock
|
||||||
import time
|
import time
|
||||||
|
|
@ -1188,7 +1189,7 @@ async def _rebuild_single_entity(
|
||||||
file_paths_list = file_paths_list[:max_file_paths]
|
file_paths_list = file_paths_list[:max_file_paths]
|
||||||
|
|
||||||
file_paths_list.append(
|
file_paths_list.append(
|
||||||
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
|
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
||||||
|
|
@ -1347,7 +1348,7 @@ async def _rebuild_single_relationship(
|
||||||
file_paths_list = file_paths_list[:max_file_paths]
|
file_paths_list = file_paths_list[:max_file_paths]
|
||||||
|
|
||||||
file_paths_list.append(
|
file_paths_list.append(
|
||||||
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
|
f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..."
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
||||||
|
|
@ -1470,6 +1471,7 @@ async def _merge_nodes_then_upsert(
|
||||||
entity_name: str,
|
entity_name: str,
|
||||||
nodes_data: list[dict],
|
nodes_data: list[dict],
|
||||||
knowledge_graph_inst: BaseGraphStorage,
|
knowledge_graph_inst: BaseGraphStorage,
|
||||||
|
entity_vdb: BaseVectorStorage | None,
|
||||||
global_config: dict,
|
global_config: dict,
|
||||||
pipeline_status: dict = None,
|
pipeline_status: dict = None,
|
||||||
pipeline_status_lock=None,
|
pipeline_status_lock=None,
|
||||||
|
|
@ -1482,6 +1484,7 @@ async def _merge_nodes_then_upsert(
|
||||||
already_description = []
|
already_description = []
|
||||||
already_file_paths = []
|
already_file_paths = []
|
||||||
|
|
||||||
|
# 1. Get existing node data from knowledge graph
|
||||||
already_node = await knowledge_graph_inst.get_node(entity_name)
|
already_node = await knowledge_graph_inst.get_node(entity_name)
|
||||||
if already_node:
|
if already_node:
|
||||||
already_entity_types.append(already_node["entity_type"])
|
already_entity_types.append(already_node["entity_type"])
|
||||||
|
|
@ -1489,16 +1492,6 @@ async def _merge_nodes_then_upsert(
|
||||||
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
|
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
|
||||||
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
|
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
|
||||||
|
|
||||||
entity_type = sorted(
|
|
||||||
Counter(
|
|
||||||
[dp["entity_type"] for dp in nodes_data] + already_entity_types
|
|
||||||
).items(),
|
|
||||||
key=lambda x: x[1],
|
|
||||||
reverse=True,
|
|
||||||
)[0][0] # Get the entity type with the highest count
|
|
||||||
|
|
||||||
original_nodes_count = len(nodes_data)
|
|
||||||
|
|
||||||
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
|
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
|
||||||
|
|
||||||
existing_full_source_ids = []
|
existing_full_source_ids = []
|
||||||
|
|
@ -1514,6 +1507,7 @@ async def _merge_nodes_then_upsert(
|
||||||
chunk_id for chunk_id in already_source_ids if chunk_id
|
chunk_id for chunk_id in already_source_ids if chunk_id
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# 2. Merging new source ids with existing ones
|
||||||
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
|
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
|
||||||
|
|
||||||
if entity_chunks_storage is not None and full_source_ids:
|
if entity_chunks_storage is not None and full_source_ids:
|
||||||
|
|
@ -1526,6 +1520,7 @@ async def _merge_nodes_then_upsert(
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 3. Finalize source_id by applying source ids limit
|
||||||
limit_method = global_config.get("source_ids_limit_method")
|
limit_method = global_config.get("source_ids_limit_method")
|
||||||
max_source_limit = global_config.get("max_source_ids_per_entity")
|
max_source_limit = global_config.get("max_source_ids_per_entity")
|
||||||
source_ids = apply_source_ids_limit(
|
source_ids = apply_source_ids_limit(
|
||||||
|
|
@ -1535,7 +1530,7 @@ async def _merge_nodes_then_upsert(
|
||||||
identifier=f"`{entity_name}`",
|
identifier=f"`{entity_name}`",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only apply filtering in KEEP(ignore new) mode
|
# 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP
|
||||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
||||||
allowed_source_ids = set(source_ids)
|
allowed_source_ids = set(source_ids)
|
||||||
filtered_nodes = []
|
filtered_nodes = []
|
||||||
|
|
@ -1550,18 +1545,40 @@ async def _merge_nodes_then_upsert(
|
||||||
continue
|
continue
|
||||||
filtered_nodes.append(dp)
|
filtered_nodes.append(dp)
|
||||||
nodes_data = filtered_nodes
|
nodes_data = filtered_nodes
|
||||||
else:
|
else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only
|
||||||
# In FIFO mode, keep all node descriptions - truncation happens at source_ids level only
|
|
||||||
nodes_data = list(nodes_data)
|
nodes_data = list(nodes_data)
|
||||||
|
|
||||||
skip_summary_due_to_limit = (
|
# 5. Check if we need to skip summary due to source_ids limit
|
||||||
|
if (
|
||||||
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
|
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
and len(existing_full_source_ids) >= max_source_limit
|
and len(existing_full_source_ids) >= max_source_limit
|
||||||
and not nodes_data
|
and not nodes_data
|
||||||
and already_description
|
):
|
||||||
)
|
if already_node:
|
||||||
|
logger.info(
|
||||||
|
f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}"
|
||||||
|
)
|
||||||
|
existing_node_data = dict(already_node)
|
||||||
|
return existing_node_data
|
||||||
|
else:
|
||||||
|
logger.error(f"Internal Error: already_node missing for `{entity_name}`")
|
||||||
|
raise ValueError(
|
||||||
|
f"Internal Error: already_node missing for `{entity_name}`"
|
||||||
|
)
|
||||||
|
|
||||||
# Deduplicate by description, keeping first occurrence
|
# 6.1 Finalize source_id
|
||||||
|
source_id = GRAPH_FIELD_SEP.join(source_ids)
|
||||||
|
|
||||||
|
# 6.2 Finalize entity type by highest count
|
||||||
|
entity_type = sorted(
|
||||||
|
Counter(
|
||||||
|
[dp["entity_type"] for dp in nodes_data] + already_entity_types
|
||||||
|
).items(),
|
||||||
|
key=lambda x: x[1],
|
||||||
|
reverse=True,
|
||||||
|
)[0][0]
|
||||||
|
|
||||||
|
# 7. Deduplicate nodes by description, keeping first occurrence in the same document
|
||||||
unique_nodes = {}
|
unique_nodes = {}
|
||||||
for dp in nodes_data:
|
for dp in nodes_data:
|
||||||
desc = dp.get("description")
|
desc = dp.get("description")
|
||||||
|
|
@ -1570,146 +1587,122 @@ async def _merge_nodes_then_upsert(
|
||||||
if desc not in unique_nodes:
|
if desc not in unique_nodes:
|
||||||
unique_nodes[desc] = dp
|
unique_nodes[desc] = dp
|
||||||
|
|
||||||
# Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
|
# Sort description by timestamp, then by description length when timestamps are the same
|
||||||
sorted_nodes = sorted(
|
sorted_nodes = sorted(
|
||||||
unique_nodes.values(),
|
unique_nodes.values(),
|
||||||
key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))),
|
key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))),
|
||||||
)
|
)
|
||||||
sorted_descriptions = [dp["description"] for dp in sorted_nodes]
|
sorted_descriptions = [dp["description"] for dp in sorted_nodes]
|
||||||
|
|
||||||
truncation_info = ""
|
|
||||||
dd_message = ""
|
|
||||||
has_placeholder = False # Initialize to track placeholder in file paths
|
|
||||||
|
|
||||||
# Combine already_description with sorted new sorted descriptions
|
# Combine already_description with sorted new sorted descriptions
|
||||||
description_list = already_description + sorted_descriptions
|
description_list = already_description + sorted_descriptions
|
||||||
deduplicated_num = original_nodes_count - len(sorted_descriptions)
|
if not description_list:
|
||||||
if deduplicated_num > 0:
|
logger.error(f"Entity {entity_name} has no description")
|
||||||
dd_message = f"dd:{deduplicated_num}"
|
raise ValueError(f"Entity {entity_name} has no description")
|
||||||
|
|
||||||
|
# 8. Get summary description an LLM usage status
|
||||||
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
|
"Entity",
|
||||||
|
entity_name,
|
||||||
|
description_list,
|
||||||
|
GRAPH_FIELD_SEP,
|
||||||
|
global_config,
|
||||||
|
llm_response_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 9. Build file_path within MAX_FILE_PATHS
|
||||||
|
file_paths_list = []
|
||||||
|
seen_paths = set()
|
||||||
|
has_placeholder = False # Indicating file_path has been truncated before
|
||||||
|
|
||||||
|
max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS)
|
||||||
|
file_path_placeholder = global_config.get(
|
||||||
|
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect from already_file_paths, excluding placeholder
|
||||||
|
for fp in already_file_paths:
|
||||||
|
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
|
||||||
|
has_placeholder = True
|
||||||
|
continue
|
||||||
|
if fp and fp not in seen_paths:
|
||||||
|
file_paths_list.append(fp)
|
||||||
|
seen_paths.add(fp)
|
||||||
|
|
||||||
|
# Collect from new data
|
||||||
|
for dp in nodes_data:
|
||||||
|
file_path_item = dp.get("file_path")
|
||||||
|
if file_path_item and file_path_item not in seen_paths:
|
||||||
|
file_paths_list.append(file_path_item)
|
||||||
|
seen_paths.add(file_path_item)
|
||||||
|
|
||||||
|
# Apply count limit
|
||||||
|
if len(file_paths_list) > max_file_paths:
|
||||||
|
limit_method = global_config.get(
|
||||||
|
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
|
)
|
||||||
|
file_path_placeholder = global_config.get(
|
||||||
|
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
||||||
|
)
|
||||||
|
# Add + sign to indicate actual file count is higher
|
||||||
|
original_count_str = (
|
||||||
|
f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list))
|
||||||
|
)
|
||||||
|
|
||||||
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
||||||
|
# FIFO: keep tail (newest), discard head
|
||||||
|
file_paths_list = file_paths_list[-max_file_paths:]
|
||||||
|
file_paths_list.append(f"...{file_path_placeholder}...(FIFO)")
|
||||||
|
else:
|
||||||
|
# KEEP: keep head (earliest), discard tail
|
||||||
|
file_paths_list = file_paths_list[:max_file_paths]
|
||||||
|
file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)")
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})"
|
||||||
|
)
|
||||||
|
# Finalize file_path
|
||||||
|
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
|
||||||
|
|
||||||
|
# 10.Log based on actual LLM usage
|
||||||
num_fragment = len(description_list)
|
num_fragment = len(description_list)
|
||||||
already_fragment = len(already_description)
|
already_fragment = len(already_description)
|
||||||
if skip_summary_due_to_limit:
|
if llm_was_used:
|
||||||
description = (
|
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
|
||||||
already_node.get("description", "(no description)")
|
else:
|
||||||
if already_node
|
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
|
||||||
else "(no description)"
|
|
||||||
|
truncation_info = truncation_info_log = ""
|
||||||
|
if len(source_ids) < len(full_source_ids):
|
||||||
|
# Add truncation info from apply_source_ids_limit if truncation occurred
|
||||||
|
truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}"
|
||||||
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
||||||
|
truncation_info = truncation_info_log
|
||||||
|
else:
|
||||||
|
truncation_info = "KEEP Old"
|
||||||
|
|
||||||
|
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
|
||||||
|
dd_message = ""
|
||||||
|
if deduplicated_num > 0:
|
||||||
|
# Duplicated description detected across multiple trucks for the same entity
|
||||||
|
dd_message = f"dd {deduplicated_num}"
|
||||||
|
|
||||||
|
if dd_message or truncation_info_log:
|
||||||
|
status_message += (
|
||||||
|
f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})"
|
||||||
)
|
)
|
||||||
llm_was_used = False
|
|
||||||
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
|
# Add message to pipeline satus when merge happens
|
||||||
logger.debug(status_message)
|
if already_fragment > 0 or llm_was_used:
|
||||||
|
logger.info(status_message)
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = status_message
|
pipeline_status["latest_message"] = status_message
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
elif num_fragment > 0:
|
|
||||||
# Get summary and LLM usage status
|
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
|
||||||
"Entity",
|
|
||||||
entity_name,
|
|
||||||
description_list,
|
|
||||||
GRAPH_FIELD_SEP,
|
|
||||||
global_config,
|
|
||||||
llm_response_cache,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Log based on actual LLM usage
|
|
||||||
if llm_was_used:
|
|
||||||
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
|
|
||||||
else:
|
|
||||||
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
|
|
||||||
|
|
||||||
# Add truncation info from apply_source_ids_limit if truncation occurred
|
|
||||||
if len(source_ids) < len(full_source_ids):
|
|
||||||
# Add + sign if has_placeholder is True, indicating actual file count is higher
|
|
||||||
full_source_count_str = (
|
|
||||||
f"{len(full_source_ids)}+"
|
|
||||||
if has_placeholder
|
|
||||||
else str(len(full_source_ids))
|
|
||||||
)
|
|
||||||
truncation_info = (
|
|
||||||
f"{limit_method}:{len(source_ids)}/{full_source_count_str}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if dd_message or truncation_info:
|
|
||||||
status_message += f" ({', '.join([truncation_info, dd_message])})"
|
|
||||||
|
|
||||||
if already_fragment > 0 or llm_was_used:
|
|
||||||
logger.info(status_message)
|
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
pipeline_status["latest_message"] = status_message
|
|
||||||
pipeline_status["history_messages"].append(status_message)
|
|
||||||
else:
|
|
||||||
logger.debug(status_message)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error(f"Entity {entity_name} has no description")
|
logger.debug(status_message)
|
||||||
description = "(no description)"
|
|
||||||
|
|
||||||
source_id = GRAPH_FIELD_SEP.join(source_ids)
|
|
||||||
|
|
||||||
# Build file_path with count limit
|
|
||||||
if skip_summary_due_to_limit:
|
|
||||||
# Skip limit, keep original file_path
|
|
||||||
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
|
|
||||||
else:
|
|
||||||
# Collect and apply limit
|
|
||||||
file_paths_list = []
|
|
||||||
seen_paths = set()
|
|
||||||
has_placeholder = False # Track if already_file_paths contains placeholder
|
|
||||||
|
|
||||||
# Get placeholder to filter it out
|
|
||||||
file_path_placeholder = global_config.get(
|
|
||||||
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
|
||||||
)
|
|
||||||
|
|
||||||
# Collect from already_file_paths, excluding placeholder
|
|
||||||
for fp in already_file_paths:
|
|
||||||
# Check if this is a placeholder record
|
|
||||||
if fp and fp.startswith(f"...{file_path_placeholder}"):
|
|
||||||
has_placeholder = True
|
|
||||||
continue
|
|
||||||
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
|
|
||||||
if fp and fp not in seen_paths:
|
|
||||||
file_paths_list.append(fp)
|
|
||||||
seen_paths.add(fp)
|
|
||||||
|
|
||||||
# Collect from new data
|
|
||||||
for dp in nodes_data:
|
|
||||||
file_path_item = dp.get("file_path")
|
|
||||||
if file_path_item and file_path_item not in seen_paths:
|
|
||||||
file_paths_list.append(file_path_item)
|
|
||||||
seen_paths.add(file_path_item)
|
|
||||||
|
|
||||||
# Apply count limit
|
|
||||||
max_file_paths = global_config.get("max_file_paths")
|
|
||||||
|
|
||||||
if len(file_paths_list) > max_file_paths:
|
|
||||||
limit_method = global_config.get(
|
|
||||||
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
|
||||||
)
|
|
||||||
file_path_placeholder = global_config.get(
|
|
||||||
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
|
||||||
)
|
|
||||||
original_count = len(file_paths_list)
|
|
||||||
|
|
||||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
|
||||||
# FIFO: keep tail (newest), discard head
|
|
||||||
file_paths_list = file_paths_list[-max_file_paths:]
|
|
||||||
else:
|
|
||||||
# KEEP: keep head (earliest), discard tail
|
|
||||||
file_paths_list = file_paths_list[:max_file_paths]
|
|
||||||
|
|
||||||
file_paths_list.append(
|
|
||||||
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
|
||||||
)
|
|
||||||
|
|
||||||
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
|
|
||||||
|
|
||||||
|
# 11. Update both graph and vector db
|
||||||
node_data = dict(
|
node_data = dict(
|
||||||
entity_id=entity_name,
|
entity_id=entity_name,
|
||||||
entity_type=entity_type,
|
entity_type=entity_type,
|
||||||
|
|
@ -1724,6 +1717,25 @@ async def _merge_nodes_then_upsert(
|
||||||
node_data=node_data,
|
node_data=node_data,
|
||||||
)
|
)
|
||||||
node_data["entity_name"] = entity_name
|
node_data["entity_name"] = entity_name
|
||||||
|
if entity_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-")
|
||||||
|
entity_content = f"{entity_name}\n{description}"
|
||||||
|
data_for_vdb = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"entity_name": entity_name,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
"content": entity_content,
|
||||||
|
"source_id": source_id,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload),
|
||||||
|
operation_name="entity_upsert",
|
||||||
|
entity_name=entity_name,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
return node_data
|
return node_data
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1732,6 +1744,8 @@ async def _merge_edges_then_upsert(
|
||||||
tgt_id: str,
|
tgt_id: str,
|
||||||
edges_data: list[dict],
|
edges_data: list[dict],
|
||||||
knowledge_graph_inst: BaseGraphStorage,
|
knowledge_graph_inst: BaseGraphStorage,
|
||||||
|
relationships_vdb: BaseVectorStorage | None,
|
||||||
|
entity_vdb: BaseVectorStorage | None,
|
||||||
global_config: dict,
|
global_config: dict,
|
||||||
pipeline_status: dict = None,
|
pipeline_status: dict = None,
|
||||||
pipeline_status_lock=None,
|
pipeline_status_lock=None,
|
||||||
|
|
@ -1742,12 +1756,14 @@ async def _merge_edges_then_upsert(
|
||||||
if src_id == tgt_id:
|
if src_id == tgt_id:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
already_edge = None
|
||||||
already_weights = []
|
already_weights = []
|
||||||
already_source_ids = []
|
already_source_ids = []
|
||||||
already_description = []
|
already_description = []
|
||||||
already_keywords = []
|
already_keywords = []
|
||||||
already_file_paths = []
|
already_file_paths = []
|
||||||
|
|
||||||
|
# 1. Get existing edge data from graph storage
|
||||||
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
|
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
|
||||||
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
|
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
|
||||||
# Handle the case where get_edge returns None or missing fields
|
# Handle the case where get_edge returns None or missing fields
|
||||||
|
|
@ -1781,8 +1797,6 @@ async def _merge_edges_then_upsert(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
original_edges_count = len(edges_data)
|
|
||||||
|
|
||||||
new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")]
|
new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")]
|
||||||
|
|
||||||
storage_key = make_relation_chunk_key(src_id, tgt_id)
|
storage_key = make_relation_chunk_key(src_id, tgt_id)
|
||||||
|
|
@ -1799,6 +1813,7 @@ async def _merge_edges_then_upsert(
|
||||||
chunk_id for chunk_id in already_source_ids if chunk_id
|
chunk_id for chunk_id in already_source_ids if chunk_id
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# 2. Merge new source ids with existing ones
|
||||||
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
|
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
|
||||||
|
|
||||||
if relation_chunks_storage is not None and full_source_ids:
|
if relation_chunks_storage is not None and full_source_ids:
|
||||||
|
|
@ -1811,6 +1826,7 @@ async def _merge_edges_then_upsert(
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 3. Finalize source_id by applying source ids limit
|
||||||
limit_method = global_config.get("source_ids_limit_method")
|
limit_method = global_config.get("source_ids_limit_method")
|
||||||
max_source_limit = global_config.get("max_source_ids_per_relation")
|
max_source_limit = global_config.get("max_source_ids_per_relation")
|
||||||
source_ids = apply_source_ids_limit(
|
source_ids = apply_source_ids_limit(
|
||||||
|
|
@ -1823,13 +1839,13 @@ async def _merge_edges_then_upsert(
|
||||||
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
|
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only apply filtering in IGNORE_NEW mode
|
# 4. Only keep edges with source_id in the final source_ids list if in KEEP mode
|
||||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
||||||
allowed_source_ids = set(source_ids)
|
allowed_source_ids = set(source_ids)
|
||||||
filtered_edges = []
|
filtered_edges = []
|
||||||
for dp in edges_data:
|
for dp in edges_data:
|
||||||
source_id = dp.get("source_id")
|
source_id = dp.get("source_id")
|
||||||
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
|
# Skip relationship fragments sourced from chunks dropped by keep oldest cap
|
||||||
if (
|
if (
|
||||||
source_id
|
source_id
|
||||||
and source_id not in allowed_source_ids
|
and source_id not in allowed_source_ids
|
||||||
|
|
@ -1838,21 +1854,51 @@ async def _merge_edges_then_upsert(
|
||||||
continue
|
continue
|
||||||
filtered_edges.append(dp)
|
filtered_edges.append(dp)
|
||||||
edges_data = filtered_edges
|
edges_data = filtered_edges
|
||||||
else:
|
else: # In FIFO mode, keep all edges - truncation happens at source_ids level only
|
||||||
# In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only
|
|
||||||
edges_data = list(edges_data)
|
edges_data = list(edges_data)
|
||||||
|
|
||||||
skip_summary_due_to_limit = (
|
# 5. Check if we need to skip summary due to source_ids limit
|
||||||
|
if (
|
||||||
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
|
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
and len(existing_full_source_ids) >= max_source_limit
|
and len(existing_full_source_ids) >= max_source_limit
|
||||||
and not edges_data
|
and not edges_data
|
||||||
and already_description
|
):
|
||||||
)
|
if already_edge:
|
||||||
|
logger.info(
|
||||||
|
f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}"
|
||||||
|
)
|
||||||
|
existing_edge_data = dict(already_edge)
|
||||||
|
return existing_edge_data
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`"
|
||||||
|
)
|
||||||
|
raise ValueError(
|
||||||
|
f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`"
|
||||||
|
)
|
||||||
|
|
||||||
# Process edges_data with None checks
|
# 6.1 Finalize source_id
|
||||||
|
source_id = GRAPH_FIELD_SEP.join(source_ids)
|
||||||
|
|
||||||
|
# 6.2 Finalize weight by summing new edges and existing weights
|
||||||
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
|
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
|
||||||
|
|
||||||
# Deduplicate by description, keeping first occurrence
|
# 6.2 Finalize keywords by merging existing and new keywords
|
||||||
|
all_keywords = set()
|
||||||
|
# Process already_keywords (which are comma-separated)
|
||||||
|
for keyword_str in already_keywords:
|
||||||
|
if keyword_str: # Skip empty strings
|
||||||
|
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
|
||||||
|
# Process new keywords from edges_data
|
||||||
|
for edge in edges_data:
|
||||||
|
if edge.get("keywords"):
|
||||||
|
all_keywords.update(
|
||||||
|
k.strip() for k in edge["keywords"].split(",") if k.strip()
|
||||||
|
)
|
||||||
|
# Join all unique keywords with commas
|
||||||
|
keywords = ",".join(sorted(all_keywords))
|
||||||
|
|
||||||
|
# 7. Deduplicate by description, keeping first occurrence in the same document
|
||||||
unique_edges = {}
|
unique_edges = {}
|
||||||
for dp in edges_data:
|
for dp in edges_data:
|
||||||
description_value = dp.get("description")
|
description_value = dp.get("description")
|
||||||
|
|
@ -1868,170 +1914,153 @@ async def _merge_edges_then_upsert(
|
||||||
)
|
)
|
||||||
sorted_descriptions = [dp["description"] for dp in sorted_edges]
|
sorted_descriptions = [dp["description"] for dp in sorted_edges]
|
||||||
|
|
||||||
truncation_info = ""
|
|
||||||
dd_message = ""
|
|
||||||
has_placeholder = False # Initialize to track placeholder in file paths
|
|
||||||
|
|
||||||
# Combine already_description with sorted new descriptions
|
# Combine already_description with sorted new descriptions
|
||||||
description_list = already_description + sorted_descriptions
|
description_list = already_description + sorted_descriptions
|
||||||
deduplicated_num = original_edges_count - len(sorted_descriptions)
|
if not description_list:
|
||||||
if deduplicated_num > 0:
|
logger.error(f"Relation {src_id}~{tgt_id} has no description")
|
||||||
dd_message = f"dd:{deduplicated_num}"
|
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
|
||||||
|
|
||||||
num_fragment = len(description_list)
|
# 8. Get summary description an LLM usage status
|
||||||
already_fragment = len(already_description)
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
|
"Relation",
|
||||||
|
f"({src_id}, {tgt_id})",
|
||||||
|
description_list,
|
||||||
|
GRAPH_FIELD_SEP,
|
||||||
|
global_config,
|
||||||
|
llm_response_cache,
|
||||||
|
)
|
||||||
|
|
||||||
if skip_summary_due_to_limit:
|
# 9. Build file_path within MAX_FILE_PATHS limit
|
||||||
description = (
|
file_paths_list = []
|
||||||
already_edge.get("description", "(no description)")
|
seen_paths = set()
|
||||||
if already_edge
|
has_placeholder = False # Track if already_file_paths contains placeholder
|
||||||
else "(no description)"
|
|
||||||
|
max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS)
|
||||||
|
file_path_placeholder = global_config.get(
|
||||||
|
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect from already_file_paths, excluding placeholder
|
||||||
|
for fp in already_file_paths:
|
||||||
|
# Check if this is a placeholder record
|
||||||
|
if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders
|
||||||
|
has_placeholder = True
|
||||||
|
continue
|
||||||
|
if fp and fp not in seen_paths:
|
||||||
|
file_paths_list.append(fp)
|
||||||
|
seen_paths.add(fp)
|
||||||
|
|
||||||
|
# Collect from new data
|
||||||
|
for dp in edges_data:
|
||||||
|
file_path_item = dp.get("file_path")
|
||||||
|
if file_path_item and file_path_item not in seen_paths:
|
||||||
|
file_paths_list.append(file_path_item)
|
||||||
|
seen_paths.add(file_path_item)
|
||||||
|
|
||||||
|
# Apply count limit
|
||||||
|
max_file_paths = global_config.get("max_file_paths")
|
||||||
|
|
||||||
|
if len(file_paths_list) > max_file_paths:
|
||||||
|
limit_method = global_config.get(
|
||||||
|
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
)
|
)
|
||||||
llm_was_used = False
|
|
||||||
status_message = (
|
|
||||||
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
|
|
||||||
)
|
|
||||||
logger.debug(status_message)
|
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
pipeline_status["latest_message"] = status_message
|
|
||||||
pipeline_status["history_messages"].append(status_message)
|
|
||||||
elif num_fragment > 0:
|
|
||||||
# Get summary and LLM usage status
|
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
|
||||||
"Relation",
|
|
||||||
f"({src_id}, {tgt_id})",
|
|
||||||
description_list,
|
|
||||||
GRAPH_FIELD_SEP,
|
|
||||||
global_config,
|
|
||||||
llm_response_cache,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Log based on actual LLM usage
|
|
||||||
if llm_was_used:
|
|
||||||
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
|
|
||||||
else:
|
|
||||||
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
|
|
||||||
|
|
||||||
# Add truncation info from apply_source_ids_limit if truncation occurred
|
|
||||||
if len(source_ids) < len(full_source_ids):
|
|
||||||
# Add + sign if has_placeholder is True, indicating actual file count is higher
|
|
||||||
full_source_count_str = (
|
|
||||||
f"{len(full_source_ids)}+"
|
|
||||||
if has_placeholder
|
|
||||||
else str(len(full_source_ids))
|
|
||||||
)
|
|
||||||
truncation_info = (
|
|
||||||
f"{limit_method}:{len(source_ids)}/{full_source_count_str}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if dd_message or truncation_info:
|
|
||||||
status_message += f" ({', '.join([truncation_info, dd_message])})"
|
|
||||||
|
|
||||||
if already_fragment > 0 or llm_was_used:
|
|
||||||
logger.info(status_message)
|
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
pipeline_status["latest_message"] = status_message
|
|
||||||
pipeline_status["history_messages"].append(status_message)
|
|
||||||
else:
|
|
||||||
logger.debug(status_message)
|
|
||||||
|
|
||||||
else:
|
|
||||||
logger.error(f"Edge {src_id} - {tgt_id} has no description")
|
|
||||||
description = "(no description)"
|
|
||||||
|
|
||||||
# Split all existing and new keywords into individual terms, then combine and deduplicate
|
|
||||||
all_keywords = set()
|
|
||||||
# Process already_keywords (which are comma-separated)
|
|
||||||
for keyword_str in already_keywords:
|
|
||||||
if keyword_str: # Skip empty strings
|
|
||||||
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
|
|
||||||
# Process new keywords from edges_data
|
|
||||||
for edge in edges_data:
|
|
||||||
if edge.get("keywords"):
|
|
||||||
all_keywords.update(
|
|
||||||
k.strip() for k in edge["keywords"].split(",") if k.strip()
|
|
||||||
)
|
|
||||||
# Join all unique keywords with commas
|
|
||||||
keywords = ",".join(sorted(all_keywords))
|
|
||||||
|
|
||||||
source_id = GRAPH_FIELD_SEP.join(source_ids)
|
|
||||||
|
|
||||||
# Build file_path with count limit
|
|
||||||
if skip_summary_due_to_limit:
|
|
||||||
# Skip limit, keep original file_path
|
|
||||||
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
|
|
||||||
else:
|
|
||||||
# Collect and apply limit
|
|
||||||
file_paths_list = []
|
|
||||||
seen_paths = set()
|
|
||||||
has_placeholder = False # Track if already_file_paths contains placeholder
|
|
||||||
|
|
||||||
# Get placeholder to filter it out
|
|
||||||
file_path_placeholder = global_config.get(
|
file_path_placeholder = global_config.get(
|
||||||
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
||||||
)
|
)
|
||||||
|
|
||||||
# Collect from already_file_paths, excluding placeholder
|
# Add + sign to indicate actual file count is higher
|
||||||
for fp in already_file_paths:
|
original_count_str = (
|
||||||
# Check if this is a placeholder record
|
f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list))
|
||||||
if fp and fp.startswith(f"...{file_path_placeholder}"):
|
)
|
||||||
has_placeholder = True
|
|
||||||
continue
|
|
||||||
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
|
|
||||||
if fp and fp not in seen_paths:
|
|
||||||
file_paths_list.append(fp)
|
|
||||||
seen_paths.add(fp)
|
|
||||||
|
|
||||||
# Collect from new data
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
||||||
for dp in edges_data:
|
# FIFO: keep tail (newest), discard head
|
||||||
file_path_item = dp.get("file_path")
|
file_paths_list = file_paths_list[-max_file_paths:]
|
||||||
if file_path_item and file_path_item not in seen_paths:
|
file_paths_list.append(f"...{file_path_placeholder}...(FIFO)")
|
||||||
file_paths_list.append(file_path_item)
|
else:
|
||||||
seen_paths.add(file_path_item)
|
# KEEP: keep head (earliest), discard tail
|
||||||
|
file_paths_list = file_paths_list[:max_file_paths]
|
||||||
|
file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)")
|
||||||
|
|
||||||
# Apply count limit
|
logger.info(
|
||||||
max_file_paths = global_config.get("max_file_paths")
|
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})"
|
||||||
|
)
|
||||||
|
# Finalize file_path
|
||||||
|
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
|
||||||
|
|
||||||
if len(file_paths_list) > max_file_paths:
|
# 10. Log based on actual LLM usage
|
||||||
limit_method = global_config.get(
|
num_fragment = len(description_list)
|
||||||
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
already_fragment = len(already_description)
|
||||||
)
|
if llm_was_used:
|
||||||
file_path_placeholder = global_config.get(
|
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
|
||||||
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
|
else:
|
||||||
)
|
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
|
||||||
original_count = len(file_paths_list)
|
|
||||||
|
|
||||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
truncation_info = truncation_info_log = ""
|
||||||
# FIFO: keep tail (newest), discard head
|
if len(source_ids) < len(full_source_ids):
|
||||||
file_paths_list = file_paths_list[-max_file_paths:]
|
# Add truncation info from apply_source_ids_limit if truncation occurred
|
||||||
else:
|
truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}"
|
||||||
# KEEP: keep head (earliest), discard tail
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
||||||
file_paths_list = file_paths_list[:max_file_paths]
|
truncation_info = truncation_info_log
|
||||||
|
else:
|
||||||
|
truncation_info = "KEEP Old"
|
||||||
|
|
||||||
file_paths_list.append(
|
deduplicated_num = already_fragment + len(edges_data) - num_fragment
|
||||||
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
|
dd_message = ""
|
||||||
)
|
if deduplicated_num > 0:
|
||||||
logger.info(
|
# Duplicated description detected across multiple trucks for the same entity
|
||||||
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
|
dd_message = f"dd {deduplicated_num}"
|
||||||
)
|
|
||||||
|
|
||||||
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
|
if dd_message or truncation_info_log:
|
||||||
|
status_message += (
|
||||||
|
f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add message to pipeline satus when merge happens
|
||||||
|
if already_fragment > 0 or llm_was_used:
|
||||||
|
logger.info(status_message)
|
||||||
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = status_message
|
||||||
|
pipeline_status["history_messages"].append(status_message)
|
||||||
|
else:
|
||||||
|
logger.debug(status_message)
|
||||||
|
|
||||||
|
# 11. Update both graph and vector db
|
||||||
for need_insert_id in [src_id, tgt_id]:
|
for need_insert_id in [src_id, tgt_id]:
|
||||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||||
|
node_created_at = int(time.time())
|
||||||
node_data = {
|
node_data = {
|
||||||
"entity_id": need_insert_id,
|
"entity_id": need_insert_id,
|
||||||
"source_id": source_id,
|
"source_id": source_id,
|
||||||
"description": description,
|
"description": description,
|
||||||
"entity_type": "UNKNOWN",
|
"entity_type": "UNKNOWN",
|
||||||
"file_path": file_path,
|
"file_path": file_path,
|
||||||
"created_at": int(time.time()),
|
"created_at": node_created_at,
|
||||||
"truncate": "",
|
"truncate": "",
|
||||||
}
|
}
|
||||||
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
||||||
|
|
||||||
|
if entity_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||||
|
entity_content = f"{need_insert_id}\n{description}"
|
||||||
|
vdb_data = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"content": entity_content,
|
||||||
|
"entity_name": need_insert_id,
|
||||||
|
"source_id": source_id,
|
||||||
|
"entity_type": "UNKNOWN",
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||||
|
operation_name="added_entity_upsert",
|
||||||
|
entity_name=need_insert_id,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
# Track entities added during edge processing
|
# Track entities added during edge processing
|
||||||
if added_entities is not None:
|
if added_entities is not None:
|
||||||
entity_data = {
|
entity_data = {
|
||||||
|
|
@ -2040,10 +2069,11 @@ async def _merge_edges_then_upsert(
|
||||||
"description": description,
|
"description": description,
|
||||||
"source_id": source_id,
|
"source_id": source_id,
|
||||||
"file_path": file_path,
|
"file_path": file_path,
|
||||||
"created_at": int(time.time()),
|
"created_at": node_created_at,
|
||||||
}
|
}
|
||||||
added_entities.append(entity_data)
|
added_entities.append(entity_data)
|
||||||
|
|
||||||
|
edge_created_at = int(time.time())
|
||||||
await knowledge_graph_inst.upsert_edge(
|
await knowledge_graph_inst.upsert_edge(
|
||||||
src_id,
|
src_id,
|
||||||
tgt_id,
|
tgt_id,
|
||||||
|
|
@ -2053,7 +2083,7 @@ async def _merge_edges_then_upsert(
|
||||||
keywords=keywords,
|
keywords=keywords,
|
||||||
source_id=source_id,
|
source_id=source_id,
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
created_at=int(time.time()),
|
created_at=edge_created_at,
|
||||||
truncate=truncation_info,
|
truncate=truncation_info,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
@ -2065,10 +2095,41 @@ async def _merge_edges_then_upsert(
|
||||||
keywords=keywords,
|
keywords=keywords,
|
||||||
source_id=source_id,
|
source_id=source_id,
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
created_at=int(time.time()),
|
created_at=edge_created_at,
|
||||||
truncate=truncation_info,
|
truncate=truncation_info,
|
||||||
|
weight=weight,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if relationships_vdb is not None:
|
||||||
|
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
|
||||||
|
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
|
||||||
|
try:
|
||||||
|
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
|
||||||
|
)
|
||||||
|
rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}"
|
||||||
|
vdb_data = {
|
||||||
|
rel_vdb_id: {
|
||||||
|
"src_id": src_id,
|
||||||
|
"tgt_id": tgt_id,
|
||||||
|
"source_id": source_id,
|
||||||
|
"content": rel_content,
|
||||||
|
"keywords": keywords,
|
||||||
|
"description": description,
|
||||||
|
"weight": weight,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: relationships_vdb.upsert(payload),
|
||||||
|
operation_name="relationship_upsert",
|
||||||
|
entity_name=f"{src_id}-{tgt_id}",
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.2,
|
||||||
|
)
|
||||||
|
|
||||||
return edge_data
|
return edge_data
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -2158,12 +2219,12 @@ async def merge_nodes_and_edges(
|
||||||
[entity_name], namespace=namespace, enable_logging=False
|
[entity_name], namespace=namespace, enable_logging=False
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
logger.debug(f"Inserting {entity_name} in Graph")
|
logger.debug(f"Processing entity {entity_name}")
|
||||||
# Graph database operation (critical path, must succeed)
|
|
||||||
entity_data = await _merge_nodes_then_upsert(
|
entity_data = await _merge_nodes_then_upsert(
|
||||||
entity_name,
|
entity_name,
|
||||||
entities,
|
entities,
|
||||||
knowledge_graph_inst,
|
knowledge_graph_inst,
|
||||||
|
entity_vdb,
|
||||||
global_config,
|
global_config,
|
||||||
pipeline_status,
|
pipeline_status,
|
||||||
pipeline_status_lock,
|
pipeline_status_lock,
|
||||||
|
|
@ -2171,36 +2232,9 @@ async def merge_nodes_and_edges(
|
||||||
entity_chunks_storage,
|
entity_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Vector database operation (equally critical, must succeed)
|
|
||||||
if entity_vdb is not None and entity_data:
|
|
||||||
data_for_vdb = {
|
|
||||||
compute_mdhash_id(
|
|
||||||
str(entity_data["entity_name"]), prefix="ent-"
|
|
||||||
): {
|
|
||||||
"entity_name": entity_data["entity_name"],
|
|
||||||
"entity_type": entity_data["entity_type"],
|
|
||||||
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
|
|
||||||
"source_id": entity_data["source_id"],
|
|
||||||
"file_path": entity_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug(f"Inserting {entity_name} in Graph")
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda: entity_vdb.upsert(data_for_vdb),
|
|
||||||
operation_name="entity_upsert",
|
|
||||||
entity_name=entity_name,
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return entity_data
|
return entity_data
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Any database operation failure is critical
|
|
||||||
error_msg = (
|
error_msg = (
|
||||||
f"Critical error in entity processing for `{entity_name}`: {e}"
|
f"Critical error in entity processing for `{entity_name}`: {e}"
|
||||||
)
|
)
|
||||||
|
|
@ -2290,12 +2324,14 @@ async def merge_nodes_and_edges(
|
||||||
try:
|
try:
|
||||||
added_entities = [] # Track entities added during edge processing
|
added_entities = [] # Track entities added during edge processing
|
||||||
|
|
||||||
# Graph database operation (critical path, must succeed)
|
logger.debug(f"Processing relation {sorted_edge_key}")
|
||||||
edge_data = await _merge_edges_then_upsert(
|
edge_data = await _merge_edges_then_upsert(
|
||||||
edge_key[0],
|
edge_key[0],
|
||||||
edge_key[1],
|
edge_key[1],
|
||||||
edges,
|
edges,
|
||||||
knowledge_graph_inst,
|
knowledge_graph_inst,
|
||||||
|
relationships_vdb,
|
||||||
|
entity_vdb,
|
||||||
global_config,
|
global_config,
|
||||||
pipeline_status,
|
pipeline_status,
|
||||||
pipeline_status_lock,
|
pipeline_status_lock,
|
||||||
|
|
@ -2307,66 +2343,9 @@ async def merge_nodes_and_edges(
|
||||||
if edge_data is None:
|
if edge_data is None:
|
||||||
return None, []
|
return None, []
|
||||||
|
|
||||||
# Vector database operation (equally critical, must succeed)
|
|
||||||
if relationships_vdb is not None:
|
|
||||||
data_for_vdb = {
|
|
||||||
compute_mdhash_id(
|
|
||||||
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
|
|
||||||
): {
|
|
||||||
"src_id": edge_data["src_id"],
|
|
||||||
"tgt_id": edge_data["tgt_id"],
|
|
||||||
"keywords": edge_data["keywords"],
|
|
||||||
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
|
||||||
"source_id": edge_data["source_id"],
|
|
||||||
"file_path": edge_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
"weight": edge_data.get("weight", 1.0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda: relationships_vdb.upsert(data_for_vdb),
|
|
||||||
operation_name="relationship_upsert",
|
|
||||||
entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}",
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update added_entities to entity vector database using safe operation wrapper
|
|
||||||
if added_entities and entity_vdb is not None:
|
|
||||||
for entity_data in added_entities:
|
|
||||||
entity_vdb_id = compute_mdhash_id(
|
|
||||||
entity_data["entity_name"], prefix="ent-"
|
|
||||||
)
|
|
||||||
entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}"
|
|
||||||
|
|
||||||
vdb_data = {
|
|
||||||
entity_vdb_id: {
|
|
||||||
"content": entity_content,
|
|
||||||
"entity_name": entity_data["entity_name"],
|
|
||||||
"source_id": entity_data["source_id"],
|
|
||||||
"entity_type": entity_data["entity_type"],
|
|
||||||
"file_path": entity_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda data=vdb_data: entity_vdb.upsert(data),
|
|
||||||
operation_name="added_entity_upsert",
|
|
||||||
entity_name=entity_data["entity_name"],
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return edge_data, added_entities
|
return edge_data, added_entities
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Any database operation failure is critical
|
|
||||||
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
|
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
|
||||||
logger.error(error_msg)
|
logger.error(error_msg)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue