diff --git a/graphiti_core/utils/bulk_utils.py b/graphiti_core/utils/bulk_utils.py index 2c5eaf5b..4538da53 100644 --- a/graphiti_core/utils/bulk_utils.py +++ b/graphiti_core/utils/bulk_utils.py @@ -356,6 +356,10 @@ async def dedupe_nodes_bulk( union_pairs.extend(duplicate_pairs) compressed_map: dict[str, str] = compress_uuid_map(union_pairs) + # We pass directed edges (extracted -> canonical) to the compressor, but the utility treats + # them as undirected pairs and picks the lexicographically smaller UUID as the component root. + # Re-write the entries using the original direction so that each source maps to the canonical + # target returned by the first/second pass even if its UUID sorts before the canonical one. for source_uuid, target_uuid in union_pairs: canonical_uuid = compressed_map.get(target_uuid, target_uuid) compressed_map[source_uuid] = canonical_uuid @@ -369,7 +373,14 @@ async def dedupe_nodes_bulk( if canonical_uuid in seen: continue seen.add(canonical_uuid) - canonical_node = canonical_nodes.get(canonical_uuid, node) + canonical_node = canonical_nodes.get(canonical_uuid) + if canonical_node is None: + logger.warning( + 'Canonical node %s missing during batch dedupe; falling back to %s', + canonical_uuid, + node.uuid, + ) + canonical_node = node deduped_nodes.append(canonical_node) nodes_by_episode[episode_uuid] = deduped_nodes