graphiti/graphiti_core/utils/maintenance/node_operations.py
Preston Rasmussen e9e511b16d
filter out empty node names (#427)
* filter out empty node names

* Update graphiti_core/utils/maintenance/node_operations.py

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
2025-04-30 21:40:36 -04:00

463 lines
15 KiB
Python

"""
Copyright 2024, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from contextlib import suppress
from time import time
from typing import Any
import pydantic
from pydantic import BaseModel, Field
from graphiti_core.graphiti_types import GraphitiClients
from graphiti_core.helpers import MAX_REFLEXION_ITERATIONS, semaphore_gather
from graphiti_core.llm_client import LLMClient
from graphiti_core.nodes import EntityNode, EpisodeType, EpisodicNode, create_entity_node_embeddings
from graphiti_core.prompts import prompt_library
from graphiti_core.prompts.dedupe_nodes import NodeDuplicate
from graphiti_core.prompts.extract_nodes import (
ExtractedEntities,
ExtractedEntity,
MissedEntities,
)
from graphiti_core.search.search_filters import SearchFilters
from graphiti_core.search.search_utils import get_relevant_nodes
from graphiti_core.utils.datetime_utils import utc_now
logger = logging.getLogger(__name__)
async def extract_nodes_reflexion(
llm_client: LLMClient,
episode: EpisodicNode,
previous_episodes: list[EpisodicNode],
node_names: list[str],
) -> list[str]:
# Prepare context for LLM
context = {
'episode_content': episode.content,
'previous_episodes': [ep.content for ep in previous_episodes],
'extracted_entities': node_names,
}
llm_response = await llm_client.generate_response(
prompt_library.extract_nodes.reflexion(context), MissedEntities
)
missed_entities = llm_response.get('missed_entities', [])
return missed_entities
async def extract_nodes(
clients: GraphitiClients,
episode: EpisodicNode,
previous_episodes: list[EpisodicNode],
entity_types: dict[str, BaseModel] | None = None,
) -> list[EntityNode]:
start = time()
llm_client = clients.llm_client
embedder = clients.embedder
llm_response = {}
custom_prompt = ''
entities_missed = True
reflexion_iterations = 0
entity_types_context = [
{
'entity_type_id': 0,
'entity_type_name': 'Entity',
'entity_type_description': 'Default entity classification. Use this entity type if the entity is not one of the other listed types.',
}
]
entity_types_context += (
[
{
'entity_type_id': i + 1,
'entity_type_name': type_name,
'entity_type_description': type_model.__doc__,
}
for i, (type_name, type_model) in enumerate(entity_types.items())
]
if entity_types is not None
else []
)
context = {
'episode_content': episode.content,
'episode_timestamp': episode.valid_at.isoformat(),
'previous_episodes': [ep.content for ep in previous_episodes],
'custom_prompt': custom_prompt,
'entity_types': entity_types_context,
'source_description': episode.source_description,
}
while entities_missed and reflexion_iterations <= MAX_REFLEXION_ITERATIONS:
if episode.source == EpisodeType.message:
llm_response = await llm_client.generate_response(
prompt_library.extract_nodes.extract_message(context),
response_model=ExtractedEntities,
)
elif episode.source == EpisodeType.text:
llm_response = await llm_client.generate_response(
prompt_library.extract_nodes.extract_text(context), response_model=ExtractedEntities
)
elif episode.source == EpisodeType.json:
llm_response = await llm_client.generate_response(
prompt_library.extract_nodes.extract_json(context), response_model=ExtractedEntities
)
extracted_entities: list[ExtractedEntity] = [
ExtractedEntity(**entity_types_context)
for entity_types_context in llm_response.get('extracted_entities', [])
]
reflexion_iterations += 1
if reflexion_iterations < MAX_REFLEXION_ITERATIONS:
missing_entities = await extract_nodes_reflexion(
llm_client,
episode,
previous_episodes,
[entity.name for entity in extracted_entities],
)
entities_missed = len(missing_entities) != 0
custom_prompt = 'Make sure that the following entities are extracted: '
for entity in missing_entities:
custom_prompt += f'\n{entity},'
filtered_extracted_entities = [entity for entity in extracted_entities if entity.name.strip()]
end = time()
logger.debug(f'Extracted new nodes: {filtered_extracted_entities} in {(end - start) * 1000} ms')
# Convert the extracted data into EntityNode objects
extracted_nodes = []
for extracted_entity in filtered_extracted_entities:
entity_type_name = entity_types_context[extracted_entity.entity_type_id].get(
'entity_type_name'
)
labels: list[str] = list({'Entity', str(entity_type_name)})
new_node = EntityNode(
name=extracted_entity.name,
group_id=episode.group_id,
labels=labels,
summary='',
created_at=utc_now(),
)
extracted_nodes.append(new_node)
logger.debug(f'Created new node: {new_node.name} (UUID: {new_node.uuid})')
await create_entity_node_embeddings(embedder, extracted_nodes)
logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}')
return extracted_nodes
async def dedupe_extracted_nodes(
llm_client: LLMClient,
extracted_nodes: list[EntityNode],
existing_nodes: list[EntityNode],
) -> tuple[list[EntityNode], dict[str, str]]:
start = time()
# build existing node map
node_map: dict[str, EntityNode] = {}
for node in existing_nodes:
node_map[node.uuid] = node
# Prepare context for LLM
existing_nodes_context = [
{'uuid': node.uuid, 'name': node.name, 'summary': node.summary} for node in existing_nodes
]
extracted_nodes_context = [
{'uuid': node.uuid, 'name': node.name, 'summary': node.summary} for node in extracted_nodes
]
context = {
'existing_nodes': existing_nodes_context,
'extracted_nodes': extracted_nodes_context,
}
llm_response = await llm_client.generate_response(prompt_library.dedupe_nodes.node(context))
duplicate_data = llm_response.get('duplicates', [])
end = time()
logger.debug(f'Deduplicated nodes: {duplicate_data} in {(end - start) * 1000} ms')
uuid_map: dict[str, str] = {}
for duplicate in duplicate_data:
uuid_value = duplicate['duplicate_of']
uuid_map[duplicate['uuid']] = uuid_value
nodes: list[EntityNode] = []
for node in extracted_nodes:
if node.uuid in uuid_map:
existing_uuid = uuid_map[node.uuid]
existing_node = node_map[existing_uuid]
nodes.append(existing_node)
else:
nodes.append(node)
return nodes, uuid_map
async def resolve_extracted_nodes(
clients: GraphitiClients,
extracted_nodes: list[EntityNode],
episode: EpisodicNode | None = None,
previous_episodes: list[EpisodicNode] | None = None,
entity_types: dict[str, BaseModel] | None = None,
) -> tuple[list[EntityNode], dict[str, str]]:
llm_client = clients.llm_client
driver = clients.driver
# Find relevant nodes already in the graph
existing_nodes_lists: list[list[EntityNode]] = await get_relevant_nodes(
driver, extracted_nodes, SearchFilters()
)
resolved_nodes: list[EntityNode] = await semaphore_gather(
*[
resolve_extracted_node(
llm_client,
extracted_node,
existing_nodes,
episode,
previous_episodes,
entity_types.get(
next((item for item in extracted_node.labels if item != 'Entity'), '')
)
if entity_types is not None
else None,
)
for extracted_node, existing_nodes in zip(
extracted_nodes, existing_nodes_lists, strict=True
)
]
)
uuid_map: dict[str, str] = {}
for extracted_node, resolved_node in zip(extracted_nodes, resolved_nodes, strict=True):
uuid_map[extracted_node.uuid] = resolved_node.uuid
logger.debug(f'Resolved nodes: {[(n.name, n.uuid) for n in resolved_nodes]}')
return resolved_nodes, uuid_map
async def resolve_extracted_node(
llm_client: LLMClient,
extracted_node: EntityNode,
existing_nodes: list[EntityNode],
episode: EpisodicNode | None = None,
previous_episodes: list[EpisodicNode] | None = None,
entity_type: BaseModel | None = None,
) -> EntityNode:
start = time()
if len(existing_nodes) == 0:
return extracted_node
# Prepare context for LLM
existing_nodes_context = [
{
**{
'id': i,
'name': node.name,
'entity_types': node.labels,
'summary': node.summary,
},
**node.attributes,
}
for i, node in enumerate(existing_nodes)
]
extracted_node_context = {
'name': extracted_node.name,
'entity_type': entity_type.__name__ if entity_type is not None else 'Entity', # type: ignore
'entity_type_description': entity_type.__doc__
if entity_type is not None
else 'Default Entity Type',
}
context = {
'existing_nodes': existing_nodes_context,
'extracted_node': extracted_node_context,
'episode_content': episode.content if episode is not None else '',
'previous_episodes': [ep.content for ep in previous_episodes]
if previous_episodes is not None
else [],
}
llm_response = await llm_client.generate_response(
prompt_library.dedupe_nodes.node(context), response_model=NodeDuplicate
)
duplicate_id: int = llm_response.get('duplicate_node_id', -1)
node = (
existing_nodes[duplicate_id] if 0 <= duplicate_id < len(existing_nodes) else extracted_node
)
end = time()
logger.debug(
f'Resolved node: {extracted_node.name} is {node.name}, in {(end - start) * 1000} ms'
)
return node
async def extract_attributes_from_nodes(
clients: GraphitiClients,
nodes: list[EntityNode],
episode: EpisodicNode | None = None,
previous_episodes: list[EpisodicNode] | None = None,
entity_types: dict[str, BaseModel] | None = None,
) -> list[EntityNode]:
llm_client = clients.llm_client
embedder = clients.embedder
updated_nodes: list[EntityNode] = await semaphore_gather(
*[
extract_attributes_from_node(
llm_client,
node,
episode,
previous_episodes,
entity_types.get(next((item for item in node.labels if item != 'Entity'), ''))
if entity_types is not None
else None,
)
for node in nodes
]
)
await create_entity_node_embeddings(embedder, updated_nodes)
return updated_nodes
async def extract_attributes_from_node(
llm_client: LLMClient,
node: EntityNode,
episode: EpisodicNode | None = None,
previous_episodes: list[EpisodicNode] | None = None,
entity_type: BaseModel | None = None,
) -> EntityNode:
node_context: dict[str, Any] = {
'name': node.name,
'summary': node.summary,
'entity_types': node.labels,
'attributes': node.attributes,
}
attributes_definitions: dict[str, Any] = {
'summary': (
str,
Field(
description='Summary containing the important information about the entity. Under 200 words',
),
),
'name': (
str,
Field(description='Name of the ENTITY'),
),
}
if entity_type is not None:
for field_name, field_info in entity_type.model_fields.items():
attributes_definitions[field_name] = (
field_info.annotation,
Field(description=field_info.description),
)
entity_attributes_model = pydantic.create_model('EntityAttributes', **attributes_definitions)
summary_context: dict[str, Any] = {
'node': node_context,
'episode_content': episode.content if episode is not None else '',
'previous_episodes': [ep.content for ep in previous_episodes]
if previous_episodes is not None
else [],
}
llm_response = await llm_client.generate_response(
prompt_library.extract_nodes.extract_attributes(summary_context),
response_model=entity_attributes_model,
)
node.summary = llm_response.get('summary', node.summary)
node.name = llm_response.get('name', node.name)
node_attributes = {key: value for key, value in llm_response.items()}
with suppress(KeyError):
del node_attributes['summary']
del node_attributes['name']
node.attributes.update(node_attributes)
return node
async def dedupe_node_list(
llm_client: LLMClient,
nodes: list[EntityNode],
) -> tuple[list[EntityNode], dict[str, str]]:
start = time()
# build node map
node_map = {}
for node in nodes:
node_map[node.uuid] = node
# Prepare context for LLM
nodes_context = [
{'uuid': node.uuid, 'name': node.name, 'summary': node.summary}.update(node.attributes)
for node in nodes
]
context = {
'nodes': nodes_context,
}
llm_response = await llm_client.generate_response(
prompt_library.dedupe_nodes.node_list(context)
)
nodes_data = llm_response.get('nodes', [])
end = time()
logger.debug(f'Deduplicated nodes: {nodes_data} in {(end - start) * 1000} ms')
# Get full node data
unique_nodes = []
uuid_map: dict[str, str] = {}
for node_data in nodes_data:
node_instance: EntityNode | None = node_map.get(node_data['uuids'][0])
if node_instance is None:
logger.warning(f'Node {node_data["uuids"][0]} not found in node map')
continue
node_instance.summary = node_data['summary']
unique_nodes.append(node_instance)
for uuid in node_data['uuids'][1:]:
uuid_value = node_map[node_data['uuids'][0]].uuid
uuid_map[uuid] = uuid_value
return unique_nodes, uuid_map