cherry-pick ab32456a
This commit is contained in:
parent
a7857bcdde
commit
01e3c409c2
2 changed files with 92 additions and 242 deletions
|
|
@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [
|
||||||
"NaturalObject",
|
"NaturalObject",
|
||||||
]
|
]
|
||||||
|
|
||||||
# Separator for graph fields
|
# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted)
|
||||||
GRAPH_FIELD_SEP = "<SEP>"
|
GRAPH_FIELD_SEP = "<SEP>"
|
||||||
|
|
||||||
# Query and retrieval configuration defaults
|
# Query and retrieval configuration defaults
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import asyncio
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
from .base import DeletionResult
|
from .base import DeletionResult
|
||||||
from .kg.shared_storage import get_storage_keyed_lock
|
from .kg.shared_storage import get_graph_db_lock
|
||||||
from .constants import GRAPH_FIELD_SEP
|
from .constants import GRAPH_FIELD_SEP
|
||||||
from .utils import compute_mdhash_id, logger
|
from .utils import compute_mdhash_id, logger
|
||||||
from .base import StorageNameSpace
|
from .base import StorageNameSpace
|
||||||
|
|
@ -74,12 +74,9 @@ async def adelete_by_entity(
|
||||||
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
entity_chunks_storage: Optional KV storage for tracking chunks that reference this entity
|
||||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
||||||
"""
|
"""
|
||||||
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
workspace = entities_vdb.global_config.get("workspace", "")
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
async with graph_db_lock:
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
[entity_name], namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
# Check if the entity exists
|
# Check if the entity exists
|
||||||
if not await chunk_entity_relation_graph.has_node(entity_name):
|
if not await chunk_entity_relation_graph.has_node(entity_name):
|
||||||
|
|
@ -170,18 +167,14 @@ async def adelete_by_relation(
|
||||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
relation_chunks_storage: Optional KV storage for tracking chunks that reference this relation
|
||||||
"""
|
"""
|
||||||
relation_str = f"{source_entity} -> {target_entity}"
|
relation_str = f"{source_entity} -> {target_entity}"
|
||||||
# Normalize entity order for undirected graph (ensures consistent key generation)
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
if source_entity > target_entity:
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
source_entity, target_entity = target_entity, source_entity
|
async with graph_db_lock:
|
||||||
|
|
||||||
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
|
||||||
workspace = relationships_vdb.global_config.get("workspace", "")
|
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
|
||||||
sorted_edge_key = sorted([source_entity, target_entity])
|
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
sorted_edge_key, namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
|
# Normalize entity order for undirected graph (ensures consistent key generation)
|
||||||
|
if source_entity > target_entity:
|
||||||
|
source_entity, target_entity = target_entity, source_entity
|
||||||
|
|
||||||
# Check if the relation exists
|
# Check if the relation exists
|
||||||
edge_exists = await chunk_entity_relation_graph.has_edge(
|
edge_exists = await chunk_entity_relation_graph.has_edge(
|
||||||
source_entity, target_entity
|
source_entity, target_entity
|
||||||
|
|
@ -274,19 +267,9 @@ async def aedit_entity(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing updated entity information
|
Dictionary containing updated entity information
|
||||||
"""
|
"""
|
||||||
# Determine entities to lock
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
new_entity_name = updated_data.get("entity_name", entity_name)
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
is_renaming = new_entity_name != entity_name
|
async with graph_db_lock:
|
||||||
|
|
||||||
# Lock both original and new entity names if renaming
|
|
||||||
lock_keys = sorted([entity_name, new_entity_name]) if is_renaming else [entity_name]
|
|
||||||
|
|
||||||
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
|
||||||
workspace = entities_vdb.global_config.get("workspace", "")
|
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
lock_keys, namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
# Save original entity name for chunk tracking updates
|
# Save original entity name for chunk tracking updates
|
||||||
original_entity_name = entity_name
|
original_entity_name = entity_name
|
||||||
|
|
@ -297,6 +280,10 @@ async def aedit_entity(
|
||||||
raise ValueError(f"Entity '{entity_name}' does not exist")
|
raise ValueError(f"Entity '{entity_name}' does not exist")
|
||||||
node_data = await chunk_entity_relation_graph.get_node(entity_name)
|
node_data = await chunk_entity_relation_graph.get_node(entity_name)
|
||||||
|
|
||||||
|
# Check if entity is being renamed
|
||||||
|
new_entity_name = updated_data.get("entity_name", entity_name)
|
||||||
|
is_renaming = new_entity_name != entity_name
|
||||||
|
|
||||||
# If renaming, check if new name already exists
|
# If renaming, check if new name already exists
|
||||||
if is_renaming:
|
if is_renaming:
|
||||||
if not allow_rename:
|
if not allow_rename:
|
||||||
|
|
@ -632,18 +619,14 @@ async def aedit_relation(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing updated relation information
|
Dictionary containing updated relation information
|
||||||
"""
|
"""
|
||||||
# Normalize entity order for undirected graph (ensures consistent key generation)
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
if source_entity > target_entity:
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
source_entity, target_entity = target_entity, source_entity
|
async with graph_db_lock:
|
||||||
|
|
||||||
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
|
||||||
workspace = relationships_vdb.global_config.get("workspace", "")
|
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
|
||||||
sorted_edge_key = sorted([source_entity, target_entity])
|
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
sorted_edge_key, namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
|
# Normalize entity order for undirected graph (ensures consistent key generation)
|
||||||
|
if source_entity > target_entity:
|
||||||
|
source_entity, target_entity = target_entity, source_entity
|
||||||
|
|
||||||
# 1. Get current relation information
|
# 1. Get current relation information
|
||||||
edge_exists = await chunk_entity_relation_graph.has_edge(
|
edge_exists = await chunk_entity_relation_graph.has_edge(
|
||||||
source_entity, target_entity
|
source_entity, target_entity
|
||||||
|
|
@ -816,12 +799,9 @@ async def acreate_entity(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing created entity information
|
Dictionary containing created entity information
|
||||||
"""
|
"""
|
||||||
# Use keyed lock for entity to ensure atomic graph and vector db operations
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
workspace = entities_vdb.global_config.get("workspace", "")
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
async with graph_db_lock:
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
[entity_name], namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
# Check if entity already exists
|
# Check if entity already exists
|
||||||
existing_node = await chunk_entity_relation_graph.has_node(entity_name)
|
existing_node = await chunk_entity_relation_graph.has_node(entity_name)
|
||||||
|
|
@ -930,13 +910,9 @@ async def acreate_relation(
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing created relation information
|
Dictionary containing created relation information
|
||||||
"""
|
"""
|
||||||
# Use keyed lock for relation to ensure atomic graph and vector db operations
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
workspace = relationships_vdb.global_config.get("workspace", "")
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
async with graph_db_lock:
|
||||||
sorted_edge_key = sorted([source_entity, target_entity])
|
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
sorted_edge_key, namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
# Check if both entities exist
|
# Check if both entities exist
|
||||||
source_exists = await chunk_entity_relation_graph.has_node(source_entity)
|
source_exists = await chunk_entity_relation_graph.has_node(source_entity)
|
||||||
|
|
@ -1062,14 +1038,11 @@ async def amerge_entities(
|
||||||
target_entity: str,
|
target_entity: str,
|
||||||
merge_strategy: dict[str, str] = None,
|
merge_strategy: dict[str, str] = None,
|
||||||
target_entity_data: dict[str, Any] = None,
|
target_entity_data: dict[str, Any] = None,
|
||||||
entity_chunks_storage=None,
|
|
||||||
relation_chunks_storage=None,
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Asynchronously merge multiple entities into one entity.
|
"""Asynchronously merge multiple entities into one entity.
|
||||||
|
|
||||||
Merges multiple source entities into a target entity, handling all relationships,
|
Merges multiple source entities into a target entity, handling all relationships,
|
||||||
and updating both the knowledge graph and vector database.
|
and updating both the knowledge graph and vector database.
|
||||||
Also merges chunk tracking information from entity_chunks_storage and relation_chunks_storage.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
chunk_entity_relation_graph: Graph storage instance
|
chunk_entity_relation_graph: Graph storage instance
|
||||||
|
|
@ -1081,23 +1054,13 @@ async def amerge_entities(
|
||||||
customizations are applied but a warning is logged.
|
customizations are applied but a warning is logged.
|
||||||
target_entity_data: Dictionary of specific values to set for the target entity,
|
target_entity_data: Dictionary of specific values to set for the target entity,
|
||||||
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
|
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
|
||||||
entity_chunks_storage: Optional KV storage for tracking chunks that reference entities
|
|
||||||
relation_chunks_storage: Optional KV storage for tracking chunks that reference relations
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary containing the merged entity information
|
Dictionary containing the merged entity information
|
||||||
"""
|
"""
|
||||||
# Collect all entities involved (source + target) and lock them all in sorted order
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
all_entities = set(source_entities)
|
# Use graph database lock to ensure atomic graph and vector db operations
|
||||||
all_entities.add(target_entity)
|
async with graph_db_lock:
|
||||||
lock_keys = sorted(all_entities)
|
|
||||||
|
|
||||||
# Use keyed lock for all entities to ensure atomic graph and vector db operations
|
|
||||||
workspace = entities_vdb.global_config.get("workspace", "")
|
|
||||||
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
|
|
||||||
async with get_storage_keyed_lock(
|
|
||||||
lock_keys, namespace=namespace, enable_logging=False
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
# Default merge strategy for entities
|
# Default merge strategy for entities
|
||||||
default_entity_merge_strategy = {
|
default_entity_merge_strategy = {
|
||||||
|
|
@ -1109,7 +1072,8 @@ async def amerge_entities(
|
||||||
effective_entity_merge_strategy = default_entity_merge_strategy
|
effective_entity_merge_strategy = default_entity_merge_strategy
|
||||||
if merge_strategy:
|
if merge_strategy:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Entity Merge: merge_strategy parameter is deprecated and will be ignored in a future release."
|
"merge_strategy parameter is deprecated and will be ignored in a future "
|
||||||
|
"release. Provided overrides will be applied for now."
|
||||||
)
|
)
|
||||||
effective_entity_merge_strategy = {
|
effective_entity_merge_strategy = {
|
||||||
**default_entity_merge_strategy,
|
**default_entity_merge_strategy,
|
||||||
|
|
@ -1136,7 +1100,7 @@ async def amerge_entities(
|
||||||
await chunk_entity_relation_graph.get_node(target_entity)
|
await chunk_entity_relation_graph.get_node(target_entity)
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"Entity Merge: target entity already exists, source and target entities will be merged"
|
f"Target entity '{target_entity}' already exists, will merge data"
|
||||||
)
|
)
|
||||||
|
|
||||||
# 3. Merge entity data
|
# 3. Merge entity data
|
||||||
|
|
@ -1154,11 +1118,11 @@ async def amerge_entities(
|
||||||
# 4. Get all relationships of the source entities and target entity (if exists)
|
# 4. Get all relationships of the source entities and target entity (if exists)
|
||||||
all_relations = []
|
all_relations = []
|
||||||
entities_to_collect = source_entities.copy()
|
entities_to_collect = source_entities.copy()
|
||||||
|
|
||||||
# If target entity exists and not already in source_entities, add it
|
# If target entity exists, also collect its relationships for merging
|
||||||
if target_exists and target_entity not in source_entities:
|
if target_exists:
|
||||||
entities_to_collect.append(target_entity)
|
entities_to_collect.append(target_entity)
|
||||||
|
|
||||||
for entity_name in entities_to_collect:
|
for entity_name in entities_to_collect:
|
||||||
# Get all relationships of the entities
|
# Get all relationships of the entities
|
||||||
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
|
edges = await chunk_entity_relation_graph.get_node_edges(entity_name)
|
||||||
|
|
@ -1177,75 +1141,32 @@ async def amerge_entities(
|
||||||
await chunk_entity_relation_graph.upsert_node(
|
await chunk_entity_relation_graph.upsert_node(
|
||||||
target_entity, merged_entity_data
|
target_entity, merged_entity_data
|
||||||
)
|
)
|
||||||
logger.info(f"Entity Merge: created target '{target_entity}'")
|
logger.info(f"Created new target entity '{target_entity}'")
|
||||||
else:
|
else:
|
||||||
await chunk_entity_relation_graph.upsert_node(
|
await chunk_entity_relation_graph.upsert_node(
|
||||||
target_entity, merged_entity_data
|
target_entity, merged_entity_data
|
||||||
)
|
)
|
||||||
logger.info(f"Entity Merge: Updated target '{target_entity}'")
|
logger.info(f"Updated existing target entity '{target_entity}'")
|
||||||
|
|
||||||
# 6. Recreate all relations pointing to the target entity in KG
|
# 6. Recreate all relationships, pointing to the target entity
|
||||||
# Also collect chunk tracking information in the same loop
|
|
||||||
relation_updates = {} # Track relationships that need to be merged
|
relation_updates = {} # Track relationships that need to be merged
|
||||||
relations_to_delete = []
|
relations_to_delete = []
|
||||||
|
|
||||||
# Initialize chunk tracking variables
|
|
||||||
relation_chunk_tracking = {} # key: storage_key, value: list of chunk_ids
|
|
||||||
old_relation_keys_to_delete = []
|
|
||||||
|
|
||||||
for src, tgt, edge_data in all_relations:
|
for src, tgt, edge_data in all_relations:
|
||||||
relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
|
relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
|
||||||
relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
|
relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
|
||||||
|
|
||||||
# Collect old chunk tracking key for deletion
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
from .utils import make_relation_chunk_key
|
|
||||||
|
|
||||||
old_storage_key = make_relation_chunk_key(src, tgt)
|
|
||||||
old_relation_keys_to_delete.append(old_storage_key)
|
|
||||||
|
|
||||||
new_src = target_entity if src in source_entities else src
|
new_src = target_entity if src in source_entities else src
|
||||||
new_tgt = target_entity if tgt in source_entities else tgt
|
new_tgt = target_entity if tgt in source_entities else tgt
|
||||||
|
|
||||||
# Skip relationships between source entities to avoid self-loops
|
# Skip relationships between source entities to avoid self-loops
|
||||||
if new_src == new_tgt:
|
if new_src == new_tgt:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Entity Merge: skipping `{src}`~`{tgt}` to avoid self-loop"
|
f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Normalize entity order for consistent duplicate detection (undirected relationships)
|
# Check if the same relationship already exists
|
||||||
normalized_src, normalized_tgt = sorted([new_src, new_tgt])
|
relation_key = f"{new_src}|{new_tgt}"
|
||||||
relation_key = f"{normalized_src}|{normalized_tgt}"
|
|
||||||
|
|
||||||
# Process chunk tracking for this relation
|
|
||||||
if relation_chunks_storage is not None:
|
|
||||||
storage_key = make_relation_chunk_key(
|
|
||||||
normalized_src, normalized_tgt
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get chunk_ids from storage for this original relation
|
|
||||||
stored = await relation_chunks_storage.get_by_id(old_storage_key)
|
|
||||||
|
|
||||||
if stored is not None and isinstance(stored, dict):
|
|
||||||
chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
|
|
||||||
else:
|
|
||||||
# Fallback to source_id from graph
|
|
||||||
source_id = edge_data.get("source_id", "")
|
|
||||||
chunk_ids = [
|
|
||||||
cid for cid in source_id.split(GRAPH_FIELD_SEP) if cid
|
|
||||||
]
|
|
||||||
|
|
||||||
# Accumulate chunk_ids with ordered deduplication
|
|
||||||
if storage_key not in relation_chunk_tracking:
|
|
||||||
relation_chunk_tracking[storage_key] = []
|
|
||||||
|
|
||||||
existing_chunks = set(relation_chunk_tracking[storage_key])
|
|
||||||
for chunk_id in chunk_ids:
|
|
||||||
if chunk_id not in existing_chunks:
|
|
||||||
existing_chunks.add(chunk_id)
|
|
||||||
relation_chunk_tracking[storage_key].append(chunk_id)
|
|
||||||
|
|
||||||
if relation_key in relation_updates:
|
if relation_key in relation_updates:
|
||||||
# Merge relationship data
|
# Merge relationship data
|
||||||
existing_data = relation_updates[relation_key]["data"]
|
existing_data = relation_updates[relation_key]["data"]
|
||||||
|
|
@ -1262,53 +1183,57 @@ async def amerge_entities(
|
||||||
)
|
)
|
||||||
relation_updates[relation_key]["data"] = merged_relation
|
relation_updates[relation_key]["data"] = merged_relation
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Entity Merge: deduplicating relation `{normalized_src}`~`{normalized_tgt}`"
|
f"Merged duplicate relationship: {new_src} -> {new_tgt}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
relation_updates[relation_key] = {
|
relation_updates[relation_key] = {
|
||||||
"graph_src": new_src,
|
"src": new_src,
|
||||||
"graph_tgt": new_tgt,
|
"tgt": new_tgt,
|
||||||
"norm_src": normalized_src,
|
|
||||||
"norm_tgt": normalized_tgt,
|
|
||||||
"data": edge_data.copy(),
|
"data": edge_data.copy(),
|
||||||
}
|
}
|
||||||
|
|
||||||
# Apply relationship updates
|
# Apply relationship updates
|
||||||
for rel_data in relation_updates.values():
|
for rel_data in relation_updates.values():
|
||||||
await chunk_entity_relation_graph.upsert_edge(
|
await chunk_entity_relation_graph.upsert_edge(
|
||||||
rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"]
|
rel_data["src"], rel_data["tgt"], rel_data["data"]
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Entity Merge: updating relation `{rel_data['graph_src']}`->`{rel_data['graph_tgt']}`"
|
f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update relation chunk tracking storage
|
# Delete relationships records from vector database
|
||||||
if relation_chunks_storage is not None and all_relations:
|
await relationships_vdb.delete(relations_to_delete)
|
||||||
if old_relation_keys_to_delete:
|
logger.info(
|
||||||
await relation_chunks_storage.delete(old_relation_keys_to_delete)
|
f"Deleted {len(relations_to_delete)} relation records for entity from vector database"
|
||||||
|
)
|
||||||
|
|
||||||
if relation_chunk_tracking:
|
# 7. Update entity vector representation
|
||||||
updates = {}
|
description = merged_entity_data.get("description", "")
|
||||||
for storage_key, chunk_ids in relation_chunk_tracking.items():
|
source_id = merged_entity_data.get("source_id", "")
|
||||||
updates[storage_key] = {
|
entity_type = merged_entity_data.get("entity_type", "")
|
||||||
"chunk_ids": chunk_ids,
|
content = target_entity + "\n" + description
|
||||||
"count": len(chunk_ids),
|
|
||||||
}
|
|
||||||
|
|
||||||
await relation_chunks_storage.upsert(updates)
|
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
|
||||||
logger.info(
|
entity_data_for_vdb = {
|
||||||
f"Entity Merge: merged chunk tracking for {len(updates)} relations"
|
entity_id: {
|
||||||
)
|
"content": content,
|
||||||
|
"entity_name": target_entity,
|
||||||
|
"source_id": source_id,
|
||||||
|
"description": description,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
# 7. Update relationship vector representations
|
await entities_vdb.upsert(entity_data_for_vdb)
|
||||||
logger.info(
|
|
||||||
f"Entity Merge: deleting {len(relations_to_delete)} relations from vdb"
|
# 8. Update relationship vector representations
|
||||||
)
|
|
||||||
await relationships_vdb.delete(relations_to_delete)
|
|
||||||
for rel_data in relation_updates.values():
|
for rel_data in relation_updates.values():
|
||||||
|
src = rel_data["src"]
|
||||||
|
tgt = rel_data["tgt"]
|
||||||
edge_data = rel_data["data"]
|
edge_data = rel_data["data"]
|
||||||
normalized_src = rel_data["norm_src"]
|
|
||||||
normalized_tgt = rel_data["norm_tgt"]
|
# Normalize entity order for consistent vector storage
|
||||||
|
normalized_src, normalized_tgt = sorted([src, tgt])
|
||||||
|
|
||||||
description = edge_data.get("description", "")
|
description = edge_data.get("description", "")
|
||||||
keywords = edge_data.get("keywords", "")
|
keywords = edge_data.get("keywords", "")
|
||||||
|
|
@ -1334,112 +1259,37 @@ async def amerge_entities(
|
||||||
"weight": weight,
|
"weight": weight,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await relationships_vdb.upsert(relation_data_for_vdb)
|
await relationships_vdb.upsert(relation_data_for_vdb)
|
||||||
logger.info(
|
|
||||||
f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`"
|
|
||||||
)
|
|
||||||
|
|
||||||
# 8. Update entity vector representation
|
# 9. Delete source entities
|
||||||
description = merged_entity_data.get("description", "")
|
|
||||||
source_id = merged_entity_data.get("source_id", "")
|
|
||||||
entity_type = merged_entity_data.get("entity_type", "")
|
|
||||||
content = target_entity + "\n" + description
|
|
||||||
|
|
||||||
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
|
|
||||||
entity_data_for_vdb = {
|
|
||||||
entity_id: {
|
|
||||||
"content": content,
|
|
||||||
"entity_name": target_entity,
|
|
||||||
"source_id": source_id,
|
|
||||||
"description": description,
|
|
||||||
"entity_type": entity_type,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await entities_vdb.upsert(entity_data_for_vdb)
|
|
||||||
logger.info(f"Entity Merge: updating vdb `{target_entity}`")
|
|
||||||
|
|
||||||
# 9. Merge entity chunk tracking (source entities first, then target entity)
|
|
||||||
if entity_chunks_storage is not None:
|
|
||||||
all_chunk_id_lists = []
|
|
||||||
|
|
||||||
# Build list of entities to process (source entities first, then target entity)
|
|
||||||
entities_to_process = []
|
|
||||||
|
|
||||||
# Add source entities first (excluding target if it's already in source list)
|
|
||||||
for entity_name in source_entities:
|
|
||||||
if entity_name != target_entity:
|
|
||||||
entities_to_process.append(entity_name)
|
|
||||||
|
|
||||||
# Add target entity last (if it exists)
|
|
||||||
if target_exists:
|
|
||||||
entities_to_process.append(target_entity)
|
|
||||||
|
|
||||||
# Process all entities in order with unified logic
|
|
||||||
for entity_name in entities_to_process:
|
|
||||||
stored = await entity_chunks_storage.get_by_id(entity_name)
|
|
||||||
if stored and isinstance(stored, dict):
|
|
||||||
chunk_ids = [cid for cid in stored.get("chunk_ids", []) if cid]
|
|
||||||
if chunk_ids:
|
|
||||||
all_chunk_id_lists.append(chunk_ids)
|
|
||||||
|
|
||||||
# Merge chunk_ids with ordered deduplication (preserves order, source entities first)
|
|
||||||
merged_chunk_ids = []
|
|
||||||
seen = set()
|
|
||||||
for chunk_id_list in all_chunk_id_lists:
|
|
||||||
for chunk_id in chunk_id_list:
|
|
||||||
if chunk_id not in seen:
|
|
||||||
seen.add(chunk_id)
|
|
||||||
merged_chunk_ids.append(chunk_id)
|
|
||||||
|
|
||||||
# Delete source entities' chunk tracking records
|
|
||||||
entity_keys_to_delete = [
|
|
||||||
e for e in source_entities if e != target_entity
|
|
||||||
]
|
|
||||||
if entity_keys_to_delete:
|
|
||||||
await entity_chunks_storage.delete(entity_keys_to_delete)
|
|
||||||
|
|
||||||
# Update target entity's chunk tracking
|
|
||||||
if merged_chunk_ids:
|
|
||||||
await entity_chunks_storage.upsert(
|
|
||||||
{
|
|
||||||
target_entity: {
|
|
||||||
"chunk_ids": merged_chunk_ids,
|
|
||||||
"count": len(merged_chunk_ids),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
f"Entity Merge: find {len(merged_chunk_ids)} chunks related to '{target_entity}'"
|
|
||||||
)
|
|
||||||
|
|
||||||
# 10. Delete source entities
|
|
||||||
for entity_name in source_entities:
|
for entity_name in source_entities:
|
||||||
if entity_name == target_entity:
|
if entity_name == target_entity:
|
||||||
logger.warning(
|
logger.info(
|
||||||
f"Entity Merge: source entity'{entity_name}' is same as target entity"
|
f"Skipping deletion of '{entity_name}' as it's also the target entity"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.info(f"Entity Merge: deleting '{entity_name}' from KG and vdb")
|
# Delete entity node from knowledge graph
|
||||||
|
|
||||||
# Delete entity node and related edges from knowledge graph
|
|
||||||
await chunk_entity_relation_graph.delete_node(entity_name)
|
await chunk_entity_relation_graph.delete_node(entity_name)
|
||||||
|
|
||||||
# Delete entity record from vector database
|
# Delete entity record from vector database
|
||||||
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||||
await entities_vdb.delete([entity_id])
|
await entities_vdb.delete([entity_id])
|
||||||
|
|
||||||
# 11. Save changes
|
logger.info(
|
||||||
|
f"Deleted source entity '{entity_name}' and its vector embedding from database"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 10. Save changes
|
||||||
await _persist_graph_updates(
|
await _persist_graph_updates(
|
||||||
entities_vdb=entities_vdb,
|
entities_vdb=entities_vdb,
|
||||||
relationships_vdb=relationships_vdb,
|
relationships_vdb=relationships_vdb,
|
||||||
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
chunk_entity_relation_graph=chunk_entity_relation_graph,
|
||||||
entity_chunks_storage=entity_chunks_storage,
|
|
||||||
relation_chunks_storage=relation_chunks_storage,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Entity Merge: successfully merged {len(source_entities)} entities into '{target_entity}'"
|
f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
|
||||||
)
|
)
|
||||||
return await get_entity_info(
|
return await get_entity_info(
|
||||||
chunk_entity_relation_graph,
|
chunk_entity_relation_graph,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue