Refactor entity/relation merge to consolidate VDB operations within functions

• Move VDB upserts into merge functions
• Fix early return data structure issues
• Update status messages (IGNORE_NEW → KEEP)
• Consolidate error handling paths
• Improve relationship content format
This commit is contained in:
yangdx 2025-10-21 03:19:34 +08:00
parent e01c998ee9
commit 665f60b90f

View file

@ -1470,6 +1470,7 @@ async def _merge_nodes_then_upsert(
entity_name: str,
nodes_data: list[dict],
knowledge_graph_inst: BaseGraphStorage,
entity_vdb: BaseVectorStorage | None,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
@ -1595,13 +1596,25 @@ async def _merge_nodes_then_upsert(
if already_node
else "(no description)"
)
llm_was_used = False
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
status_message = f"Skip merge for `{entity_name}`: KEEP limit reached"
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
existing_node_data = dict(already_node or {})
if not existing_node_data:
existing_node_data = {
"entity_id": entity_name,
"entity_type": entity_type,
"description": description,
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
"created_at": int(time.time()),
"truncate": "",
}
existing_node_data["entity_name"] = entity_name
return existing_node_data
elif num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
@ -1726,6 +1739,25 @@ async def _merge_nodes_then_upsert(
node_data=node_data,
)
node_data["entity_name"] = entity_name
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-")
entity_content = f"{entity_name}\n{description}"
data_for_vdb = {
entity_vdb_id: {
"entity_name": entity_name,
"entity_type": entity_type,
"content": entity_content,
"source_id": source_id,
"file_path": file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload),
operation_name="entity_upsert",
entity_name=entity_name,
max_retries=3,
retry_delay=0.1,
)
return node_data
@ -1734,6 +1766,8 @@ async def _merge_edges_then_upsert(
tgt_id: str,
edges_data: list[dict],
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage | None,
entity_vdb: BaseVectorStorage | None,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
@ -1744,6 +1778,7 @@ async def _merge_edges_then_upsert(
if src_id == tgt_id:
return None
already_edge = None
already_weights = []
already_source_ids = []
already_description = []
@ -1825,13 +1860,13 @@ async def _merge_edges_then_upsert(
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
# Only apply filtering in IGNORE_NEW mode
# Only apply filtering in KEEP(ignore new) mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_edges = []
for dp in edges_data:
source_id = dp.get("source_id")
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
# Skip relationship fragments sourced from chunks dropped by keep oldest cap
if (
source_id
and source_id not in allowed_source_ids
@ -1889,15 +1924,29 @@ async def _merge_edges_then_upsert(
if already_edge
else "(no description)"
)
llm_was_used = False
status_message = (
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached"
)
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
existing_edge_data = dict(already_edge or {})
if not existing_edge_data:
existing_edge_data = {
"description": description,
"keywords": GRAPH_FIELD_SEP.join(already_keywords),
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
"weight": sum(already_weights) if already_weights else 0.0,
"truncate": "",
"created_at": int(time.time()),
}
existing_edge_data.setdefault("created_at", int(time.time()))
existing_edge_data["src_id"] = src_id
existing_edge_data["tgt_id"] = tgt_id
return existing_edge_data
elif num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
@ -2025,17 +2074,38 @@ async def _merge_edges_then_upsert(
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
node_created_at = int(time.time())
node_data = {
"entity_id": need_insert_id,
"source_id": source_id,
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
"created_at": node_created_at,
"truncate": "",
}
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = f"{need_insert_id}\n{description}"
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": need_insert_id,
"source_id": source_id,
"entity_type": "UNKNOWN",
"file_path": file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
operation_name="added_entity_upsert",
entity_name=need_insert_id,
max_retries=3,
retry_delay=0.1,
)
# Track entities added during edge processing
if added_entities is not None:
entity_data = {
@ -2044,10 +2114,11 @@ async def _merge_edges_then_upsert(
"description": description,
"source_id": source_id,
"file_path": file_path,
"created_at": int(time.time()),
"created_at": node_created_at,
}
added_entities.append(entity_data)
edge_created_at = int(time.time())
await knowledge_graph_inst.upsert_edge(
src_id,
tgt_id,
@ -2057,7 +2128,7 @@ async def _merge_edges_then_upsert(
keywords=keywords,
source_id=source_id,
file_path=file_path,
created_at=int(time.time()),
created_at=edge_created_at,
truncate=truncation_info,
),
)
@ -2069,10 +2140,41 @@ async def _merge_edges_then_upsert(
keywords=keywords,
source_id=source_id,
file_path=file_path,
created_at=int(time.time()),
created_at=edge_created_at,
truncate=truncation_info,
weight=weight,
)
if relationships_vdb is not None:
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
try:
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
except Exception as e:
logger.debug(
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
)
rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}"
vdb_data = {
rel_vdb_id: {
"src_id": src_id,
"tgt_id": tgt_id,
"source_id": source_id,
"content": rel_content,
"keywords": keywords,
"description": description,
"weight": weight,
"file_path": file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: relationships_vdb.upsert(payload),
operation_name="relationship_upsert",
entity_name=f"{src_id}-{tgt_id}",
max_retries=3,
retry_delay=0.2,
)
return edge_data
@ -2162,12 +2264,12 @@ async def merge_nodes_and_edges(
[entity_name], namespace=namespace, enable_logging=False
):
try:
logger.debug(f"Inserting {entity_name} in Graph")
# Graph database operation (critical path, must succeed)
logger.debug(f"Processing entity {entity_name}")
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
entity_vdb,
global_config,
pipeline_status,
pipeline_status_lock,
@ -2175,36 +2277,9 @@ async def merge_nodes_and_edges(
entity_chunks_storage,
)
# Vector database operation (equally critical, must succeed)
if entity_vdb is not None and entity_data:
data_for_vdb = {
compute_mdhash_id(
str(entity_data["entity_name"]), prefix="ent-"
): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get(
"file_path", "unknown_source"
),
}
}
logger.debug(f"Inserting {entity_name} in Graph")
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: entity_vdb.upsert(data_for_vdb),
operation_name="entity_upsert",
entity_name=entity_name,
max_retries=3,
retry_delay=0.1,
)
return entity_data
except Exception as e:
# Any database operation failure is critical
error_msg = (
f"Critical error in entity processing for `{entity_name}`: {e}"
)
@ -2294,12 +2369,14 @@ async def merge_nodes_and_edges(
try:
added_entities = [] # Track entities added during edge processing
# Graph database operation (critical path, must succeed)
logger.debug(f"Processing relation {sorted_edge_key}")
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
relationships_vdb,
entity_vdb,
global_config,
pipeline_status,
pipeline_status_lock,
@ -2311,66 +2388,9 @@ async def merge_nodes_and_edges(
if edge_data is None:
return None, []
# Vector database operation (equally critical, must succeed)
if relationships_vdb is not None:
data_for_vdb = {
compute_mdhash_id(
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
): {
"src_id": edge_data["src_id"],
"tgt_id": edge_data["tgt_id"],
"keywords": edge_data["keywords"],
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
"source_id": edge_data["source_id"],
"file_path": edge_data.get(
"file_path", "unknown_source"
),
"weight": edge_data.get("weight", 1.0),
}
}
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda: relationships_vdb.upsert(data_for_vdb),
operation_name="relationship_upsert",
entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}",
max_retries=3,
retry_delay=0.1,
)
# Update added_entities to entity vector database using safe operation wrapper
if added_entities and entity_vdb is not None:
for entity_data in added_entities:
entity_vdb_id = compute_mdhash_id(
entity_data["entity_name"], prefix="ent-"
)
entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}"
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": entity_data["entity_name"],
"source_id": entity_data["source_id"],
"entity_type": entity_data["entity_type"],
"file_path": entity_data.get(
"file_path", "unknown_source"
),
}
}
# Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception(
operation=lambda data=vdb_data: entity_vdb.upsert(data),
operation_name="added_entity_upsert",
entity_name=entity_data["entity_name"],
max_retries=3,
retry_delay=0.1,
)
return edge_data, added_entities
except Exception as e:
# Any database operation failure is critical
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
logger.error(error_msg)