Refactor entity merging with unified attribute merge function
• Update GRAPH_FIELD_SEP comment clarity • Deprecate merge_strategy parameter • Unify entity/relation merge logic • Add join_unique_comma strategy
This commit is contained in:
parent
38559373b3
commit
ab32456a79
2 changed files with 58 additions and 81 deletions
|
|
@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [
|
|||
"NaturalObject",
|
||||
]
|
||||
|
||||
# Separator for graph fields
|
||||
# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted)
|
||||
GRAPH_FIELD_SEP = "<SEP>"
|
||||
|
||||
# Query and retrieval configuration defaults
|
||||
|
|
|
|||
|
|
@ -1050,12 +1050,8 @@ async def amerge_entities(
|
|||
relationships_vdb: Vector database storage for relationships
|
||||
source_entities: List of source entity names to merge
|
||||
target_entity: Name of the target entity after merging
|
||||
merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
|
||||
Supported strategies:
|
||||
- "concatenate": Concatenate all values (for text fields)
|
||||
- "keep_first": Keep the first non-empty value
|
||||
- "keep_last": Keep the last non-empty value
|
||||
- "join_unique": Join all unique values (for fields separated by delimiter)
|
||||
merge_strategy: Deprecated (Each field uses its own default strategy). If provided,
|
||||
customizations are applied but a warning is logged.
|
||||
target_entity_data: Dictionary of specific values to set for the target entity,
|
||||
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
|
||||
|
||||
|
|
@ -1066,18 +1062,23 @@ async def amerge_entities(
|
|||
# Use graph database lock to ensure atomic graph and vector db operations
|
||||
async with graph_db_lock:
|
||||
try:
|
||||
# Default merge strategy
|
||||
default_strategy = {
|
||||
# Default merge strategy for entities
|
||||
default_entity_merge_strategy = {
|
||||
"description": "concatenate",
|
||||
"entity_type": "keep_first",
|
||||
"source_id": "join_unique",
|
||||
"file_path": "join_unique",
|
||||
}
|
||||
|
||||
merge_strategy = (
|
||||
default_strategy
|
||||
if merge_strategy is None
|
||||
else {**default_strategy, **merge_strategy}
|
||||
)
|
||||
effective_entity_merge_strategy = default_entity_merge_strategy
|
||||
if merge_strategy:
|
||||
logger.warning(
|
||||
"merge_strategy parameter is deprecated and will be ignored in a future "
|
||||
"release. Provided overrides will be applied for now."
|
||||
)
|
||||
effective_entity_merge_strategy = {
|
||||
**default_entity_merge_strategy,
|
||||
**merge_strategy,
|
||||
}
|
||||
target_entity_data = (
|
||||
{} if target_entity_data is None else target_entity_data
|
||||
)
|
||||
|
|
@ -1103,10 +1104,11 @@ async def amerge_entities(
|
|||
)
|
||||
|
||||
# 3. Merge entity data
|
||||
merged_entity_data = _merge_entity_attributes(
|
||||
merged_entity_data = _merge_attributes(
|
||||
list(source_entities_data.values())
|
||||
+ ([existing_target_entity_data] if target_exists else []),
|
||||
merge_strategy,
|
||||
effective_entity_merge_strategy,
|
||||
filter_none_only=False, # Use entity behavior: filter falsy values
|
||||
)
|
||||
|
||||
# Apply any explicitly provided target entity data (overrides merged data)
|
||||
|
|
@ -1168,14 +1170,16 @@ async def amerge_entities(
|
|||
if relation_key in relation_updates:
|
||||
# Merge relationship data
|
||||
existing_data = relation_updates[relation_key]["data"]
|
||||
merged_relation = _merge_relation_attributes(
|
||||
merged_relation = _merge_attributes(
|
||||
[existing_data, edge_data],
|
||||
{
|
||||
"description": "concatenate",
|
||||
"keywords": "join_unique",
|
||||
"keywords": "join_unique_comma",
|
||||
"source_id": "join_unique",
|
||||
"file_path": "join_unique",
|
||||
"weight": "max",
|
||||
},
|
||||
filter_none_only=True, # Use relation behavior: only filter None
|
||||
)
|
||||
relation_updates[relation_key]["data"] = merged_relation
|
||||
logger.info(
|
||||
|
|
@ -1299,81 +1303,45 @@ async def amerge_entities(
|
|||
raise
|
||||
|
||||
|
||||
def _merge_entity_attributes(
|
||||
entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
|
||||
def _merge_attributes(
|
||||
data_list: list[dict[str, Any]],
|
||||
merge_strategy: dict[str, str],
|
||||
filter_none_only: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Merge attributes from multiple entities.
|
||||
"""Merge attributes from multiple entities or relationships.
|
||||
|
||||
This unified function handles merging of both entity and relationship attributes,
|
||||
applying different merge strategies per field.
|
||||
|
||||
Args:
|
||||
entity_data_list: List of dictionaries containing entity data
|
||||
merge_strategy: Merge strategy for each field
|
||||
data_list: List of dictionaries containing entity or relationship data
|
||||
merge_strategy: Merge strategy for each field. Supported strategies:
|
||||
- "concatenate": Join all values with GRAPH_FIELD_SEP
|
||||
- "keep_first": Keep the first non-empty value
|
||||
- "keep_last": Keep the last non-empty value
|
||||
- "join_unique": Join unique items separated by GRAPH_FIELD_SEP
|
||||
- "join_unique_comma": Join unique items separated by comma and space
|
||||
- "max": Keep the maximum numeric value (for numeric fields)
|
||||
filter_none_only: If True, only filter None values (keep empty strings, 0, etc.).
|
||||
If False, filter all falsy values. Default is False for backward compatibility.
|
||||
|
||||
Returns:
|
||||
Dictionary containing merged entity data
|
||||
Dictionary containing merged data
|
||||
"""
|
||||
merged_data = {}
|
||||
|
||||
# Collect all possible keys
|
||||
all_keys = set()
|
||||
for data in entity_data_list:
|
||||
for data in data_list:
|
||||
all_keys.update(data.keys())
|
||||
|
||||
# Merge values for each key
|
||||
for key in all_keys:
|
||||
# Get all values for this key
|
||||
values = [data.get(key) for data in entity_data_list if data.get(key)]
|
||||
|
||||
if not values:
|
||||
continue
|
||||
|
||||
# Merge values according to strategy
|
||||
strategy = merge_strategy.get(key, "keep_first")
|
||||
|
||||
if strategy == "concatenate":
|
||||
merged_data[key] = "\n\n".join(values)
|
||||
elif strategy == "keep_first":
|
||||
merged_data[key] = values[0]
|
||||
elif strategy == "keep_last":
|
||||
merged_data[key] = values[-1]
|
||||
elif strategy == "join_unique":
|
||||
# Handle fields separated by GRAPH_FIELD_SEP
|
||||
unique_items = set()
|
||||
for value in values:
|
||||
items = value.split(GRAPH_FIELD_SEP)
|
||||
unique_items.update(items)
|
||||
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
|
||||
# Get all values for this key based on filtering mode
|
||||
if filter_none_only:
|
||||
values = [data.get(key) for data in data_list if data.get(key) is not None]
|
||||
else:
|
||||
# Default strategy
|
||||
merged_data[key] = values[0]
|
||||
|
||||
return merged_data
|
||||
|
||||
|
||||
def _merge_relation_attributes(
|
||||
relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
|
||||
) -> dict[str, Any]:
|
||||
"""Merge attributes from multiple relationships.
|
||||
|
||||
Args:
|
||||
relation_data_list: List of dictionaries containing relationship data
|
||||
merge_strategy: Merge strategy for each field
|
||||
|
||||
Returns:
|
||||
Dictionary containing merged relationship data
|
||||
"""
|
||||
merged_data = {}
|
||||
|
||||
# Collect all possible keys
|
||||
all_keys = set()
|
||||
for data in relation_data_list:
|
||||
all_keys.update(data.keys())
|
||||
|
||||
# Merge values for each key
|
||||
for key in all_keys:
|
||||
# Get all values for this key
|
||||
values = [
|
||||
data.get(key) for data in relation_data_list if data.get(key) is not None
|
||||
]
|
||||
values = [data.get(key) for data in data_list if data.get(key)]
|
||||
|
||||
if not values:
|
||||
continue
|
||||
|
|
@ -1382,7 +1350,8 @@ def _merge_relation_attributes(
|
|||
strategy = merge_strategy.get(key, "keep_first")
|
||||
|
||||
if strategy == "concatenate":
|
||||
merged_data[key] = "\n\n".join(str(v) for v in values)
|
||||
# Convert all values to strings and join with GRAPH_FIELD_SEP
|
||||
merged_data[key] = GRAPH_FIELD_SEP.join(str(v) for v in values)
|
||||
elif strategy == "keep_first":
|
||||
merged_data[key] = values[0]
|
||||
elif strategy == "keep_last":
|
||||
|
|
@ -1394,14 +1363,22 @@ def _merge_relation_attributes(
|
|||
items = str(value).split(GRAPH_FIELD_SEP)
|
||||
unique_items.update(items)
|
||||
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
|
||||
elif strategy == "join_unique_comma":
|
||||
# Handle fields separated by comma, join unique items with comma
|
||||
unique_items = set()
|
||||
for value in values:
|
||||
items = str(value).split(",")
|
||||
unique_items.update(item.strip() for item in items if item.strip())
|
||||
merged_data[key] = ",".join(sorted(unique_items))
|
||||
elif strategy == "max":
|
||||
# For numeric fields like weight
|
||||
try:
|
||||
merged_data[key] = max(float(v) for v in values)
|
||||
except (ValueError, TypeError):
|
||||
# Fallback to first value if conversion fails
|
||||
merged_data[key] = values[0]
|
||||
else:
|
||||
# Default strategy
|
||||
# Default strategy: keep first value
|
||||
merged_data[key] = values[0]
|
||||
|
||||
return merged_data
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue