diff --git a/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py b/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py index affbb69aa..9d6d01df1 100644 --- a/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py +++ b/cognee/infrastructure/databases/hybrid/falkordb/FalkorDBAdapter.py @@ -10,7 +10,13 @@ from typing import List, Dict, Any, Optional, Tuple, Type, Union from falkordb import FalkorDB from cognee.exceptions import InvalidValueError -from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface, record_graph_changes, NodeData, EdgeData, Node +from cognee.infrastructure.databases.graph.graph_db_interface import ( + GraphDBInterface, + record_graph_changes, + NodeData, + EdgeData, + Node, +) from cognee.infrastructure.databases.vector.embeddings import EmbeddingEngine from cognee.infrastructure.databases.vector.vector_db_interface import VectorDBInterface from cognee.infrastructure.engine import DataPoint @@ -176,7 +182,14 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): return f"'{json.dumps(value).replace(chr(39), chr(34))}'" if type(value) is str: # Escape single quotes and handle special characters - escaped_value = str(value).replace("'", "\\'").replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') + escaped_value = ( + str(value) + .replace("'", "\\'") + .replace('"', '\\"') + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t") + ) return f"'{escaped_value}'" return f"'{str(value)}'" @@ -215,7 +228,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): } ), } - + # Clean the properties - remove None values and handle special types clean_properties = {} for key, value in properties.items(): @@ -236,37 +249,35 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): SET node += $properties, node.updated_at = timestamp() """ ).strip() - - params = { - "node_id": str(data_point.id), - "properties": clean_properties - } + + params = {"node_id": str(data_point.id), "properties": clean_properties} return query, params def sanitize_relationship_name(self, relationship_name: str) -> str: """ Sanitize relationship name to be valid for Cypher queries. - + Parameters: ----------- - relationship_name (str): The original relationship name - + Returns: -------- - str: A sanitized relationship name valid for Cypher """ # Replace hyphens, spaces, and other special characters with underscores import re - sanitized = re.sub(r'[^\w]', '_', relationship_name) + + sanitized = re.sub(r"[^\w]", "_", relationship_name) # Remove consecutive underscores - sanitized = re.sub(r'_+', '_', sanitized) + sanitized = re.sub(r"_+", "_", sanitized) # Remove leading/trailing underscores - sanitized = sanitized.strip('_') + sanitized = sanitized.strip("_") # Ensure it starts with a letter or underscore - if sanitized and not sanitized[0].isalpha() and sanitized[0] != '_': - sanitized = '_' + sanitized - return sanitized or 'RELATIONSHIP' + if sanitized and not sanitized[0].isalpha() and sanitized[0] != "_": + sanitized = "_" + sanitized + return sanitized or "RELATIONSHIP" async def create_edge_query(self, edge: tuple[str, str, str, dict]) -> str: """ @@ -285,7 +296,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ # Sanitize the relationship name for Cypher compatibility sanitized_relationship = self.sanitize_relationship_name(edge[2]) - + # Add the original relationship name to properties edge_properties = {**edge[3], "relationship_name": edge[2]} properties = await self.stringify_properties(edge_properties) @@ -368,7 +379,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): else None for property_name in DataPoint.get_embeddable_property_names(data_point) ] - + query, params = await self.create_data_point_query(data_point, vectorized_data) self.query(query, params) @@ -460,13 +471,10 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): clean_properties[key] = f"vecf32({value})" else: clean_properties[key] = value - + query = "MERGE (node {id: $node_id}) SET node += $properties, node.updated_at = timestamp()" - params = { - "node_id": node_id, - "properties": clean_properties - } - + params = {"node_id": node_id, "properties": clean_properties} + self.query(query, params) # Keep the original create_data_points method for VectorDBInterface compatibility @@ -508,7 +516,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): else None for property_name in DataPoint.get_embeddable_property_names(data_point) ] - + query, params = await self.create_data_point_query(data_point, vectorized_data) self.query(query, params) @@ -550,11 +558,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): # Node is in (node_id, properties) format node_id, properties = node await self.add_node(node_id, properties) - elif hasattr(node, 'id') and hasattr(node, 'model_dump'): + elif hasattr(node, "id") and hasattr(node, "model_dump"): # Node is a DataPoint object await self.add_node(str(node.id), node.model_dump()) else: - raise ValueError(f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object.") + raise ValueError( + f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object." + ) async def add_edge( self, @@ -578,7 +588,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ if properties is None: properties = {} - + edge_tuple = (source_id, target_id, relationship_name, properties) query = await self.create_edge_query(edge_tuple) self.query(query) @@ -599,7 +609,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): source_id, target_id, relationship_name, properties = edge await self.add_edge(source_id, target_id, relationship_name, properties) else: - raise ValueError(f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties).") + raise ValueError( + f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties)." + ) async def has_edges(self, edges): """ @@ -782,7 +794,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): RETURN n, 1.0 as score LIMIT $limit """ - + params = {"query_text": query_text, "limit": limit} result = self.query(query, params) return result.result_set @@ -945,7 +957,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node) WHERE node.id = $node_id RETURN node", {"node_id": node_id}, ) - + if result.result_set and len(result.result_set) > 0: # FalkorDB returns node objects as first element in the result list return result.result_set[0][0].properties @@ -964,7 +976,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node) WHERE node.id IN $node_ids RETURN node", {"node_ids": node_ids}, ) - + nodes = [] if result.result_set: for record in result.result_set: @@ -985,7 +997,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "MATCH (node)-[]-(neighbor) WHERE node.id = $node_id RETURN DISTINCT neighbor", {"node_id": node_id}, ) - + neighbors = [] if result.result_set: for record in result.result_set: @@ -1010,17 +1022,19 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """, {"node_id": node_id}, ) - + edges = [] if result.result_set: for record in result.result_set: # FalkorDB returns values by index: source_id, target_id, relationship_name, properties - edges.append(( - record[0], # source_id - record[1], # target_id - record[2], # relationship_name - record[3] # properties - )) + edges.append( + ( + record[0], # source_id + record[1], # target_id + record[2], # relationship_name + record[3], # properties + ) + ) return edges async def has_edge(self, source_id: str, target_id: str, relationship_name: str) -> bool: @@ -1036,7 +1050,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): """ # Check both the sanitized relationship type and the original name in properties sanitized_relationship = self.sanitize_relationship_name(relationship_name) - + result = self.query( f""" MATCH (source)-[r:{sanitized_relationship}]->(target) @@ -1044,9 +1058,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): AND (r.relationship_name = $relationship_name OR NOT EXISTS(r.relationship_name)) RETURN COUNT(r) > 0 AS edge_exists """, - {"source_id": source_id, "target_id": target_id, "relationship_name": relationship_name}, + { + "source_id": source_id, + "target_id": target_id, + "relationship_name": relationship_name, + }, ) - + if result.result_set and len(result.result_set) > 0: # FalkorDB returns scalar results as a list, access by index instead of key return result.result_set[0][0] @@ -1065,11 +1083,11 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): # Get basic node and edge counts node_result = self.query("MATCH (n) RETURN count(n) AS node_count") edge_result = self.query("MATCH ()-[r]->() RETURN count(r) AS edge_count") - + # FalkorDB returns scalar results as a list, access by index instead of key num_nodes = node_result.result_set[0][0] if node_result.result_set else 0 num_edges = edge_result.result_set[0][0] if edge_result.result_set else 0 - + metrics = { "num_nodes": num_nodes, "num_edges": num_edges, @@ -1078,23 +1096,27 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): "num_connected_components": 1, # Simplified for now "sizes_of_connected_components": [num_nodes] if num_nodes > 0 else [], } - + if include_optional: # Add optional metrics - simplified implementation - metrics.update({ - "num_selfloops": 0, # Simplified - "diameter": -1, # Not implemented - "avg_shortest_path_length": -1, # Not implemented - "avg_clustering": -1, # Not implemented - }) + metrics.update( + { + "num_selfloops": 0, # Simplified + "diameter": -1, # Not implemented + "avg_shortest_path_length": -1, # Not implemented + "avg_clustering": -1, # Not implemented + } + ) else: - metrics.update({ - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - }) - + metrics.update( + { + "num_selfloops": -1, + "diameter": -1, + "avg_shortest_path_length": -1, + "avg_clustering": -1, + } + ) + return metrics async def get_document_subgraph(self, content_hash: str): @@ -1117,9 +1139,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): COLLECT(DISTINCT c) AS made_from_nodes, COLLECT(DISTINCT et) AS orphan_types """ - + result = self.query(query, {"content_hash": f"text_{content_hash}"}) - + if not result.result_set or not result.result_set[0]: return None @@ -1154,7 +1176,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): RETURN n """ ) - + # FalkorDB returns node objects as first element in each record return [record[0] for record in result.result_set] if result.result_set else [] @@ -1171,7 +1193,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): - node_name (List[str]): A list of names of the nodes to include in the subgraph. """ label = node_type.__name__ - + # Find primary nodes of the specified type and names primary_query = f""" UNWIND $names AS wantedName @@ -1179,60 +1201,66 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface): WHERE n.name = wantedName RETURN DISTINCT n.id, properties(n) AS properties """ - + primary_result = self.query(primary_query, {"names": node_name}) if not primary_result.result_set: return [], [] - - # FalkorDB returns values by index: id, properties + + # FalkorDB returns values by index: id, properties primary_ids = [record[0] for record in primary_result.result_set] - + # Find neighbors of primary nodes neighbor_query = """ MATCH (n)-[]-(neighbor) WHERE n.id IN $ids RETURN DISTINCT neighbor.id, properties(neighbor) AS properties """ - + neighbor_result = self.query(neighbor_query, {"ids": primary_ids}) # FalkorDB returns values by index: id, properties - neighbor_ids = [record[0] for record in neighbor_result.result_set] if neighbor_result.result_set else [] - + neighbor_ids = ( + [record[0] for record in neighbor_result.result_set] + if neighbor_result.result_set + else [] + ) + all_ids = list(set(primary_ids + neighbor_ids)) - + # Get all nodes in the subgraph nodes_query = """ MATCH (n) WHERE n.id IN $ids RETURN n.id, properties(n) AS properties """ - + nodes_result = self.query(nodes_query, {"ids": all_ids}) nodes = [] if nodes_result.result_set: for record in nodes_result.result_set: # FalkorDB returns values by index: id, properties nodes.append((record[0], record[1])) - + # Get edges between these nodes edges_query = """ MATCH (a)-[r]->(b) WHERE a.id IN $ids AND b.id IN $ids RETURN a.id AS source_id, b.id AS target_id, type(r) AS relationship_name, properties(r) AS properties """ - + edges_result = self.query(edges_query, {"ids": all_ids}) edges = [] if edges_result.result_set: for record in edges_result.result_set: # FalkorDB returns values by index: source_id, target_id, relationship_name, properties - edges.append(( - record[0], # source_id - record[1], # target_id - record[2], # relationship_name - record[3] # properties - )) - + edges.append( + ( + record[0], # source_id + record[1], # target_id + record[2], # relationship_name + record[3], # properties + ) + ) + return nodes, edges async def prune(self): diff --git a/cognee/tests/test_falkordb.py b/cognee/tests/test_falkordb.py index b17848ca0..7c3ceef64 100755 --- a/cognee/tests/test_falkordb.py +++ b/cognee/tests/test_falkordb.py @@ -14,6 +14,7 @@ async def check_falkordb_connection(): """Check if FalkorDB is available at localhost:6379""" try: from falkordb import FalkorDB + client = FalkorDB(host="localhost", port=6379) # Try to list graphs to check connection client.list_graphs() @@ -130,7 +131,7 @@ async def main(): # For FalkorDB vector engine, check if collections are empty # Since FalkorDB is a hybrid adapter, we can check if the graph is empty # as the vector data is stored in the same graph - if hasattr(vector_engine, 'driver'): + if hasattr(vector_engine, "driver"): # This is FalkorDB - check if graphs exist collections = vector_engine.driver.list_graphs() # The graph should be deleted, so either no graphs or empty graph @@ -138,7 +139,9 @@ async def main(): # Graph exists but should be empty vector_graph_data = await vector_engine.get_graph_data() vector_nodes, vector_edges = vector_graph_data - assert len(vector_nodes) == 0 and len(vector_edges) == 0, "FalkorDB vector database is not empty" + assert len(vector_nodes) == 0 and len(vector_edges) == 0, ( + "FalkorDB vector database is not empty" + ) else: # Fallback for other vector engines like LanceDB connection = await vector_engine.get_connection() @@ -153,7 +156,7 @@ async def main(): # For FalkorDB, check if the graph database is empty from cognee.infrastructure.databases.graph import get_graph_engine - + graph_engine = get_graph_engine() graph_data = await graph_engine.get_graph_data() nodes, edges = graph_data @@ -161,7 +164,7 @@ async def main(): print("šŸŽ‰ FalkorDB test completed successfully!") print(" āœ“ Data ingestion worked") - print(" āœ“ Cognify processing worked") + print(" āœ“ Cognify processing worked") print(" āœ“ Search operations worked") print(" āœ“ Cleanup worked") diff --git a/debug_falkordb.py b/debug_falkordb.py index 7f065d0a4..6e1a201db 100644 --- a/debug_falkordb.py +++ b/debug_falkordb.py @@ -6,10 +6,11 @@ import asyncio async def debug_falkordb(): """Debug script to see what's actually stored in FalkorDB""" - + # Check if FalkorDB is available try: from falkordb import FalkorDB + client = FalkorDB(host="localhost", port=6379) client.list_graphs() print("āœ… FalkorDB connection successful") @@ -18,22 +19,26 @@ async def debug_falkordb(): return # Configure FalkorDB - cognee.config.set_graph_db_config({ - "graph_database_url": "localhost", - "graph_database_port": 6379, - "graph_database_provider": "falkordb", - }) - - cognee.config.set_vector_db_config({ - "vector_db_url": "localhost", - "vector_db_port": 6379, - "vector_db_provider": "falkordb", - }) + cognee.config.set_graph_db_config( + { + "graph_database_url": "localhost", + "graph_database_port": 6379, + "graph_database_provider": "falkordb", + } + ) + + cognee.config.set_vector_db_config( + { + "vector_db_url": "localhost", + "vector_db_port": 6379, + "vector_db_provider": "falkordb", + } + ) # Set up directories data_directory_path = str(pathlib.Path("./debug_data").resolve()) cognee_directory_path = str(pathlib.Path("./debug_cognee").resolve()) - + cognee.config.data_root_directory(data_directory_path) cognee.config.system_root_directory(cognee_directory_path) @@ -44,78 +49,80 @@ async def debug_falkordb(): # Add simple text simple_text = "Artificial Intelligence (AI) is a fascinating technology." dataset_name = "test_dataset" - + print("šŸ“ Adding data...") await cognee.add([simple_text], dataset_name) - + print("🧠 Running cognify...") await cognee.cognify([dataset_name]) - + # Debug: Check what's in the database print("\nšŸ” Checking what's in the database...") - + from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.graph import get_graph_engine - + vector_engine = get_vector_engine() graph_engine = await get_graph_engine() - + # Get all graph data print("\nšŸ“Š Graph data:") graph_data = await graph_engine.get_graph_data() nodes, edges = graph_data - + print(f"Total nodes: {len(nodes)}") print(f"Total edges: {len(edges)}") - + if nodes: print("\nšŸ·ļø Sample nodes:") for i, (node_id, node_props) in enumerate(nodes[:3]): - print(f" Node {i+1}: ID={node_id}") + print(f" Node {i + 1}: ID={node_id}") print(f" Properties: {node_props}") - + if edges: print("\nšŸ”— Sample edges:") for i, edge in enumerate(edges[:3]): - print(f" Edge {i+1}: {edge}") - + print(f" Edge {i + 1}: {edge}") + # Try different search variations print("\nšŸ” Testing different search queries...") - + # Get available graphs and collections - if hasattr(vector_engine, 'driver'): + if hasattr(vector_engine, "driver"): graphs = vector_engine.driver.list_graphs() print(f"Available graphs: {graphs}") - + # Try to query directly to see node labels try: result = vector_engine.query("MATCH (n) RETURN DISTINCT labels(n) as labels LIMIT 10") print(f"Node labels found: {result.result_set}") - + result = vector_engine.query("MATCH (n) RETURN n LIMIT 5") print(f"Sample nodes raw: {result.result_set}") - + except Exception as e: print(f"Direct query error: {e}") - + # Try searching with different queries search_queries = [ ("entity.name + AI", "entity.name", "AI"), - ("Entity.name + AI", "Entity.name", "AI"), + ("Entity.name + AI", "Entity.name", "AI"), ("text + AI", "text", "AI"), ("content + AI", "content", "AI"), ("name + AI", "name", "AI"), ] - + for query_desc, collection_name, query_text in search_queries: try: - results = await vector_engine.search(collection_name=collection_name, query_text=query_text) + results = await vector_engine.search( + collection_name=collection_name, query_text=query_text + ) print(f" {query_desc}: {len(results)} results") if results: print(f" First result: {results[0]}") except Exception as e: print(f" {query_desc}: Error - {e}") - + # Clean up await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) @@ -123,4 +130,4 @@ async def debug_falkordb(): if __name__ == "__main__": - asyncio.run(debug_falkordb()) \ No newline at end of file + asyncio.run(debug_falkordb())