LightRAG/lightrag/api/routers/graph_routes.py
2025-10-09 16:52:22 -04:00

415 lines
15 KiB
Python

"""
This module contains all graph-related routes for the LightRAG API.
"""
from typing import Optional, Dict, Any
import traceback
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from lightrag.utils import logger
from ..utils_api import get_combined_auth_dependency
router = APIRouter(tags=["graph"])
class EntityUpdateRequest(BaseModel):
entity_name: str
updated_data: Dict[str, Any]
allow_rename: bool = False
class RelationUpdateRequest(BaseModel):
source_id: str
target_id: str
updated_data: Dict[str, Any]
class EntityMergeRequest(BaseModel):
entities_to_change: list[str]
entity_to_change_into: str
class EntityCreateRequest(BaseModel):
entity_name: str
entity_data: Dict[str, Any]
class RelationCreateRequest(BaseModel):
source_entity: str
target_entity: str
relation_data: Dict[str, Any]
def create_graph_routes(rag, api_key: Optional[str] = None):
combined_auth = get_combined_auth_dependency(api_key)
@router.get("/graph/label/list", dependencies=[Depends(combined_auth)])
async def get_graph_labels():
"""
Get all graph labels
Returns:
List[str]: List of graph labels
"""
try:
return await rag.get_graph_labels()
except Exception as e:
logger.error(f"Error getting graph labels: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error getting graph labels: {str(e)}"
)
@router.get("/graph/label/popular", dependencies=[Depends(combined_auth)])
async def get_popular_labels(
limit: int = Query(
300, description="Maximum number of popular labels to return", ge=1, le=1000
),
):
"""
Get popular labels by node degree (most connected entities)
Args:
limit (int): Maximum number of labels to return (default: 300, max: 1000)
Returns:
List[str]: List of popular labels sorted by degree (highest first)
"""
try:
return await rag.chunk_entity_relation_graph.get_popular_labels(limit)
except Exception as e:
logger.error(f"Error getting popular labels: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error getting popular labels: {str(e)}"
)
@router.get("/graph/label/search", dependencies=[Depends(combined_auth)])
async def search_labels(
q: str = Query(..., description="Search query string"),
limit: int = Query(
50, description="Maximum number of search results to return", ge=1, le=100
),
):
"""
Search labels with fuzzy matching
Args:
q (str): Search query string
limit (int): Maximum number of results to return (default: 50, max: 100)
Returns:
List[str]: List of matching labels sorted by relevance
"""
try:
return await rag.chunk_entity_relation_graph.search_labels(q, limit)
except Exception as e:
logger.error(f"Error searching labels with query '{q}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error searching labels: {str(e)}"
)
@router.get("/graphs", dependencies=[Depends(combined_auth)])
async def get_knowledge_graph(
label: str = Query(..., description="Label to get knowledge graph for"),
max_depth: int = Query(3, description="Maximum depth of graph", ge=1),
max_nodes: int = Query(1000, description="Maximum nodes to return", ge=1),
):
"""
Retrieve a connected subgraph of nodes where the label includes the specified label.
When reducing the number of nodes, the prioritization criteria are as follows:
1. Hops(path) to the staring node take precedence
2. Followed by the degree of the nodes
Args:
label (str): Label of the starting node
max_depth (int, optional): Maximum depth of the subgraph,Defaults to 3
max_nodes: Maxiumu nodes to return
Returns:
Dict[str, List[str]]: Knowledge graph for label
"""
try:
# Log the label parameter to check for leading spaces
logger.debug(
f"get_knowledge_graph called with label: '{label}' (length: {len(label)}, repr: {repr(label)})"
)
return await rag.get_knowledge_graph(
node_label=label,
max_depth=max_depth,
max_nodes=max_nodes,
)
except Exception as e:
logger.error(f"Error getting knowledge graph for label '{label}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error getting knowledge graph: {str(e)}"
)
@router.get("/graph/entity/exists", dependencies=[Depends(combined_auth)])
async def check_entity_exists(
name: str = Query(..., description="Entity name to check"),
):
"""
Check if an entity with the given name exists in the knowledge graph
Args:
name (str): Name of the entity to check
Returns:
Dict[str, bool]: Dictionary with 'exists' key indicating if entity exists
"""
try:
exists = await rag.chunk_entity_relation_graph.has_node(name)
return {"exists": exists}
except Exception as e:
logger.error(f"Error checking entity existence for '{name}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error checking entity existence: {str(e)}"
)
@router.post("/graph/entity/edit", dependencies=[Depends(combined_auth)])
async def update_entity(request: EntityUpdateRequest):
"""
Update an entity's properties in the knowledge graph
Args:
request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag
Returns:
Dict: Updated entity information
"""
try:
result = await rag.aedit_entity(
entity_name=request.entity_name,
updated_data=request.updated_data,
allow_rename=request.allow_rename,
)
return {
"status": "success",
"message": "Entity updated successfully",
"data": result,
}
except ValueError as ve:
logger.error(
f"Validation error updating entity '{request.entity_name}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(f"Error updating entity '{request.entity_name}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error updating entity: {str(e)}"
)
@router.post("/graph/relation/edit", dependencies=[Depends(combined_auth)])
async def update_relation(request: RelationUpdateRequest):
"""Update a relation's properties in the knowledge graph
Args:
request (RelationUpdateRequest): Request containing source ID, target ID and updated data
Returns:
Dict: Updated relation information
"""
try:
result = await rag.aedit_relation(
source_entity=request.source_id,
target_entity=request.target_id,
updated_data=request.updated_data,
)
return {
"status": "success",
"message": "Relation updated successfully",
"data": result,
}
except ValueError as ve:
logger.error(
f"Validation error updating relation between '{request.source_id}' and '{request.target_id}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(
f"Error updating relation between '{request.source_id}' and '{request.target_id}': {str(e)}"
)
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error updating relation: {str(e)}"
)
@router.post("/graph/entity/create", dependencies=[Depends(combined_auth)])
async def create_entity(request: EntityCreateRequest):
"""
Create a new entity in the knowledge graph
Args:
request (EntityCreateRequest): Request containing:
- entity_name: Name of the entity
- entity_data: Dictionary of entity properties (e.g., description, entity_type)
Returns:
Dict: Created entity information
Example:
{
"entity_name": "Tesla",
"entity_data": {
"description": "Electric vehicle manufacturer",
"entity_type": "ORGANIZATION"
}
}
"""
try:
# Check if entity already exists
exists = await rag.chunk_entity_relation_graph.has_node(request.entity_name)
if exists:
raise ValueError(f"Entity '{request.entity_name}' already exists")
# Prepare entity data
entity_data = request.entity_data.copy()
entity_data["entity_id"] = request.entity_name
# Create the entity
await rag.chunk_entity_relation_graph.upsert_node(
request.entity_name, entity_data
)
return {
"status": "success",
"message": f"Entity '{request.entity_name}' created successfully",
"data": entity_data,
}
except ValueError as ve:
logger.error(
f"Validation error creating entity '{request.entity_name}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(f"Error creating entity '{request.entity_name}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error creating entity: {str(e)}"
)
@router.post("/graph/relation/create", dependencies=[Depends(combined_auth)])
async def create_relation(request: RelationCreateRequest):
"""
Create a new relationship between two entities in the knowledge graph
Args:
request (RelationCreateRequest): Request containing:
- source_entity: Source entity name
- target_entity: Target entity name
- relation_data: Dictionary of relation properties (e.g., description, keywords, weight)
Returns:
Dict: Created relation information
Example:
{
"source_entity": "Elon Musk",
"target_entity": "Tesla",
"relation_data": {
"description": "Elon Musk is the CEO of Tesla",
"keywords": "CEO, founder",
"weight": 1.0
}
}
"""
try:
# Check if both entities exist
source_exists = await rag.chunk_entity_relation_graph.has_node(
request.source_entity
)
target_exists = await rag.chunk_entity_relation_graph.has_node(
request.target_entity
)
if not source_exists:
raise ValueError(
f"Source entity '{request.source_entity}' does not exist"
)
if not target_exists:
raise ValueError(
f"Target entity '{request.target_entity}' does not exist"
)
# Create the relationship
await rag.chunk_entity_relation_graph.upsert_edge(
request.source_entity, request.target_entity, request.relation_data
)
return {
"status": "success",
"message": f"Relation created successfully between '{request.source_entity}' and '{request.target_entity}'",
"data": {
"source": request.source_entity,
"target": request.target_entity,
**request.relation_data,
},
}
except ValueError as ve:
logger.error(
f"Validation error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(
f"Error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(e)}"
)
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error creating relation: {str(e)}"
)
@router.post("/graph/entities/merge", dependencies=[Depends(combined_auth)])
async def merge_entities(request: EntityMergeRequest):
"""
Merge multiple entities into a single entity, preserving all relationships.
This endpoint is useful for consolidating duplicate or misspelled entities.
All relationships from the source entities will be transferred to the target entity.
Args:
request (EntityMergeRequest): Request containing:
- entities_to_change: List of entity names to be removed
- entity_to_change_into: Name of the target entity to merge into
Returns:
Dict: Result of the merge operation with merged entity information
Example:
{
"entities_to_change": ["Elon Msk", "Ellon Musk"],
"entity_to_change_into": "Elon Musk"
}
"""
try:
result = await rag.amerge_entities(
source_entities=request.entities_to_change,
target_entity=request.entity_to_change_into,
)
return {
"status": "success",
"message": f"Successfully merged {len(request.entities_to_change)} entities into '{request.entity_to_change_into}'",
"data": result,
}
except ValueError as ve:
logger.error(
f"Validation error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(
f"Error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(e)}"
)
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error merging entities: {str(e)}"
)
return router