From a0c4d88b0d2dcb372a7cbf1e006bec30d39d9f47 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Thu, 10 Jul 2025 16:56:44 +0200 Subject: [PATCH 01/10] wip fix Memgraph get_knowledge_graph issues --- lightrag/api/README.md | 5 +++-- lightrag/kg/memgraph_impl.py | 36 +++++++++++++++++++++--------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/lightrag/api/README.md b/lightrag/api/README.md index 915ad7f3..4517a318 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -179,9 +179,9 @@ The command-line `workspace` argument and the `WORKSPACE` environment variable i - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. - **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. -- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage` +- **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage` -To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`. +To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`, `MEMGRAPH_WORKSPACE`. ### Multiple workers for Gunicorn + Uvicorn @@ -394,6 +394,7 @@ MongoKVStorage MongoDB NetworkXStorage NetworkX (default) Neo4JStorage Neo4J PGGraphStorage PostgreSQL with AGE plugin +MemgraphStorage. Memgraph ``` > Testing has shown that Neo4J delivers superior performance in production environments compared to PostgreSQL with AGE plugin. diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 8c6d6574..77e45a06 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -435,7 +435,7 @@ class MemgraphStorage(BaseGraphStorage): async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: """ - Upsert a node in the Neo4j database. + Upsert a node in the Memgraph database. Args: node_id: The unique identifier for the node (used as label) @@ -448,7 +448,7 @@ class MemgraphStorage(BaseGraphStorage): properties = node_data entity_type = properties["entity_type"] if "entity_id" not in properties: - raise ValueError("Neo4j: node properties must contain an 'entity_id' field") + raise ValueError("Memgraph: node properties must contain an 'entity_id' field") try: async with self._driver.session(database=self._DATABASE) as session: @@ -817,28 +817,34 @@ class MemgraphStorage(BaseGraphStorage): WITH start CALL {{ WITH start - MATCH path = (start)-[*0..{max_depth}]-(node) + MATCH path = (start)-[*BFS 0..{max_depth}]-(node) WITH nodes(path) AS path_nodes, relationships(path) AS path_rels UNWIND path_nodes AS n WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels RETURN all_nodes, all_rels }} - WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes + WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes_found WITH - CASE - WHEN total_nodes <= {max_nodes} THEN nodes - ELSE nodes[0..{max_nodes}] - END AS limited_nodes, - relationships, - total_nodes, - total_nodes > {max_nodes} AS is_truncated + CASE + WHEN total_nodes_found <= {max_nodes} THEN nodes + ELSE nodes[0..{max_nodes}] + END AS limited_nodes, + relationships, + total_nodes_found, + total_nodes_found > {max_nodes} AS is_truncated + + UNWIND relationships AS rel + WITH limited_nodes, rel, total_nodes_found, is_truncated + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, total_nodes_found, is_truncated RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - relationships, - total_nodes, - is_truncated + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships, + total_nodes_found, + is_truncated """ + result_set = None try: result_set = await session.run( From 63d554bad8dc18bbe4ec361367099a565ff0afc0 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Thu, 10 Jul 2025 16:57:13 +0200 Subject: [PATCH 02/10] run pre-commit --- lightrag/kg/memgraph_impl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 77e45a06..dc78ca19 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -448,7 +448,9 @@ class MemgraphStorage(BaseGraphStorage): properties = node_data entity_type = properties["entity_type"] if "entity_id" not in properties: - raise ValueError("Memgraph: node properties must contain an 'entity_id' field") + raise ValueError( + "Memgraph: node properties must contain an 'entity_id' field" + ) try: async with self._driver.session(database=self._DATABASE) as session: From 8d295bd2941b8350d0bd4b0b4ad2fef5dad1134b Mon Sep 17 00:00:00 2001 From: DavIvek Date: Fri, 11 Jul 2025 11:55:53 +0200 Subject: [PATCH 03/10] wip fix Memgraph get_knowledge_graph issues --- lightrag/kg/memgraph_impl.py | 189 ++++++++++++++++++++--------------- 1 file changed, 109 insertions(+), 80 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index dc78ca19..d0044499 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -734,7 +734,7 @@ class MemgraphStorage(BaseGraphStorage): self, node_label: str, max_depth: int = 3, - max_nodes: int = MAX_GRAPH_NODES, + max_nodes: int = None, ) -> KnowledgeGraph: """ Retrieve a connected subgraph of nodes where the label includes the specified `node_label`. @@ -747,137 +747,166 @@ class MemgraphStorage(BaseGraphStorage): Returns: KnowledgeGraph object containing nodes and edges, with an is_truncated flag indicating whether the graph was truncated due to max_nodes limit - - Raises: - Exception: If there is an error executing the query """ - if self._driver is None: - raise RuntimeError( - "Memgraph driver is not initialized. Call 'await initialize()' first." - ) + # Get max_nodes from global_config if not provided + if max_nodes is None: + max_nodes = self.global_config.get("max_graph_nodes", 1000) + else: + # Limit max_nodes to not exceed global_config max_graph_nodes + max_nodes = min(max_nodes, self.global_config.get("max_graph_nodes", 1000)) + workspace_label = self._get_workspace_label() result = KnowledgeGraph() seen_nodes = set() seen_edges = set() - workspace_label = self._get_workspace_label() + async with self._driver.session( database=self._DATABASE, default_access_mode="READ" ) as session: try: if node_label == "*": - # First check if database has any nodes - count_query = "MATCH (n) RETURN count(n) as total" + # First check total node count to determine if graph is truncated + count_query = ( + f"MATCH (n:`{workspace_label}`) RETURN count(n) as total" + ) count_result = None - total_count = 0 try: count_result = await session.run(count_query) count_record = await count_result.single() - if count_record: - total_count = count_record["total"] - if total_count == 0: - logger.debug("No nodes found in database") - return result - if total_count > max_nodes: - result.is_truncated = True - logger.info( - f"Graph truncated: {total_count} nodes found, limited to {max_nodes}" - ) + + if count_record and count_record["total"] > max_nodes: + result.is_truncated = True + logger.info( + f"Graph truncated: {count_record['total']} nodes found, limited to {max_nodes}" + ) finally: if count_result: await count_result.consume() - # Run the main query to get nodes with highest degree + # Run main query to get nodes with highest degree main_query = f""" MATCH (n:`{workspace_label}`) OPTIONAL MATCH (n)-[r]-() WITH n, COALESCE(count(r), 0) AS degree ORDER BY degree DESC LIMIT $max_nodes - WITH collect(n) AS kept_nodes - MATCH (a)-[r]-(b) + WITH collect({{node: n}}) AS filtered_nodes + UNWIND filtered_nodes AS node_info + WITH collect(node_info.node) AS kept_nodes, filtered_nodes + OPTIONAL MATCH (a)-[r]-(b) WHERE a IN kept_nodes AND b IN kept_nodes - RETURN [node IN kept_nodes | {{node: node}}] AS node_info, - collect(DISTINCT r) AS relationships + RETURN filtered_nodes AS node_info, + collect(DISTINCT r) AS relationships """ result_set = None try: result_set = await session.run( - main_query, {"max_nodes": max_nodes} + main_query, + {"max_nodes": max_nodes}, ) record = await result_set.single() - if not record: - logger.debug("No record returned from main query") - return result finally: if result_set: await result_set.consume() else: - bfs_query = f""" + # return await self._robust_fallback(node_label, max_depth, max_nodes) + # First try without limit to check if we need to truncate + full_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id WITH start - CALL {{ - WITH start - MATCH path = (start)-[*BFS 0..{max_depth}]-(node) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels - RETURN all_nodes, all_rels - }} - WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes_found - WITH - CASE - WHEN total_nodes_found <= {max_nodes} THEN nodes - ELSE nodes[0..{max_nodes}] - END AS limited_nodes, - relationships, - total_nodes_found, - total_nodes_found > {max_nodes} AS is_truncated - - UNWIND relationships AS rel - WITH limited_nodes, rel, total_nodes_found, is_truncated - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, total_nodes_found, is_truncated + MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) + WITH nodes(path) AS path_nodes, relationships(path) AS path_rels + UNWIND path_nodes AS n + WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists + WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships, - total_nodes_found, - is_truncated + [node IN all_nodes | {{node: node}}] AS node_info, + all_rels AS relationships, + size(all_nodes) AS total_nodes """ - result_set = None + # Try to get full result + full_result = None try: - result_set = await session.run( - bfs_query, + full_result = await session.run( + full_query, { "entity_id": node_label, }, ) - record = await result_set.single() - if not record: + full_record = await full_result.single() + + # If no record found, return empty KnowledgeGraph + if not full_record: logger.debug(f"No nodes found for entity_id: {node_label}") return result - # Check if the query indicates truncation - if "is_truncated" in record and record["is_truncated"]: + # If record found, check node count + total_nodes = full_record["total_nodes"] + + if total_nodes <= max_nodes: + # If node count is within limit, use full result directly + logger.debug( + f"Using full result with {total_nodes} nodes (no truncation needed)" + ) + record = full_record + else: + # If node count exceeds limit, set truncated flag and run limited query result.is_truncated = True logger.info( - f"Graph truncated: breadth-first search limited to {max_nodes} nodes" + f"Graph truncated: {total_nodes} nodes found, breadth-first search limited to {max_nodes}" ) - finally: - if result_set: - await result_set.consume() + # Run limited query + limited_query = f""" + MATCH (start:`{workspace_label}`) + WHERE start.entity_id = $entity_id + WITH start + MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) + WITH nodes(path) AS path_nodes, relationships(path) AS path_rels + UNWIND path_nodes AS n + WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists + WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels + WITH + CASE + WHEN size(all_nodes) <= $max_nodes THEN all_nodes + ELSE all_nodes[0..$max_nodes] + END AS limited_nodes, + all_rels + UNWIND all_rels AS rel + WITH limited_nodes, rel + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships + RETURN + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships + """ - # Process the record if it exists - if record and record["node_info"]: + result_set = None + try: + result_set = await session.run( + limited_query, + { + "entity_id": node_label, + "max_nodes": max_nodes, + }, + ) + record = await result_set.single() + finally: + if result_set: + await result_set.consume() + finally: + if full_result: + await full_result.consume() + + if record: + # Handle nodes (compatible with multi-label cases) for node_info in record["node_info"]: node = node_info["node"] node_id = node.id if node_id not in seen_nodes: - seen_nodes.add(node_id) result.nodes.append( KnowledgeGraphNode( id=f"{node_id}", @@ -885,11 +914,12 @@ class MemgraphStorage(BaseGraphStorage): properties=dict(node), ) ) + seen_nodes.add(node_id) + # Handle relationships (including direction information) for rel in record["relationships"]: edge_id = rel.id if edge_id not in seen_edges: - seen_edges.add(edge_id) start = rel.start_node end = rel.end_node result.edges.append( @@ -901,14 +931,13 @@ class MemgraphStorage(BaseGraphStorage): properties=dict(rel), ) ) + seen_edges.add(edge_id) - logger.info( - f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) + logger.info( + f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + ) except Exception as e: - logger.error(f"Error getting knowledge graph: {str(e)}") - # Return empty but properly initialized KnowledgeGraph on error - return KnowledgeGraph() + logger.error(f"Error during subgraph query for {node_label}: {str(e)}") return result From 15c4bac87f975e5192592e8959f1523daec52cc1 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Fri, 11 Jul 2025 17:02:42 +0200 Subject: [PATCH 04/10] wip fix Memgraph get_knowledge_graph issue by using mage function --- lightrag/kg/memgraph_impl.py | 284 ++++++++++++++++++++++++++--------- 1 file changed, 210 insertions(+), 74 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index d0044499..67c8a63f 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -742,7 +742,7 @@ class MemgraphStorage(BaseGraphStorage): Args: node_label: Label of the starting node, * means all nodes max_depth: Maximum depth of the subgraph, Defaults to 3 - max_nodes: Maxiumu nodes to return by BFS, Defaults to 1000 + max_nodes: Maximum nodes to return by BFS, Defaults to 1000 Returns: KnowledgeGraph object containing nodes and edges, with an is_truncated flag @@ -796,7 +796,7 @@ class MemgraphStorage(BaseGraphStorage): OPTIONAL MATCH (a)-[r]-(b) WHERE a IN kept_nodes AND b IN kept_nodes RETURN filtered_nodes AS node_info, - collect(DISTINCT r) AS relationships + collect(DISTINCT r) AS relationships """ result_set = None try: @@ -810,99 +810,64 @@ class MemgraphStorage(BaseGraphStorage): await result_set.consume() else: - # return await self._robust_fallback(node_label, max_depth, max_nodes) - # First try without limit to check if we need to truncate - full_query = f""" + # For specific node queries, use path.subgraph_all with the refined query pattern + subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id WITH start - MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels + CALL path.subgraph_all(start, {{ + relationshipFilter: [], + labelFilter: ['{workspace_label}'], + minHops: 0, + maxHops: $max_depth + }}) + YIELD nodes, rels + WITH + CASE + WHEN size(nodes) <= $max_nodes THEN nodes + ELSE nodes[0..$max_nodes] + END AS limited_nodes, + rels, + size(nodes) > $max_nodes AS is_truncated + UNWIND rels AS rel + WITH limited_nodes, rel, is_truncated + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, is_truncated RETURN - [node IN all_nodes | {{node: node}}] AS node_info, - all_rels AS relationships, - size(all_nodes) AS total_nodes + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships, + is_truncated """ - # Try to get full result - full_result = None + result_set = None try: - full_result = await session.run( - full_query, + result_set = await session.run( + subgraph_query, { "entity_id": node_label, + "max_depth": max_depth, + "max_nodes": max_nodes, }, ) - full_record = await full_result.single() + record = await result_set.single() # If no record found, return empty KnowledgeGraph - if not full_record: + if not record: logger.debug(f"No nodes found for entity_id: {node_label}") return result - # If record found, check node count - total_nodes = full_record["total_nodes"] - - if total_nodes <= max_nodes: - # If node count is within limit, use full result directly - logger.debug( - f"Using full result with {total_nodes} nodes (no truncation needed)" - ) - record = full_record - else: - # If node count exceeds limit, set truncated flag and run limited query + # Check if the result was truncated + if record.get("is_truncated"): result.is_truncated = True logger.info( - f"Graph truncated: {total_nodes} nodes found, breadth-first search limited to {max_nodes}" + f"Graph truncated: breadth-first search limited to {max_nodes} nodes" ) - # Run limited query - limited_query = f""" - MATCH (start:`{workspace_label}`) - WHERE start.entity_id = $entity_id - WITH start - MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels - WITH - CASE - WHEN size(all_nodes) <= $max_nodes THEN all_nodes - ELSE all_nodes[0..$max_nodes] - END AS limited_nodes, - all_rels - UNWIND all_rels AS rel - WITH limited_nodes, rel - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships - RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships - """ - - result_set = None - try: - result_set = await session.run( - limited_query, - { - "entity_id": node_label, - "max_nodes": max_nodes, - }, - ) - record = await result_set.single() - finally: - if result_set: - await result_set.consume() finally: - if full_result: - await full_result.consume() + if result_set: + await result_set.consume() if record: - # Handle nodes (compatible with multi-label cases) for node_info in record["node_info"]: node = node_info["node"] node_id = node.id @@ -916,7 +881,6 @@ class MemgraphStorage(BaseGraphStorage): ) seen_nodes.add(node_id) - # Handle relationships (including direction information) for rel in record["relationships"]: edge_id = rel.id if edge_id not in seen_edges: @@ -938,6 +902,178 @@ class MemgraphStorage(BaseGraphStorage): ) except Exception as e: - logger.error(f"Error during subgraph query for {node_label}: {str(e)}") + logger.warning(f"Memgraph error during subgraph query: {str(e)}") + if node_label != "*": + logger.warning( + "Memgraph: falling back to basic Cypher recursive search..." + ) + return await self._robust_fallback(node_label, max_depth, max_nodes) + else: + logger.warning( + "Memgraph: Mage plugin error with wildcard query, returning empty result" + ) return result + + async def _robust_fallback( + self, node_label: str, max_depth: int, max_nodes: int + ) -> KnowledgeGraph: + """ + Fallback implementation when MAGE plugin is not available or incompatible. + This method implements the same functionality as get_knowledge_graph but uses + only basic Cypher queries and true breadth-first traversal instead of MAGE procedures. + """ + from collections import deque + + result = KnowledgeGraph() + visited_nodes = set() + visited_edges = set() + visited_edge_pairs = set() + + # Get the starting node's data + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) + RETURN id(n) as node_id, n + """ + node_result = await session.run(query, entity_id=node_label) + try: + node_record = await node_result.single() + if not node_record: + return result + + # Create initial KnowledgeGraphNode + start_node = KnowledgeGraphNode( + id=f"{node_record['n'].get('entity_id')}", + labels=[node_record["n"].get("entity_id")], + properties=dict(node_record["n"]._properties), + ) + finally: + await node_result.consume() # Ensure results are consumed + + # Initialize queue for BFS with (node, depth) tuples + queue = deque([(start_node, 0)]) + + # Keep track of all nodes we've discovered (including those we might not add due to limits) + discovered_nodes = {} # node_id -> KnowledgeGraphNode + discovered_nodes[start_node.id] = start_node + + # True BFS implementation using a queue + while queue: + # Dequeue the next node to process + current_node, current_depth = queue.popleft() + + # Skip if already processed or exceeds max depth + if current_node.id in visited_nodes: + continue + + if current_depth > max_depth: + logger.debug( + f"Skipping node at depth {current_depth} (max_depth: {max_depth})" + ) + continue + + # Check if we've reached the node limit + if len(visited_nodes) >= max_nodes: + result.is_truncated = True + logger.info( + f"Graph truncated: breadth-first search limited to: {max_nodes} nodes" + ) + break + + # Add current node to result + result.nodes.append(current_node) + visited_nodes.add(current_node.id) + + # Only continue exploring if we haven't reached max depth + if current_depth < max_depth: + # Get all edges and target nodes for the current node + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + workspace_label = self._get_workspace_label() + query = f""" + MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b:`{workspace_label}`) + WHERE b.entity_id IS NOT NULL + RETURN r, b, id(r) as edge_id + """ + results = await session.run(query, entity_id=current_node.id) + + # Get all records and release database connection + records = await results.fetch( + 1000 + ) # Max neighbor nodes we can handle + await results.consume() # Ensure results are consumed + + # Process all neighbors + for record in records: + rel = record["r"] + edge_id = str(record["edge_id"]) + b_node = record["b"] + target_id = b_node.get("entity_id") + + if target_id and edge_id not in visited_edges: + # Create KnowledgeGraphNode for target if not already discovered + if target_id not in discovered_nodes: + target_node = KnowledgeGraphNode( + id=f"{target_id}", + labels=[target_id], + properties=dict(b_node._properties), + ) + discovered_nodes[target_id] = target_node + + # Add to queue for further exploration + queue.append((target_node, current_depth + 1)) + + # Second pass: Add edges only between nodes that are actually in the result + final_node_ids = {node.id for node in result.nodes} + + # Now collect all edges between the nodes we actually included + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + # Use a parameterized query to get all edges between our final nodes + query = f""" + UNWIND $node_ids AS node_id + MATCH (a:`{workspace_label}` {{entity_id: node_id}})-[r]-(b:`{workspace_label}`) + WHERE b.entity_id IN $node_ids + RETURN DISTINCT r, a.entity_id AS source_id, b.entity_id AS target_id, id(r) AS edge_id + """ + results = await session.run(query, node_ids=list(final_node_ids)) + + edges_to_add = [] + async for record in results: + rel = record["r"] + edge_id = str(record["edge_id"]) + source_id = record["source_id"] + target_id = record["target_id"] + + if edge_id not in visited_edges: + # Create edge pair for deduplication (undirected) + sorted_pair = tuple(sorted([source_id, target_id])) + + if sorted_pair not in visited_edge_pairs: + edges_to_add.append( + KnowledgeGraphEdge( + id=f"{edge_id}", + type=rel.type, + source=f"{source_id}", + target=f"{target_id}", + properties=dict(rel), + ) + ) + visited_edges.add(edge_id) + visited_edge_pairs.add(sorted_pair) + + await results.consume() + + # Add all valid edges to the result + result.edges.extend(edges_to_add) + + logger.info( + f"BFS subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + ) + return result From 81c93f695087693ec0bb5b261ac1a036276361bf Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:16:20 +0200 Subject: [PATCH 05/10] dont use mage procedure --- lightrag/kg/memgraph_impl.py | 39 +++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 67c8a63f..01465413 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -814,29 +814,26 @@ class MemgraphStorage(BaseGraphStorage): subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id - WITH start - CALL path.subgraph_all(start, {{ - relationshipFilter: [], - labelFilter: ['{workspace_label}'], - minHops: 0, - maxHops: $max_depth - }}) - YIELD nodes, rels + + MATCH path = (start)-[*BFS 0..{max_depth}]-(end:`{workspace_label}`) + WHERE ALL(n IN nodes(path) WHERE '{workspace_label}' IN labels(n)) + WITH collect(DISTINCT end) + start AS all_nodes_unlimited WITH - CASE - WHEN size(nodes) <= $max_nodes THEN nodes - ELSE nodes[0..$max_nodes] - END AS limited_nodes, - rels, - size(nodes) > $max_nodes AS is_truncated - UNWIND rels AS rel - WITH limited_nodes, rel, is_truncated - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, is_truncated + CASE + WHEN size(all_nodes_unlimited) <= $max_nodes THEN all_nodes_unlimited + ELSE all_nodes_unlimited[0..$max_nodes] + END AS limited_nodes, + size(all_nodes_unlimited) > $max_nodes AS is_truncated + + UNWIND limited_nodes AS n1 + UNWIND limited_nodes AS n2 + MATCH (n1)-[r]-(n2) + WITH DISTINCT r, limited_nodes, is_truncated + RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships, - is_truncated + [node IN limited_nodes | {{node: node}}] AS node_info, + collect(DISTINCT r) AS relationships, + is_truncated """ result_set = None From f961f1aa7de1351c907ec9edd07f1abc783a03db Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:26:23 +0200 Subject: [PATCH 06/10] remove fallback query --- lightrag/kg/memgraph_impl.py | 174 +---------------------------------- 1 file changed, 1 insertion(+), 173 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 01465413..aff53429 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -900,177 +900,5 @@ class MemgraphStorage(BaseGraphStorage): except Exception as e: logger.warning(f"Memgraph error during subgraph query: {str(e)}") - if node_label != "*": - logger.warning( - "Memgraph: falling back to basic Cypher recursive search..." - ) - return await self._robust_fallback(node_label, max_depth, max_nodes) - else: - logger.warning( - "Memgraph: Mage plugin error with wildcard query, returning empty result" - ) - return result - - async def _robust_fallback( - self, node_label: str, max_depth: int, max_nodes: int - ) -> KnowledgeGraph: - """ - Fallback implementation when MAGE plugin is not available or incompatible. - This method implements the same functionality as get_knowledge_graph but uses - only basic Cypher queries and true breadth-first traversal instead of MAGE procedures. - """ - from collections import deque - - result = KnowledgeGraph() - visited_nodes = set() - visited_edges = set() - visited_edge_pairs = set() - - # Get the starting node's data - workspace_label = self._get_workspace_label() - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - query = f""" - MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) - RETURN id(n) as node_id, n - """ - node_result = await session.run(query, entity_id=node_label) - try: - node_record = await node_result.single() - if not node_record: - return result - - # Create initial KnowledgeGraphNode - start_node = KnowledgeGraphNode( - id=f"{node_record['n'].get('entity_id')}", - labels=[node_record["n"].get("entity_id")], - properties=dict(node_record["n"]._properties), - ) - finally: - await node_result.consume() # Ensure results are consumed - - # Initialize queue for BFS with (node, depth) tuples - queue = deque([(start_node, 0)]) - - # Keep track of all nodes we've discovered (including those we might not add due to limits) - discovered_nodes = {} # node_id -> KnowledgeGraphNode - discovered_nodes[start_node.id] = start_node - - # True BFS implementation using a queue - while queue: - # Dequeue the next node to process - current_node, current_depth = queue.popleft() - - # Skip if already processed or exceeds max depth - if current_node.id in visited_nodes: - continue - - if current_depth > max_depth: - logger.debug( - f"Skipping node at depth {current_depth} (max_depth: {max_depth})" - ) - continue - - # Check if we've reached the node limit - if len(visited_nodes) >= max_nodes: - result.is_truncated = True - logger.info( - f"Graph truncated: breadth-first search limited to: {max_nodes} nodes" - ) - break - - # Add current node to result - result.nodes.append(current_node) - visited_nodes.add(current_node.id) - - # Only continue exploring if we haven't reached max depth - if current_depth < max_depth: - # Get all edges and target nodes for the current node - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - workspace_label = self._get_workspace_label() - query = f""" - MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IS NOT NULL - RETURN r, b, id(r) as edge_id - """ - results = await session.run(query, entity_id=current_node.id) - - # Get all records and release database connection - records = await results.fetch( - 1000 - ) # Max neighbor nodes we can handle - await results.consume() # Ensure results are consumed - - # Process all neighbors - for record in records: - rel = record["r"] - edge_id = str(record["edge_id"]) - b_node = record["b"] - target_id = b_node.get("entity_id") - - if target_id and edge_id not in visited_edges: - # Create KnowledgeGraphNode for target if not already discovered - if target_id not in discovered_nodes: - target_node = KnowledgeGraphNode( - id=f"{target_id}", - labels=[target_id], - properties=dict(b_node._properties), - ) - discovered_nodes[target_id] = target_node - - # Add to queue for further exploration - queue.append((target_node, current_depth + 1)) - - # Second pass: Add edges only between nodes that are actually in the result - final_node_ids = {node.id for node in result.nodes} - - # Now collect all edges between the nodes we actually included - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - # Use a parameterized query to get all edges between our final nodes - query = f""" - UNWIND $node_ids AS node_id - MATCH (a:`{workspace_label}` {{entity_id: node_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IN $node_ids - RETURN DISTINCT r, a.entity_id AS source_id, b.entity_id AS target_id, id(r) AS edge_id - """ - results = await session.run(query, node_ids=list(final_node_ids)) - - edges_to_add = [] - async for record in results: - rel = record["r"] - edge_id = str(record["edge_id"]) - source_id = record["source_id"] - target_id = record["target_id"] - - if edge_id not in visited_edges: - # Create edge pair for deduplication (undirected) - sorted_pair = tuple(sorted([source_id, target_id])) - - if sorted_pair not in visited_edge_pairs: - edges_to_add.append( - KnowledgeGraphEdge( - id=f"{edge_id}", - type=rel.type, - source=f"{source_id}", - target=f"{target_id}", - properties=dict(rel), - ) - ) - visited_edges.add(edge_id) - visited_edge_pairs.add(sorted_pair) - - await results.consume() - - # Add all valid edges to the result - result.edges.extend(edges_to_add) - - logger.info( - f"BFS subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) - return result + return result \ No newline at end of file From 593ce552afb0a86086bc0edb92c2719d7a93f459 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:26:39 +0200 Subject: [PATCH 07/10] run pre-commit --- lightrag/kg/memgraph_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index aff53429..4b640d2d 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -901,4 +901,4 @@ class MemgraphStorage(BaseGraphStorage): except Exception as e: logger.warning(f"Memgraph error during subgraph query: {str(e)}") - return result \ No newline at end of file + return result From 45815f1eaeb44b8f671ec39f858b003e4c7d0aa5 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 15:39:39 +0200 Subject: [PATCH 08/10] remove redundant UNWIND --- lightrag/kg/memgraph_impl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 4b640d2d..3683a8e3 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -825,14 +825,14 @@ class MemgraphStorage(BaseGraphStorage): END AS limited_nodes, size(all_nodes_unlimited) > $max_nodes AS is_truncated - UNWIND limited_nodes AS n1 - UNWIND limited_nodes AS n2 - MATCH (n1)-[r]-(n2) - WITH DISTINCT r, limited_nodes, is_truncated + UNWIND limited_nodes AS n + MATCH (n)-[r]-(m) + WHERE m IN limited_nodes + WITH collect(DISTINCT n) AS limited_nodes, collect(DISTINCT r) AS relationships, is_truncated RETURN [node IN limited_nodes | {{node: node}}] AS node_info, - collect(DISTINCT r) AS relationships, + relationships, is_truncated """ From 9beb2456ece43fc396d9fa1941c88fa20dda4d99 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 16:25:17 +0200 Subject: [PATCH 09/10] update subgraph query comment --- lightrag/kg/memgraph_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 3683a8e3..5838343d 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -810,7 +810,7 @@ class MemgraphStorage(BaseGraphStorage): await result_set.consume() else: - # For specific node queries, use path.subgraph_all with the refined query pattern + # Run subgraph query for specific node_label subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id From 2914b21b3404ffbe628805cd0ea57a6afc27448b Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 16:25:58 +0200 Subject: [PATCH 10/10] remove unused query parameter --- lightrag/kg/memgraph_impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 5838343d..3d2c131e 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -842,7 +842,6 @@ class MemgraphStorage(BaseGraphStorage): subgraph_query, { "entity_id": node_label, - "max_depth": max_depth, "max_nodes": max_nodes, }, )