Refactor entity/relation merge to consolidate VDB operations within functions
• Move VDB upserts into merge functions • Fix early return data structure issues • Update status messages (IGNORE_NEW → KEEP) • Consolidate error handling paths • Improve relationship content format
This commit is contained in:
parent
e01c998ee9
commit
665f60b90f
1 changed files with 117 additions and 97 deletions
|
|
@ -1470,6 +1470,7 @@ async def _merge_nodes_then_upsert(
|
|||
entity_name: str,
|
||||
nodes_data: list[dict],
|
||||
knowledge_graph_inst: BaseGraphStorage,
|
||||
entity_vdb: BaseVectorStorage | None,
|
||||
global_config: dict,
|
||||
pipeline_status: dict = None,
|
||||
pipeline_status_lock=None,
|
||||
|
|
@ -1595,13 +1596,25 @@ async def _merge_nodes_then_upsert(
|
|||
if already_node
|
||||
else "(no description)"
|
||||
)
|
||||
llm_was_used = False
|
||||
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
|
||||
status_message = f"Skip merge for `{entity_name}`: KEEP limit reached"
|
||||
logger.debug(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
existing_node_data = dict(already_node or {})
|
||||
if not existing_node_data:
|
||||
existing_node_data = {
|
||||
"entity_id": entity_name,
|
||||
"entity_type": entity_type,
|
||||
"description": description,
|
||||
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
|
||||
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
|
||||
"created_at": int(time.time()),
|
||||
"truncate": "",
|
||||
}
|
||||
existing_node_data["entity_name"] = entity_name
|
||||
return existing_node_data
|
||||
elif num_fragment > 0:
|
||||
# Get summary and LLM usage status
|
||||
description, llm_was_used = await _handle_entity_relation_summary(
|
||||
|
|
@ -1726,6 +1739,25 @@ async def _merge_nodes_then_upsert(
|
|||
node_data=node_data,
|
||||
)
|
||||
node_data["entity_name"] = entity_name
|
||||
if entity_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-")
|
||||
entity_content = f"{entity_name}\n{description}"
|
||||
data_for_vdb = {
|
||||
entity_vdb_id: {
|
||||
"entity_name": entity_name,
|
||||
"entity_type": entity_type,
|
||||
"content": entity_content,
|
||||
"source_id": source_id,
|
||||
"file_path": file_path,
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload),
|
||||
operation_name="entity_upsert",
|
||||
entity_name=entity_name,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
return node_data
|
||||
|
||||
|
||||
|
|
@ -1734,6 +1766,8 @@ async def _merge_edges_then_upsert(
|
|||
tgt_id: str,
|
||||
edges_data: list[dict],
|
||||
knowledge_graph_inst: BaseGraphStorage,
|
||||
relationships_vdb: BaseVectorStorage | None,
|
||||
entity_vdb: BaseVectorStorage | None,
|
||||
global_config: dict,
|
||||
pipeline_status: dict = None,
|
||||
pipeline_status_lock=None,
|
||||
|
|
@ -1744,6 +1778,7 @@ async def _merge_edges_then_upsert(
|
|||
if src_id == tgt_id:
|
||||
return None
|
||||
|
||||
already_edge = None
|
||||
already_weights = []
|
||||
already_source_ids = []
|
||||
already_description = []
|
||||
|
|
@ -1825,13 +1860,13 @@ async def _merge_edges_then_upsert(
|
|||
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||
)
|
||||
|
||||
# Only apply filtering in IGNORE_NEW mode
|
||||
# Only apply filtering in KEEP(ignore new) mode
|
||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
||||
allowed_source_ids = set(source_ids)
|
||||
filtered_edges = []
|
||||
for dp in edges_data:
|
||||
source_id = dp.get("source_id")
|
||||
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
|
||||
# Skip relationship fragments sourced from chunks dropped by keep oldest cap
|
||||
if (
|
||||
source_id
|
||||
and source_id not in allowed_source_ids
|
||||
|
|
@ -1889,15 +1924,29 @@ async def _merge_edges_then_upsert(
|
|||
if already_edge
|
||||
else "(no description)"
|
||||
)
|
||||
llm_was_used = False
|
||||
status_message = (
|
||||
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
|
||||
f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached"
|
||||
)
|
||||
logger.debug(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
existing_edge_data = dict(already_edge or {})
|
||||
if not existing_edge_data:
|
||||
existing_edge_data = {
|
||||
"description": description,
|
||||
"keywords": GRAPH_FIELD_SEP.join(already_keywords),
|
||||
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
|
||||
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
|
||||
"weight": sum(already_weights) if already_weights else 0.0,
|
||||
"truncate": "",
|
||||
"created_at": int(time.time()),
|
||||
}
|
||||
existing_edge_data.setdefault("created_at", int(time.time()))
|
||||
existing_edge_data["src_id"] = src_id
|
||||
existing_edge_data["tgt_id"] = tgt_id
|
||||
return existing_edge_data
|
||||
elif num_fragment > 0:
|
||||
# Get summary and LLM usage status
|
||||
description, llm_was_used = await _handle_entity_relation_summary(
|
||||
|
|
@ -2025,17 +2074,38 @@ async def _merge_edges_then_upsert(
|
|||
|
||||
for need_insert_id in [src_id, tgt_id]:
|
||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||
node_created_at = int(time.time())
|
||||
node_data = {
|
||||
"entity_id": need_insert_id,
|
||||
"source_id": source_id,
|
||||
"description": description,
|
||||
"entity_type": "UNKNOWN",
|
||||
"file_path": file_path,
|
||||
"created_at": int(time.time()),
|
||||
"created_at": node_created_at,
|
||||
"truncate": "",
|
||||
}
|
||||
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
||||
|
||||
if entity_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||
entity_content = f"{need_insert_id}\n{description}"
|
||||
vdb_data = {
|
||||
entity_vdb_id: {
|
||||
"content": entity_content,
|
||||
"entity_name": need_insert_id,
|
||||
"source_id": source_id,
|
||||
"entity_type": "UNKNOWN",
|
||||
"file_path": file_path,
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||
operation_name="added_entity_upsert",
|
||||
entity_name=need_insert_id,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
# Track entities added during edge processing
|
||||
if added_entities is not None:
|
||||
entity_data = {
|
||||
|
|
@ -2044,10 +2114,11 @@ async def _merge_edges_then_upsert(
|
|||
"description": description,
|
||||
"source_id": source_id,
|
||||
"file_path": file_path,
|
||||
"created_at": int(time.time()),
|
||||
"created_at": node_created_at,
|
||||
}
|
||||
added_entities.append(entity_data)
|
||||
|
||||
edge_created_at = int(time.time())
|
||||
await knowledge_graph_inst.upsert_edge(
|
||||
src_id,
|
||||
tgt_id,
|
||||
|
|
@ -2057,7 +2128,7 @@ async def _merge_edges_then_upsert(
|
|||
keywords=keywords,
|
||||
source_id=source_id,
|
||||
file_path=file_path,
|
||||
created_at=int(time.time()),
|
||||
created_at=edge_created_at,
|
||||
truncate=truncation_info,
|
||||
),
|
||||
)
|
||||
|
|
@ -2069,10 +2140,41 @@ async def _merge_edges_then_upsert(
|
|||
keywords=keywords,
|
||||
source_id=source_id,
|
||||
file_path=file_path,
|
||||
created_at=int(time.time()),
|
||||
created_at=edge_created_at,
|
||||
truncate=truncation_info,
|
||||
weight=weight,
|
||||
)
|
||||
|
||||
if relationships_vdb is not None:
|
||||
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
|
||||
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
|
||||
try:
|
||||
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
|
||||
)
|
||||
rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}"
|
||||
vdb_data = {
|
||||
rel_vdb_id: {
|
||||
"src_id": src_id,
|
||||
"tgt_id": tgt_id,
|
||||
"source_id": source_id,
|
||||
"content": rel_content,
|
||||
"keywords": keywords,
|
||||
"description": description,
|
||||
"weight": weight,
|
||||
"file_path": file_path,
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=vdb_data: relationships_vdb.upsert(payload),
|
||||
operation_name="relationship_upsert",
|
||||
entity_name=f"{src_id}-{tgt_id}",
|
||||
max_retries=3,
|
||||
retry_delay=0.2,
|
||||
)
|
||||
|
||||
return edge_data
|
||||
|
||||
|
||||
|
|
@ -2162,12 +2264,12 @@ async def merge_nodes_and_edges(
|
|||
[entity_name], namespace=namespace, enable_logging=False
|
||||
):
|
||||
try:
|
||||
logger.debug(f"Inserting {entity_name} in Graph")
|
||||
# Graph database operation (critical path, must succeed)
|
||||
logger.debug(f"Processing entity {entity_name}")
|
||||
entity_data = await _merge_nodes_then_upsert(
|
||||
entity_name,
|
||||
entities,
|
||||
knowledge_graph_inst,
|
||||
entity_vdb,
|
||||
global_config,
|
||||
pipeline_status,
|
||||
pipeline_status_lock,
|
||||
|
|
@ -2175,36 +2277,9 @@ async def merge_nodes_and_edges(
|
|||
entity_chunks_storage,
|
||||
)
|
||||
|
||||
# Vector database operation (equally critical, must succeed)
|
||||
if entity_vdb is not None and entity_data:
|
||||
data_for_vdb = {
|
||||
compute_mdhash_id(
|
||||
str(entity_data["entity_name"]), prefix="ent-"
|
||||
): {
|
||||
"entity_name": entity_data["entity_name"],
|
||||
"entity_type": entity_data["entity_type"],
|
||||
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
|
||||
"source_id": entity_data["source_id"],
|
||||
"file_path": entity_data.get(
|
||||
"file_path", "unknown_source"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(f"Inserting {entity_name} in Graph")
|
||||
# Use safe operation wrapper - VDB failure must throw exception
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda: entity_vdb.upsert(data_for_vdb),
|
||||
operation_name="entity_upsert",
|
||||
entity_name=entity_name,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
return entity_data
|
||||
|
||||
except Exception as e:
|
||||
# Any database operation failure is critical
|
||||
error_msg = (
|
||||
f"Critical error in entity processing for `{entity_name}`: {e}"
|
||||
)
|
||||
|
|
@ -2294,12 +2369,14 @@ async def merge_nodes_and_edges(
|
|||
try:
|
||||
added_entities = [] # Track entities added during edge processing
|
||||
|
||||
# Graph database operation (critical path, must succeed)
|
||||
logger.debug(f"Processing relation {sorted_edge_key}")
|
||||
edge_data = await _merge_edges_then_upsert(
|
||||
edge_key[0],
|
||||
edge_key[1],
|
||||
edges,
|
||||
knowledge_graph_inst,
|
||||
relationships_vdb,
|
||||
entity_vdb,
|
||||
global_config,
|
||||
pipeline_status,
|
||||
pipeline_status_lock,
|
||||
|
|
@ -2311,66 +2388,9 @@ async def merge_nodes_and_edges(
|
|||
if edge_data is None:
|
||||
return None, []
|
||||
|
||||
# Vector database operation (equally critical, must succeed)
|
||||
if relationships_vdb is not None:
|
||||
data_for_vdb = {
|
||||
compute_mdhash_id(
|
||||
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
|
||||
): {
|
||||
"src_id": edge_data["src_id"],
|
||||
"tgt_id": edge_data["tgt_id"],
|
||||
"keywords": edge_data["keywords"],
|
||||
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
||||
"source_id": edge_data["source_id"],
|
||||
"file_path": edge_data.get(
|
||||
"file_path", "unknown_source"
|
||||
),
|
||||
"weight": edge_data.get("weight", 1.0),
|
||||
}
|
||||
}
|
||||
|
||||
# Use safe operation wrapper - VDB failure must throw exception
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda: relationships_vdb.upsert(data_for_vdb),
|
||||
operation_name="relationship_upsert",
|
||||
entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}",
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
# Update added_entities to entity vector database using safe operation wrapper
|
||||
if added_entities and entity_vdb is not None:
|
||||
for entity_data in added_entities:
|
||||
entity_vdb_id = compute_mdhash_id(
|
||||
entity_data["entity_name"], prefix="ent-"
|
||||
)
|
||||
entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}"
|
||||
|
||||
vdb_data = {
|
||||
entity_vdb_id: {
|
||||
"content": entity_content,
|
||||
"entity_name": entity_data["entity_name"],
|
||||
"source_id": entity_data["source_id"],
|
||||
"entity_type": entity_data["entity_type"],
|
||||
"file_path": entity_data.get(
|
||||
"file_path", "unknown_source"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
# Use safe operation wrapper - VDB failure must throw exception
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda data=vdb_data: entity_vdb.upsert(data),
|
||||
operation_name="added_entity_upsert",
|
||||
entity_name=entity_data["entity_name"],
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
return edge_data, added_entities
|
||||
|
||||
except Exception as e:
|
||||
# Any database operation failure is critical
|
||||
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
|
||||
logger.error(error_msg)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue