From a0c4d88b0d2dcb372a7cbf1e006bec30d39d9f47 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Thu, 10 Jul 2025 16:56:44 +0200 Subject: [PATCH 01/11] wip fix Memgraph get_knowledge_graph issues --- lightrag/api/README.md | 5 +++-- lightrag/kg/memgraph_impl.py | 36 +++++++++++++++++++++--------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/lightrag/api/README.md b/lightrag/api/README.md index 915ad7f3..4517a318 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -179,9 +179,9 @@ The command-line `workspace` argument and the `WORKSPACE` environment variable i - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. - **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. -- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage` +- **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage` -To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`. +To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`, `MEMGRAPH_WORKSPACE`. ### Multiple workers for Gunicorn + Uvicorn @@ -394,6 +394,7 @@ MongoKVStorage MongoDB NetworkXStorage NetworkX (default) Neo4JStorage Neo4J PGGraphStorage PostgreSQL with AGE plugin +MemgraphStorage. Memgraph ``` > Testing has shown that Neo4J delivers superior performance in production environments compared to PostgreSQL with AGE plugin. diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 8c6d6574..77e45a06 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -435,7 +435,7 @@ class MemgraphStorage(BaseGraphStorage): async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: """ - Upsert a node in the Neo4j database. + Upsert a node in the Memgraph database. Args: node_id: The unique identifier for the node (used as label) @@ -448,7 +448,7 @@ class MemgraphStorage(BaseGraphStorage): properties = node_data entity_type = properties["entity_type"] if "entity_id" not in properties: - raise ValueError("Neo4j: node properties must contain an 'entity_id' field") + raise ValueError("Memgraph: node properties must contain an 'entity_id' field") try: async with self._driver.session(database=self._DATABASE) as session: @@ -817,28 +817,34 @@ class MemgraphStorage(BaseGraphStorage): WITH start CALL {{ WITH start - MATCH path = (start)-[*0..{max_depth}]-(node) + MATCH path = (start)-[*BFS 0..{max_depth}]-(node) WITH nodes(path) AS path_nodes, relationships(path) AS path_rels UNWIND path_nodes AS n WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels RETURN all_nodes, all_rels }} - WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes + WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes_found WITH - CASE - WHEN total_nodes <= {max_nodes} THEN nodes - ELSE nodes[0..{max_nodes}] - END AS limited_nodes, - relationships, - total_nodes, - total_nodes > {max_nodes} AS is_truncated + CASE + WHEN total_nodes_found <= {max_nodes} THEN nodes + ELSE nodes[0..{max_nodes}] + END AS limited_nodes, + relationships, + total_nodes_found, + total_nodes_found > {max_nodes} AS is_truncated + + UNWIND relationships AS rel + WITH limited_nodes, rel, total_nodes_found, is_truncated + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, total_nodes_found, is_truncated RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - relationships, - total_nodes, - is_truncated + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships, + total_nodes_found, + is_truncated """ + result_set = None try: result_set = await session.run( From 63d554bad8dc18bbe4ec361367099a565ff0afc0 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Thu, 10 Jul 2025 16:57:13 +0200 Subject: [PATCH 02/11] run pre-commit --- lightrag/kg/memgraph_impl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 77e45a06..dc78ca19 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -448,7 +448,9 @@ class MemgraphStorage(BaseGraphStorage): properties = node_data entity_type = properties["entity_type"] if "entity_id" not in properties: - raise ValueError("Memgraph: node properties must contain an 'entity_id' field") + raise ValueError( + "Memgraph: node properties must contain an 'entity_id' field" + ) try: async with self._driver.session(database=self._DATABASE) as session: From 8d295bd2941b8350d0bd4b0b4ad2fef5dad1134b Mon Sep 17 00:00:00 2001 From: DavIvek Date: Fri, 11 Jul 2025 11:55:53 +0200 Subject: [PATCH 03/11] wip fix Memgraph get_knowledge_graph issues --- lightrag/kg/memgraph_impl.py | 189 ++++++++++++++++++++--------------- 1 file changed, 109 insertions(+), 80 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index dc78ca19..d0044499 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -734,7 +734,7 @@ class MemgraphStorage(BaseGraphStorage): self, node_label: str, max_depth: int = 3, - max_nodes: int = MAX_GRAPH_NODES, + max_nodes: int = None, ) -> KnowledgeGraph: """ Retrieve a connected subgraph of nodes where the label includes the specified `node_label`. @@ -747,137 +747,166 @@ class MemgraphStorage(BaseGraphStorage): Returns: KnowledgeGraph object containing nodes and edges, with an is_truncated flag indicating whether the graph was truncated due to max_nodes limit - - Raises: - Exception: If there is an error executing the query """ - if self._driver is None: - raise RuntimeError( - "Memgraph driver is not initialized. Call 'await initialize()' first." - ) + # Get max_nodes from global_config if not provided + if max_nodes is None: + max_nodes = self.global_config.get("max_graph_nodes", 1000) + else: + # Limit max_nodes to not exceed global_config max_graph_nodes + max_nodes = min(max_nodes, self.global_config.get("max_graph_nodes", 1000)) + workspace_label = self._get_workspace_label() result = KnowledgeGraph() seen_nodes = set() seen_edges = set() - workspace_label = self._get_workspace_label() + async with self._driver.session( database=self._DATABASE, default_access_mode="READ" ) as session: try: if node_label == "*": - # First check if database has any nodes - count_query = "MATCH (n) RETURN count(n) as total" + # First check total node count to determine if graph is truncated + count_query = ( + f"MATCH (n:`{workspace_label}`) RETURN count(n) as total" + ) count_result = None - total_count = 0 try: count_result = await session.run(count_query) count_record = await count_result.single() - if count_record: - total_count = count_record["total"] - if total_count == 0: - logger.debug("No nodes found in database") - return result - if total_count > max_nodes: - result.is_truncated = True - logger.info( - f"Graph truncated: {total_count} nodes found, limited to {max_nodes}" - ) + + if count_record and count_record["total"] > max_nodes: + result.is_truncated = True + logger.info( + f"Graph truncated: {count_record['total']} nodes found, limited to {max_nodes}" + ) finally: if count_result: await count_result.consume() - # Run the main query to get nodes with highest degree + # Run main query to get nodes with highest degree main_query = f""" MATCH (n:`{workspace_label}`) OPTIONAL MATCH (n)-[r]-() WITH n, COALESCE(count(r), 0) AS degree ORDER BY degree DESC LIMIT $max_nodes - WITH collect(n) AS kept_nodes - MATCH (a)-[r]-(b) + WITH collect({{node: n}}) AS filtered_nodes + UNWIND filtered_nodes AS node_info + WITH collect(node_info.node) AS kept_nodes, filtered_nodes + OPTIONAL MATCH (a)-[r]-(b) WHERE a IN kept_nodes AND b IN kept_nodes - RETURN [node IN kept_nodes | {{node: node}}] AS node_info, - collect(DISTINCT r) AS relationships + RETURN filtered_nodes AS node_info, + collect(DISTINCT r) AS relationships """ result_set = None try: result_set = await session.run( - main_query, {"max_nodes": max_nodes} + main_query, + {"max_nodes": max_nodes}, ) record = await result_set.single() - if not record: - logger.debug("No record returned from main query") - return result finally: if result_set: await result_set.consume() else: - bfs_query = f""" + # return await self._robust_fallback(node_label, max_depth, max_nodes) + # First try without limit to check if we need to truncate + full_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id WITH start - CALL {{ - WITH start - MATCH path = (start)-[*BFS 0..{max_depth}]-(node) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels - RETURN all_nodes, all_rels - }} - WITH all_nodes AS nodes, all_rels AS relationships, size(all_nodes) AS total_nodes_found - WITH - CASE - WHEN total_nodes_found <= {max_nodes} THEN nodes - ELSE nodes[0..{max_nodes}] - END AS limited_nodes, - relationships, - total_nodes_found, - total_nodes_found > {max_nodes} AS is_truncated - - UNWIND relationships AS rel - WITH limited_nodes, rel, total_nodes_found, is_truncated - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, total_nodes_found, is_truncated + MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) + WITH nodes(path) AS path_nodes, relationships(path) AS path_rels + UNWIND path_nodes AS n + WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists + WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships, - total_nodes_found, - is_truncated + [node IN all_nodes | {{node: node}}] AS node_info, + all_rels AS relationships, + size(all_nodes) AS total_nodes """ - result_set = None + # Try to get full result + full_result = None try: - result_set = await session.run( - bfs_query, + full_result = await session.run( + full_query, { "entity_id": node_label, }, ) - record = await result_set.single() - if not record: + full_record = await full_result.single() + + # If no record found, return empty KnowledgeGraph + if not full_record: logger.debug(f"No nodes found for entity_id: {node_label}") return result - # Check if the query indicates truncation - if "is_truncated" in record and record["is_truncated"]: + # If record found, check node count + total_nodes = full_record["total_nodes"] + + if total_nodes <= max_nodes: + # If node count is within limit, use full result directly + logger.debug( + f"Using full result with {total_nodes} nodes (no truncation needed)" + ) + record = full_record + else: + # If node count exceeds limit, set truncated flag and run limited query result.is_truncated = True logger.info( - f"Graph truncated: breadth-first search limited to {max_nodes} nodes" + f"Graph truncated: {total_nodes} nodes found, breadth-first search limited to {max_nodes}" ) - finally: - if result_set: - await result_set.consume() + # Run limited query + limited_query = f""" + MATCH (start:`{workspace_label}`) + WHERE start.entity_id = $entity_id + WITH start + MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) + WITH nodes(path) AS path_nodes, relationships(path) AS path_rels + UNWIND path_nodes AS n + WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists + WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels + WITH + CASE + WHEN size(all_nodes) <= $max_nodes THEN all_nodes + ELSE all_nodes[0..$max_nodes] + END AS limited_nodes, + all_rels + UNWIND all_rels AS rel + WITH limited_nodes, rel + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships + RETURN + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships + """ - # Process the record if it exists - if record and record["node_info"]: + result_set = None + try: + result_set = await session.run( + limited_query, + { + "entity_id": node_label, + "max_nodes": max_nodes, + }, + ) + record = await result_set.single() + finally: + if result_set: + await result_set.consume() + finally: + if full_result: + await full_result.consume() + + if record: + # Handle nodes (compatible with multi-label cases) for node_info in record["node_info"]: node = node_info["node"] node_id = node.id if node_id not in seen_nodes: - seen_nodes.add(node_id) result.nodes.append( KnowledgeGraphNode( id=f"{node_id}", @@ -885,11 +914,12 @@ class MemgraphStorage(BaseGraphStorage): properties=dict(node), ) ) + seen_nodes.add(node_id) + # Handle relationships (including direction information) for rel in record["relationships"]: edge_id = rel.id if edge_id not in seen_edges: - seen_edges.add(edge_id) start = rel.start_node end = rel.end_node result.edges.append( @@ -901,14 +931,13 @@ class MemgraphStorage(BaseGraphStorage): properties=dict(rel), ) ) + seen_edges.add(edge_id) - logger.info( - f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) + logger.info( + f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + ) except Exception as e: - logger.error(f"Error getting knowledge graph: {str(e)}") - # Return empty but properly initialized KnowledgeGraph on error - return KnowledgeGraph() + logger.error(f"Error during subgraph query for {node_label}: {str(e)}") return result From 15c4bac87f975e5192592e8959f1523daec52cc1 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Fri, 11 Jul 2025 17:02:42 +0200 Subject: [PATCH 04/11] wip fix Memgraph get_knowledge_graph issue by using mage function --- lightrag/kg/memgraph_impl.py | 284 ++++++++++++++++++++++++++--------- 1 file changed, 210 insertions(+), 74 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index d0044499..67c8a63f 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -742,7 +742,7 @@ class MemgraphStorage(BaseGraphStorage): Args: node_label: Label of the starting node, * means all nodes max_depth: Maximum depth of the subgraph, Defaults to 3 - max_nodes: Maxiumu nodes to return by BFS, Defaults to 1000 + max_nodes: Maximum nodes to return by BFS, Defaults to 1000 Returns: KnowledgeGraph object containing nodes and edges, with an is_truncated flag @@ -796,7 +796,7 @@ class MemgraphStorage(BaseGraphStorage): OPTIONAL MATCH (a)-[r]-(b) WHERE a IN kept_nodes AND b IN kept_nodes RETURN filtered_nodes AS node_info, - collect(DISTINCT r) AS relationships + collect(DISTINCT r) AS relationships """ result_set = None try: @@ -810,99 +810,64 @@ class MemgraphStorage(BaseGraphStorage): await result_set.consume() else: - # return await self._robust_fallback(node_label, max_depth, max_nodes) - # First try without limit to check if we need to truncate - full_query = f""" + # For specific node queries, use path.subgraph_all with the refined query pattern + subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id WITH start - MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels + CALL path.subgraph_all(start, {{ + relationshipFilter: [], + labelFilter: ['{workspace_label}'], + minHops: 0, + maxHops: $max_depth + }}) + YIELD nodes, rels + WITH + CASE + WHEN size(nodes) <= $max_nodes THEN nodes + ELSE nodes[0..$max_nodes] + END AS limited_nodes, + rels, + size(nodes) > $max_nodes AS is_truncated + UNWIND rels AS rel + WITH limited_nodes, rel, is_truncated + WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes + WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, is_truncated RETURN - [node IN all_nodes | {{node: node}}] AS node_info, - all_rels AS relationships, - size(all_nodes) AS total_nodes + [node IN limited_nodes | {{node: node}}] AS node_info, + limited_relationships AS relationships, + is_truncated """ - # Try to get full result - full_result = None + result_set = None try: - full_result = await session.run( - full_query, + result_set = await session.run( + subgraph_query, { "entity_id": node_label, + "max_depth": max_depth, + "max_nodes": max_nodes, }, ) - full_record = await full_result.single() + record = await result_set.single() # If no record found, return empty KnowledgeGraph - if not full_record: + if not record: logger.debug(f"No nodes found for entity_id: {node_label}") return result - # If record found, check node count - total_nodes = full_record["total_nodes"] - - if total_nodes <= max_nodes: - # If node count is within limit, use full result directly - logger.debug( - f"Using full result with {total_nodes} nodes (no truncation needed)" - ) - record = full_record - else: - # If node count exceeds limit, set truncated flag and run limited query + # Check if the result was truncated + if record.get("is_truncated"): result.is_truncated = True logger.info( - f"Graph truncated: {total_nodes} nodes found, breadth-first search limited to {max_nodes}" + f"Graph truncated: breadth-first search limited to {max_nodes} nodes" ) - # Run limited query - limited_query = f""" - MATCH (start:`{workspace_label}`) - WHERE start.entity_id = $entity_id - WITH start - MATCH path = (start)-[*BFS ..{max_depth}]-(node:`{workspace_label}`) - WITH nodes(path) AS path_nodes, relationships(path) AS path_rels - UNWIND path_nodes AS n - WITH collect(DISTINCT n) AS all_nodes, collect(DISTINCT path_rels) AS all_rel_lists - WITH all_nodes, reduce(r = [], x IN all_rel_lists | r + x) AS all_rels - WITH - CASE - WHEN size(all_nodes) <= $max_nodes THEN all_nodes - ELSE all_nodes[0..$max_nodes] - END AS limited_nodes, - all_rels - UNWIND all_rels AS rel - WITH limited_nodes, rel - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships - RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships - """ - - result_set = None - try: - result_set = await session.run( - limited_query, - { - "entity_id": node_label, - "max_nodes": max_nodes, - }, - ) - record = await result_set.single() - finally: - if result_set: - await result_set.consume() finally: - if full_result: - await full_result.consume() + if result_set: + await result_set.consume() if record: - # Handle nodes (compatible with multi-label cases) for node_info in record["node_info"]: node = node_info["node"] node_id = node.id @@ -916,7 +881,6 @@ class MemgraphStorage(BaseGraphStorage): ) seen_nodes.add(node_id) - # Handle relationships (including direction information) for rel in record["relationships"]: edge_id = rel.id if edge_id not in seen_edges: @@ -938,6 +902,178 @@ class MemgraphStorage(BaseGraphStorage): ) except Exception as e: - logger.error(f"Error during subgraph query for {node_label}: {str(e)}") + logger.warning(f"Memgraph error during subgraph query: {str(e)}") + if node_label != "*": + logger.warning( + "Memgraph: falling back to basic Cypher recursive search..." + ) + return await self._robust_fallback(node_label, max_depth, max_nodes) + else: + logger.warning( + "Memgraph: Mage plugin error with wildcard query, returning empty result" + ) return result + + async def _robust_fallback( + self, node_label: str, max_depth: int, max_nodes: int + ) -> KnowledgeGraph: + """ + Fallback implementation when MAGE plugin is not available or incompatible. + This method implements the same functionality as get_knowledge_graph but uses + only basic Cypher queries and true breadth-first traversal instead of MAGE procedures. + """ + from collections import deque + + result = KnowledgeGraph() + visited_nodes = set() + visited_edges = set() + visited_edge_pairs = set() + + # Get the starting node's data + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) + RETURN id(n) as node_id, n + """ + node_result = await session.run(query, entity_id=node_label) + try: + node_record = await node_result.single() + if not node_record: + return result + + # Create initial KnowledgeGraphNode + start_node = KnowledgeGraphNode( + id=f"{node_record['n'].get('entity_id')}", + labels=[node_record["n"].get("entity_id")], + properties=dict(node_record["n"]._properties), + ) + finally: + await node_result.consume() # Ensure results are consumed + + # Initialize queue for BFS with (node, depth) tuples + queue = deque([(start_node, 0)]) + + # Keep track of all nodes we've discovered (including those we might not add due to limits) + discovered_nodes = {} # node_id -> KnowledgeGraphNode + discovered_nodes[start_node.id] = start_node + + # True BFS implementation using a queue + while queue: + # Dequeue the next node to process + current_node, current_depth = queue.popleft() + + # Skip if already processed or exceeds max depth + if current_node.id in visited_nodes: + continue + + if current_depth > max_depth: + logger.debug( + f"Skipping node at depth {current_depth} (max_depth: {max_depth})" + ) + continue + + # Check if we've reached the node limit + if len(visited_nodes) >= max_nodes: + result.is_truncated = True + logger.info( + f"Graph truncated: breadth-first search limited to: {max_nodes} nodes" + ) + break + + # Add current node to result + result.nodes.append(current_node) + visited_nodes.add(current_node.id) + + # Only continue exploring if we haven't reached max depth + if current_depth < max_depth: + # Get all edges and target nodes for the current node + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + workspace_label = self._get_workspace_label() + query = f""" + MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b:`{workspace_label}`) + WHERE b.entity_id IS NOT NULL + RETURN r, b, id(r) as edge_id + """ + results = await session.run(query, entity_id=current_node.id) + + # Get all records and release database connection + records = await results.fetch( + 1000 + ) # Max neighbor nodes we can handle + await results.consume() # Ensure results are consumed + + # Process all neighbors + for record in records: + rel = record["r"] + edge_id = str(record["edge_id"]) + b_node = record["b"] + target_id = b_node.get("entity_id") + + if target_id and edge_id not in visited_edges: + # Create KnowledgeGraphNode for target if not already discovered + if target_id not in discovered_nodes: + target_node = KnowledgeGraphNode( + id=f"{target_id}", + labels=[target_id], + properties=dict(b_node._properties), + ) + discovered_nodes[target_id] = target_node + + # Add to queue for further exploration + queue.append((target_node, current_depth + 1)) + + # Second pass: Add edges only between nodes that are actually in the result + final_node_ids = {node.id for node in result.nodes} + + # Now collect all edges between the nodes we actually included + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + # Use a parameterized query to get all edges between our final nodes + query = f""" + UNWIND $node_ids AS node_id + MATCH (a:`{workspace_label}` {{entity_id: node_id}})-[r]-(b:`{workspace_label}`) + WHERE b.entity_id IN $node_ids + RETURN DISTINCT r, a.entity_id AS source_id, b.entity_id AS target_id, id(r) AS edge_id + """ + results = await session.run(query, node_ids=list(final_node_ids)) + + edges_to_add = [] + async for record in results: + rel = record["r"] + edge_id = str(record["edge_id"]) + source_id = record["source_id"] + target_id = record["target_id"] + + if edge_id not in visited_edges: + # Create edge pair for deduplication (undirected) + sorted_pair = tuple(sorted([source_id, target_id])) + + if sorted_pair not in visited_edge_pairs: + edges_to_add.append( + KnowledgeGraphEdge( + id=f"{edge_id}", + type=rel.type, + source=f"{source_id}", + target=f"{target_id}", + properties=dict(rel), + ) + ) + visited_edges.add(edge_id) + visited_edge_pairs.add(sorted_pair) + + await results.consume() + + # Add all valid edges to the result + result.edges.extend(edges_to_add) + + logger.info( + f"BFS subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + ) + return result From 81c93f695087693ec0bb5b261ac1a036276361bf Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:16:20 +0200 Subject: [PATCH 05/11] dont use mage procedure --- lightrag/kg/memgraph_impl.py | 39 +++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 67c8a63f..01465413 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -814,29 +814,26 @@ class MemgraphStorage(BaseGraphStorage): subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id - WITH start - CALL path.subgraph_all(start, {{ - relationshipFilter: [], - labelFilter: ['{workspace_label}'], - minHops: 0, - maxHops: $max_depth - }}) - YIELD nodes, rels + + MATCH path = (start)-[*BFS 0..{max_depth}]-(end:`{workspace_label}`) + WHERE ALL(n IN nodes(path) WHERE '{workspace_label}' IN labels(n)) + WITH collect(DISTINCT end) + start AS all_nodes_unlimited WITH - CASE - WHEN size(nodes) <= $max_nodes THEN nodes - ELSE nodes[0..$max_nodes] - END AS limited_nodes, - rels, - size(nodes) > $max_nodes AS is_truncated - UNWIND rels AS rel - WITH limited_nodes, rel, is_truncated - WHERE startNode(rel) IN limited_nodes AND endNode(rel) IN limited_nodes - WITH limited_nodes, collect(DISTINCT rel) AS limited_relationships, is_truncated + CASE + WHEN size(all_nodes_unlimited) <= $max_nodes THEN all_nodes_unlimited + ELSE all_nodes_unlimited[0..$max_nodes] + END AS limited_nodes, + size(all_nodes_unlimited) > $max_nodes AS is_truncated + + UNWIND limited_nodes AS n1 + UNWIND limited_nodes AS n2 + MATCH (n1)-[r]-(n2) + WITH DISTINCT r, limited_nodes, is_truncated + RETURN - [node IN limited_nodes | {{node: node}}] AS node_info, - limited_relationships AS relationships, - is_truncated + [node IN limited_nodes | {{node: node}}] AS node_info, + collect(DISTINCT r) AS relationships, + is_truncated """ result_set = None From f961f1aa7de1351c907ec9edd07f1abc783a03db Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:26:23 +0200 Subject: [PATCH 06/11] remove fallback query --- lightrag/kg/memgraph_impl.py | 174 +---------------------------------- 1 file changed, 1 insertion(+), 173 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 01465413..aff53429 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -900,177 +900,5 @@ class MemgraphStorage(BaseGraphStorage): except Exception as e: logger.warning(f"Memgraph error during subgraph query: {str(e)}") - if node_label != "*": - logger.warning( - "Memgraph: falling back to basic Cypher recursive search..." - ) - return await self._robust_fallback(node_label, max_depth, max_nodes) - else: - logger.warning( - "Memgraph: Mage plugin error with wildcard query, returning empty result" - ) - return result - - async def _robust_fallback( - self, node_label: str, max_depth: int, max_nodes: int - ) -> KnowledgeGraph: - """ - Fallback implementation when MAGE plugin is not available or incompatible. - This method implements the same functionality as get_knowledge_graph but uses - only basic Cypher queries and true breadth-first traversal instead of MAGE procedures. - """ - from collections import deque - - result = KnowledgeGraph() - visited_nodes = set() - visited_edges = set() - visited_edge_pairs = set() - - # Get the starting node's data - workspace_label = self._get_workspace_label() - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - query = f""" - MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) - RETURN id(n) as node_id, n - """ - node_result = await session.run(query, entity_id=node_label) - try: - node_record = await node_result.single() - if not node_record: - return result - - # Create initial KnowledgeGraphNode - start_node = KnowledgeGraphNode( - id=f"{node_record['n'].get('entity_id')}", - labels=[node_record["n"].get("entity_id")], - properties=dict(node_record["n"]._properties), - ) - finally: - await node_result.consume() # Ensure results are consumed - - # Initialize queue for BFS with (node, depth) tuples - queue = deque([(start_node, 0)]) - - # Keep track of all nodes we've discovered (including those we might not add due to limits) - discovered_nodes = {} # node_id -> KnowledgeGraphNode - discovered_nodes[start_node.id] = start_node - - # True BFS implementation using a queue - while queue: - # Dequeue the next node to process - current_node, current_depth = queue.popleft() - - # Skip if already processed or exceeds max depth - if current_node.id in visited_nodes: - continue - - if current_depth > max_depth: - logger.debug( - f"Skipping node at depth {current_depth} (max_depth: {max_depth})" - ) - continue - - # Check if we've reached the node limit - if len(visited_nodes) >= max_nodes: - result.is_truncated = True - logger.info( - f"Graph truncated: breadth-first search limited to: {max_nodes} nodes" - ) - break - - # Add current node to result - result.nodes.append(current_node) - visited_nodes.add(current_node.id) - - # Only continue exploring if we haven't reached max depth - if current_depth < max_depth: - # Get all edges and target nodes for the current node - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - workspace_label = self._get_workspace_label() - query = f""" - MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IS NOT NULL - RETURN r, b, id(r) as edge_id - """ - results = await session.run(query, entity_id=current_node.id) - - # Get all records and release database connection - records = await results.fetch( - 1000 - ) # Max neighbor nodes we can handle - await results.consume() # Ensure results are consumed - - # Process all neighbors - for record in records: - rel = record["r"] - edge_id = str(record["edge_id"]) - b_node = record["b"] - target_id = b_node.get("entity_id") - - if target_id and edge_id not in visited_edges: - # Create KnowledgeGraphNode for target if not already discovered - if target_id not in discovered_nodes: - target_node = KnowledgeGraphNode( - id=f"{target_id}", - labels=[target_id], - properties=dict(b_node._properties), - ) - discovered_nodes[target_id] = target_node - - # Add to queue for further exploration - queue.append((target_node, current_depth + 1)) - - # Second pass: Add edges only between nodes that are actually in the result - final_node_ids = {node.id for node in result.nodes} - - # Now collect all edges between the nodes we actually included - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - # Use a parameterized query to get all edges between our final nodes - query = f""" - UNWIND $node_ids AS node_id - MATCH (a:`{workspace_label}` {{entity_id: node_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IN $node_ids - RETURN DISTINCT r, a.entity_id AS source_id, b.entity_id AS target_id, id(r) AS edge_id - """ - results = await session.run(query, node_ids=list(final_node_ids)) - - edges_to_add = [] - async for record in results: - rel = record["r"] - edge_id = str(record["edge_id"]) - source_id = record["source_id"] - target_id = record["target_id"] - - if edge_id not in visited_edges: - # Create edge pair for deduplication (undirected) - sorted_pair = tuple(sorted([source_id, target_id])) - - if sorted_pair not in visited_edge_pairs: - edges_to_add.append( - KnowledgeGraphEdge( - id=f"{edge_id}", - type=rel.type, - source=f"{source_id}", - target=f"{target_id}", - properties=dict(rel), - ) - ) - visited_edges.add(edge_id) - visited_edge_pairs.add(sorted_pair) - - await results.consume() - - # Add all valid edges to the result - result.edges.extend(edges_to_add) - - logger.info( - f"BFS subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) - return result + return result \ No newline at end of file From 593ce552afb0a86086bc0edb92c2719d7a93f459 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:26:39 +0200 Subject: [PATCH 07/11] run pre-commit --- lightrag/kg/memgraph_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index aff53429..4b640d2d 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -901,4 +901,4 @@ class MemgraphStorage(BaseGraphStorage): except Exception as e: logger.warning(f"Memgraph error during subgraph query: {str(e)}") - return result \ No newline at end of file + return result From 45815f1eaeb44b8f671ec39f858b003e4c7d0aa5 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 15:39:39 +0200 Subject: [PATCH 08/11] remove redundant UNWIND --- lightrag/kg/memgraph_impl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 4b640d2d..3683a8e3 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -825,14 +825,14 @@ class MemgraphStorage(BaseGraphStorage): END AS limited_nodes, size(all_nodes_unlimited) > $max_nodes AS is_truncated - UNWIND limited_nodes AS n1 - UNWIND limited_nodes AS n2 - MATCH (n1)-[r]-(n2) - WITH DISTINCT r, limited_nodes, is_truncated + UNWIND limited_nodes AS n + MATCH (n)-[r]-(m) + WHERE m IN limited_nodes + WITH collect(DISTINCT n) AS limited_nodes, collect(DISTINCT r) AS relationships, is_truncated RETURN [node IN limited_nodes | {{node: node}}] AS node_info, - collect(DISTINCT r) AS relationships, + relationships, is_truncated """ From 9beb2456ece43fc396d9fa1941c88fa20dda4d99 Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 16:25:17 +0200 Subject: [PATCH 09/11] update subgraph query comment --- lightrag/kg/memgraph_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 3683a8e3..5838343d 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -810,7 +810,7 @@ class MemgraphStorage(BaseGraphStorage): await result_set.consume() else: - # For specific node queries, use path.subgraph_all with the refined query pattern + # Run subgraph query for specific node_label subgraph_query = f""" MATCH (start:`{workspace_label}`) WHERE start.entity_id = $entity_id From 2914b21b3404ffbe628805cd0ea57a6afc27448b Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 16:25:58 +0200 Subject: [PATCH 10/11] remove unused query parameter --- lightrag/kg/memgraph_impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 5838343d..3d2c131e 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -842,7 +842,6 @@ class MemgraphStorage(BaseGraphStorage): subgraph_query, { "entity_id": node_label, - "max_depth": max_depth, "max_nodes": max_nodes, }, ) From ccc2a200712359869d165ebcae29bd4e6ceef3fe Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 15 Jul 2025 12:26:33 +0800 Subject: [PATCH 11/11] feat: remove deprecated MAX_TOKEN_SUMMARY parameter to prevent LLM output truncation - Remove MAX_TOKEN_SUMMARY parameter and related configurations - Eliminate forced token-based truncation in entity/relationship descriptions - Switch to fragment-count based summarization logic using FORCE_LLM_SUMMARY_ON_MERGE - Update FORCE_LLM_SUMMARY_ON_MERGE default from 6 to 4 for better summarization - Clean up documentation, environment examples, and API display code - Preserve backward compatibility by graceful parameter removal This change resolves issues where LLMs were forcibly truncating entity relationship descriptions mid-sentence, leading to incomplete and potentially inaccurate knowledge graph content. The new approach allows LLMs to generate complete descriptions while still providing summarization when multiple fragments need to be merged. Breaking Change: None - parameter removal is backward compatible Fixes: Entity relationship description truncation issues --- README-zh.md | 1 - README.md | 1 - env.example | 2 -- lightrag/api/utils_api.py | 4 ---- lightrag/constants.py | 3 +-- lightrag/lightrag.py | 5 ----- lightrag/operate.py | 1 - 7 files changed, 1 insertion(+), 16 deletions(-) diff --git a/README-zh.md b/README-zh.md index 8b377e0e..9f7f314e 100644 --- a/README-zh.md +++ b/README-zh.md @@ -242,7 +242,6 @@ if __name__ == "__main__": | **tokenizer** | `Tokenizer` | 用于将文本转换为 tokens(数字)以及使用遵循 TokenizerInterface 协议的 .encode() 和 .decode() 函数将 tokens 转换回文本的函数。 如果您不指定,它将使用默认的 Tiktoken tokenizer。 | `TiktokenTokenizer` | | **tiktoken_model_name** | `str` | 如果您使用的是默认的 Tiktoken tokenizer,那么这是要使用的特定 Tiktoken 模型的名称。如果您提供自己的 tokenizer,则忽略此设置。 | `gpt-4o-mini` | | **entity_extract_max_gleaning** | `int` | 实体提取过程中的循环次数,附加历史消息 | `1` | -| **entity_summary_to_max_tokens** | `int` | 每个实体摘要的最大令牌大小 | `500` | | **node_embedding_algorithm** | `str` | 节点嵌入算法(当前未使用) | `node2vec` | | **node2vec_params** | `dict` | 节点嵌入的参数 | `{"dimensions": 1536,"num_walks": 10,"walk_length": 40,"window_size": 2,"iterations": 3,"random_seed": 3,}` | | **embedding_func** | `EmbeddingFunc` | 从文本生成嵌入向量的函数 | `openai_embed` | diff --git a/README.md b/README.md index 5d8a642f..fa2b5924 100644 --- a/README.md +++ b/README.md @@ -249,7 +249,6 @@ A full list of LightRAG init parameters: | **tokenizer** | `Tokenizer` | The function used to convert text into tokens (numbers) and back using .encode() and .decode() functions following `TokenizerInterface` protocol. If you don't specify one, it will use the default Tiktoken tokenizer. | `TiktokenTokenizer` | | **tiktoken_model_name** | `str` | If you're using the default Tiktoken tokenizer, this is the name of the specific Tiktoken model to use. This setting is ignored if you provide your own tokenizer. | `gpt-4o-mini` | | **entity_extract_max_gleaning** | `int` | Number of loops in the entity extraction process, appending history messages | `1` | -| **entity_summary_to_max_tokens** | `int` | Maximum token size for each entity summary | `500` | | **node_embedding_algorithm** | `str` | Algorithm for node embedding (currently not used) | `node2vec` | | **node2vec_params** | `dict` | Parameters for node embedding | `{"dimensions": 1536,"num_walks": 10,"walk_length": 40,"window_size": 2,"iterations": 3,"random_seed": 3,}` | | **embedding_func** | `EmbeddingFunc` | Function to generate embedding vectors from text | `openai_embed` | diff --git a/env.example b/env.example index f8f6d614..828c6d24 100644 --- a/env.example +++ b/env.example @@ -72,8 +72,6 @@ OLLAMA_EMULATING_MODEL_TAG=latest SUMMARY_LANGUAGE=English ### Number of duplicated entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented) # FORCE_LLM_SUMMARY_ON_MERGE=6 -### Max tokens for entity/relations description after merge -# MAX_TOKEN_SUMMARY=500 ### Maximum number of entity extraction attempts for ambiguous content # MAX_GLEANING=1 diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index a724069d..b7099bb3 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -10,7 +10,6 @@ from ascii_colors import ASCIIColors from lightrag.api import __api_version__ as api_version from lightrag import __version__ as core_version from lightrag.constants import ( - DEFAULT_MAX_TOKEN_SUMMARY, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, ) from fastapi import HTTPException, Security, Request, status @@ -280,9 +279,6 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.white(" ├─ Top-K: ", end="") ASCIIColors.yellow(f"{args.top_k}") ASCIIColors.white(" ├─ Max Token Summary: ", end="") - ASCIIColors.yellow( - f"{get_env_value('MAX_TOKEN_SUMMARY', DEFAULT_MAX_TOKEN_SUMMARY, int)}" - ) ASCIIColors.white(" └─ Force LLM Summary on Merge: ", end="") ASCIIColors.yellow( f"{get_env_value('FORCE_LLM_SUMMARY_ON_MERGE', DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, int)}" diff --git a/lightrag/constants.py b/lightrag/constants.py index 82451a36..c3fd6531 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -8,8 +8,7 @@ consistency and makes maintenance easier. # Default values for environment variables DEFAULT_MAX_GLEANING = 1 -DEFAULT_MAX_TOKEN_SUMMARY = 500 -DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 6 +DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4 DEFAULT_WOKERS = 2 DEFAULT_TIMEOUT = 150 diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b6cca32a..6ee61e2d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -23,7 +23,6 @@ from typing import ( ) from lightrag.constants import ( DEFAULT_MAX_GLEANING, - DEFAULT_MAX_TOKEN_SUMMARY, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, ) from lightrag.utils import get_env_value @@ -134,10 +133,6 @@ class LightRAG: ) """Maximum number of entity extraction attempts for ambiguous content.""" - summary_to_max_tokens: int = field( - default=get_env_value("MAX_TOKEN_SUMMARY", DEFAULT_MAX_TOKEN_SUMMARY, int) - ) - force_llm_summary_on_merge: int = field( default=get_env_value( "FORCE_LLM_SUMMARY_ON_MERGE", DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, int diff --git a/lightrag/operate.py b/lightrag/operate.py index 49de3c71..4bf579d1 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -118,7 +118,6 @@ async def _handle_entity_relation_summary( tokenizer: Tokenizer = global_config["tokenizer"] llm_max_tokens = global_config["llm_model_max_token_size"] - # summary_max_tokens = global_config["summary_to_max_tokens"] language = global_config["addon_params"].get( "language", PROMPTS["DEFAULT_LANGUAGE"]