From ab32456a799c79fbeef401d2b353ecf7891a11d0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 27 Oct 2025 00:04:17 +0800 Subject: [PATCH] Refactor entity merging with unified attribute merge function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Update GRAPH_FIELD_SEP comment clarity • Deprecate merge_strategy parameter • Unify entity/relation merge logic • Add join_unique_comma strategy --- lightrag/constants.py | 2 +- lightrag/utils_graph.py | 137 +++++++++++++++++----------------------- 2 files changed, 58 insertions(+), 81 deletions(-) diff --git a/lightrag/constants.py b/lightrag/constants.py index c040e0ac..eedecd65 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [ "NaturalObject", ] -# Separator for graph fields +# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted) GRAPH_FIELD_SEP = "" # Query and retrieval configuration defaults diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 6e3a52f8..da7da5a9 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1050,12 +1050,8 @@ async def amerge_entities( relationships_vdb: Vector database storage for relationships source_entities: List of source entity names to merge target_entity: Name of the target entity after merging - merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} - Supported strategies: - - "concatenate": Concatenate all values (for text fields) - - "keep_first": Keep the first non-empty value - - "keep_last": Keep the last non-empty value - - "join_unique": Join all unique values (for fields separated by delimiter) + merge_strategy: Deprecated (Each field uses its own default strategy). If provided, + customizations are applied but a warning is logged. target_entity_data: Dictionary of specific values to set for the target entity, overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} @@ -1066,18 +1062,23 @@ async def amerge_entities( # Use graph database lock to ensure atomic graph and vector db operations async with graph_db_lock: try: - # Default merge strategy - default_strategy = { + # Default merge strategy for entities + default_entity_merge_strategy = { "description": "concatenate", "entity_type": "keep_first", "source_id": "join_unique", + "file_path": "join_unique", } - - merge_strategy = ( - default_strategy - if merge_strategy is None - else {**default_strategy, **merge_strategy} - ) + effective_entity_merge_strategy = default_entity_merge_strategy + if merge_strategy: + logger.warning( + "merge_strategy parameter is deprecated and will be ignored in a future " + "release. Provided overrides will be applied for now." + ) + effective_entity_merge_strategy = { + **default_entity_merge_strategy, + **merge_strategy, + } target_entity_data = ( {} if target_entity_data is None else target_entity_data ) @@ -1103,10 +1104,11 @@ async def amerge_entities( ) # 3. Merge entity data - merged_entity_data = _merge_entity_attributes( + merged_entity_data = _merge_attributes( list(source_entities_data.values()) + ([existing_target_entity_data] if target_exists else []), - merge_strategy, + effective_entity_merge_strategy, + filter_none_only=False, # Use entity behavior: filter falsy values ) # Apply any explicitly provided target entity data (overrides merged data) @@ -1168,14 +1170,16 @@ async def amerge_entities( if relation_key in relation_updates: # Merge relationship data existing_data = relation_updates[relation_key]["data"] - merged_relation = _merge_relation_attributes( + merged_relation = _merge_attributes( [existing_data, edge_data], { "description": "concatenate", - "keywords": "join_unique", + "keywords": "join_unique_comma", "source_id": "join_unique", + "file_path": "join_unique", "weight": "max", }, + filter_none_only=True, # Use relation behavior: only filter None ) relation_updates[relation_key]["data"] = merged_relation logger.info( @@ -1299,81 +1303,45 @@ async def amerge_entities( raise -def _merge_entity_attributes( - entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] +def _merge_attributes( + data_list: list[dict[str, Any]], + merge_strategy: dict[str, str], + filter_none_only: bool = False, ) -> dict[str, Any]: - """Merge attributes from multiple entities. + """Merge attributes from multiple entities or relationships. + + This unified function handles merging of both entity and relationship attributes, + applying different merge strategies per field. Args: - entity_data_list: List of dictionaries containing entity data - merge_strategy: Merge strategy for each field + data_list: List of dictionaries containing entity or relationship data + merge_strategy: Merge strategy for each field. Supported strategies: + - "concatenate": Join all values with GRAPH_FIELD_SEP + - "keep_first": Keep the first non-empty value + - "keep_last": Keep the last non-empty value + - "join_unique": Join unique items separated by GRAPH_FIELD_SEP + - "join_unique_comma": Join unique items separated by comma and space + - "max": Keep the maximum numeric value (for numeric fields) + filter_none_only: If True, only filter None values (keep empty strings, 0, etc.). + If False, filter all falsy values. Default is False for backward compatibility. Returns: - Dictionary containing merged entity data + Dictionary containing merged data """ merged_data = {} # Collect all possible keys all_keys = set() - for data in entity_data_list: + for data in data_list: all_keys.update(data.keys()) # Merge values for each key for key in all_keys: - # Get all values for this key - values = [data.get(key) for data in entity_data_list if data.get(key)] - - if not values: - continue - - # Merge values according to strategy - strategy = merge_strategy.get(key, "keep_first") - - if strategy == "concatenate": - merged_data[key] = "\n\n".join(values) - elif strategy == "keep_first": - merged_data[key] = values[0] - elif strategy == "keep_last": - merged_data[key] = values[-1] - elif strategy == "join_unique": - # Handle fields separated by GRAPH_FIELD_SEP - unique_items = set() - for value in values: - items = value.split(GRAPH_FIELD_SEP) - unique_items.update(items) - merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + # Get all values for this key based on filtering mode + if filter_none_only: + values = [data.get(key) for data in data_list if data.get(key) is not None] else: - # Default strategy - merged_data[key] = values[0] - - return merged_data - - -def _merge_relation_attributes( - relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] -) -> dict[str, Any]: - """Merge attributes from multiple relationships. - - Args: - relation_data_list: List of dictionaries containing relationship data - merge_strategy: Merge strategy for each field - - Returns: - Dictionary containing merged relationship data - """ - merged_data = {} - - # Collect all possible keys - all_keys = set() - for data in relation_data_list: - all_keys.update(data.keys()) - - # Merge values for each key - for key in all_keys: - # Get all values for this key - values = [ - data.get(key) for data in relation_data_list if data.get(key) is not None - ] + values = [data.get(key) for data in data_list if data.get(key)] if not values: continue @@ -1382,7 +1350,8 @@ def _merge_relation_attributes( strategy = merge_strategy.get(key, "keep_first") if strategy == "concatenate": - merged_data[key] = "\n\n".join(str(v) for v in values) + # Convert all values to strings and join with GRAPH_FIELD_SEP + merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values) elif strategy == "keep_first": merged_data[key] = values[0] elif strategy == "keep_last": @@ -1394,14 +1363,22 @@ def _merge_relation_attributes( items = str(value).split(GRAPH_FIELD_SEP) unique_items.update(items) merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + elif strategy == "join_unique_comma": + # Handle fields separated by comma, join unique items with comma + unique_items = set() + for value in values: + items = str(value).split(",") + unique_items.update(item.strip() for item in items if item.strip()) + merged_data[key] = ",".join(sorted(unique_items)) elif strategy == "max": # For numeric fields like weight try: merged_data[key] = max(float(v) for v in values) except (ValueError, TypeError): + # Fallback to first value if conversion fails merged_data[key] = values[0] else: - # Default strategy + # Default strategy: keep first value merged_data[key] = values[0] return merged_data