From f961f1aa7de1351c907ec9edd07f1abc783a03db Mon Sep 17 00:00:00 2001 From: DavIvek Date: Mon, 14 Jul 2025 14:26:23 +0200 Subject: [PATCH] remove fallback query --- lightrag/kg/memgraph_impl.py | 174 +---------------------------------- 1 file changed, 1 insertion(+), 173 deletions(-) diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 01465413..aff53429 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -900,177 +900,5 @@ class MemgraphStorage(BaseGraphStorage): except Exception as e: logger.warning(f"Memgraph error during subgraph query: {str(e)}") - if node_label != "*": - logger.warning( - "Memgraph: falling back to basic Cypher recursive search..." - ) - return await self._robust_fallback(node_label, max_depth, max_nodes) - else: - logger.warning( - "Memgraph: Mage plugin error with wildcard query, returning empty result" - ) - return result - - async def _robust_fallback( - self, node_label: str, max_depth: int, max_nodes: int - ) -> KnowledgeGraph: - """ - Fallback implementation when MAGE plugin is not available or incompatible. - This method implements the same functionality as get_knowledge_graph but uses - only basic Cypher queries and true breadth-first traversal instead of MAGE procedures. - """ - from collections import deque - - result = KnowledgeGraph() - visited_nodes = set() - visited_edges = set() - visited_edge_pairs = set() - - # Get the starting node's data - workspace_label = self._get_workspace_label() - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - query = f""" - MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) - RETURN id(n) as node_id, n - """ - node_result = await session.run(query, entity_id=node_label) - try: - node_record = await node_result.single() - if not node_record: - return result - - # Create initial KnowledgeGraphNode - start_node = KnowledgeGraphNode( - id=f"{node_record['n'].get('entity_id')}", - labels=[node_record["n"].get("entity_id")], - properties=dict(node_record["n"]._properties), - ) - finally: - await node_result.consume() # Ensure results are consumed - - # Initialize queue for BFS with (node, depth) tuples - queue = deque([(start_node, 0)]) - - # Keep track of all nodes we've discovered (including those we might not add due to limits) - discovered_nodes = {} # node_id -> KnowledgeGraphNode - discovered_nodes[start_node.id] = start_node - - # True BFS implementation using a queue - while queue: - # Dequeue the next node to process - current_node, current_depth = queue.popleft() - - # Skip if already processed or exceeds max depth - if current_node.id in visited_nodes: - continue - - if current_depth > max_depth: - logger.debug( - f"Skipping node at depth {current_depth} (max_depth: {max_depth})" - ) - continue - - # Check if we've reached the node limit - if len(visited_nodes) >= max_nodes: - result.is_truncated = True - logger.info( - f"Graph truncated: breadth-first search limited to: {max_nodes} nodes" - ) - break - - # Add current node to result - result.nodes.append(current_node) - visited_nodes.add(current_node.id) - - # Only continue exploring if we haven't reached max depth - if current_depth < max_depth: - # Get all edges and target nodes for the current node - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - workspace_label = self._get_workspace_label() - query = f""" - MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IS NOT NULL - RETURN r, b, id(r) as edge_id - """ - results = await session.run(query, entity_id=current_node.id) - - # Get all records and release database connection - records = await results.fetch( - 1000 - ) # Max neighbor nodes we can handle - await results.consume() # Ensure results are consumed - - # Process all neighbors - for record in records: - rel = record["r"] - edge_id = str(record["edge_id"]) - b_node = record["b"] - target_id = b_node.get("entity_id") - - if target_id and edge_id not in visited_edges: - # Create KnowledgeGraphNode for target if not already discovered - if target_id not in discovered_nodes: - target_node = KnowledgeGraphNode( - id=f"{target_id}", - labels=[target_id], - properties=dict(b_node._properties), - ) - discovered_nodes[target_id] = target_node - - # Add to queue for further exploration - queue.append((target_node, current_depth + 1)) - - # Second pass: Add edges only between nodes that are actually in the result - final_node_ids = {node.id for node in result.nodes} - - # Now collect all edges between the nodes we actually included - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - # Use a parameterized query to get all edges between our final nodes - query = f""" - UNWIND $node_ids AS node_id - MATCH (a:`{workspace_label}` {{entity_id: node_id}})-[r]-(b:`{workspace_label}`) - WHERE b.entity_id IN $node_ids - RETURN DISTINCT r, a.entity_id AS source_id, b.entity_id AS target_id, id(r) AS edge_id - """ - results = await session.run(query, node_ids=list(final_node_ids)) - - edges_to_add = [] - async for record in results: - rel = record["r"] - edge_id = str(record["edge_id"]) - source_id = record["source_id"] - target_id = record["target_id"] - - if edge_id not in visited_edges: - # Create edge pair for deduplication (undirected) - sorted_pair = tuple(sorted([source_id, target_id])) - - if sorted_pair not in visited_edge_pairs: - edges_to_add.append( - KnowledgeGraphEdge( - id=f"{edge_id}", - type=rel.type, - source=f"{source_id}", - target=f"{target_id}", - properties=dict(rel), - ) - ) - visited_edges.add(edge_id) - visited_edge_pairs.add(sorted_pair) - - await results.consume() - - # Add all valid edges to the result - result.edges.extend(edges_to_add) - - logger.info( - f"BFS subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) - return result + return result \ No newline at end of file