From c5ef544f8ba61b44124d14a3b85b2f22a877138b Mon Sep 17 00:00:00 2001 From: Naseem Ali <34807727+Naseem77@users.noreply.github.com> Date: Tue, 26 Aug 2025 13:03:42 +0300 Subject: [PATCH] Fix code formatting with pre-commit hooks --- README.md | 2 +- examples/falkordb_example.py | 66 +++++----- examples/graph_visual_with_falkordb.py | 37 ++++-- lightrag/kg/falkordb_impl.py | 161 +++++++++++++++---------- 4 files changed, 156 insertions(+), 110 deletions(-) diff --git a/README.md b/README.md index 1d7086c7..335fa6f1 100644 --- a/README.md +++ b/README.md @@ -849,7 +849,7 @@ see test_neo4j.py for a working example. ```python export FALKORDB_HOST="localhost" -export FALKORDB_PORT="6379" +export FALKORDB_PORT="6379" export FALKORDB_PASSWORD="password" # optional export FALKORDB_USERNAME="username" # optional export FALKORDB_GRAPH_NAME="lightrag_graph" # optional, defaults to namespace diff --git a/examples/falkordb_example.py b/examples/falkordb_example.py index 5ddddc74..8e3aeb6a 100644 --- a/examples/falkordb_example.py +++ b/examples/falkordb_example.py @@ -20,88 +20,88 @@ from lightrag.kg.shared_storage import initialize_pipeline_status # Load environment variables load_dotenv() + async def main(): """Example usage of LightRAG with FalkorDB""" - + # Set up environment for FalkorDB os.environ.setdefault("FALKORDB_HOST", "localhost") os.environ.setdefault("FALKORDB_PORT", "6379") os.environ.setdefault("FALKORDB_GRAPH_NAME", "lightrag_example") os.environ.setdefault("FALKORDB_WORKSPACE", "example_workspace") - + # Initialize LightRAG with FalkorDB rag = LightRAG( working_dir="./falkordb_example", llm_model_func=gpt_4o_mini_complete, # Updated function name - embedding_func=openai_embed, # Updated function name - graph_storage="FalkorDBStorage", # Specify FalkorDB backend + embedding_func=openai_embed, # Updated function name + graph_storage="FalkorDBStorage", # Specify FalkorDB backend ) - + # Initialize storage connections await rag.initialize_storages() await initialize_pipeline_status() - + # Example text to process sample_text = """ - FalkorDB is a high-performance graph database built on Redis. + FalkorDB is a high-performance graph database built on Redis. It supports OpenCypher queries and provides excellent performance for graph operations. LightRAG can now use FalkorDB as its graph storage backend, enabling scalable knowledge graph operations with Redis-based persistence. This integration - allows developers to leverage both the speed of Redis and the power of + allows developers to leverage both the speed of Redis and the power of graph databases for advanced AI applications. """ - + print("Inserting text into LightRAG with FalkorDB backend...") await rag.ainsert(sample_text) - + # Check what was created storage = rag.chunk_entity_relation_graph nodes = await storage.get_all_nodes() edges = await storage.get_all_edges() print(f"Knowledge graph created: {len(nodes)} nodes, {len(edges)} edges") - + print("\nQuerying the knowledge graph...") - + # Test different query modes questions = [ "What is FalkorDB and how does it relate to LightRAG?", "What are the benefits of using Redis with graph databases?", - "How does FalkorDB support OpenCypher queries?" + "How does FalkorDB support OpenCypher queries?", ] - + for i, question in enumerate(questions, 1): print(f"\n--- Question {i} ---") print(f"Q: {question}") - + try: response = await rag.aquery( - question, - param=QueryParam(mode="hybrid", top_k=3) + question, param=QueryParam(mode="hybrid", top_k=3) ) print(f"A: {response}") except Exception as e: print(f"Error querying: {e}") - + # Show some graph statistics - print(f"\n--- Graph Statistics ---") + print("\n--- Graph Statistics ---") try: all_labels = await storage.get_all_labels() print(f"Unique entities: {len(all_labels)}") - + if nodes: - print(f"Sample entities:") + print("Sample entities:") for i, node in enumerate(nodes[:3]): - entity_id = node.get('entity_id', 'Unknown') - entity_type = node.get('entity_type', 'Unknown') + entity_id = node.get("entity_id", "Unknown") + entity_type = node.get("entity_type", "Unknown") print(f" {i+1}. {entity_id} ({entity_type})") - + if edges: - print(f"Sample relationships:") + print("Sample relationships:") for i, edge in enumerate(edges[:2]): - source = edge.get('source', 'Unknown') - target = edge.get('target', 'Unknown') + source = edge.get("source", "Unknown") + target = edge.get("target", "Unknown") print(f" {i+1}. {source} → {target}") - + except Exception as e: print(f"Error getting statistics: {e}") @@ -110,19 +110,21 @@ if __name__ == "__main__": print("LightRAG with FalkorDB Example") print("==============================") print("Note: This requires FalkorDB running on localhost:6379") - print("You can start FalkorDB with: docker run -p 6379:6379 falkordb/falkordb:latest") + print( + "You can start FalkorDB with: docker run -p 6379:6379 falkordb/falkordb:latest" + ) print() - + # Check OpenAI API key if not os.getenv("OPENAI_API_KEY"): print("āŒ Please set your OpenAI API key in .env file!") print(" Create a .env file with: OPENAI_API_KEY=your-actual-api-key") exit(1) - + try: asyncio.run(main()) except KeyboardInterrupt: print("\nšŸ‘‹ Example interrupted. Goodbye!") except Exception as e: print(f"\nšŸ’„ Unexpected error: {e}") - print("šŸ”§ Make sure FalkorDB is running and your .env file is configured") \ No newline at end of file + print("šŸ”§ Make sure FalkorDB is running and your .env file is configured") diff --git a/examples/graph_visual_with_falkordb.py b/examples/graph_visual_with_falkordb.py index aaffa3d3..6bce2a64 100644 --- a/examples/graph_visual_with_falkordb.py +++ b/examples/graph_visual_with_falkordb.py @@ -1,5 +1,4 @@ import os -import json import xml.etree.ElementTree as ET import falkordb @@ -130,12 +129,18 @@ def insert_nodes_and_edges_to_falkordb(data): # Print some statistics node_count_result = graph.query("MATCH (n:Entity) RETURN count(n) AS count") - edge_count_result = graph.query("MATCH ()-[r:DIRECTED]-() RETURN count(r) AS count") + edge_count_result = graph.query( + "MATCH ()-[r:DIRECTED]-() RETURN count(r) AS count" + ) - node_count = node_count_result.result_set[0][0] if node_count_result.result_set else 0 - edge_count = edge_count_result.result_set[0][0] if edge_count_result.result_set else 0 + node_count = ( + node_count_result.result_set[0][0] if node_count_result.result_set else 0 + ) + edge_count = ( + edge_count_result.result_set[0][0] if edge_count_result.result_set else 0 + ) - print(f"Final statistics:") + print("Final statistics:") print(f"- Nodes in database: {node_count}") print(f"- Edges in database: {edge_count}") @@ -153,7 +158,9 @@ def query_graph_data(): print("\n=== Sample Graph Data ===") # Get some sample nodes - query = "MATCH (n:Entity) RETURN n.entity_id, n.entity_type, n.description LIMIT 5" + query = ( + "MATCH (n:Entity) RETURN n.entity_id, n.entity_type, n.description LIMIT 5" + ) result = graph.query(query) print("\nSample Nodes:") @@ -163,8 +170,8 @@ def query_graph_data(): # Get some sample edges query = """ - MATCH (a:Entity)-[r:DIRECTED]-(b:Entity) - RETURN a.entity_id, b.entity_id, r.weight, r.description + MATCH (a:Entity)-[r:DIRECTED]-(b:Entity) + RETURN a.entity_id, b.entity_id, r.weight, r.description LIMIT 5 """ result = graph.query(query) @@ -172,7 +179,9 @@ def query_graph_data(): print("\nSample Edges:") if result.result_set: for record in result.result_set: - print(f"- {record[0]} -> {record[1]} (weight: {record[2]}): {record[3][:100]}...") + print( + f"- {record[0]} -> {record[1]} (weight: {record[2]}): {record[3][:100]}..." + ) # Get node degree statistics query = """ @@ -212,8 +221,12 @@ def main(): xml_file = os.path.join(WORKING_DIR, "graph_chunk_entity_relation.graphml") if not os.path.exists(xml_file): - print(f"Error: File {xml_file} not found. Please ensure the GraphML file exists.") - print("This file is typically generated by LightRAG after processing documents.") + print( + f"Error: File {xml_file} not found. Please ensure the GraphML file exists." + ) + print( + "This file is typically generated by LightRAG after processing documents." + ) return print("FalkorDB Graph Visualization Example") @@ -263,4 +276,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/lightrag/kg/falkordb_impl.py b/lightrag/kg/falkordb_impl.py index e3949535..00166973 100644 --- a/lightrag/kg/falkordb_impl.py +++ b/lightrag/kg/falkordb_impl.py @@ -66,13 +66,27 @@ class FalkorDBStorage(BaseGraphStorage): return workspace if workspace else "base" async def initialize(self): - HOST = os.environ.get("FALKORDB_HOST", config.get("falkordb", "host", fallback="localhost")) - PORT = int(os.environ.get("FALKORDB_PORT", config.get("falkordb", "port", fallback=6379))) - PASSWORD = os.environ.get("FALKORDB_PASSWORD", config.get("falkordb", "password", fallback=None)) - USERNAME = os.environ.get("FALKORDB_USERNAME", config.get("falkordb", "username", fallback=None)) + HOST = os.environ.get( + "FALKORDB_HOST", config.get("falkordb", "host", fallback="localhost") + ) + PORT = int( + os.environ.get( + "FALKORDB_PORT", config.get("falkordb", "port", fallback=6379) + ) + ) + PASSWORD = os.environ.get( + "FALKORDB_PASSWORD", config.get("falkordb", "password", fallback=None) + ) + USERNAME = os.environ.get( + "FALKORDB_USERNAME", config.get("falkordb", "username", fallback=None) + ) GRAPH_NAME = os.environ.get( - "FALKORDB_GRAPH_NAME", - config.get("falkordb", "graph_name", fallback=re.sub(r"[^a-zA-Z0-9-]", "-", self.namespace)) + "FALKORDB_GRAPH_NAME", + config.get( + "falkordb", + "graph_name", + fallback=re.sub(r"[^a-zA-Z0-9-]", "-", self.namespace), + ), ) try: @@ -83,25 +97,29 @@ class FalkorDBStorage(BaseGraphStorage): password=PASSWORD, username=USERNAME, ) - + # Select the graph (creates if doesn't exist) self._graph = self._db.select_graph(GRAPH_NAME) - + # Test connection with a simple query await self._run_query("RETURN 1") - + # Create index for workspace nodes on entity_id if it doesn't exist workspace_label = self._get_workspace_label() try: - index_query = f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)" + index_query = ( + f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)" + ) await self._run_query(index_query) - logger.info(f"Created index for {workspace_label} nodes on entity_id in FalkorDB") + logger.info( + f"Created index for {workspace_label} nodes on entity_id in FalkorDB" + ) except Exception as e: # Index may already exist, which is not an error logger.debug(f"Index creation may have failed or already exists: {e}") - + logger.info(f"Connected to FalkorDB at {HOST}:{PORT}, graph: {GRAPH_NAME}") - + except Exception as e: logger.error(f"Failed to connect to FalkorDB at {HOST}:{PORT}: {e}") raise @@ -124,8 +142,7 @@ class FalkorDBStorage(BaseGraphStorage): """Run a query asynchronously using thread pool""" loop = asyncio.get_event_loop() return await loop.run_in_executor( - self._executor, - lambda: self._graph.query(query, params or {}) + self._executor, lambda: self._graph.query(query, params or {}) ) async def index_done_callback(self) -> None: @@ -176,10 +193,13 @@ class FalkorDBStorage(BaseGraphStorage): f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) " "RETURN COUNT(r) > 0 AS edgeExists" ) - result = await self._run_query(query, { - "source_entity_id": source_node_id, - "target_entity_id": target_node_id, - }) + result = await self._run_query( + query, + { + "source_entity_id": source_node_id, + "target_entity_id": target_node_id, + }, + ) return result.result_set[0][0] if result.result_set else False except Exception as e: logger.error( @@ -205,7 +225,7 @@ class FalkorDBStorage(BaseGraphStorage): try: query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN n" result = await self._run_query(query, {"entity_id": node_id}) - + if result.result_set and len(result.result_set) > 0: node = result.result_set[0][0] # Get the first node # Convert FalkorDB node to dictionary @@ -234,14 +254,14 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query, {"node_ids": node_ids}) nodes = {} - + if result.result_set and len(result.result_set) > 0: for record in result.result_set: entity_id = record[0] node = record[1] node_dict = {key: value for key, value in node.properties.items()} nodes[entity_id] = node_dict - + return nodes async def node_degree(self, node_id: str) -> int: @@ -267,7 +287,7 @@ class FalkorDBStorage(BaseGraphStorage): RETURN COUNT(r) AS degree """ result = await self._run_query(query, {"entity_id": node_id}) - + if result.result_set and len(result.result_set) > 0: degree = result.result_set[0][0] return degree @@ -298,7 +318,7 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query, {"node_ids": node_ids}) degrees = {} - + if result.result_set and len(result.result_set) > 0: for record in result.result_set: entity_id = record[0] @@ -380,14 +400,17 @@ class FalkorDBStorage(BaseGraphStorage): MATCH (start:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(end:`{workspace_label}` {{entity_id: $target_entity_id}}) RETURN properties(r) as edge_properties """ - result = await self._run_query(query, { - "source_entity_id": source_node_id, - "target_entity_id": target_node_id, - }) - + result = await self._run_query( + query, + { + "source_entity_id": source_node_id, + "target_entity_id": target_node_id, + }, + ) + if result.result_set and len(result.result_set) > 0: edge_result = result.result_set[0][0] # Get properties dict - + # Ensure required keys exist with defaults required_keys = { "weight": 1.0, @@ -404,7 +427,7 @@ class FalkorDBStorage(BaseGraphStorage): ) return edge_result - + # Return None when no edge found return None @@ -426,7 +449,7 @@ class FalkorDBStorage(BaseGraphStorage): Returns: A dictionary mapping (src, tgt) tuples to their edge properties. """ - + workspace_label = self._get_workspace_label() query = f""" UNWIND $pairs AS pair @@ -435,14 +458,14 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query, {"pairs": pairs}) edges_dict = {} - + if result.result_set and len(result.result_set) > 0: for record in result.result_set: if record and len(record) >= 3: src = record[0] tgt = record[1] edge_props = record[2] if record[2] else {} - + edge_result = {} for key, default in { "weight": 1.0, @@ -451,9 +474,9 @@ class FalkorDBStorage(BaseGraphStorage): "keywords": None, }.items(): edge_result[key] = edge_props.get(key, default) - + edges_dict[(src, tgt)] = edge_result - + # Add default properties for pairs not found for pair_dict in pairs: src = pair_dict["src"] @@ -465,7 +488,7 @@ class FalkorDBStorage(BaseGraphStorage): "description": None, "keywords": None, } - + return edges_dict async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: @@ -574,9 +597,11 @@ class FalkorDBStorage(BaseGraphStorage): WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep) RETURN DISTINCT n """ - result = await self._run_query(query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP}) + result = await self._run_query( + query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP} + ) nodes = [] - + if result.result_set: for record in result.result_set: node = record[0] @@ -584,7 +609,7 @@ class FalkorDBStorage(BaseGraphStorage): # Add node id (entity_id) to the dictionary for easier access node_dict["id"] = node_dict.get("entity_id") nodes.append(node_dict) - + return nodes async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: @@ -595,16 +620,18 @@ class FalkorDBStorage(BaseGraphStorage): WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep) RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties """ - result = await self._run_query(query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP}) + result = await self._run_query( + query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP} + ) edges = [] - + if result.result_set: for record in result.result_set: edge_properties = record[2] edge_properties["source"] = record[0] edge_properties["target"] = record[1] edges.append(edge_properties) - + return edges @retry( @@ -624,7 +651,9 @@ class FalkorDBStorage(BaseGraphStorage): properties = node_data entity_type = properties["entity_type"] if "entity_id" not in properties: - raise ValueError("FalkorDB: node properties must contain an 'entity_id' field") + raise ValueError( + "FalkorDB: node properties must contain an 'entity_id' field" + ) try: query = f""" @@ -632,7 +661,9 @@ class FalkorDBStorage(BaseGraphStorage): SET n += $properties SET n:`{entity_type}` """ - await self._run_query(query, {"entity_id": node_id, "properties": properties}) + await self._run_query( + query, {"entity_id": node_id, "properties": properties} + ) except Exception as e: logger.error(f"Error during upsert: {str(e)}") raise @@ -669,11 +700,14 @@ class FalkorDBStorage(BaseGraphStorage): SET r += $properties RETURN r, source, target """ - await self._run_query(query, { - "source_entity_id": source_node_id, - "target_entity_id": target_node_id, - "properties": edge_properties, - }) + await self._run_query( + query, + { + "source_entity_id": source_node_id, + "target_entity_id": target_node_id, + "properties": edge_properties, + }, + ) except Exception as e: logger.error(f"Error during edge upsert: {str(e)}") raise @@ -768,7 +802,7 @@ class FalkorDBStorage(BaseGraphStorage): # Get start and end node IDs start_node_id = str(rel.src_node) end_node_id = str(rel.dest_node) - + result.edges.append( KnowledgeGraphEdge( id=edge_id, @@ -806,11 +840,11 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query) labels = [] - + if result.result_set: for record in result.result_set: labels.append(record[0]) - + return labels @retry( @@ -868,10 +902,9 @@ class FalkorDBStorage(BaseGraphStorage): MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(target:`{workspace_label}` {{entity_id: $target_entity_id}}) DELETE r """ - await self._run_query(query, { - "source_entity_id": source, - "target_entity_id": target - }) + await self._run_query( + query, {"source_entity_id": source, "target_entity_id": target} + ) logger.debug(f"Deleted edge from '{source}' to '{target}'") except Exception as e: logger.error(f"Error during edge deletion: {str(e)}") @@ -890,7 +923,7 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query) nodes = [] - + if result.result_set: for record in result.result_set: node = record[0] @@ -898,7 +931,7 @@ class FalkorDBStorage(BaseGraphStorage): # Add node id (entity_id) to the dictionary for easier access node_dict["id"] = node_dict.get("entity_id") nodes.append(node_dict) - + return nodes async def get_all_edges(self) -> list[dict]: @@ -914,14 +947,14 @@ class FalkorDBStorage(BaseGraphStorage): """ result = await self._run_query(query) edges = [] - + if result.result_set: for record in result.result_set: edge_properties = record[2] edge_properties["source"] = record[0] edge_properties["target"] = record[1] edges.append(edge_properties) - + return edges async def drop(self) -> dict[str, str]: @@ -948,7 +981,5 @@ class FalkorDBStorage(BaseGraphStorage): "message": f"workspace '{workspace_label}' data dropped", } except Exception as e: - logger.error( - f"Error dropping FalkorDB workspace '{workspace_label}': {e}" - ) - return {"status": "error", "message": str(e)} \ No newline at end of file + logger.error(f"Error dropping FalkorDB workspace '{workspace_label}': {e}") + return {"status": "error", "message": str(e)}