Refactor deduplication calculation and remove unused variables

This commit is contained in:
yangdx 2025-10-21 04:41:15 +08:00
parent 665f60b90f
commit 1154c5683f

View file

@ -1498,8 +1498,6 @@ async def _merge_nodes_then_upsert(
reverse=True, reverse=True,
)[0][0] # Get the entity type with the highest count )[0][0] # Get the entity type with the highest count
original_nodes_count = len(nodes_data)
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
existing_full_source_ids = [] existing_full_source_ids = []
@ -1584,12 +1582,12 @@ async def _merge_nodes_then_upsert(
# Combine already_description with sorted new sorted descriptions # Combine already_description with sorted new sorted descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_nodes_count - len(sorted_descriptions) num_fragment = len(description_list)
already_fragment = len(already_description)
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
if deduplicated_num > 0: if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}" dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list)
already_fragment = len(already_description)
if skip_summary_due_to_limit: if skip_summary_due_to_limit:
description = ( description = (
already_node.get("description", "(no description)") already_node.get("description", "(no description)")
@ -1818,8 +1816,6 @@ async def _merge_edges_then_upsert(
) )
) )
original_edges_count = len(edges_data)
new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")]
storage_key = make_relation_chunk_key(src_id, tgt_id) storage_key = make_relation_chunk_key(src_id, tgt_id)
@ -1911,12 +1907,12 @@ async def _merge_edges_then_upsert(
# Combine already_description with sorted new descriptions # Combine already_description with sorted new descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_edges_count - len(sorted_descriptions)
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list) num_fragment = len(description_list)
already_fragment = len(already_description) already_fragment = len(already_description)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
if skip_summary_due_to_limit: if skip_summary_due_to_limit:
description = ( description = (
@ -1924,9 +1920,7 @@ async def _merge_edges_then_upsert(
if already_edge if already_edge
else "(no description)" else "(no description)"
) )
status_message = ( status_message = f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached"
f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached"
)
logger.debug(status_message) logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock: