diff --git a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py index f56e6d0ca..fb54770b3 100644 --- a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py +++ b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py @@ -10,6 +10,200 @@ from cognee.modules.engine.utils import ( from cognee.shared.data_models import KnowledgeGraph from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver +# Constants for node key suffixes +TYPE_SUFFIX = "_type" +ENTITY_SUFFIX = "_entity" + + +def _create_ontology_node(ont_node, category: str, data_chunk: DocumentChunk): + """Create an ontology node based on category (classes or individuals).""" + ont_node_id = generate_node_id(ont_node.name) + ont_node_name = generate_node_name(ont_node.name) + + if category == "classes": + return f"{ont_node_id}{TYPE_SUFFIX}", EntityType( + id=ont_node_id, + name=ont_node_name, + description=ont_node_name, + ontology_valid=True, + ) + elif category == "individuals": + return f"{ont_node_id}{ENTITY_SUFFIX}", Entity( + id=ont_node_id, + name=ont_node_name, + description=ont_node_name, + ontology_valid=True, + belongs_to_set=data_chunk.belongs_to_set, + ) + return None, None + + +def _process_ontology_nodes( + ontology_nodes, data_chunk: DocumentChunk, added_nodes_map: dict, added_ontology_nodes_map: dict +): + """Process and add ontology nodes to the appropriate maps.""" + for ont_node in ontology_nodes: + ont_node_key, ont_node_obj = _create_ontology_node(ont_node, ont_node.category, data_chunk) + if ( + ont_node_key + and ont_node_obj + and ont_node_key not in added_nodes_map + and ont_node_key not in added_ontology_nodes_map + ): + added_ontology_nodes_map[ont_node_key] = ont_node_obj + + +def _process_ontology_edges( + ontology_edges, + existing_edges_map: dict, + ontology_relationships: list, + ontology_valid: bool = True, +): + """Process ontology edges and add them to relationships if not already existing.""" + for source, relation, target in ontology_edges: + source_node_id = generate_node_id(source) + target_node_id = generate_node_id(target) + relationship_name = generate_edge_name(relation) + edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" + + if edge_key not in existing_edges_map: + ontology_relationships.append( + ( + source_node_id, + target_node_id, + relationship_name, + { + "relationship_name": relationship_name, + "source_node_id": source_node_id, + "target_node_id": target_node_id, + "ontology_valid": ontology_valid, + }, + ) + ) + existing_edges_map[edge_key] = True + + +def _resolve_ontology_mapping(node_name: str, node_type: str, ontology_resolver: OntologyResolver): + """Resolve ontology mapping for a node and return validation result and closest match.""" + ontology_nodes, ontology_edges, closest_node = ontology_resolver.get_subgraph( + node_name=node_name, node_type=node_type + ) + + ontology_validated = bool(closest_node) + mapped_name = closest_node.name if closest_node else node_name + + return ontology_nodes, ontology_edges, ontology_validated, mapped_name + + +def _get_or_create_type_node( + node, + data_chunk: DocumentChunk, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, + added_ontology_nodes_map: dict, + name_mapping: dict, + key_mapping: dict, + existing_edges_map: dict, + ontology_relationships: list, +): + """Get or create a type node with ontology validation.""" + type_node_id = generate_node_id(node.type) + type_node_name = generate_node_name(node.type) + type_node_key = f"{type_node_id}{TYPE_SUFFIX}" + + # Check if node already exists + if type_node_key in added_nodes_map or type_node_key in key_mapping: + return added_nodes_map.get(type_node_key) or added_nodes_map.get( + key_mapping.get(type_node_key) + ) + + # Resolve ontology for type + ontology_nodes, ontology_edges, ontology_validated, mapped_type_name = ( + _resolve_ontology_mapping(type_node_name, "classes", ontology_resolver) + ) + + # Update mappings if ontology validation succeeded + if ontology_validated: + name_mapping[type_node_name] = mapped_type_name + old_key = type_node_key + type_node_id = generate_node_id(mapped_type_name) + type_node_key = f"{type_node_id}{TYPE_SUFFIX}" + type_node_name = generate_node_name(mapped_type_name) + key_mapping[old_key] = type_node_key + + # Create type node + type_node = EntityType( + id=type_node_id, + name=type_node_name, + type=type_node_name, + description=type_node_name, + ontology_valid=ontology_validated, + ) + added_nodes_map[type_node_key] = type_node + + # Process ontology nodes and edges + _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) + _process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships) + + return type_node + + +def _get_or_create_entity_node( + node, + type_node, + data_chunk: DocumentChunk, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, + added_ontology_nodes_map: dict, + name_mapping: dict, + key_mapping: dict, + existing_edges_map: dict, + ontology_relationships: list, +): + """Get or create an entity node with ontology validation.""" + node_id = generate_node_id(node.id) + node_name = generate_node_name(node.name) + entity_node_key = f"{node_id}{ENTITY_SUFFIX}" + + # Check if node already exists + if entity_node_key in added_nodes_map or entity_node_key in key_mapping: + return added_nodes_map.get(entity_node_key) or added_nodes_map.get( + key_mapping.get(entity_node_key) + ) + + # Resolve ontology for entity + ontology_nodes, ontology_edges, ontology_validated, mapped_entity_name = ( + _resolve_ontology_mapping(node_name, "individuals", ontology_resolver) + ) + + # Update mappings if ontology validation succeeded + if ontology_validated: + name_mapping[node_name] = mapped_entity_name + old_key = entity_node_key + node_id = generate_node_id(mapped_entity_name) + entity_node_key = f"{node_id}{ENTITY_SUFFIX}" + node_name = generate_node_name(mapped_entity_name) + key_mapping[old_key] = entity_node_key + + # Create entity node + entity_node = Entity( + id=node_id, + name=node_name, + is_a=type_node, + description=node.description, + ontology_valid=ontology_validated, + belongs_to_set=data_chunk.belongs_to_set, + ) + added_nodes_map[entity_node_key] = entity_node + + # Process ontology nodes and edges + _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) + _process_ontology_edges( + ontology_edges, existing_edges_map, ontology_relationships, ontology_valid=True + ) + + return entity_node + def expand_with_nodes_and_edges( data_chunks: list[DocumentChunk], @@ -17,17 +211,25 @@ def expand_with_nodes_and_edges( ontology_resolver: OntologyResolver = None, existing_edges_map: Optional[dict[str, bool]] = None, ): - if existing_edges_map is None: - existing_edges_map = {} + """ + Expand chunk graphs with nodes and edges, applying ontology validation. - if ontology_resolver is None: - ontology_resolver = OntologyResolver() + Args: + data_chunks: List of document chunks + chunk_graphs: List of knowledge graphs corresponding to chunks + ontology_resolver: Optional ontology resolver for validation + existing_edges_map: Optional map of existing edges to avoid duplicates + + Returns: + Tuple of (graph_nodes, graph_edges) + """ + existing_edges_map = existing_edges_map or {} + ontology_resolver = ontology_resolver or OntologyResolver() added_nodes_map = {} added_ontology_nodes_map = {} relationships = [] ontology_relationships = [] - name_mapping = {} key_mapping = {} @@ -35,184 +237,41 @@ def expand_with_nodes_and_edges( if not graph: continue + # Process nodes for node in graph.nodes: - node_id = generate_node_id(node.id) - node_name = generate_node_name(node.name) - type_node_id = generate_node_id(node.type) - type_node_name = generate_node_name(node.type) + # Get or create type node + type_node = _get_or_create_type_node( + node, + data_chunk, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + existing_edges_map, + ontology_relationships, + ) - ontology_validated_source_type = False - ontology_validated_source_ent = False - - type_node_key = f"{type_node_id}_type" - - if type_node_key not in added_nodes_map and type_node_key not in key_mapping: - ( - ontology_entity_type_nodes, - ontology_entity_type_edges, - ontology_closest_class_node, - ) = ontology_resolver.get_subgraph(node_name=type_node_name, node_type="classes") - - if ontology_closest_class_node: - name_mapping[type_node_name] = ontology_closest_class_node.name - ontology_validated_source_type = True - old_key = type_node_key - type_node_id = generate_node_id(ontology_closest_class_node.name) - type_node_key = f"{type_node_id}_type" - type_node_name = generate_node_name(ontology_closest_class_node.name) - key_mapping[old_key] = type_node_key - - type_node = EntityType( - id=type_node_id, - name=type_node_name, - type=type_node_name, - description=type_node_name, - ontology_valid=ontology_validated_source_type, - ) - added_nodes_map[type_node_key] = type_node - - for ontology_node_to_store in ontology_entity_type_nodes: - ont_node_id = generate_node_id(ontology_node_to_store.name) - ont_node_name = generate_node_name(ontology_node_to_store.name) - - if ontology_node_to_store.category == "classes": - ont_node_key = f"{ont_node_id}_type" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = EntityType( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - ) - - elif ontology_node_to_store.category == "individuals": - ont_node_key = f"{ont_node_id}_entity" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = Entity( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - belongs_to_set=data_chunk.belongs_to_set, - ) - - for source, relation, target in ontology_entity_type_edges: - source_node_id = generate_node_id(source) - target_node_id = generate_node_id(target) - relationship_name = generate_edge_name(relation) - edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" - - if edge_key not in existing_edges_map: - ontology_relationships.append( - ( - source_node_id, - target_node_id, - relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ), - ) - ) - existing_edges_map[edge_key] = True - else: - type_node = added_nodes_map.get(type_node_key) or added_nodes_map.get( - key_mapping.get(type_node_key) - ) - - entity_node_key = f"{node_id}_entity" - - if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping: - ontology_entity_nodes, ontology_entity_edges, start_ent_ont = ( - ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals") - ) - - if start_ent_ont: - name_mapping[node_name] = start_ent_ont.name - ontology_validated_source_ent = True - old_key = entity_node_key - node_id = generate_node_id(start_ent_ont.name) - entity_node_key = f"{node_id}_entity" - node_name = generate_node_name(start_ent_ont.name) - key_mapping[old_key] = entity_node_key - - entity_node = Entity( - id=node_id, - name=node_name, - is_a=type_node, - description=node.description, - ontology_valid=ontology_validated_source_ent, - belongs_to_set=data_chunk.belongs_to_set, - ) - - added_nodes_map[entity_node_key] = entity_node - - for ontology_node_to_store in ontology_entity_nodes: - ont_node_id = generate_node_id(ontology_node_to_store.name) - ont_node_name = generate_node_name(ontology_node_to_store.name) - - if ontology_node_to_store.category == "classes": - ont_node_key = f"{ont_node_id}_type" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = EntityType( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - ) - - elif ontology_node_to_store.category == "individuals": - ont_node_key = f"{ont_node_id}_entity" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = Entity( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - belongs_to_set=data_chunk.belongs_to_set, - ) - - for source, relation, target in ontology_entity_edges: - source_node_id = generate_node_id(source) - target_node_id = generate_node_id(target) - relationship_name = generate_edge_name(relation) - edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" - - if edge_key not in existing_edges_map: - ontology_relationships.append( - ( - source_node_id, - target_node_id, - relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ontology_valid=True, - ), - ) - ) - existing_edges_map[edge_key] = True - - else: - entity_node = added_nodes_map.get(entity_node_key) or added_nodes_map.get( - key_mapping.get(entity_node_key) - ) + # Get or create entity node + entity_node = _get_or_create_entity_node( + node, + type_node, + data_chunk, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + existing_edges_map, + ontology_relationships, + ) + # Add entity to chunk if data_chunk.contains is None: data_chunk.contains = [] - data_chunk.contains.append(entity_node) + # Process edges for edge in graph.edges: source_node_id = generate_node_id( name_mapping.get(edge.source_node_id, edge.source_node_id) @@ -229,12 +288,12 @@ def expand_with_nodes_and_edges( source_node_id, target_node_id, relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ontology_valid=False, - ), + { + "relationship_name": relationship_name, + "source_node_id": source_node_id, + "target_node_id": target_node_id, + "ontology_valid": False, + }, ) ) existing_edges_map[edge_key] = True