Bulk updates (#732)

* updates

* update

* update

* typo

* linter
This commit is contained in:
Preston Rasmussen 2025-07-16 02:26:33 -04:00 committed by GitHub
parent 0de2812eb6
commit 5d45d71259
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 116 additions and 14 deletions

View file

@ -78,7 +78,7 @@ async def main(use_bulk: bool = False):
group_id = str(uuid4())
raw_episodes: list[RawEpisode] = []
for i, message in enumerate(messages[3:10]):
for i, message in enumerate(messages[3:14]):
raw_episodes.append(
RawEpisode(
name=f'Message {i}',
@ -94,7 +94,7 @@ async def main(use_bulk: bool = False):
group_id=group_id,
entity_types={'Person': Person},
edge_types={'IS_PRESIDENT_OF': IsPresidentOf},
edge_type_map={('Person', 'Entity'): ['PRESIDENT_OF']},
edge_type_map={('Person', 'Entity'): ['IS_PRESIDENT_OF']},
)
else:
for i, message in enumerate(messages[3:14]):

View file

@ -640,6 +640,7 @@ class Graphiti:
self.clients, extracted_nodes_bulk, episode_context, entity_types
)
# Create Episodic Edges
episodic_edges: list[EpisodicEdge] = []
for episode_uuid, nodes in nodes_by_episode.items():
episodic_edges.extend(build_episodic_edges(nodes, episode_uuid, now))
@ -695,18 +696,112 @@ class Graphiti:
hydrated_nodes = [node for nodes in new_hydrated_nodes for node in nodes]
# TODO: Resolve nodes and edges against the existing graph
edges_by_uuid: dict[str, EntityEdge] = {
edge.uuid: edge for edges in edges_by_episode.values() for edge in edges
}
# Update nodes_by_uuid map with the hydrated nodes
for hydrated_node in hydrated_nodes:
nodes_by_uuid[hydrated_node.uuid] = hydrated_node
# Resolve nodes and edges against the existing graph
nodes_by_episode_unique: dict[str, list[EntityNode]] = {}
nodes_uuid_set: set[str] = set()
for episode, _ in episode_context:
nodes_by_episode_unique[episode.uuid] = []
nodes = [nodes_by_uuid[node.uuid] for node in nodes_by_episode[episode.uuid]]
for node in nodes:
if node.uuid not in nodes_uuid_set:
nodes_by_episode_unique[episode.uuid].append(node)
nodes_uuid_set.add(node.uuid)
node_results = await semaphore_gather(
*[
resolve_extracted_nodes(
self.clients,
nodes_by_episode_unique[episode.uuid],
episode,
previous_episodes,
entity_types,
)
for episode, previous_episodes in episode_context
]
)
resolved_nodes: list[EntityNode] = []
uuid_map: dict[str, str] = {}
node_duplicates: list[tuple[EntityNode, EntityNode]] = []
for result in node_results:
resolved_nodes.extend(result[0])
uuid_map.update(result[1])
node_duplicates.extend(result[2])
# Update nodes_by_uuid map with the resolved nodes
for resolved_node in resolved_nodes:
nodes_by_uuid[resolved_node.uuid] = resolved_node
# update nodes_by_episode_unique mapping
for episode_uuid, nodes in nodes_by_episode_unique.items():
updated_nodes: list[EntityNode] = []
for node in nodes:
updated_node_uuid = uuid_map.get(node.uuid, node.uuid)
updated_node = nodes_by_uuid[updated_node_uuid]
updated_nodes.append(updated_node)
nodes_by_episode_unique[episode_uuid] = updated_nodes
hydrated_nodes_results: list[list[EntityNode]] = await semaphore_gather(
*[
extract_attributes_from_nodes(
self.clients,
nodes_by_episode_unique[episode.uuid],
episode,
previous_episodes,
entity_types,
)
for episode, previous_episodes in episode_context
]
)
final_hydrated_nodes = [node for nodes in hydrated_nodes_results for node in nodes]
edges_by_episode_unique: dict[str, list[EntityEdge]] = {}
edges_uuid_set: set[str] = set()
for episode_uuid, edges in edges_by_episode.items():
edges_with_updated_pointers = resolve_edge_pointers(edges, uuid_map)
edges_by_episode_unique[episode_uuid] = []
for edge in edges_with_updated_pointers:
if edge.uuid not in edges_uuid_set:
edges_by_episode_unique[episode_uuid].append(edge)
edges_uuid_set.add(edge.uuid)
edge_results = await semaphore_gather(
*[
resolve_extracted_edges(
self.clients,
edges_by_episode_unique[episode.uuid],
episode,
hydrated_nodes,
edge_types or {},
edge_type_map or edge_type_map_default,
)
for episode in episodes
]
)
resolved_edges: list[EntityEdge] = []
invalidated_edges: list[EntityEdge] = []
for result in edge_results:
resolved_edges.extend(result[0])
invalidated_edges.extend(result[1])
# Resolved pointers for episodic edges
resolved_episodic_edges = resolve_edge_pointers(episodic_edges, uuid_map)
# save data to KG
await add_nodes_and_edges_bulk(
self.driver,
episodes,
episodic_edges,
hydrated_nodes,
list(edges_by_uuid.values()),
resolved_episodic_edges,
final_hydrated_nodes,
resolved_edges + invalidated_edges,
self.embedder,
)

View file

@ -141,9 +141,9 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]:
Task:
If the NEW FACT represents the same factual information as any fact in EXISTING FACTS, return the idx of the duplicate fact.
If the NEW FACT represents identical factual information of one or more in EXISTING FACTS, return the idx of the duplicate facts.
Facts with similar information that contain key differences should not be marked as duplicates.
If the NEW FACT is not a duplicate of any of the EXISTING FACTS, return -1.
If the NEW FACT is not a duplicate of any of the EXISTING FACTS, return an empty list.
Given the predefined FACT TYPES, determine if the NEW FACT should be classified as one of these types.
Return the fact type as fact_type or DEFAULT if NEW FACT is not one of the FACT TYPES.
@ -153,8 +153,8 @@ def resolve_edge(context: dict[str, Any]) -> list[Message]:
If there are no contradicted facts, return an empty list.
Guidelines:
1. The facts do not need to be completely identical to be duplicates, they just need to express the same information.
2. Some facts may be very similar but will have key differences, particularly around numeric values in the facts.
1. Some facts may be very similar but will have key differences, particularly around numeric values in the facts.
Do not mark these facts as duplicates.
""",
),
]

View file

@ -317,6 +317,12 @@ async def dedupe_edges_bulk(
for existing_edge in existing_edges:
# Approximate BM25 by checking for word overlaps (this is faster than creating many in-memory indices)
# This approach will cast a wider net than BM25, which is ideal for this use case
if (
edge.source_node_uuid != existing_edge.source_node_uuid
or edge.target_node_uuid != existing_edge.target_node_uuid
):
continue
edge_words = set(edge.fact.lower().split())
existing_edge_words = set(existing_edge.fact.lower().split())
has_overlap = not edge_words.isdisjoint(existing_edge_words)
@ -345,6 +351,7 @@ async def dedupe_edges_bulk(
]
)
# For now we won't track edge invalidation
duplicate_pairs: list[tuple[str, str]] = []
for i, (_, _, duplicates) in enumerate(bulk_edge_resolutions):
episode, edge, candidates = dedupe_tuples[i]

2
uv.lock generated
View file

@ -746,7 +746,7 @@ wheels = [
[[package]]
name = "graphiti-core"
version = "0.17.3"
version = "0.17.4"
source = { editable = "." }
dependencies = [
{ name = "diskcache" },