Fix code formatting with pre-commit hooks

This commit is contained in:
Naseem Ali 2025-08-26 13:03:42 +03:00
parent e00b2a1334
commit c5ef544f8b
4 changed files with 156 additions and 110 deletions

View file

@ -849,7 +849,7 @@ see test_neo4j.py for a working example.
```python
export FALKORDB_HOST="localhost"
export FALKORDB_PORT="6379"
export FALKORDB_PORT="6379"
export FALKORDB_PASSWORD="password" # optional
export FALKORDB_USERNAME="username" # optional
export FALKORDB_GRAPH_NAME="lightrag_graph" # optional, defaults to namespace

View file

@ -20,88 +20,88 @@ from lightrag.kg.shared_storage import initialize_pipeline_status
# Load environment variables
load_dotenv()
async def main():
"""Example usage of LightRAG with FalkorDB"""
# Set up environment for FalkorDB
os.environ.setdefault("FALKORDB_HOST", "localhost")
os.environ.setdefault("FALKORDB_PORT", "6379")
os.environ.setdefault("FALKORDB_GRAPH_NAME", "lightrag_example")
os.environ.setdefault("FALKORDB_WORKSPACE", "example_workspace")
# Initialize LightRAG with FalkorDB
rag = LightRAG(
working_dir="./falkordb_example",
llm_model_func=gpt_4o_mini_complete, # Updated function name
embedding_func=openai_embed, # Updated function name
graph_storage="FalkorDBStorage", # Specify FalkorDB backend
embedding_func=openai_embed, # Updated function name
graph_storage="FalkorDBStorage", # Specify FalkorDB backend
)
# Initialize storage connections
await rag.initialize_storages()
await initialize_pipeline_status()
# Example text to process
sample_text = """
FalkorDB is a high-performance graph database built on Redis.
FalkorDB is a high-performance graph database built on Redis.
It supports OpenCypher queries and provides excellent performance for graph operations.
LightRAG can now use FalkorDB as its graph storage backend, enabling scalable
knowledge graph operations with Redis-based persistence. This integration
allows developers to leverage both the speed of Redis and the power of
allows developers to leverage both the speed of Redis and the power of
graph databases for advanced AI applications.
"""
print("Inserting text into LightRAG with FalkorDB backend...")
await rag.ainsert(sample_text)
# Check what was created
storage = rag.chunk_entity_relation_graph
nodes = await storage.get_all_nodes()
edges = await storage.get_all_edges()
print(f"Knowledge graph created: {len(nodes)} nodes, {len(edges)} edges")
print("\nQuerying the knowledge graph...")
# Test different query modes
questions = [
"What is FalkorDB and how does it relate to LightRAG?",
"What are the benefits of using Redis with graph databases?",
"How does FalkorDB support OpenCypher queries?"
"How does FalkorDB support OpenCypher queries?",
]
for i, question in enumerate(questions, 1):
print(f"\n--- Question {i} ---")
print(f"Q: {question}")
try:
response = await rag.aquery(
question,
param=QueryParam(mode="hybrid", top_k=3)
question, param=QueryParam(mode="hybrid", top_k=3)
)
print(f"A: {response}")
except Exception as e:
print(f"Error querying: {e}")
# Show some graph statistics
print(f"\n--- Graph Statistics ---")
print("\n--- Graph Statistics ---")
try:
all_labels = await storage.get_all_labels()
print(f"Unique entities: {len(all_labels)}")
if nodes:
print(f"Sample entities:")
print("Sample entities:")
for i, node in enumerate(nodes[:3]):
entity_id = node.get('entity_id', 'Unknown')
entity_type = node.get('entity_type', 'Unknown')
entity_id = node.get("entity_id", "Unknown")
entity_type = node.get("entity_type", "Unknown")
print(f" {i+1}. {entity_id} ({entity_type})")
if edges:
print(f"Sample relationships:")
print("Sample relationships:")
for i, edge in enumerate(edges[:2]):
source = edge.get('source', 'Unknown')
target = edge.get('target', 'Unknown')
source = edge.get("source", "Unknown")
target = edge.get("target", "Unknown")
print(f" {i+1}. {source}{target}")
except Exception as e:
print(f"Error getting statistics: {e}")
@ -110,19 +110,21 @@ if __name__ == "__main__":
print("LightRAG with FalkorDB Example")
print("==============================")
print("Note: This requires FalkorDB running on localhost:6379")
print("You can start FalkorDB with: docker run -p 6379:6379 falkordb/falkordb:latest")
print(
"You can start FalkorDB with: docker run -p 6379:6379 falkordb/falkordb:latest"
)
print()
# Check OpenAI API key
if not os.getenv("OPENAI_API_KEY"):
print("❌ Please set your OpenAI API key in .env file!")
print(" Create a .env file with: OPENAI_API_KEY=your-actual-api-key")
exit(1)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Example interrupted. Goodbye!")
except Exception as e:
print(f"\n💥 Unexpected error: {e}")
print("🔧 Make sure FalkorDB is running and your .env file is configured")
print("🔧 Make sure FalkorDB is running and your .env file is configured")

View file

@ -1,5 +1,4 @@
import os
import json
import xml.etree.ElementTree as ET
import falkordb
@ -130,12 +129,18 @@ def insert_nodes_and_edges_to_falkordb(data):
# Print some statistics
node_count_result = graph.query("MATCH (n:Entity) RETURN count(n) AS count")
edge_count_result = graph.query("MATCH ()-[r:DIRECTED]-() RETURN count(r) AS count")
edge_count_result = graph.query(
"MATCH ()-[r:DIRECTED]-() RETURN count(r) AS count"
)
node_count = node_count_result.result_set[0][0] if node_count_result.result_set else 0
edge_count = edge_count_result.result_set[0][0] if edge_count_result.result_set else 0
node_count = (
node_count_result.result_set[0][0] if node_count_result.result_set else 0
)
edge_count = (
edge_count_result.result_set[0][0] if edge_count_result.result_set else 0
)
print(f"Final statistics:")
print("Final statistics:")
print(f"- Nodes in database: {node_count}")
print(f"- Edges in database: {edge_count}")
@ -153,7 +158,9 @@ def query_graph_data():
print("\n=== Sample Graph Data ===")
# Get some sample nodes
query = "MATCH (n:Entity) RETURN n.entity_id, n.entity_type, n.description LIMIT 5"
query = (
"MATCH (n:Entity) RETURN n.entity_id, n.entity_type, n.description LIMIT 5"
)
result = graph.query(query)
print("\nSample Nodes:")
@ -163,8 +170,8 @@ def query_graph_data():
# Get some sample edges
query = """
MATCH (a:Entity)-[r:DIRECTED]-(b:Entity)
RETURN a.entity_id, b.entity_id, r.weight, r.description
MATCH (a:Entity)-[r:DIRECTED]-(b:Entity)
RETURN a.entity_id, b.entity_id, r.weight, r.description
LIMIT 5
"""
result = graph.query(query)
@ -172,7 +179,9 @@ def query_graph_data():
print("\nSample Edges:")
if result.result_set:
for record in result.result_set:
print(f"- {record[0]} -> {record[1]} (weight: {record[2]}): {record[3][:100]}...")
print(
f"- {record[0]} -> {record[1]} (weight: {record[2]}): {record[3][:100]}..."
)
# Get node degree statistics
query = """
@ -212,8 +221,12 @@ def main():
xml_file = os.path.join(WORKING_DIR, "graph_chunk_entity_relation.graphml")
if not os.path.exists(xml_file):
print(f"Error: File {xml_file} not found. Please ensure the GraphML file exists.")
print("This file is typically generated by LightRAG after processing documents.")
print(
f"Error: File {xml_file} not found. Please ensure the GraphML file exists."
)
print(
"This file is typically generated by LightRAG after processing documents."
)
return
print("FalkorDB Graph Visualization Example")
@ -263,4 +276,4 @@ def main():
if __name__ == "__main__":
main()
main()

View file

@ -66,13 +66,27 @@ class FalkorDBStorage(BaseGraphStorage):
return workspace if workspace else "base"
async def initialize(self):
HOST = os.environ.get("FALKORDB_HOST", config.get("falkordb", "host", fallback="localhost"))
PORT = int(os.environ.get("FALKORDB_PORT", config.get("falkordb", "port", fallback=6379)))
PASSWORD = os.environ.get("FALKORDB_PASSWORD", config.get("falkordb", "password", fallback=None))
USERNAME = os.environ.get("FALKORDB_USERNAME", config.get("falkordb", "username", fallback=None))
HOST = os.environ.get(
"FALKORDB_HOST", config.get("falkordb", "host", fallback="localhost")
)
PORT = int(
os.environ.get(
"FALKORDB_PORT", config.get("falkordb", "port", fallback=6379)
)
)
PASSWORD = os.environ.get(
"FALKORDB_PASSWORD", config.get("falkordb", "password", fallback=None)
)
USERNAME = os.environ.get(
"FALKORDB_USERNAME", config.get("falkordb", "username", fallback=None)
)
GRAPH_NAME = os.environ.get(
"FALKORDB_GRAPH_NAME",
config.get("falkordb", "graph_name", fallback=re.sub(r"[^a-zA-Z0-9-]", "-", self.namespace))
"FALKORDB_GRAPH_NAME",
config.get(
"falkordb",
"graph_name",
fallback=re.sub(r"[^a-zA-Z0-9-]", "-", self.namespace),
),
)
try:
@ -83,25 +97,29 @@ class FalkorDBStorage(BaseGraphStorage):
password=PASSWORD,
username=USERNAME,
)
# Select the graph (creates if doesn't exist)
self._graph = self._db.select_graph(GRAPH_NAME)
# Test connection with a simple query
await self._run_query("RETURN 1")
# Create index for workspace nodes on entity_id if it doesn't exist
workspace_label = self._get_workspace_label()
try:
index_query = f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)"
index_query = (
f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)"
)
await self._run_query(index_query)
logger.info(f"Created index for {workspace_label} nodes on entity_id in FalkorDB")
logger.info(
f"Created index for {workspace_label} nodes on entity_id in FalkorDB"
)
except Exception as e:
# Index may already exist, which is not an error
logger.debug(f"Index creation may have failed or already exists: {e}")
logger.info(f"Connected to FalkorDB at {HOST}:{PORT}, graph: {GRAPH_NAME}")
except Exception as e:
logger.error(f"Failed to connect to FalkorDB at {HOST}:{PORT}: {e}")
raise
@ -124,8 +142,7 @@ class FalkorDBStorage(BaseGraphStorage):
"""Run a query asynchronously using thread pool"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
lambda: self._graph.query(query, params or {})
self._executor, lambda: self._graph.query(query, params or {})
)
async def index_done_callback(self) -> None:
@ -176,10 +193,13 @@ class FalkorDBStorage(BaseGraphStorage):
f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) "
"RETURN COUNT(r) > 0 AS edgeExists"
)
result = await self._run_query(query, {
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
})
result = await self._run_query(
query,
{
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
},
)
return result.result_set[0][0] if result.result_set else False
except Exception as e:
logger.error(
@ -205,7 +225,7 @@ class FalkorDBStorage(BaseGraphStorage):
try:
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN n"
result = await self._run_query(query, {"entity_id": node_id})
if result.result_set and len(result.result_set) > 0:
node = result.result_set[0][0] # Get the first node
# Convert FalkorDB node to dictionary
@ -234,14 +254,14 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query, {"node_ids": node_ids})
nodes = {}
if result.result_set and len(result.result_set) > 0:
for record in result.result_set:
entity_id = record[0]
node = record[1]
node_dict = {key: value for key, value in node.properties.items()}
nodes[entity_id] = node_dict
return nodes
async def node_degree(self, node_id: str) -> int:
@ -267,7 +287,7 @@ class FalkorDBStorage(BaseGraphStorage):
RETURN COUNT(r) AS degree
"""
result = await self._run_query(query, {"entity_id": node_id})
if result.result_set and len(result.result_set) > 0:
degree = result.result_set[0][0]
return degree
@ -298,7 +318,7 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query, {"node_ids": node_ids})
degrees = {}
if result.result_set and len(result.result_set) > 0:
for record in result.result_set:
entity_id = record[0]
@ -380,14 +400,17 @@ class FalkorDBStorage(BaseGraphStorage):
MATCH (start:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(end:`{workspace_label}` {{entity_id: $target_entity_id}})
RETURN properties(r) as edge_properties
"""
result = await self._run_query(query, {
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
})
result = await self._run_query(
query,
{
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
},
)
if result.result_set and len(result.result_set) > 0:
edge_result = result.result_set[0][0] # Get properties dict
# Ensure required keys exist with defaults
required_keys = {
"weight": 1.0,
@ -404,7 +427,7 @@ class FalkorDBStorage(BaseGraphStorage):
)
return edge_result
# Return None when no edge found
return None
@ -426,7 +449,7 @@ class FalkorDBStorage(BaseGraphStorage):
Returns:
A dictionary mapping (src, tgt) tuples to their edge properties.
"""
workspace_label = self._get_workspace_label()
query = f"""
UNWIND $pairs AS pair
@ -435,14 +458,14 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query, {"pairs": pairs})
edges_dict = {}
if result.result_set and len(result.result_set) > 0:
for record in result.result_set:
if record and len(record) >= 3:
src = record[0]
tgt = record[1]
edge_props = record[2] if record[2] else {}
edge_result = {}
for key, default in {
"weight": 1.0,
@ -451,9 +474,9 @@ class FalkorDBStorage(BaseGraphStorage):
"keywords": None,
}.items():
edge_result[key] = edge_props.get(key, default)
edges_dict[(src, tgt)] = edge_result
# Add default properties for pairs not found
for pair_dict in pairs:
src = pair_dict["src"]
@ -465,7 +488,7 @@ class FalkorDBStorage(BaseGraphStorage):
"description": None,
"keywords": None,
}
return edges_dict
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@ -574,9 +597,11 @@ class FalkorDBStorage(BaseGraphStorage):
WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep)
RETURN DISTINCT n
"""
result = await self._run_query(query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP})
result = await self._run_query(
query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP}
)
nodes = []
if result.result_set:
for record in result.result_set:
node = record[0]
@ -584,7 +609,7 @@ class FalkorDBStorage(BaseGraphStorage):
# Add node id (entity_id) to the dictionary for easier access
node_dict["id"] = node_dict.get("entity_id")
nodes.append(node_dict)
return nodes
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
@ -595,16 +620,18 @@ class FalkorDBStorage(BaseGraphStorage):
WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep)
RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
"""
result = await self._run_query(query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP})
result = await self._run_query(
query, {"chunk_ids": chunk_ids, "sep": GRAPH_FIELD_SEP}
)
edges = []
if result.result_set:
for record in result.result_set:
edge_properties = record[2]
edge_properties["source"] = record[0]
edge_properties["target"] = record[1]
edges.append(edge_properties)
return edges
@retry(
@ -624,7 +651,9 @@ class FalkorDBStorage(BaseGraphStorage):
properties = node_data
entity_type = properties["entity_type"]
if "entity_id" not in properties:
raise ValueError("FalkorDB: node properties must contain an 'entity_id' field")
raise ValueError(
"FalkorDB: node properties must contain an 'entity_id' field"
)
try:
query = f"""
@ -632,7 +661,9 @@ class FalkorDBStorage(BaseGraphStorage):
SET n += $properties
SET n:`{entity_type}`
"""
await self._run_query(query, {"entity_id": node_id, "properties": properties})
await self._run_query(
query, {"entity_id": node_id, "properties": properties}
)
except Exception as e:
logger.error(f"Error during upsert: {str(e)}")
raise
@ -669,11 +700,14 @@ class FalkorDBStorage(BaseGraphStorage):
SET r += $properties
RETURN r, source, target
"""
await self._run_query(query, {
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
"properties": edge_properties,
})
await self._run_query(
query,
{
"source_entity_id": source_node_id,
"target_entity_id": target_node_id,
"properties": edge_properties,
},
)
except Exception as e:
logger.error(f"Error during edge upsert: {str(e)}")
raise
@ -768,7 +802,7 @@ class FalkorDBStorage(BaseGraphStorage):
# Get start and end node IDs
start_node_id = str(rel.src_node)
end_node_id = str(rel.dest_node)
result.edges.append(
KnowledgeGraphEdge(
id=edge_id,
@ -806,11 +840,11 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query)
labels = []
if result.result_set:
for record in result.result_set:
labels.append(record[0])
return labels
@retry(
@ -868,10 +902,9 @@ class FalkorDBStorage(BaseGraphStorage):
MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(target:`{workspace_label}` {{entity_id: $target_entity_id}})
DELETE r
"""
await self._run_query(query, {
"source_entity_id": source,
"target_entity_id": target
})
await self._run_query(
query, {"source_entity_id": source, "target_entity_id": target}
)
logger.debug(f"Deleted edge from '{source}' to '{target}'")
except Exception as e:
logger.error(f"Error during edge deletion: {str(e)}")
@ -890,7 +923,7 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query)
nodes = []
if result.result_set:
for record in result.result_set:
node = record[0]
@ -898,7 +931,7 @@ class FalkorDBStorage(BaseGraphStorage):
# Add node id (entity_id) to the dictionary for easier access
node_dict["id"] = node_dict.get("entity_id")
nodes.append(node_dict)
return nodes
async def get_all_edges(self) -> list[dict]:
@ -914,14 +947,14 @@ class FalkorDBStorage(BaseGraphStorage):
"""
result = await self._run_query(query)
edges = []
if result.result_set:
for record in result.result_set:
edge_properties = record[2]
edge_properties["source"] = record[0]
edge_properties["target"] = record[1]
edges.append(edge_properties)
return edges
async def drop(self) -> dict[str, str]:
@ -948,7 +981,5 @@ class FalkorDBStorage(BaseGraphStorage):
"message": f"workspace '{workspace_label}' data dropped",
}
except Exception as e:
logger.error(
f"Error dropping FalkorDB workspace '{workspace_label}': {e}"
)
return {"status": "error", "message": str(e)}
logger.error(f"Error dropping FalkorDB workspace '{workspace_label}': {e}")
return {"status": "error", "message": str(e)}