From 38406a0ab1b3d4d47f8d9fb4e95b4612cd3ce117 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Wed, 15 Oct 2025 17:32:48 +0100 Subject: [PATCH] chore: remove memgraph from cognee repo --- .../databases/graph/get_graph_engine.py | 2 +- .../graph/memgraph/memgraph_adapter.py | 1116 ----------------- cognee/tests/test_memgraph.py | 105 -- notebooks/neptune-analytics-example.ipynb | 82 +- 4 files changed, 42 insertions(+), 1263 deletions(-) delete mode 100644 cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py delete mode 100644 cognee/tests/test_memgraph.py diff --git a/cognee/infrastructure/databases/graph/get_graph_engine.py b/cognee/infrastructure/databases/graph/get_graph_engine.py index 1861aa15c..1ea61d29f 100644 --- a/cognee/infrastructure/databases/graph/get_graph_engine.py +++ b/cognee/infrastructure/databases/graph/get_graph_engine.py @@ -162,5 +162,5 @@ def create_graph_engine( raise EnvironmentError( f"Unsupported graph database provider: {graph_database_provider}. " - f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote', 'memgraph', 'neptune', 'neptune_analytics'])}" + f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote', 'neptune', 'neptune_analytics'])}" ) diff --git a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py b/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py deleted file mode 100644 index 3612e3277..000000000 --- a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py +++ /dev/null @@ -1,1116 +0,0 @@ -"""Memgraph Adapter for Graph Database""" - -import json -from cognee.shared.logging_utils import get_logger, ERROR -import asyncio -from textwrap import dedent -from typing import Optional, Any, List, Dict, Type, Tuple -from contextlib import asynccontextmanager -from uuid import UUID -from neo4j import AsyncSession -from neo4j import AsyncGraphDatabase -from neo4j.exceptions import Neo4jError -from cognee.infrastructure.engine import DataPoint -from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface -from cognee.modules.storage.utils import JSONEncoder -from cognee.infrastructure.databases.exceptions.exceptions import NodesetFilterNotSupportedError - -logger = get_logger("MemgraphAdapter", level=ERROR) - - -class MemgraphAdapter(GraphDBInterface): - """ - Handles interaction with a Memgraph database through various graph operations. - - Public methods include: - - get_session - - query - - has_node - - add_node - - add_nodes - - extract_node - - extract_nodes - - delete_node - - delete_nodes - - has_edge - - has_edges - - add_edge - - add_edges - - get_edges - - get_disconnected_nodes - - get_predecessors - - get_successors - - get_neighbours - - get_connections - - remove_connection_to_predecessors_of - - remove_connection_to_successors_of - - delete_graph - - serialize_properties - - get_model_independent_graph_data - - get_graph_data - - get_nodeset_subgraph - - get_filtered_graph_data - - get_node_labels_string - - get_relationship_labels_string - - get_graph_metrics - """ - - def __init__( - self, - graph_database_url: str, - graph_database_username: Optional[str] = None, - graph_database_password: Optional[str] = None, - driver: Optional[Any] = None, - ): - # Only use auth if both username and password are provided - auth = None - if graph_database_username and graph_database_password: - auth = (graph_database_username, graph_database_password) - - self.driver = driver or AsyncGraphDatabase.driver( - graph_database_url, - auth=auth, - max_connection_lifetime=120, - ) - - @asynccontextmanager - async def get_session(self) -> AsyncSession: - """ - Manage a session with the database, yielding the session for use in operations. - """ - async with self.driver.session() as session: - yield session - - async def query( - self, - query: str, - params: Optional[Dict[str, Any]] = None, - ) -> List[Dict[str, Any]]: - """ - Execute a provided query on the Memgraph database and return the results. - - Parameters: - ----------- - - - query (str): The Cypher query to be executed against the database. - - params (Optional[Dict[str, Any]]): Optional parameters to be used in the query. - (default None) - - Returns: - -------- - - - List[Dict[str, Any]]: A list of dictionaries representing the result set of the - query. - """ - try: - async with self.get_session() as session: - result = await session.run(query, params) - data = await result.data() - return data - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def has_node(self, node_id: str) -> bool: - """ - Determine if a node with the given ID exists in the database. - - Parameters: - ----------- - - - node_id (str): The ID of the node to check for existence. - - Returns: - -------- - - - bool: True if the node exists; otherwise, False. - """ - results = await self.query( - """ - MATCH (n) - WHERE n.id = $node_id - RETURN COUNT(n) > 0 AS node_exists - """, - {"node_id": node_id}, - ) - return results[0]["node_exists"] if len(results) > 0 else False - - async def add_node(self, node: DataPoint): - """ - Add a new node to the database with specified properties. - - Parameters: - ----------- - - - node (DataPoint): The DataPoint object representing the node to add. - - Returns: - -------- - - The result of the node addition, including its internal ID and node ID. - """ - serialized_properties = self.serialize_properties(node.model_dump()) - - query = """ - MERGE (node {id: $node_id}) - ON CREATE SET node:$node_label, node += $properties, node.updated_at = timestamp() - ON MATCH SET node:$node_label, node += $properties, node.updated_at = timestamp() - RETURN ID(node) AS internal_id, node.id AS nodeId - """ - - params = { - "node_id": str(node.id), - "node_label": type(node).__name__, - "properties": serialized_properties, - } - return await self.query(query, params) - - async def add_nodes(self, nodes: list[DataPoint]) -> None: - """ - Add multiple nodes to the database in a single operation. - - Parameters: - ----------- - - - nodes (list[DataPoint]): A list of DataPoint objects representing the nodes to - add. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $nodes AS node - MERGE (n {id: node.node_id}) - ON CREATE SET n:node.label, n += node.properties, n.updated_at = timestamp() - ON MATCH SET n:node.label, n += node.properties, n.updated_at = timestamp() - RETURN ID(n) AS internal_id, n.id AS nodeId - """ - - nodes = [ - { - "node_id": str(node.id), - "label": type(node).__name__, - "properties": self.serialize_properties(node.model_dump()), - } - for node in nodes - ] - - results = await self.query(query, dict(nodes=nodes)) - return results - - async def extract_node(self, node_id: str): - """ - Retrieve a single node based on its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node to retrieve. - - Returns: - -------- - - The node corresponding to the provided ID, or None if not found. - """ - results = await self.extract_nodes([node_id]) - - return results[0] if len(results) > 0 else None - - async def extract_nodes(self, node_ids: List[str]): - """ - Retrieve multiple nodes based on their IDs. - - Parameters: - ----------- - - - node_ids (List[str]): A list of IDs for the nodes to retrieve. - - Returns: - -------- - - A list of nodes corresponding to the provided IDs. - """ - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - RETURN node""" - - params = {"node_ids": node_ids} - - results = await self.query(query, params) - - return [result["node"] for result in results] - - async def delete_node(self, node_id: str): - """ - Delete a node from the database based on its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node to delete. - - Returns: - -------- - - None. - """ - sanitized_id = node_id.replace(":", "_") - - query = "MATCH (node: {{id: $node_id}}) DETACH DELETE node" - params = {"node_id": sanitized_id} - - return await self.query(query, params) - - async def delete_nodes(self, node_ids: list[str]) -> None: - """ - Delete multiple nodes from the database based on their IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of IDs for the nodes to delete. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - DETACH DELETE node""" - - params = {"node_ids": node_ids} - - return await self.query(query, params) - - async def has_edge(self, from_node: UUID, to_node: UUID, edge_label: str) -> bool: - """ - Check if a directed edge exists between two nodes identified by their IDs. - - Parameters: - ----------- - - - from_node (UUID): The ID of the source node. - - to_node (UUID): The ID of the target node. - - edge_label (str): The label of the edge to check. - - Returns: - -------- - - - bool: True if the edge exists; otherwise, False. - """ - query = """ - MATCH (from_node)-[relationship]->(to_node) - WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id AND type(relationship) = $edge_label - RETURN COUNT(relationship) > 0 AS edge_exists - """ - - params = { - "from_node_id": str(from_node), - "to_node_id": str(to_node), - "edge_label": edge_label, - } - - records = await self.query(query, params) - return records[0]["edge_exists"] if records else False - - async def has_edges(self, edges): - """ - Check for the existence of multiple edges based on provided criteria. - - Parameters: - ----------- - - - edges: A list of edges to verify existence for. - - Returns: - -------- - - A list of boolean values indicating the existence of each edge. - """ - query = """ - UNWIND $edges AS edge - MATCH (a)-[r]->(b) - WHERE id(a) = edge.from_node AND id(b) = edge.to_node AND type(r) = edge.relationship_name - RETURN edge.from_node AS from_node, edge.to_node AS to_node, edge.relationship_name AS relationship_name, count(r) > 0 AS edge_exists - """ - - try: - params = { - "edges": [ - { - "from_node": str(edge[0]), - "to_node": str(edge[1]), - "relationship_name": edge[2], - } - for edge in edges - ], - } - - results = await self.query(query, params) - return [result["edge_exists"] for result in results] - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def add_edge( - self, - from_node: UUID, - to_node: UUID, - relationship_name: str, - edge_properties: Optional[Dict[str, Any]] = None, - ): - """ - Add a directed edge between two nodes with optional properties. - - Parameters: - ----------- - - - from_node (UUID): The ID of the source node. - - to_node (UUID): The ID of the target node. - - relationship_name (str): The type/label of the relationship to create. - - edge_properties (Optional[Dict[str, Any]]): Optional properties associated with - the edge. (default None) - - Returns: - -------- - - The result of the edge addition operation, including relationship details. - """ - - exists = await asyncio.gather(self.has_node(str(from_node)), self.has_node(str(to_node))) - - if not all(exists): - return None - - serialized_properties = self.serialize_properties(edge_properties or {}) - - query = dedent( - f"""\ - MATCH (from_node {{id: $from_node}}), - (to_node {{id: $to_node}}) - WHERE from_node IS NOT NULL AND to_node IS NOT NULL - MERGE (from_node)-[r:{relationship_name}]->(to_node) - ON CREATE SET r += $properties, r.updated_at = timestamp() - ON MATCH SET r += $properties, r.updated_at = timestamp() - RETURN r - """ - ) - - params = { - "from_node": str(from_node), - "to_node": str(to_node), - "relationship_name": relationship_name, - "properties": serialized_properties, - } - - return await self.query(query, params) - - async def add_edges(self, edges: list[tuple[str, str, str, dict[str, Any]]]) -> None: - """ - Batch add multiple edges between nodes, enforcing specified relationships. - - Parameters: - ----------- - - - edges (list[tuple[str, str, str, dict[str, Any]]): A list of tuples containing - specifications for each edge to add. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $edges AS edge - MATCH (from_node {id: edge.from_node}) - MATCH (to_node {id: edge.to_node}) - CALL merge.relationship( - from_node, - edge.relationship_name, - { - source_node_id: edge.from_node, - target_node_id: edge.to_node - }, - edge.properties, - to_node, - {} - ) YIELD rel - RETURN rel""" - - edges = [ - { - "from_node": str(edge[0]), - "to_node": str(edge[1]), - "relationship_name": edge[2], - "properties": { - **(edge[3] if edge[3] else {}), - "source_node_id": str(edge[0]), - "target_node_id": str(edge[1]), - }, - } - for edge in edges - ] - - try: - results = await self.query(query, dict(edges=edges)) - return results - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def get_edges(self, node_id: str): - """ - Retrieve all edges connected to a specific node identified by its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node for which to retrieve connected edges. - - Returns: - -------- - - A list of tuples representing the edges connected to the node. - """ - query = """ - MATCH (n {id: $node_id})-[r]-(m) - RETURN n, r, m - """ - - results = await self.query(query, dict(node_id=node_id)) - - return [ - (result["n"]["id"], result["m"]["id"], {"relationship_name": result["r"][1]}) - for result in results - ] - - async def get_disconnected_nodes(self) -> list[str]: - """ - Identify nodes in the graph that do not belong to the largest connected component. - - Returns: - -------- - - - list[str]: A list of IDs representing the disconnected nodes. - """ - query = """ - // Step 1: Collect all nodes - MATCH (n) - WITH COLLECT(n) AS nodes - - // Step 2: Find all connected components - WITH nodes - CALL { - WITH nodes - UNWIND nodes AS startNode - MATCH path = (startNode)-[*]-(connectedNode) - WITH startNode, COLLECT(DISTINCT connectedNode) AS component - RETURN component - } - - // Step 3: Aggregate components - WITH COLLECT(component) AS components - - // Step 4: Identify the largest connected component - UNWIND components AS component - WITH component - ORDER BY SIZE(component) DESC - LIMIT 1 - WITH component AS largestComponent - - // Step 5: Find nodes not in the largest connected component - MATCH (n) - WHERE NOT n IN largestComponent - RETURN COLLECT(ID(n)) AS ids - """ - - results = await self.query(query) - return results[0]["ids"] if len(results) > 0 else [] - - async def get_predecessors(self, node_id: str, edge_label: str = None) -> list[str]: - """ - Retrieve all predecessors of a node based on its ID and optional edge label. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find predecessors for. - - edge_label (str): Optional edge label to filter predecessors. (default None) - - Returns: - -------- - - - list[str]: A list of predecessor node IDs. - """ - if edge_label is not None: - query = """ - MATCH (node)<-[r]-(predecessor) - WHERE node.id = $node_id AND type(r) = $edge_label - RETURN predecessor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - edge_label=edge_label, - ), - ) - - return [result["predecessor"] for result in results] - else: - query = """ - MATCH (node)<-[r]-(predecessor) - WHERE node.id = $node_id - RETURN predecessor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - ), - ) - - return [result["predecessor"] for result in results] - - async def get_successors(self, node_id: str, edge_label: str = None) -> list[str]: - """ - Retrieve all successors of a node based on its ID and optional edge label. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find successors for. - - edge_label (str): Optional edge label to filter successors. (default None) - - Returns: - -------- - - - list[str]: A list of successor node IDs. - """ - if edge_label is not None: - query = """ - MATCH (node)-[r]->(successor) - WHERE node.id = $node_id AND type(r) = $edge_label - RETURN successor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - edge_label=edge_label, - ), - ) - - return [result["successor"] for result in results] - else: - query = """ - MATCH (node)-[r]->(successor) - WHERE node.id = $node_id - RETURN successor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - ), - ) - - return [result["successor"] for result in results] - - async def get_neighbors(self, node_id: str) -> List[Dict[str, Any]]: - """ - Get both predecessors and successors of a node. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find neighbors for. - - Returns: - -------- - - - List[Dict[str, Any]]: A combined list of neighbor node IDs. - """ - predecessors, successors = await asyncio.gather( - self.get_predecessors(node_id), self.get_successors(node_id) - ) - - return predecessors + successors - - async def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: - """Get a single node by ID.""" - query = """ - MATCH (node {id: $node_id}) - RETURN node - """ - results = await self.query(query, {"node_id": node_id}) - return results[0]["node"] if results else None - - async def get_nodes(self, node_ids: List[str]) -> List[Dict[str, Any]]: - """Get multiple nodes by their IDs.""" - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - RETURN node - """ - results = await self.query(query, {"node_ids": node_ids}) - return [result["node"] for result in results] - - async def get_connections(self, node_id: UUID) -> list: - """ - Retrieve connections for a given node, including both predecessors and successors. - - Parameters: - ----------- - - - node_id (UUID): The ID of the node for which to retrieve connections. - - Returns: - -------- - - - list: A list of connections associated with the node. - """ - predecessors_query = """ - MATCH (node)<-[relation]-(neighbour) - WHERE node.id = $node_id - RETURN neighbour, relation, node - """ - successors_query = """ - MATCH (node)-[relation]->(neighbour) - WHERE node.id = $node_id - RETURN node, relation, neighbour - """ - - predecessors, successors = await asyncio.gather( - self.query(predecessors_query, dict(node_id=str(node_id))), - self.query(successors_query, dict(node_id=str(node_id))), - ) - - connections = [] - - for neighbour in predecessors: - neighbour = neighbour["relation"] - connections.append((neighbour[0], {"relationship_name": neighbour[1]}, neighbour[2])) - - for neighbour in successors: - neighbour = neighbour["relation"] - connections.append((neighbour[0], {"relationship_name": neighbour[1]}, neighbour[2])) - - return connections - - async def remove_connection_to_predecessors_of( - self, node_ids: list[str], edge_label: str - ) -> None: - """ - Remove specified connections to the predecessors of the given node IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of node IDs from which to remove predecessor - connections. - - edge_label (str): The label of the edges to remove. - - Returns: - -------- - - - None: None. - """ - query = f""" - UNWIND $node_ids AS nid - MATCH (node {id: nid})-[r]->(predecessor) - WHERE type(r) = $edge_label - DELETE r; - """ - - params = {"node_ids": node_ids, "edge_label": edge_label} - - return await self.query(query, params) - - async def remove_connection_to_successors_of( - self, node_ids: list[str], edge_label: str - ) -> None: - """ - Remove specified connections to the successors of the given node IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of node IDs from which to remove successor - connections. - - edge_label (str): The label of the edges to remove. - - Returns: - -------- - - - None: None. - """ - query = f""" - UNWIND $node_ids AS id - MATCH (node:`{id}`)<-[r:{edge_label}]-(successor) - DELETE r; - """ - - params = {"node_ids": node_ids} - - return await self.query(query, params) - - async def delete_graph(self): - """ - Completely delete the graph from the database, removing all nodes and edges. - - Returns: - -------- - - None. - """ - query = """MATCH (node) - DETACH DELETE node;""" - - return await self.query(query) - - def serialize_properties(self, properties=dict()): - """ - Convert property values to a suitable representation for storage. - - Parameters: - ----------- - - - properties: A dictionary of properties to serialize. (default dict()) - - Returns: - -------- - - A dictionary of serialized properties. - """ - serialized_properties = {} - - for property_key, property_value in properties.items(): - if isinstance(property_value, UUID): - serialized_properties[property_key] = str(property_value) - continue - - if isinstance(property_value, dict): - serialized_properties[property_key] = json.dumps(property_value, cls=JSONEncoder) - continue - - serialized_properties[property_key] = property_value - - return serialized_properties - - async def get_model_independent_graph_data(self): - """ - Fetch nodes and relationships without any specific model filtering. - - Returns: - -------- - - A tuple containing nodes and edges as collections. - """ - query_nodes = "MATCH (n) RETURN collect(n) AS nodes" - nodes = await self.query(query_nodes) - - query_edges = "MATCH (n)-[r]->(m) RETURN collect([n, r, m]) AS elements" - edges = await self.query(query_edges) - - return (nodes, edges) - - async def get_graph_data(self): - """ - Retrieve all nodes and edges from the graph, including their properties. - - Returns: - -------- - - A tuple containing lists of nodes and edges. - """ - query = "MATCH (n) RETURN ID(n) AS id, labels(n) AS labels, properties(n) AS properties" - - result = await self.query(query) - - nodes = [ - ( - record["id"], - record["properties"], - ) - for record in result - ] - - query = """ - MATCH (n)-[r]->(m) - RETURN ID(n) AS source, ID(m) AS target, TYPE(r) AS type, properties(r) AS properties - """ - result = await self.query(query) - edges = [ - ( - record["source"], - record["target"], - record["type"], - record["properties"], - ) - for record in result - ] - - return (nodes, edges) - - async def get_nodeset_subgraph( - self, node_type: Type[Any], node_name: List[str] - ) -> Tuple[List[Tuple[int, dict]], List[Tuple[int, int, str, dict]]]: - """ - Throw an error indicating that node set filtering is not supported. - - Parameters: - ----------- - - - node_type (Type[Any]): The type of nodes to filter. - - node_name (List[str]): A list of node names to filter. - """ - raise NodesetFilterNotSupportedError - - async def get_filtered_graph_data(self, attribute_filters): - """ - Fetch nodes and relationships based on specified attribute filters. - - Parameters: - ----------- - - - attribute_filters: A list of criteria to filter nodes and relationships. - - Returns: - -------- - - A tuple containing filtered nodes and edges. - """ - where_clauses = [] - for attribute, values in attribute_filters[0].items(): - values_str = ", ".join( - f"'{value}'" if isinstance(value, str) else str(value) for value in values - ) - where_clauses.append(f"n.{attribute} IN [{values_str}]") - - where_clause = " AND ".join(where_clauses) - - query_nodes = f""" - MATCH (n) - WHERE {where_clause} - RETURN ID(n) AS id, labels(n) AS labels, properties(n) AS properties - """ - result_nodes = await self.query(query_nodes) - - nodes = [ - ( - record["id"], - record["properties"], - ) - for record in result_nodes - ] - - query_edges = f""" - MATCH (n)-[r]->(m) - WHERE {where_clause} AND {where_clause.replace("n.", "m.")} - RETURN ID(n) AS source, ID(m) AS target, TYPE(r) AS type, properties(r) AS properties - """ - result_edges = await self.query(query_edges) - - edges = [ - ( - record["source"], - record["target"], - record["type"], - record["properties"], - ) - for record in result_edges - ] - - return (nodes, edges) - - async def get_node_labels_string(self): - """ - Retrieve a string representation of all unique node labels in the graph. - - Returns: - -------- - - A string containing unique node labels. - """ - node_labels_query = """ - MATCH (n) - WITH DISTINCT labels(n) AS labelList - UNWIND labelList AS label - RETURN collect(DISTINCT label) AS labels; - """ - node_labels_result = await self.query(node_labels_query) - node_labels = node_labels_result[0]["labels"] if node_labels_result else [] - - if not node_labels: - raise ValueError("No node labels found in the database") - - node_labels_str = "[" + ", ".join(f"'{label}'" for label in node_labels) + "]" - return node_labels_str - - async def get_relationship_labels_string(self): - """ - Retrieve a string representation of all unique relationship types in the graph. - - Returns: - -------- - - A string containing unique relationship types. - """ - relationship_types_query = ( - "MATCH ()-[r]->() RETURN collect(DISTINCT type(r)) AS relationships;" - ) - relationship_types_result = await self.query(relationship_types_query) - relationship_types = ( - relationship_types_result[0]["relationships"] if relationship_types_result else [] - ) - - if not relationship_types: - raise ValueError("No relationship types found in the database.") - - relationship_types_undirected_str = ( - "{" - + ", ".join(f"{rel}" + ": {orientation: 'UNDIRECTED'}" for rel in relationship_types) - + "}" - ) - return relationship_types_undirected_str - - async def get_graph_metrics(self, include_optional=False): - """ - Calculate and return various metrics of the graph, including mandatory and optional - metrics. - - Parameters: - ----------- - - - include_optional: Specify whether to include optional metrics in the results. - (default False) - - Returns: - -------- - - A dictionary containing calculated graph metrics. - """ - - try: - # Basic metrics - node_count = await self.query("MATCH (n) RETURN count(n)") - edge_count = await self.query("MATCH ()-[r]->() RETURN count(r)") - num_nodes = node_count[0][0] if node_count else 0 - num_edges = edge_count[0][0] if edge_count else 0 - - # Calculate mandatory metrics - mandatory_metrics = { - "num_nodes": num_nodes, - "num_edges": num_edges, - "mean_degree": (2 * num_edges) / num_nodes if num_nodes > 0 else 0, - "edge_density": (num_edges) / (num_nodes * (num_nodes - 1)) if num_nodes > 1 else 0, - } - - # Calculate connected components - components_query = """ - MATCH (n:Node) - WITH n.id AS node_id - MATCH path = (n)-[:EDGE*0..]-() - WITH COLLECT(DISTINCT node_id) AS component - RETURN COLLECT(component) AS components - """ - components_result = await self.query(components_query) - component_sizes = ( - [len(comp) for comp in components_result[0][0]] if components_result else [] - ) - - mandatory_metrics.update( - { - "num_connected_components": len(component_sizes), - "sizes_of_connected_components": component_sizes, - } - ) - - if include_optional: - # Self-loops - self_loops_query = """ - MATCH (n:Node)-[r:EDGE]->(n) - RETURN COUNT(r) - """ - self_loops = await self.query(self_loops_query) - num_selfloops = self_loops[0][0] if self_loops else 0 - - # Shortest paths (simplified for Kuzu) - paths_query = """ - MATCH (n:Node), (m:Node) - WHERE n.id < m.id - MATCH path = (n)-[:EDGE*]-(m) - RETURN MIN(LENGTH(path)) AS length - """ - paths = await self.query(paths_query) - path_lengths = [p[0] for p in paths if p[0] is not None] - - # Local clustering coefficient - clustering_query = """ - /// Step 1: Get each node with its neighbors and degree - MATCH (n:Node)-[:EDGE]-(neighbor) - WITH n, COLLECT(DISTINCT neighbor) AS neighbors, COUNT(DISTINCT neighbor) AS degree - - // Step 2: Pair up neighbors and check if they are connected - UNWIND neighbors AS n1 - UNWIND neighbors AS n2 - WITH n, degree, n1, n2 - WHERE id(n1) < id(n2) // avoid duplicate pairs - - // Step 3: Use OPTIONAL MATCH to see if n1 and n2 are connected - OPTIONAL MATCH (n1)-[:EDGE]-(n2) - WITH n, degree, COUNT(n2) AS triangle_count - - // Step 4: Compute local clustering coefficient - WITH n, degree, - CASE WHEN degree <= 1 THEN 0.0 - ELSE (1.0 * triangle_count) / (degree * (degree - 1) / 2.0) - END AS local_cc - - // Step 5: Compute average - RETURN AVG(local_cc) AS avg_clustering_coefficient - """ - clustering = await self.query(clustering_query) - - optional_metrics = { - "num_selfloops": num_selfloops, - "diameter": max(path_lengths) if path_lengths else -1, - "avg_shortest_path_length": sum(path_lengths) / len(path_lengths) - if path_lengths - else -1, - "avg_clustering": clustering[0][0] if clustering and clustering[0][0] else -1, - } - else: - optional_metrics = { - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - } - - return {**mandatory_metrics, **optional_metrics} - - except Exception as e: - logger.error(f"Failed to get graph metrics: {e}") - return { - "num_nodes": 0, - "num_edges": 0, - "mean_degree": 0, - "edge_density": 0, - "num_connected_components": 0, - "sizes_of_connected_components": [], - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - } diff --git a/cognee/tests/test_memgraph.py b/cognee/tests/test_memgraph.py deleted file mode 100644 index d0d968fc4..000000000 --- a/cognee/tests/test_memgraph.py +++ /dev/null @@ -1,105 +0,0 @@ -import os - -import pathlib -import cognee -from cognee.infrastructure.files.storage import get_storage_config -from cognee.modules.search.operations import get_history -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - - -logger = get_logger() - - -async def main(): - cognee.config.set_graph_database_provider("memgraph") - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_memgraph") - ).resolve() - ) - cognee.config.data_root_directory(data_directory_path) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_memgraph") - ).resolve() - ) - cognee.config.system_root_directory(cognee_directory_path) - - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - dataset_name = "cs_explanations" - - explanation_file_path_nlp = os.path.join( - pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" - ) - await cognee.add([explanation_file_path_nlp], dataset_name) - - explanation_file_path_quantum = os.path.join( - pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt" - ) - - await cognee.add([explanation_file_path_quantum], dataset_name) - - await cognee.cognify([dataset_name]) - - from cognee.infrastructure.databases.vector import get_vector_engine - - vector_engine = get_vector_engine() - random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0] - random_node_name = random_node.payload["text"] - - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name - ) - assert len(search_results) != 0, "The search results list is empty." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search(query_type=SearchType.CHUNKS, query_text=random_node_name) - assert len(search_results) != 0, "The search results list is empty." - print("\n\nExtracted chunks are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search( - query_type=SearchType.SUMMARIES, query_text=random_node_name - ) - assert len(search_results) != 0, "Query related summaries don't exist." - print("\nExtracted results are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search( - query_type=SearchType.NATURAL_LANGUAGE, - query_text=f"Find nodes connected to node with name {random_node_name}", - ) - assert len(search_results) != 0, "Query related natural language don't exist." - print("\nExtracted results are:\n") - for result in search_results: - print(f"{result}\n") - - user = await get_default_user() - history = await get_history(user.id) - - assert len(history) == 8, "Search history is not correct." - - await cognee.prune.prune_data() - data_root_directory = get_storage_config()["data_root_directory"] - assert not os.path.isdir(data_root_directory), "Local data files are not deleted" - - await cognee.prune.prune_system(metadata=True) - from cognee.infrastructure.databases.graph import get_graph_engine - - graph_engine = await get_graph_engine() - nodes, edges = await graph_engine.get_graph_data() - assert len(nodes) == 0 and len(edges) == 0, "Memgraph graph database is not empty" - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) diff --git a/notebooks/neptune-analytics-example.ipynb b/notebooks/neptune-analytics-example.ipynb index e80ea4dcb..c85ccf58a 100644 --- a/notebooks/neptune-analytics-example.ipynb +++ b/notebooks/neptune-analytics-example.ipynb @@ -83,16 +83,16 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "import os\n", "import pathlib\n", "from cognee import config, add, cognify, search, SearchType, prune, visualize_graph\n", "from dotenv import load_dotenv" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -106,7 +106,9 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# load environment variables from file .env\n", "load_dotenv()\n", @@ -145,9 +147,7 @@ " \"vector_db_url\": f\"neptune-graph://{graph_identifier}\", # Neptune Analytics endpoint with the format neptune-graph://\n", " }\n", ")" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -159,19 +159,19 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Prune data and system metadata before running, only if we want \"fresh\" state.\n", "await prune.prune_data()\n", "await prune.prune_system(metadata=True)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## Setup data and cognify\n", "\n", @@ -180,7 +180,9 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Add sample text to the dataset\n", "sample_text_1 = \"\"\"Neptune Analytics is a memory-optimized graph database engine for analytics. With Neptune\n", @@ -205,9 +207,7 @@ "\n", "# Cognify the text data.\n", "await cognify([dataset_name])" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -215,14 +215,16 @@ "source": [ "## Graph Memory visualization\n", "\n", - "Initialize Memgraph as a Graph Memory store and save to .artefacts/graph_visualization.html\n", + "Initialize Neptune as a Graph Memory store and save to .artefacts/graph_visualization.html\n", "\n", "![visualization](./neptune_analytics_demo.png)" ] }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Get a graphistry url (Register for a free account at https://www.graphistry.com)\n", "# url = await render_graph()\n", @@ -235,9 +237,7 @@ " ).resolve()\n", ")\n", "await visualize_graph(graph_file_path)" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -250,19 +250,19 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Completion query that uses graph data to form context.\n", "graph_completion = await search(query_text=\"What is Neptune Analytics?\", query_type=SearchType.GRAPH_COMPLETION)\n", "print(\"\\nGraph completion result is:\")\n", "print(graph_completion)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: RAG Completion\n", "\n", @@ -271,19 +271,19 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Completion query that uses document chunks to form context.\n", "rag_completion = await search(query_text=\"What is Neptune Analytics?\", query_type=SearchType.RAG_COMPLETION)\n", "print(\"\\nRAG Completion result is:\")\n", "print(rag_completion)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Graph Insights\n", "\n", @@ -291,8 +291,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Search graph insights\n", "insights_results = await search(query_text=\"Neptune Analytics\", query_type=SearchType.GRAPH_COMPLETION)\n", @@ -302,13 +304,11 @@ " tgt_node = result[2].get(\"name\", result[2][\"type\"])\n", " relationship = result[1].get(\"relationship_name\", \"__relationship__\")\n", " print(f\"- {src_node} -[{relationship}]-> {tgt_node}\")" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Entity Summaries\n", "\n", @@ -316,8 +316,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Query all summaries related to query.\n", "summaries = await search(query_text=\"Neptune Analytics\", query_type=SearchType.SUMMARIES)\n", @@ -326,13 +328,11 @@ " type = summary[\"type\"]\n", " text = summary[\"text\"]\n", " print(f\"- {type}: {text}\")" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Chunks\n", "\n", @@ -340,8 +340,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "chunks = await search(query_text=\"Neptune Analytics\", query_type=SearchType.CHUNKS)\n", "print(\"\\nChunk results are:\")\n", @@ -349,9 +351,7 @@ " type = chunk[\"type\"]\n", " text = chunk[\"text\"]\n", " print(f\"- {type}: {text}\")" - ], - "outputs": [], - "execution_count": null + ] } ], "metadata": {