""" Copyright 2024, Zep Software, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import logging from time import time from typing import Any from pydantic import BaseModel from graphiti_core.graphiti_types import GraphitiClients from graphiti_core.helpers import MAX_REFLEXION_ITERATIONS, semaphore_gather from graphiti_core.llm_client import LLMClient from graphiti_core.llm_client.config import ModelSize from graphiti_core.nodes import ( EntityNode, EpisodeType, EpisodicNode, create_entity_node_embeddings, ) from graphiti_core.prompts import prompt_library from graphiti_core.prompts.dedupe_nodes import NodeDuplicate, NodeResolutions from graphiti_core.prompts.extract_nodes import ( EntitySummary, ExtractedEntities, ExtractedEntity, MissedEntities, ) from graphiti_core.search.search import search from graphiti_core.search.search_config import SearchResults from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_RRF from graphiti_core.search.search_filters import SearchFilters from graphiti_core.utils.datetime_utils import utc_now from graphiti_core.utils.maintenance.dedup_helpers import ( DedupCandidateIndexes, DedupResolutionState, _build_candidate_indexes, _resolve_with_similarity, ) from graphiti_core.utils.maintenance.edge_operations import ( filter_existing_duplicate_of_edges, ) logger = logging.getLogger(__name__) async def extract_nodes_reflexion( llm_client: LLMClient, episode: EpisodicNode, previous_episodes: list[EpisodicNode], node_names: list[str], ensure_ascii: bool = False, ) -> list[str]: # Prepare context for LLM context = { 'episode_content': episode.content, 'previous_episodes': [ep.content for ep in previous_episodes], 'extracted_entities': node_names, 'ensure_ascii': ensure_ascii, } llm_response = await llm_client.generate_response( prompt_library.extract_nodes.reflexion(context), MissedEntities ) missed_entities = llm_response.get('missed_entities', []) return missed_entities async def extract_nodes( clients: GraphitiClients, episode: EpisodicNode, previous_episodes: list[EpisodicNode], entity_types: dict[str, type[BaseModel]] | None = None, excluded_entity_types: list[str] | None = None, ) -> list[EntityNode]: start = time() llm_client = clients.llm_client llm_response = {} custom_prompt = '' entities_missed = True reflexion_iterations = 0 entity_types_context = [ { 'entity_type_id': 0, 'entity_type_name': 'Entity', 'entity_type_description': 'Default entity classification. Use this entity type if the entity is not one of the other listed types.', } ] entity_types_context += ( [ { 'entity_type_id': i + 1, 'entity_type_name': type_name, 'entity_type_description': type_model.__doc__, } for i, (type_name, type_model) in enumerate(entity_types.items()) ] if entity_types is not None else [] ) context = { 'episode_content': episode.content, 'episode_timestamp': episode.valid_at.isoformat(), 'previous_episodes': [ep.content for ep in previous_episodes], 'custom_prompt': custom_prompt, 'entity_types': entity_types_context, 'source_description': episode.source_description, 'ensure_ascii': clients.ensure_ascii, } while entities_missed and reflexion_iterations <= MAX_REFLEXION_ITERATIONS: if episode.source == EpisodeType.message: llm_response = await llm_client.generate_response( prompt_library.extract_nodes.extract_message(context), response_model=ExtractedEntities, ) elif episode.source == EpisodeType.text: llm_response = await llm_client.generate_response( prompt_library.extract_nodes.extract_text(context), response_model=ExtractedEntities, ) elif episode.source == EpisodeType.json: llm_response = await llm_client.generate_response( prompt_library.extract_nodes.extract_json(context), response_model=ExtractedEntities, ) response_object = ExtractedEntities(**llm_response) extracted_entities: list[ExtractedEntity] = response_object.extracted_entities reflexion_iterations += 1 if reflexion_iterations < MAX_REFLEXION_ITERATIONS: missing_entities = await extract_nodes_reflexion( llm_client, episode, previous_episodes, [entity.name for entity in extracted_entities], clients.ensure_ascii, ) entities_missed = len(missing_entities) != 0 custom_prompt = 'Make sure that the following entities are extracted: ' for entity in missing_entities: custom_prompt += f'\n{entity},' filtered_extracted_entities = [entity for entity in extracted_entities if entity.name.strip()] end = time() logger.debug(f'Extracted new nodes: {filtered_extracted_entities} in {(end - start) * 1000} ms') # Convert the extracted data into EntityNode objects extracted_nodes = [] for extracted_entity in filtered_extracted_entities: type_id = extracted_entity.entity_type_id if 0 <= type_id < len(entity_types_context): entity_type_name = entity_types_context[extracted_entity.entity_type_id].get( 'entity_type_name' ) else: entity_type_name = 'Entity' # Check if this entity type should be excluded if excluded_entity_types and entity_type_name in excluded_entity_types: logger.debug(f'Excluding entity "{extracted_entity.name}" of type "{entity_type_name}"') continue labels: list[str] = list({'Entity', str(entity_type_name)}) new_node = EntityNode( name=extracted_entity.name, group_id=episode.group_id, labels=labels, summary='', created_at=utc_now(), ) extracted_nodes.append(new_node) logger.debug(f'Created new node: {new_node.name} (UUID: {new_node.uuid})') logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}') return extracted_nodes async def _collect_candidate_nodes( clients: GraphitiClients, extracted_nodes: list[EntityNode], existing_nodes_override: list[EntityNode] | None, ) -> list[EntityNode]: """Search per extracted name and return unique candidates with overrides honored in order.""" search_results: list[SearchResults] = await semaphore_gather( *[ search( clients=clients, query=node.name, group_ids=[node.group_id], search_filter=SearchFilters(), config=NODE_HYBRID_SEARCH_RRF, ) for node in extracted_nodes ] ) candidate_nodes: list[EntityNode] = [node for result in search_results for node in result.nodes] if existing_nodes_override is not None: candidate_nodes.extend(existing_nodes_override) seen_candidate_uuids: set[str] = set() ordered_candidates: list[EntityNode] = [] for candidate in candidate_nodes: if candidate.uuid in seen_candidate_uuids: continue seen_candidate_uuids.add(candidate.uuid) ordered_candidates.append(candidate) return ordered_candidates async def _resolve_with_llm( llm_client: LLMClient, extracted_nodes: list[EntityNode], indexes: DedupCandidateIndexes, state: DedupResolutionState, ensure_ascii: bool, episode: EpisodicNode | None, previous_episodes: list[EpisodicNode] | None, entity_types: dict[str, type[BaseModel]] | None, ) -> None: """Escalate unresolved nodes to the dedupe prompt so the LLM can select or reject duplicates.""" if not state.unresolved_indices: return entity_types_dict: dict[str, type[BaseModel]] = entity_types if entity_types is not None else {} llm_extracted_nodes = [extracted_nodes[i] for i in state.unresolved_indices] extracted_nodes_context = [ { 'id': i, 'name': node.name, 'entity_type': node.labels, 'entity_type_description': entity_types_dict.get( next((item for item in node.labels if item != 'Entity'), '') ).__doc__ or 'Default Entity Type', } for i, node in enumerate(llm_extracted_nodes) ] existing_nodes_context = [ { **{ 'idx': i, 'name': candidate.name, 'entity_types': candidate.labels, }, **candidate.attributes, } for i, candidate in enumerate(indexes.existing_nodes) ] context = { 'extracted_nodes': extracted_nodes_context, 'existing_nodes': existing_nodes_context, 'episode_content': episode.content if episode is not None else '', 'previous_episodes': ( [ep.content for ep in previous_episodes] if previous_episodes is not None else [] ), 'ensure_ascii': ensure_ascii, } llm_response = await llm_client.generate_response( prompt_library.dedupe_nodes.nodes(context), response_model=NodeResolutions, ) node_resolutions: list[NodeDuplicate] = NodeResolutions(**llm_response).entity_resolutions for resolution in node_resolutions: relative_id: int = resolution.id duplicate_idx: int = resolution.duplicate_idx original_index = state.unresolved_indices[relative_id] extracted_node = extracted_nodes[original_index] resolved_node = ( indexes.existing_nodes[duplicate_idx] if 0 <= duplicate_idx < len(indexes.existing_nodes) else extracted_node ) state.resolved_nodes[original_index] = resolved_node state.uuid_map[extracted_node.uuid] = resolved_node.uuid async def resolve_extracted_nodes( clients: GraphitiClients, extracted_nodes: list[EntityNode], episode: EpisodicNode | None = None, previous_episodes: list[EpisodicNode] | None = None, entity_types: dict[str, type[BaseModel]] | None = None, existing_nodes_override: list[EntityNode] | None = None, ) -> tuple[list[EntityNode], dict[str, str], list[tuple[EntityNode, EntityNode]]]: """Search for existing nodes, resolve deterministic matches, then escalate holdouts to the LLM dedupe prompt.""" llm_client = clients.llm_client driver = clients.driver existing_nodes = await _collect_candidate_nodes( clients, extracted_nodes, existing_nodes_override, ) indexes: DedupCandidateIndexes = _build_candidate_indexes(existing_nodes) state = DedupResolutionState( resolved_nodes=[None] * len(extracted_nodes), uuid_map={}, unresolved_indices=[], ) node_duplicates: list[tuple[EntityNode, EntityNode]] = [] _resolve_with_similarity(extracted_nodes, indexes, state) await _resolve_with_llm( llm_client, extracted_nodes, indexes, state, clients.ensure_ascii, episode, previous_episodes, entity_types, ) for idx, node in enumerate(extracted_nodes): if state.resolved_nodes[idx] is None: state.resolved_nodes[idx] = node state.uuid_map[node.uuid] = node.uuid logger.debug( 'Resolved nodes: %s', [(node.name, node.uuid) for node in state.resolved_nodes if node is not None], ) new_node_duplicates: list[ tuple[EntityNode, EntityNode] ] = await filter_existing_duplicate_of_edges(driver, node_duplicates) return ( [node for node in state.resolved_nodes if node is not None], state.uuid_map, new_node_duplicates, ) async def extract_attributes_from_nodes( clients: GraphitiClients, nodes: list[EntityNode], episode: EpisodicNode | None = None, previous_episodes: list[EpisodicNode] | None = None, entity_types: dict[str, type[BaseModel]] | None = None, ) -> list[EntityNode]: llm_client = clients.llm_client embedder = clients.embedder updated_nodes: list[EntityNode] = await semaphore_gather( *[ extract_attributes_from_node( llm_client, node, episode, previous_episodes, ( entity_types.get(next((item for item in node.labels if item != 'Entity'), '')) if entity_types is not None else None ), clients.ensure_ascii, ) for node in nodes ] ) await create_entity_node_embeddings(embedder, updated_nodes) return updated_nodes async def extract_attributes_from_node( llm_client: LLMClient, node: EntityNode, episode: EpisodicNode | None = None, previous_episodes: list[EpisodicNode] | None = None, entity_type: type[BaseModel] | None = None, ensure_ascii: bool = False, ) -> EntityNode: node_context: dict[str, Any] = { 'name': node.name, 'summary': node.summary, 'entity_types': node.labels, 'attributes': node.attributes, } attributes_context: dict[str, Any] = { 'node': node_context, 'episode_content': episode.content if episode is not None else '', 'previous_episodes': ( [ep.content for ep in previous_episodes] if previous_episodes is not None else [] ), 'ensure_ascii': ensure_ascii, } summary_context: dict[str, Any] = { 'node': node_context, 'episode_content': episode.content if episode is not None else '', 'previous_episodes': ( [ep.content for ep in previous_episodes] if previous_episodes is not None else [] ), 'ensure_ascii': ensure_ascii, } has_entity_attributes: bool = bool( entity_type is not None and len(entity_type.model_fields) != 0 ) llm_response = ( ( await llm_client.generate_response( prompt_library.extract_nodes.extract_attributes(attributes_context), response_model=entity_type, model_size=ModelSize.small, ) ) if has_entity_attributes else {} ) summary_response = await llm_client.generate_response( prompt_library.extract_nodes.extract_summary(summary_context), response_model=EntitySummary, model_size=ModelSize.small, ) if has_entity_attributes and entity_type is not None: entity_type(**llm_response) node.summary = summary_response.get('summary', '') node_attributes = {key: value for key, value in llm_response.items()} node.attributes.update(node_attributes) return node