From 51274972e8054ccd08e86a1f245e1340be9ed8f1 Mon Sep 17 00:00:00 2001 From: vasilije Date: Fri, 11 Jul 2025 19:31:14 +0200 Subject: [PATCH] small cleanup --- .../hybrid/falkordb/FalkorDBAdapter.py | 190 ++++---- .../utils/expand_with_nodes_and_edges.py | 425 ++++++++++-------- cognee/tests/test_falkordb.py | 11 +- 3 files changed, 359 insertions(+), 267 deletions(-) diff --git a/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py b/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py index affbb69aa..9d6d01df1 100644 --- a/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py +++ b/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py @@ -10,7 +10,13 @@ from typing import List, Dict, Any, Optional, Tuple, Type, Union from falkordb import FalkorDB from cognee.exceptions import InvalidValueError -from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface, record_graph_changes, NodeData, EdgeData, Node +from cognee.infrastructure.databases.graph.graph_db_interface import ( + GraphDBInterface, + record_graph_changes, + NodeData, + EdgeData, + Node, +) from cognee.infrastructure.databases.vector.embeddings import EmbeddingEngine from cognee.infrastructure.databases.vector.vector_db_interface import VectorDBInterface from cognee.infrastructure.engine import DataPoint @@ -176,7 +182,14 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): return f"'{json.dumps(value).replace(chr(39), chr(34))}'" if type(value) is str: # Escape single quotes and handle special characters - escaped_value = str(value).replace("'", "\\'").replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') + escaped_value = ( + str(value) + .replace("'", "\\'") + .replace('"', '\\"') + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t") + ) return f"'{escaped_value}'" return f"'{str(value)}'" @@ -215,7 +228,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): } ), } - + # Clean the properties - remove None values and handle special types clean_properties = {} for key, value in properties.items(): @@ -236,37 +249,35 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): SET node += $properties, node.updated_at = timestamp() """ ).strip() - - params = { - "node_id": str(data_point.id), - "properties": clean_properties - } + + params = {"node_id": str(data_point.id), "properties": clean_properties} return query, params def sanitize_relationship_name(self, relationship_name: str) -> str: """ Sanitize relationship name to be valid for Cypher queries. - + Parameters: ----------- - relationship_name (str): The original relationship name - + Returns: -------- - str: A sanitized relationship name valid for Cypher """ # Replace hyphens, spaces, and other special characters with underscores import re - sanitized = re.sub(r'[^\w]', '_', relationship_name) + + sanitized = re.sub(r"[^\w]", "_", relationship_name) # Remove consecutive underscores - sanitized = re.sub(r'_+', '_', sanitized) + sanitized = re.sub(r"_+", "_", sanitized) # Remove leading/trailing underscores - sanitized = sanitized.strip('_') + sanitized = sanitized.strip("_") # Ensure it starts with a letter or underscore - if sanitized and not sanitized[0].isalpha() and sanitized[0] != '_': - sanitized = '_' + sanitized - return sanitized or 'RELATIONSHIP' + if sanitized and not sanitized[0].isalpha() and sanitized[0] != "_": + sanitized = "_" + sanitized + return sanitized or "RELATIONSHIP" async def create_edge_query(self, edge: tuple[str, str, str, dict]) -> str: """ @@ -285,7 +296,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ # Sanitize the relationship name for Cypher compatibility sanitized_relationship = self.sanitize_relationship_name(edge[2]) - + # Add the original relationship name to properties edge_properties = {**edge[3], "relationship_name": edge[2]} properties = await self.stringify_properties(edge_properties) @@ -368,7 +379,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): else None for property_name in DataPoint.get_embeddable_property_names(data_point) ] - + query, params = await self.create_data_point_query(data_point, vectorized_data) self.query(query, params) @@ -460,13 +471,10 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): clean_properties[key] = f"vecf32({value})" else: clean_properties[key] = value - + query = "MERGE (node {id: $node_id}) SET node += $properties, node.updated_at = timestamp()" - params = { - "node_id": node_id, - "properties": clean_properties - } - + params = {"node_id": node_id, "properties": clean_properties} + self.query(query, params) # Keep the original create_data_points method for VectorDBInterface compatibility @@ -508,7 +516,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): else None for property_name in DataPoint.get_embeddable_property_names(data_point) ] - + query, params = await self.create_data_point_query(data_point, vectorized_data) self.query(query, params) @@ -550,11 +558,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): # Node is in (node_id, properties) format node_id, properties = node await self.add_node(node_id, properties) - elif hasattr(node, 'id') and hasattr(node, 'model_dump'): + elif hasattr(node, "id") and hasattr(node, "model_dump"): # Node is a DataPoint object await self.add_node(str(node.id), node.model_dump()) else: - raise ValueError(f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object.") + raise ValueError( + f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object." + ) async def add_edge( self, @@ -578,7 +588,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ if properties is None: properties = {} - + edge_tuple = (source_id, target_id, relationship_name, properties) query = await self.create_edge_query(edge_tuple) self.query(query) @@ -599,7 +609,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): source_id, target_id, relationship_name, properties = edge await self.add_edge(source_id, target_id, relationship_name, properties) else: - raise ValueError(f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties).") + raise ValueError( + f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties)." + ) async def has_edges(self, edges): """ @@ -782,7 +794,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): RETURN n, 1.0 as score LIMIT $limit """ - + params = {"query_text": query_text, "limit": limit} result = self.query(query, params) return result.result_set @@ -945,7 +957,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node) WHERE node.id = $node_id RETURN node", {"node_id": node_id}, ) - + if result.result_set and len(result.result_set) > 0: # FalkorDB returns node objects as first element in the result list return result.result_set[0][0].properties @@ -964,7 +976,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node) WHERE node.id IN $node_ids RETURN node", {"node_ids": node_ids}, ) - + nodes = [] if result.result_set: for record in result.result_set: @@ -985,7 +997,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node)-[]-(neighbor) WHERE node.id = $node_id RETURN DISTINCT neighbor", {"node_id": node_id}, ) - + neighbors = [] if result.result_set: for record in result.result_set: @@ -1010,17 +1022,19 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """, {"node_id": node_id}, ) - + edges = [] if result.result_set: for record in result.result_set: # FalkorDB returns values by index: source_id, target_id, relationship_name, properties - edges.append(( - record[0], # source_id - record[1], # target_id - record[2], # relationship_name - record[3] # properties - )) + edges.append( + ( + record[0], # source_id + record[1], # target_id + record[2], # relationship_name + record[3], # properties + ) + ) return edges async def has_edge(self, source_id: str, target_id: str, relationship_name: str) -> bool: @@ -1036,7 +1050,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ # Check both the sanitized relationship type and the original name in properties sanitized_relationship = self.sanitize_relationship_name(relationship_name) - + result = self.query( f""" MATCH (source)-[r:{sanitized_relationship}]->(target) @@ -1044,9 +1058,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): AND (r.relationship_name = $relationship_name OR NOT EXISTS(r.relationship_name)) RETURN COUNT(r) > 0 AS edge_exists """, - {"source_id": source_id, "target_id": target_id, "relationship_name": relationship_name}, + { + "source_id": source_id, + "target_id": target_id, + "relationship_name": relationship_name, + }, ) - + if result.result_set and len(result.result_set) > 0: # FalkorDB returns scalar results as a list, access by index instead of key return result.result_set[0][0] @@ -1065,11 +1083,11 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): # Get basic node and edge counts node_result = self.query("MATCH (n) RETURN count(n) AS node_count") edge_result = self.query("MATCH ()-[r]->() RETURN count(r) AS edge_count") - + # FalkorDB returns scalar results as a list, access by index instead of key num_nodes = node_result.result_set[0][0] if node_result.result_set else 0 num_edges = edge_result.result_set[0][0] if edge_result.result_set else 0 - + metrics = { "num_nodes": num_nodes, "num_edges": num_edges, @@ -1078,23 +1096,27 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "num_connected_components": 1, # Simplified for now "sizes_of_connected_components": [num_nodes] if num_nodes > 0 else [], } - + if include_optional: # Add optional metrics - simplified implementation - metrics.update({ - "num_selfloops": 0, # Simplified - "diameter": -1, # Not implemented - "avg_shortest_path_length": -1, # Not implemented - "avg_clustering": -1, # Not implemented - }) + metrics.update( + { + "num_selfloops": 0, # Simplified + "diameter": -1, # Not implemented + "avg_shortest_path_length": -1, # Not implemented + "avg_clustering": -1, # Not implemented + } + ) else: - metrics.update({ - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - }) - + metrics.update( + { + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } + ) + return metrics async def get_document_subgraph(self, content_hash: str): @@ -1117,9 +1139,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): COLLECT(DISTINCT c) AS made_from_nodes, COLLECT(DISTINCT et) AS orphan_types """ - + result = self.query(query, {"content_hash": f"text_{content_hash}"}) - + if not result.result_set or not result.result_set[0]: return None @@ -1154,7 +1176,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): RETURN n """ ) - + # FalkorDB returns node objects as first element in each record return [record[0] for record in result.result_set] if result.result_set else [] @@ -1171,7 +1193,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): - node_name (List[str]): A list of names of the nodes to include in the subgraph. """ label = node_type.__name__ - + # Find primary nodes of the specified type and names primary_query = f""" UNWIND $names AS wantedName @@ -1179,60 +1201,66 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): WHERE n.name = wantedName RETURN DISTINCT n.id, properties(n) AS properties """ - + primary_result = self.query(primary_query, {"names": node_name}) if not primary_result.result_set: return [], [] - - # FalkorDB returns values by index: id, properties + + # FalkorDB returns values by index: id, properties primary_ids = [record[0] for record in primary_result.result_set] - + # Find neighbors of primary nodes neighbor_query = """ MATCH (n)-[]-(neighbor) WHERE n.id IN $ids RETURN DISTINCT neighbor.id, properties(neighbor) AS properties """ - + neighbor_result = self.query(neighbor_query, {"ids": primary_ids}) # FalkorDB returns values by index: id, properties - neighbor_ids = [record[0] for record in neighbor_result.result_set] if neighbor_result.result_set else [] - + neighbor_ids = ( + [record[0] for record in neighbor_result.result_set] + if neighbor_result.result_set + else [] + ) + all_ids = list(set(primary_ids + neighbor_ids)) - + # Get all nodes in the subgraph nodes_query = """ MATCH (n) WHERE n.id IN $ids RETURN n.id, properties(n) AS properties """ - + nodes_result = self.query(nodes_query, {"ids": all_ids}) nodes = [] if nodes_result.result_set: for record in nodes_result.result_set: # FalkorDB returns values by index: id, properties nodes.append((record[0], record[1])) - + # Get edges between these nodes edges_query = """ MATCH (a)-[r]->(b) WHERE a.id IN $ids AND b.id IN $ids RETURN a.id AS source_id, b.id AS target_id, type(r) AS relationship_name, properties(r) AS properties """ - + edges_result = self.query(edges_query, {"ids": all_ids}) edges = [] if edges_result.result_set: for record in edges_result.result_set: # FalkorDB returns values by index: source_id, target_id, relationship_name, properties - edges.append(( - record[0], # source_id - record[1], # target_id - record[2], # relationship_name - record[3] # properties - )) - + edges.append( + ( + record[0], # source_id + record[1], # target_id + record[2], # relationship_name + record[3], # properties + ) + ) + return nodes, edges async def prune(self): diff --git a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py index f56e6d0ca..14f3e4a54 100644 --- a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py +++ b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py @@ -10,6 +10,202 @@ from cognee.modules.engine.utils import ( from cognee.shared.data_models import KnowledgeGraph from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver +# Constants for node key suffixes +TYPE_SUFFIX = "_type" +ENTITY_SUFFIX = "_entity" + + +def _create_ontology_node(ont_node, category: str, data_chunk: DocumentChunk): + """Create an ontology node based on category (classes or individuals).""" + ont_node_id = generate_node_id(ont_node.name) + ont_node_name = generate_node_name(ont_node.name) + + if category == "classes": + return f"{ont_node_id}{TYPE_SUFFIX}", EntityType( + id=ont_node_id, + name=ont_node_name, + description=ont_node_name, + ontology_valid=True, + ) + elif category == "individuals": + return f"{ont_node_id}{ENTITY_SUFFIX}", Entity( + id=ont_node_id, + name=ont_node_name, + description=ont_node_name, + ontology_valid=True, + belongs_to_set=data_chunk.belongs_to_set, + ) + return None, None + + +def _process_ontology_nodes( + ontology_nodes, data_chunk: DocumentChunk, added_nodes_map: dict, added_ontology_nodes_map: dict +): + """Process and add ontology nodes to the appropriate maps.""" + for ont_node in ontology_nodes: + ont_node_key, ont_node_obj = _create_ontology_node(ont_node, ont_node.category, data_chunk) + if ( + ont_node_key + and ont_node_obj + and ont_node_key not in added_nodes_map + and ont_node_key not in added_ontology_nodes_map + ): + added_ontology_nodes_map[ont_node_key] = ont_node_obj + + +def _process_ontology_edges( + ontology_edges, + existing_edges_map: dict, + ontology_relationships: list, + ontology_valid: bool = True, +): + """Process ontology edges and add them to relationships if not already existing.""" + for source, relation, target in ontology_edges: + source_node_id = generate_node_id(source) + target_node_id = generate_node_id(target) + relationship_name = generate_edge_name(relation) + edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" + + if edge_key not in existing_edges_map: + ontology_relationships.append( + ( + source_node_id, + target_node_id, + relationship_name, + { + "relationship_name": relationship_name, + "source_node_id": source_node_id, + "target_node_id": target_node_id, + "ontology_valid": ontology_valid, + }, + ) + ) + existing_edges_map[edge_key] = True + + +def _resolve_ontology_mapping(node_name: str, node_type: str, ontology_resolver: OntologyResolver): + """Resolve ontology mapping for a node and return validation result and closest match.""" + ontology_nodes, ontology_edges, closest_node = ontology_resolver.get_subgraph( + node_name=node_name, node_type=node_type + ) + + ontology_validated = bool(closest_node) + mapped_name = closest_node.name if closest_node else node_name + + return ontology_nodes, ontology_edges, ontology_validated, mapped_name + + +def _get_or_create_type_node( + node, + data_chunk: DocumentChunk, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, + added_ontology_nodes_map: dict, + name_mapping: dict, + key_mapping: dict, + existing_edges_map: dict, + ontology_relationships: list, +): + """Get or create a type node with ontology validation.""" + node_id = generate_node_id(node.id) + node_name = generate_node_name(node.name) + type_node_id = generate_node_id(node.type) + type_node_name = generate_node_name(node.type) + type_node_key = f"{type_node_id}{TYPE_SUFFIX}" + + # Check if node already exists + if type_node_key in added_nodes_map or type_node_key in key_mapping: + return added_nodes_map.get(type_node_key) or added_nodes_map.get( + key_mapping.get(type_node_key) + ) + + # Resolve ontology for type + ontology_nodes, ontology_edges, ontology_validated, mapped_type_name = ( + _resolve_ontology_mapping(type_node_name, "classes", ontology_resolver) + ) + + # Update mappings if ontology validation succeeded + if ontology_validated: + name_mapping[type_node_name] = mapped_type_name + old_key = type_node_key + type_node_id = generate_node_id(mapped_type_name) + type_node_key = f"{type_node_id}{TYPE_SUFFIX}" + type_node_name = generate_node_name(mapped_type_name) + key_mapping[old_key] = type_node_key + + # Create type node + type_node = EntityType( + id=type_node_id, + name=type_node_name, + type=type_node_name, + description=type_node_name, + ontology_valid=ontology_validated, + ) + added_nodes_map[type_node_key] = type_node + + # Process ontology nodes and edges + _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) + _process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships) + + return type_node + + +def _get_or_create_entity_node( + node, + type_node, + data_chunk: DocumentChunk, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, + added_ontology_nodes_map: dict, + name_mapping: dict, + key_mapping: dict, + existing_edges_map: dict, + ontology_relationships: list, +): + """Get or create an entity node with ontology validation.""" + node_id = generate_node_id(node.id) + node_name = generate_node_name(node.name) + entity_node_key = f"{node_id}{ENTITY_SUFFIX}" + + # Check if node already exists + if entity_node_key in added_nodes_map or entity_node_key in key_mapping: + return added_nodes_map.get(entity_node_key) or added_nodes_map.get( + key_mapping.get(entity_node_key) + ) + + # Resolve ontology for entity + ontology_nodes, ontology_edges, ontology_validated, mapped_entity_name = ( + _resolve_ontology_mapping(node_name, "individuals", ontology_resolver) + ) + + # Update mappings if ontology validation succeeded + if ontology_validated: + name_mapping[node_name] = mapped_entity_name + old_key = entity_node_key + node_id = generate_node_id(mapped_entity_name) + entity_node_key = f"{node_id}{ENTITY_SUFFIX}" + node_name = generate_node_name(mapped_entity_name) + key_mapping[old_key] = entity_node_key + + # Create entity node + entity_node = Entity( + id=node_id, + name=node_name, + is_a=type_node, + description=node.description, + ontology_valid=ontology_validated, + belongs_to_set=data_chunk.belongs_to_set, + ) + added_nodes_map[entity_node_key] = entity_node + + # Process ontology nodes and edges + _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) + _process_ontology_edges( + ontology_edges, existing_edges_map, ontology_relationships, ontology_valid=True + ) + + return entity_node + def expand_with_nodes_and_edges( data_chunks: list[DocumentChunk], @@ -17,17 +213,25 @@ def expand_with_nodes_and_edges( ontology_resolver: OntologyResolver = None, existing_edges_map: Optional[dict[str, bool]] = None, ): - if existing_edges_map is None: - existing_edges_map = {} + """ + Expand chunk graphs with nodes and edges, applying ontology validation. - if ontology_resolver is None: - ontology_resolver = OntologyResolver() + Args: + data_chunks: List of document chunks + chunk_graphs: List of knowledge graphs corresponding to chunks + ontology_resolver: Optional ontology resolver for validation + existing_edges_map: Optional map of existing edges to avoid duplicates + + Returns: + Tuple of (graph_nodes, graph_edges) + """ + existing_edges_map = existing_edges_map or {} + ontology_resolver = ontology_resolver or OntologyResolver() added_nodes_map = {} added_ontology_nodes_map = {} relationships = [] ontology_relationships = [] - name_mapping = {} key_mapping = {} @@ -35,184 +239,41 @@ def expand_with_nodes_and_edges( if not graph: continue + # Process nodes for node in graph.nodes: - node_id = generate_node_id(node.id) - node_name = generate_node_name(node.name) - type_node_id = generate_node_id(node.type) - type_node_name = generate_node_name(node.type) + # Get or create type node + type_node = _get_or_create_type_node( + node, + data_chunk, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + existing_edges_map, + ontology_relationships, + ) - ontology_validated_source_type = False - ontology_validated_source_ent = False - - type_node_key = f"{type_node_id}_type" - - if type_node_key not in added_nodes_map and type_node_key not in key_mapping: - ( - ontology_entity_type_nodes, - ontology_entity_type_edges, - ontology_closest_class_node, - ) = ontology_resolver.get_subgraph(node_name=type_node_name, node_type="classes") - - if ontology_closest_class_node: - name_mapping[type_node_name] = ontology_closest_class_node.name - ontology_validated_source_type = True - old_key = type_node_key - type_node_id = generate_node_id(ontology_closest_class_node.name) - type_node_key = f"{type_node_id}_type" - type_node_name = generate_node_name(ontology_closest_class_node.name) - key_mapping[old_key] = type_node_key - - type_node = EntityType( - id=type_node_id, - name=type_node_name, - type=type_node_name, - description=type_node_name, - ontology_valid=ontology_validated_source_type, - ) - added_nodes_map[type_node_key] = type_node - - for ontology_node_to_store in ontology_entity_type_nodes: - ont_node_id = generate_node_id(ontology_node_to_store.name) - ont_node_name = generate_node_name(ontology_node_to_store.name) - - if ontology_node_to_store.category == "classes": - ont_node_key = f"{ont_node_id}_type" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = EntityType( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - ) - - elif ontology_node_to_store.category == "individuals": - ont_node_key = f"{ont_node_id}_entity" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = Entity( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - belongs_to_set=data_chunk.belongs_to_set, - ) - - for source, relation, target in ontology_entity_type_edges: - source_node_id = generate_node_id(source) - target_node_id = generate_node_id(target) - relationship_name = generate_edge_name(relation) - edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" - - if edge_key not in existing_edges_map: - ontology_relationships.append( - ( - source_node_id, - target_node_id, - relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ), - ) - ) - existing_edges_map[edge_key] = True - else: - type_node = added_nodes_map.get(type_node_key) or added_nodes_map.get( - key_mapping.get(type_node_key) - ) - - entity_node_key = f"{node_id}_entity" - - if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping: - ontology_entity_nodes, ontology_entity_edges, start_ent_ont = ( - ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals") - ) - - if start_ent_ont: - name_mapping[node_name] = start_ent_ont.name - ontology_validated_source_ent = True - old_key = entity_node_key - node_id = generate_node_id(start_ent_ont.name) - entity_node_key = f"{node_id}_entity" - node_name = generate_node_name(start_ent_ont.name) - key_mapping[old_key] = entity_node_key - - entity_node = Entity( - id=node_id, - name=node_name, - is_a=type_node, - description=node.description, - ontology_valid=ontology_validated_source_ent, - belongs_to_set=data_chunk.belongs_to_set, - ) - - added_nodes_map[entity_node_key] = entity_node - - for ontology_node_to_store in ontology_entity_nodes: - ont_node_id = generate_node_id(ontology_node_to_store.name) - ont_node_name = generate_node_name(ontology_node_to_store.name) - - if ontology_node_to_store.category == "classes": - ont_node_key = f"{ont_node_id}_type" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = EntityType( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - ) - - elif ontology_node_to_store.category == "individuals": - ont_node_key = f"{ont_node_id}_entity" - if (ont_node_key not in added_nodes_map) and ( - ont_node_key not in added_ontology_nodes_map - ): - added_ontology_nodes_map[ont_node_key] = Entity( - id=ont_node_id, - name=ont_node_name, - description=ont_node_name, - ontology_valid=True, - belongs_to_set=data_chunk.belongs_to_set, - ) - - for source, relation, target in ontology_entity_edges: - source_node_id = generate_node_id(source) - target_node_id = generate_node_id(target) - relationship_name = generate_edge_name(relation) - edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}" - - if edge_key not in existing_edges_map: - ontology_relationships.append( - ( - source_node_id, - target_node_id, - relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ontology_valid=True, - ), - ) - ) - existing_edges_map[edge_key] = True - - else: - entity_node = added_nodes_map.get(entity_node_key) or added_nodes_map.get( - key_mapping.get(entity_node_key) - ) + # Get or create entity node + entity_node = _get_or_create_entity_node( + node, + type_node, + data_chunk, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + existing_edges_map, + ontology_relationships, + ) + # Add entity to chunk if data_chunk.contains is None: data_chunk.contains = [] - data_chunk.contains.append(entity_node) + # Process edges for edge in graph.edges: source_node_id = generate_node_id( name_mapping.get(edge.source_node_id, edge.source_node_id) @@ -229,12 +290,12 @@ def expand_with_nodes_and_edges( source_node_id, target_node_id, relationship_name, - dict( - relationship_name=relationship_name, - source_node_id=source_node_id, - target_node_id=target_node_id, - ontology_valid=False, - ), + { + "relationship_name": relationship_name, + "source_node_id": source_node_id, + "target_node_id": target_node_id, + "ontology_valid": False, + }, ) ) existing_edges_map[edge_key] = True diff --git a/cognee/tests/test_falkordb.py b/cognee/tests/test_falkordb.py index b17848ca0..7c3ceef64 100755 --- a/cognee/tests/test_falkordb.py +++ b/cognee/tests/test_falkordb.py @@ -14,6 +14,7 @@ async def check_falkordb_connection(): """Check if FalkorDB is available at localhost:6379""" try: from falkordb import FalkorDB + client = FalkorDB(host="localhost", port=6379) # Try to list graphs to check connection client.list_graphs() @@ -130,7 +131,7 @@ async def main(): # For FalkorDB vector engine, check if collections are empty # Since FalkorDB is a hybrid adapter, we can check if the graph is empty # as the vector data is stored in the same graph - if hasattr(vector_engine, 'driver'): + if hasattr(vector_engine, "driver"): # This is FalkorDB - check if graphs exist collections = vector_engine.driver.list_graphs() # The graph should be deleted, so either no graphs or empty graph @@ -138,7 +139,9 @@ async def main(): # Graph exists but should be empty vector_graph_data = await vector_engine.get_graph_data() vector_nodes, vector_edges = vector_graph_data - assert len(vector_nodes) == 0 and len(vector_edges) == 0, "FalkorDB vector database is not empty" + assert len(vector_nodes) == 0 and len(vector_edges) == 0, ( + "FalkorDB vector database is not empty" + ) else: # Fallback for other vector engines like LanceDB connection = await vector_engine.get_connection() @@ -153,7 +156,7 @@ async def main(): # For FalkorDB, check if the graph database is empty from cognee.infrastructure.databases.graph import get_graph_engine - + graph_engine = get_graph_engine() graph_data = await graph_engine.get_graph_data() nodes, edges = graph_data @@ -161,7 +164,7 @@ async def main(): print("🎉 FalkorDB test completed successfully!") print(" ✓ Data ingestion worked") - print(" ✓ Cognify processing worked") + print(" ✓ Cognify processing worked") print(" ✓ Search operations worked") print(" ✓ Cleanup worked")