From 5d45d712595f0510298704d7a7b06bc53fe89859 Mon Sep 17 00:00:00 2001 From: Preston Rasmussen <109292228+prasmussen15@users.noreply.github.com> Date: Wed, 16 Jul 2025 02:26:33 -0400 Subject: [PATCH] Bulk updates (#732) * updates * update * update * typo * linter --- examples/podcast/podcast_runner.py | 4 +- graphiti_core/graphiti.py | 109 ++++++++++++++++++++++++-- graphiti_core/prompts/dedupe_edges.py | 8 +- graphiti_core/utils/bulk_utils.py | 7 ++ uv.lock | 2 +- 5 files changed, 116 insertions(+), 14 deletions(-) diff --git a/examples/podcast/podcast_runner.py b/examples/podcast/podcast_runner.py index e4573a70..cf98655f 100644 --- a/examples/podcast/podcast_runner.py +++ b/examples/podcast/podcast_runner.py @@ -78,7 +78,7 @@ async def main(use_bulk: bool = False): group_id = str(uuid4()) raw_episodes: list[RawEpisode] = [] - for i, message in enumerate(messages[3:10]): + for i, message in enumerate(messages[3:14]): raw_episodes.append( RawEpisode( name=f'Message {i}', @@ -94,7 +94,7 @@ async def main(use_bulk: bool = False): group_id=group_id, entity_types={'Person': Person}, edge_types={'IS_PRESIDENT_OF': IsPresidentOf}, - edge_type_map={('Person', 'Entity'): ['PRESIDENT_OF']}, + edge_type_map={('Person', 'Entity'): ['IS_PRESIDENT_OF']}, ) else: for i, message in enumerate(messages[3:14]): diff --git a/graphiti_core/graphiti.py b/graphiti_core/graphiti.py index da5a86c1..2ba8e6da 100644 --- a/graphiti_core/graphiti.py +++ b/graphiti_core/graphiti.py @@ -640,6 +640,7 @@ class Graphiti: self.clients, extracted_nodes_bulk, episode_context, entity_types ) + # Create Episodic Edges episodic_edges: list[EpisodicEdge] = [] for episode_uuid, nodes in nodes_by_episode.items(): episodic_edges.extend(build_episodic_edges(nodes, episode_uuid, now)) @@ -695,18 +696,112 @@ class Graphiti: hydrated_nodes = [node for nodes in new_hydrated_nodes for node in nodes] - # TODO: Resolve nodes and edges against the existing graph - edges_by_uuid: dict[str, EntityEdge] = { - edge.uuid: edge for edges in edges_by_episode.values() for edge in edges - } + # Update nodes_by_uuid map with the hydrated nodes + for hydrated_node in hydrated_nodes: + nodes_by_uuid[hydrated_node.uuid] = hydrated_node + + # Resolve nodes and edges against the existing graph + nodes_by_episode_unique: dict[str, list[EntityNode]] = {} + nodes_uuid_set: set[str] = set() + for episode, _ in episode_context: + nodes_by_episode_unique[episode.uuid] = [] + nodes = [nodes_by_uuid[node.uuid] for node in nodes_by_episode[episode.uuid]] + for node in nodes: + if node.uuid not in nodes_uuid_set: + nodes_by_episode_unique[episode.uuid].append(node) + nodes_uuid_set.add(node.uuid) + + node_results = await semaphore_gather( + *[ + resolve_extracted_nodes( + self.clients, + nodes_by_episode_unique[episode.uuid], + episode, + previous_episodes, + entity_types, + ) + for episode, previous_episodes in episode_context + ] + ) + + resolved_nodes: list[EntityNode] = [] + uuid_map: dict[str, str] = {} + node_duplicates: list[tuple[EntityNode, EntityNode]] = [] + for result in node_results: + resolved_nodes.extend(result[0]) + uuid_map.update(result[1]) + node_duplicates.extend(result[2]) + + # Update nodes_by_uuid map with the resolved nodes + for resolved_node in resolved_nodes: + nodes_by_uuid[resolved_node.uuid] = resolved_node + + # update nodes_by_episode_unique mapping + for episode_uuid, nodes in nodes_by_episode_unique.items(): + updated_nodes: list[EntityNode] = [] + for node in nodes: + updated_node_uuid = uuid_map.get(node.uuid, node.uuid) + updated_node = nodes_by_uuid[updated_node_uuid] + updated_nodes.append(updated_node) + + nodes_by_episode_unique[episode_uuid] = updated_nodes + + hydrated_nodes_results: list[list[EntityNode]] = await semaphore_gather( + *[ + extract_attributes_from_nodes( + self.clients, + nodes_by_episode_unique[episode.uuid], + episode, + previous_episodes, + entity_types, + ) + for episode, previous_episodes in episode_context + ] + ) + + final_hydrated_nodes = [node for nodes in hydrated_nodes_results for node in nodes] + + edges_by_episode_unique: dict[str, list[EntityEdge]] = {} + edges_uuid_set: set[str] = set() + for episode_uuid, edges in edges_by_episode.items(): + edges_with_updated_pointers = resolve_edge_pointers(edges, uuid_map) + edges_by_episode_unique[episode_uuid] = [] + + for edge in edges_with_updated_pointers: + if edge.uuid not in edges_uuid_set: + edges_by_episode_unique[episode_uuid].append(edge) + edges_uuid_set.add(edge.uuid) + + edge_results = await semaphore_gather( + *[ + resolve_extracted_edges( + self.clients, + edges_by_episode_unique[episode.uuid], + episode, + hydrated_nodes, + edge_types or {}, + edge_type_map or edge_type_map_default, + ) + for episode in episodes + ] + ) + + resolved_edges: list[EntityEdge] = [] + invalidated_edges: list[EntityEdge] = [] + for result in edge_results: + resolved_edges.extend(result[0]) + invalidated_edges.extend(result[1]) + + # Resolved pointers for episodic edges + resolved_episodic_edges = resolve_edge_pointers(episodic_edges, uuid_map) # save data to KG await add_nodes_and_edges_bulk( self.driver, episodes, - episodic_edges, - hydrated_nodes, - list(edges_by_uuid.values()), + resolved_episodic_edges, + final_hydrated_nodes, + resolved_edges + invalidated_edges, self.embedder, ) diff --git a/graphiti_core/prompts/dedupe_edges.py b/graphiti_core/prompts/dedupe_edges.py index 2177280a..36dbccd1 100644 --- a/graphiti_core/prompts/dedupe_edges.py +++ b/graphiti_core/prompts/dedupe_edges.py @@ -141,9 +141,9 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]: Task: - If the NEW FACT represents the same factual information as any fact in EXISTING FACTS, return the idx of the duplicate fact. + If the NEW FACT represents identical factual information of one or more in EXISTING FACTS, return the idx of the duplicate facts. Facts with similar information that contain key differences should not be marked as duplicates. - If the NEW FACT is not a duplicate of any of the EXISTING FACTS, return -1. + If the NEW FACT is not a duplicate of any of the EXISTING FACTS, return an empty list. Given the predefined FACT TYPES, determine if the NEW FACT should be classified as one of these types. Return the fact type as fact_type or DEFAULT if NEW FACT is not one of the FACT TYPES. @@ -153,8 +153,8 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]: If there are no contradicted facts, return an empty list. Guidelines: - 1. The facts do not need to be completely identical to be duplicates, they just need to express the same information. - 2. Some facts may be very similar but will have key differences, particularly around numeric values in the facts. + 1. Some facts may be very similar but will have key differences, particularly around numeric values in the facts. + Do not mark these facts as duplicates. """, ), ] diff --git a/graphiti_core/utils/bulk_utils.py b/graphiti_core/utils/bulk_utils.py index c10c070d..8d5e8a68 100644 --- a/graphiti_core/utils/bulk_utils.py +++ b/graphiti_core/utils/bulk_utils.py @@ -317,6 +317,12 @@ async def dedupe_edges_bulk( for existing_edge in existing_edges: # Approximate BM25 by checking for word overlaps (this is faster than creating many in-memory indices) # This approach will cast a wider net than BM25, which is ideal for this use case + if ( + edge.source_node_uuid != existing_edge.source_node_uuid + or edge.target_node_uuid != existing_edge.target_node_uuid + ): + continue + edge_words = set(edge.fact.lower().split()) existing_edge_words = set(existing_edge.fact.lower().split()) has_overlap = not edge_words.isdisjoint(existing_edge_words) @@ -345,6 +351,7 @@ async def dedupe_edges_bulk( ] ) + # For now we won't track edge invalidation duplicate_pairs: list[tuple[str, str]] = [] for i, (_, _, duplicates) in enumerate(bulk_edge_resolutions): episode, edge, candidates = dedupe_tuples[i] diff --git a/uv.lock b/uv.lock index dff9047d..ca86c214 100644 --- a/uv.lock +++ b/uv.lock @@ -746,7 +746,7 @@ wheels = [ [[package]] name = "graphiti-core" -version = "0.17.3" +version = "0.17.4" source = { editable = "." } dependencies = [ { name = "diskcache" },