Refactor entity/relation merge to consolidate VDB operations within functions
• Move VDB upserts into merge functions • Fix early return data structure issues • Update status messages (IGNORE_NEW → KEEP) • Consolidate error handling paths • Improve relationship content format
This commit is contained in:
parent
e01c998ee9
commit
665f60b90f
1 changed files with 117 additions and 97 deletions
|
|
@ -1470,6 +1470,7 @@ async def _merge_nodes_then_upsert(
|
||||||
entity_name: str,
|
entity_name: str,
|
||||||
nodes_data: list[dict],
|
nodes_data: list[dict],
|
||||||
knowledge_graph_inst: BaseGraphStorage,
|
knowledge_graph_inst: BaseGraphStorage,
|
||||||
|
entity_vdb: BaseVectorStorage | None,
|
||||||
global_config: dict,
|
global_config: dict,
|
||||||
pipeline_status: dict = None,
|
pipeline_status: dict = None,
|
||||||
pipeline_status_lock=None,
|
pipeline_status_lock=None,
|
||||||
|
|
@ -1595,13 +1596,25 @@ async def _merge_nodes_then_upsert(
|
||||||
if already_node
|
if already_node
|
||||||
else "(no description)"
|
else "(no description)"
|
||||||
)
|
)
|
||||||
llm_was_used = False
|
status_message = f"Skip merge for `{entity_name}`: KEEP limit reached"
|
||||||
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
|
|
||||||
logger.debug(status_message)
|
logger.debug(status_message)
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = status_message
|
pipeline_status["latest_message"] = status_message
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
|
existing_node_data = dict(already_node or {})
|
||||||
|
if not existing_node_data:
|
||||||
|
existing_node_data = {
|
||||||
|
"entity_id": entity_name,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
"description": description,
|
||||||
|
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
|
||||||
|
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
|
||||||
|
"created_at": int(time.time()),
|
||||||
|
"truncate": "",
|
||||||
|
}
|
||||||
|
existing_node_data["entity_name"] = entity_name
|
||||||
|
return existing_node_data
|
||||||
elif num_fragment > 0:
|
elif num_fragment > 0:
|
||||||
# Get summary and LLM usage status
|
# Get summary and LLM usage status
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
|
|
@ -1726,6 +1739,25 @@ async def _merge_nodes_then_upsert(
|
||||||
node_data=node_data,
|
node_data=node_data,
|
||||||
)
|
)
|
||||||
node_data["entity_name"] = entity_name
|
node_data["entity_name"] = entity_name
|
||||||
|
if entity_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-")
|
||||||
|
entity_content = f"{entity_name}\n{description}"
|
||||||
|
data_for_vdb = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"entity_name": entity_name,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
"content": entity_content,
|
||||||
|
"source_id": source_id,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload),
|
||||||
|
operation_name="entity_upsert",
|
||||||
|
entity_name=entity_name,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
return node_data
|
return node_data
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1734,6 +1766,8 @@ async def _merge_edges_then_upsert(
|
||||||
tgt_id: str,
|
tgt_id: str,
|
||||||
edges_data: list[dict],
|
edges_data: list[dict],
|
||||||
knowledge_graph_inst: BaseGraphStorage,
|
knowledge_graph_inst: BaseGraphStorage,
|
||||||
|
relationships_vdb: BaseVectorStorage | None,
|
||||||
|
entity_vdb: BaseVectorStorage | None,
|
||||||
global_config: dict,
|
global_config: dict,
|
||||||
pipeline_status: dict = None,
|
pipeline_status: dict = None,
|
||||||
pipeline_status_lock=None,
|
pipeline_status_lock=None,
|
||||||
|
|
@ -1744,6 +1778,7 @@ async def _merge_edges_then_upsert(
|
||||||
if src_id == tgt_id:
|
if src_id == tgt_id:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
already_edge = None
|
||||||
already_weights = []
|
already_weights = []
|
||||||
already_source_ids = []
|
already_source_ids = []
|
||||||
already_description = []
|
already_description = []
|
||||||
|
|
@ -1825,13 +1860,13 @@ async def _merge_edges_then_upsert(
|
||||||
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
|
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||||
)
|
)
|
||||||
|
|
||||||
# Only apply filtering in IGNORE_NEW mode
|
# Only apply filtering in KEEP(ignore new) mode
|
||||||
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
|
||||||
allowed_source_ids = set(source_ids)
|
allowed_source_ids = set(source_ids)
|
||||||
filtered_edges = []
|
filtered_edges = []
|
||||||
for dp in edges_data:
|
for dp in edges_data:
|
||||||
source_id = dp.get("source_id")
|
source_id = dp.get("source_id")
|
||||||
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
|
# Skip relationship fragments sourced from chunks dropped by keep oldest cap
|
||||||
if (
|
if (
|
||||||
source_id
|
source_id
|
||||||
and source_id not in allowed_source_ids
|
and source_id not in allowed_source_ids
|
||||||
|
|
@ -1889,15 +1924,29 @@ async def _merge_edges_then_upsert(
|
||||||
if already_edge
|
if already_edge
|
||||||
else "(no description)"
|
else "(no description)"
|
||||||
)
|
)
|
||||||
llm_was_used = False
|
|
||||||
status_message = (
|
status_message = (
|
||||||
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
|
f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached"
|
||||||
)
|
)
|
||||||
logger.debug(status_message)
|
logger.debug(status_message)
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = status_message
|
pipeline_status["latest_message"] = status_message
|
||||||
pipeline_status["history_messages"].append(status_message)
|
pipeline_status["history_messages"].append(status_message)
|
||||||
|
existing_edge_data = dict(already_edge or {})
|
||||||
|
if not existing_edge_data:
|
||||||
|
existing_edge_data = {
|
||||||
|
"description": description,
|
||||||
|
"keywords": GRAPH_FIELD_SEP.join(already_keywords),
|
||||||
|
"source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids),
|
||||||
|
"file_path": GRAPH_FIELD_SEP.join(already_file_paths),
|
||||||
|
"weight": sum(already_weights) if already_weights else 0.0,
|
||||||
|
"truncate": "",
|
||||||
|
"created_at": int(time.time()),
|
||||||
|
}
|
||||||
|
existing_edge_data.setdefault("created_at", int(time.time()))
|
||||||
|
existing_edge_data["src_id"] = src_id
|
||||||
|
existing_edge_data["tgt_id"] = tgt_id
|
||||||
|
return existing_edge_data
|
||||||
elif num_fragment > 0:
|
elif num_fragment > 0:
|
||||||
# Get summary and LLM usage status
|
# Get summary and LLM usage status
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
|
|
@ -2025,17 +2074,38 @@ async def _merge_edges_then_upsert(
|
||||||
|
|
||||||
for need_insert_id in [src_id, tgt_id]:
|
for need_insert_id in [src_id, tgt_id]:
|
||||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||||
|
node_created_at = int(time.time())
|
||||||
node_data = {
|
node_data = {
|
||||||
"entity_id": need_insert_id,
|
"entity_id": need_insert_id,
|
||||||
"source_id": source_id,
|
"source_id": source_id,
|
||||||
"description": description,
|
"description": description,
|
||||||
"entity_type": "UNKNOWN",
|
"entity_type": "UNKNOWN",
|
||||||
"file_path": file_path,
|
"file_path": file_path,
|
||||||
"created_at": int(time.time()),
|
"created_at": node_created_at,
|
||||||
"truncate": "",
|
"truncate": "",
|
||||||
}
|
}
|
||||||
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
|
||||||
|
|
||||||
|
if entity_vdb is not None:
|
||||||
|
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||||
|
entity_content = f"{need_insert_id}\n{description}"
|
||||||
|
vdb_data = {
|
||||||
|
entity_vdb_id: {
|
||||||
|
"content": entity_content,
|
||||||
|
"entity_name": need_insert_id,
|
||||||
|
"source_id": source_id,
|
||||||
|
"entity_type": "UNKNOWN",
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||||
|
operation_name="added_entity_upsert",
|
||||||
|
entity_name=need_insert_id,
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
# Track entities added during edge processing
|
# Track entities added during edge processing
|
||||||
if added_entities is not None:
|
if added_entities is not None:
|
||||||
entity_data = {
|
entity_data = {
|
||||||
|
|
@ -2044,10 +2114,11 @@ async def _merge_edges_then_upsert(
|
||||||
"description": description,
|
"description": description,
|
||||||
"source_id": source_id,
|
"source_id": source_id,
|
||||||
"file_path": file_path,
|
"file_path": file_path,
|
||||||
"created_at": int(time.time()),
|
"created_at": node_created_at,
|
||||||
}
|
}
|
||||||
added_entities.append(entity_data)
|
added_entities.append(entity_data)
|
||||||
|
|
||||||
|
edge_created_at = int(time.time())
|
||||||
await knowledge_graph_inst.upsert_edge(
|
await knowledge_graph_inst.upsert_edge(
|
||||||
src_id,
|
src_id,
|
||||||
tgt_id,
|
tgt_id,
|
||||||
|
|
@ -2057,7 +2128,7 @@ async def _merge_edges_then_upsert(
|
||||||
keywords=keywords,
|
keywords=keywords,
|
||||||
source_id=source_id,
|
source_id=source_id,
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
created_at=int(time.time()),
|
created_at=edge_created_at,
|
||||||
truncate=truncation_info,
|
truncate=truncation_info,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
@ -2069,10 +2140,41 @@ async def _merge_edges_then_upsert(
|
||||||
keywords=keywords,
|
keywords=keywords,
|
||||||
source_id=source_id,
|
source_id=source_id,
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
created_at=int(time.time()),
|
created_at=edge_created_at,
|
||||||
truncate=truncation_info,
|
truncate=truncation_info,
|
||||||
|
weight=weight,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if relationships_vdb is not None:
|
||||||
|
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
|
||||||
|
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
|
||||||
|
try:
|
||||||
|
await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse])
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}"
|
||||||
|
)
|
||||||
|
rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}"
|
||||||
|
vdb_data = {
|
||||||
|
rel_vdb_id: {
|
||||||
|
"src_id": src_id,
|
||||||
|
"tgt_id": tgt_id,
|
||||||
|
"source_id": source_id,
|
||||||
|
"content": rel_content,
|
||||||
|
"keywords": keywords,
|
||||||
|
"description": description,
|
||||||
|
"weight": weight,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await safe_vdb_operation_with_exception(
|
||||||
|
operation=lambda payload=vdb_data: relationships_vdb.upsert(payload),
|
||||||
|
operation_name="relationship_upsert",
|
||||||
|
entity_name=f"{src_id}-{tgt_id}",
|
||||||
|
max_retries=3,
|
||||||
|
retry_delay=0.2,
|
||||||
|
)
|
||||||
|
|
||||||
return edge_data
|
return edge_data
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -2162,12 +2264,12 @@ async def merge_nodes_and_edges(
|
||||||
[entity_name], namespace=namespace, enable_logging=False
|
[entity_name], namespace=namespace, enable_logging=False
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
logger.debug(f"Inserting {entity_name} in Graph")
|
logger.debug(f"Processing entity {entity_name}")
|
||||||
# Graph database operation (critical path, must succeed)
|
|
||||||
entity_data = await _merge_nodes_then_upsert(
|
entity_data = await _merge_nodes_then_upsert(
|
||||||
entity_name,
|
entity_name,
|
||||||
entities,
|
entities,
|
||||||
knowledge_graph_inst,
|
knowledge_graph_inst,
|
||||||
|
entity_vdb,
|
||||||
global_config,
|
global_config,
|
||||||
pipeline_status,
|
pipeline_status,
|
||||||
pipeline_status_lock,
|
pipeline_status_lock,
|
||||||
|
|
@ -2175,36 +2277,9 @@ async def merge_nodes_and_edges(
|
||||||
entity_chunks_storage,
|
entity_chunks_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Vector database operation (equally critical, must succeed)
|
|
||||||
if entity_vdb is not None and entity_data:
|
|
||||||
data_for_vdb = {
|
|
||||||
compute_mdhash_id(
|
|
||||||
str(entity_data["entity_name"]), prefix="ent-"
|
|
||||||
): {
|
|
||||||
"entity_name": entity_data["entity_name"],
|
|
||||||
"entity_type": entity_data["entity_type"],
|
|
||||||
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
|
|
||||||
"source_id": entity_data["source_id"],
|
|
||||||
"file_path": entity_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug(f"Inserting {entity_name} in Graph")
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda: entity_vdb.upsert(data_for_vdb),
|
|
||||||
operation_name="entity_upsert",
|
|
||||||
entity_name=entity_name,
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return entity_data
|
return entity_data
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Any database operation failure is critical
|
|
||||||
error_msg = (
|
error_msg = (
|
||||||
f"Critical error in entity processing for `{entity_name}`: {e}"
|
f"Critical error in entity processing for `{entity_name}`: {e}"
|
||||||
)
|
)
|
||||||
|
|
@ -2294,12 +2369,14 @@ async def merge_nodes_and_edges(
|
||||||
try:
|
try:
|
||||||
added_entities = [] # Track entities added during edge processing
|
added_entities = [] # Track entities added during edge processing
|
||||||
|
|
||||||
# Graph database operation (critical path, must succeed)
|
logger.debug(f"Processing relation {sorted_edge_key}")
|
||||||
edge_data = await _merge_edges_then_upsert(
|
edge_data = await _merge_edges_then_upsert(
|
||||||
edge_key[0],
|
edge_key[0],
|
||||||
edge_key[1],
|
edge_key[1],
|
||||||
edges,
|
edges,
|
||||||
knowledge_graph_inst,
|
knowledge_graph_inst,
|
||||||
|
relationships_vdb,
|
||||||
|
entity_vdb,
|
||||||
global_config,
|
global_config,
|
||||||
pipeline_status,
|
pipeline_status,
|
||||||
pipeline_status_lock,
|
pipeline_status_lock,
|
||||||
|
|
@ -2311,66 +2388,9 @@ async def merge_nodes_and_edges(
|
||||||
if edge_data is None:
|
if edge_data is None:
|
||||||
return None, []
|
return None, []
|
||||||
|
|
||||||
# Vector database operation (equally critical, must succeed)
|
|
||||||
if relationships_vdb is not None:
|
|
||||||
data_for_vdb = {
|
|
||||||
compute_mdhash_id(
|
|
||||||
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
|
|
||||||
): {
|
|
||||||
"src_id": edge_data["src_id"],
|
|
||||||
"tgt_id": edge_data["tgt_id"],
|
|
||||||
"keywords": edge_data["keywords"],
|
|
||||||
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
|
||||||
"source_id": edge_data["source_id"],
|
|
||||||
"file_path": edge_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
"weight": edge_data.get("weight", 1.0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda: relationships_vdb.upsert(data_for_vdb),
|
|
||||||
operation_name="relationship_upsert",
|
|
||||||
entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}",
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update added_entities to entity vector database using safe operation wrapper
|
|
||||||
if added_entities and entity_vdb is not None:
|
|
||||||
for entity_data in added_entities:
|
|
||||||
entity_vdb_id = compute_mdhash_id(
|
|
||||||
entity_data["entity_name"], prefix="ent-"
|
|
||||||
)
|
|
||||||
entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}"
|
|
||||||
|
|
||||||
vdb_data = {
|
|
||||||
entity_vdb_id: {
|
|
||||||
"content": entity_content,
|
|
||||||
"entity_name": entity_data["entity_name"],
|
|
||||||
"source_id": entity_data["source_id"],
|
|
||||||
"entity_type": entity_data["entity_type"],
|
|
||||||
"file_path": entity_data.get(
|
|
||||||
"file_path", "unknown_source"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Use safe operation wrapper - VDB failure must throw exception
|
|
||||||
await safe_vdb_operation_with_exception(
|
|
||||||
operation=lambda data=vdb_data: entity_vdb.upsert(data),
|
|
||||||
operation_name="added_entity_upsert",
|
|
||||||
entity_name=entity_data["entity_name"],
|
|
||||||
max_retries=3,
|
|
||||||
retry_delay=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return edge_data, added_entities
|
return edge_data, added_entities
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Any database operation failure is critical
|
|
||||||
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
|
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
|
||||||
logger.error(error_msg)
|
logger.error(error_msg)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue