This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:14 +08:00
parent 3ae2043e7b
commit 593b277945
2 changed files with 267 additions and 47 deletions

View file

@ -588,6 +588,69 @@ You can test the API endpoints using the provided curl commands or through the S
4. Query the system using the query endpoints 4. Query the system using the query endpoints
5. Trigger document scan if new files are put into the inputs directory 5. Trigger document scan if new files are put into the inputs directory
### Graph Manipulation Endpoints
LightRAG provides REST API endpoints for direct knowledge graph manipulation:
#### Create Entity
Create a new entity in the knowledge graph:
```bash
curl -X POST "http://localhost:9621/graph/entity/create" \
-H "Content-Type: application/json" \
-d '{
"entity_name": "Tesla",
"entity_data": {
"description": "Electric vehicle manufacturer",
"entity_type": "ORGANIZATION"
}
}'
```
#### Create Relationship
Create a new relationship between two existing entities:
```bash
curl -X POST "http://localhost:9621/graph/relation/create" \
-H "Content-Type: application/json" \
-d '{
"source_entity": "Elon Musk",
"target_entity": "Tesla",
"relation_data": {
"description": "Elon Musk is the CEO of Tesla",
"keywords": "CEO, founder",
"weight": 1.0
}
}'
```
#### Merge Entities
Consolidate duplicate or misspelled entities while preserving all relationships:
```bash
curl -X POST "http://localhost:9621/graph/entities/merge" \
-H "Content-Type: application/json" \
-d '{
"entities_to_change": ["Elon Msk", "Ellon Musk"],
"entity_to_change_into": "Elon Musk"
}'
```
**What the merge operation does:**
- Deletes the specified source entities
- Transfers all relationships from source entities to the target entity
- Intelligently merges duplicate relationships
- Updates vector embeddings for accurate retrieval
- Preserves the entire graph structure
This is particularly useful for:
- Fixing spelling errors in entity names
- Consolidating duplicate entities discovered after document processing
- Cleaning up the knowledge graph for better query performance
## Asynchronous Document Indexing with Progress Tracking ## Asynchronous Document Indexing with Progress Tracking
LightRAG implements asynchronous document indexing to enable frontend monitoring and querying of document processing progress. Upon uploading files or inserting text through designated endpoints, a unique Track ID is returned to facilitate real-time progress monitoring. LightRAG implements asynchronous document indexing to enable frontend monitoring and querying of document processing progress. Upon uploading files or inserting text through designated endpoints, a unique Track ID is returned to facilitate real-time progress monitoring.

View file

@ -7,12 +7,8 @@ import traceback
from fastapi import APIRouter, Depends, Query, HTTPException from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from lightrag import LightRAG
from lightrag.utils import logger from lightrag.utils import logger
from lightrag.models.tenant import TenantContext
from lightrag.tenant_rag_manager import TenantRAGManager
from ..utils_api import get_combined_auth_dependency from ..utils_api import get_combined_auth_dependency
from ..dependencies import get_tenant_context_optional
router = APIRouter(tags=["graph"]) router = APIRouter(tags=["graph"])
@ -29,29 +25,35 @@ class RelationUpdateRequest(BaseModel):
updated_data: Dict[str, Any] updated_data: Dict[str, Any]
def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optional[TenantRAGManager] = None): class EntityMergeRequest(BaseModel):
entities_to_change: list[str]
entity_to_change_into: str
class EntityCreateRequest(BaseModel):
entity_name: str
entity_data: Dict[str, Any]
class RelationCreateRequest(BaseModel):
source_entity: str
target_entity: str
relation_data: Dict[str, Any]
def create_graph_routes(rag, api_key: Optional[str] = None):
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
async def get_tenant_rag(tenant_context: Optional[TenantContext] = Depends(get_tenant_context_optional)) -> LightRAG:
"""Dependency to get tenant-specific RAG instance for graph operations"""
if rag_manager and tenant_context and tenant_context.tenant_id and tenant_context.kb_id:
return await rag_manager.get_rag_instance(
tenant_context.tenant_id,
tenant_context.kb_id,
tenant_context.user_id # Pass user_id for security validation
)
return rag
@router.get("/graph/label/list", dependencies=[Depends(combined_auth)]) @router.get("/graph/label/list", dependencies=[Depends(combined_auth)])
async def get_graph_labels(tenant_rag: LightRAG = Depends(get_tenant_rag)): async def get_graph_labels():
""" """
Get all graph labels (tenant-scoped) Get all graph labels
Returns: Returns:
List[str]: List of graph labels for the selected tenant/KB List[str]: List of graph labels
""" """
try: try:
return await tenant_rag.get_graph_labels() return await rag.get_graph_labels()
except Exception as e: except Exception as e:
logger.error(f"Error getting graph labels: {str(e)}") logger.error(f"Error getting graph labels: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@ -64,19 +66,18 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
limit: int = Query( limit: int = Query(
300, description="Maximum number of popular labels to return", ge=1, le=1000 300, description="Maximum number of popular labels to return", ge=1, le=1000
), ),
tenant_rag: LightRAG = Depends(get_tenant_rag),
): ):
""" """
Get popular labels by node degree (tenant-scoped) Get popular labels by node degree (most connected entities)
Args: Args:
limit (int): Maximum number of labels to return (default: 300, max: 1000) limit (int): Maximum number of labels to return (default: 300, max: 1000)
Returns: Returns:
List[str]: List of popular labels sorted by degree (highest first) for the selected tenant/KB List[str]: List of popular labels sorted by degree (highest first)
""" """
try: try:
return await tenant_rag.chunk_entity_relation_graph.get_popular_labels(limit) return await rag.chunk_entity_relation_graph.get_popular_labels(limit)
except Exception as e: except Exception as e:
logger.error(f"Error getting popular labels: {str(e)}") logger.error(f"Error getting popular labels: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@ -90,20 +91,19 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
limit: int = Query( limit: int = Query(
50, description="Maximum number of search results to return", ge=1, le=100 50, description="Maximum number of search results to return", ge=1, le=100
), ),
tenant_rag: LightRAG = Depends(get_tenant_rag),
): ):
""" """
Search labels with fuzzy matching (tenant-scoped) Search labels with fuzzy matching
Args: Args:
q (str): Search query string q (str): Search query string
limit (int): Maximum number of results to return (default: 50, max: 100) limit (int): Maximum number of results to return (default: 50, max: 100)
Returns: Returns:
List[str]: List of matching labels sorted by relevance for the selected tenant/KB List[str]: List of matching labels sorted by relevance
""" """
try: try:
return await tenant_rag.chunk_entity_relation_graph.search_labels(q, limit) return await rag.chunk_entity_relation_graph.search_labels(q, limit)
except Exception as e: except Exception as e:
logger.error(f"Error searching labels with query '{q}': {str(e)}") logger.error(f"Error searching labels with query '{q}': {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@ -116,10 +116,9 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
label: str = Query(..., description="Label to get knowledge graph for"), label: str = Query(..., description="Label to get knowledge graph for"),
max_depth: int = Query(3, description="Maximum depth of graph", ge=1), max_depth: int = Query(3, description="Maximum depth of graph", ge=1),
max_nodes: int = Query(1000, description="Maximum nodes to return", ge=1), max_nodes: int = Query(1000, description="Maximum nodes to return", ge=1),
tenant_rag: LightRAG = Depends(get_tenant_rag),
): ):
""" """
Retrieve a connected subgraph of nodes (tenant-scoped). Retrieve a connected subgraph of nodes where the label includes the specified label.
When reducing the number of nodes, the prioritization criteria are as follows: When reducing the number of nodes, the prioritization criteria are as follows:
1. Hops(path) to the staring node take precedence 1. Hops(path) to the staring node take precedence
2. Followed by the degree of the nodes 2. Followed by the degree of the nodes
@ -130,7 +129,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
max_nodes: Maxiumu nodes to return max_nodes: Maxiumu nodes to return
Returns: Returns:
Dict[str, List[str]]: Knowledge graph for label from the selected tenant/KB Dict[str, List[str]]: Knowledge graph for label
""" """
try: try:
# Log the label parameter to check for leading spaces # Log the label parameter to check for leading spaces
@ -138,7 +137,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
f"get_knowledge_graph called with label: '{label}' (length: {len(label)}, repr: {repr(label)})" f"get_knowledge_graph called with label: '{label}' (length: {len(label)}, repr: {repr(label)})"
) )
return await tenant_rag.get_knowledge_graph( return await rag.get_knowledge_graph(
node_label=label, node_label=label,
max_depth=max_depth, max_depth=max_depth,
max_nodes=max_nodes, max_nodes=max_nodes,
@ -153,19 +152,18 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
@router.get("/graph/entity/exists", dependencies=[Depends(combined_auth)]) @router.get("/graph/entity/exists", dependencies=[Depends(combined_auth)])
async def check_entity_exists( async def check_entity_exists(
name: str = Query(..., description="Entity name to check"), name: str = Query(..., description="Entity name to check"),
tenant_rag: LightRAG = Depends(get_tenant_rag),
): ):
""" """
Check if an entity with the given name exists in the knowledge graph (tenant-scoped) Check if an entity with the given name exists in the knowledge graph
Args: Args:
name (str): Name of the entity to check name (str): Name of the entity to check
Returns: Returns:
Dict[str, bool]: Dictionary with 'exists' key indicating if entity exists in the selected tenant/KB Dict[str, bool]: Dictionary with 'exists' key indicating if entity exists
""" """
try: try:
exists = await tenant_rag.chunk_entity_relation_graph.has_node(name) exists = await rag.chunk_entity_relation_graph.has_node(name)
return {"exists": exists} return {"exists": exists}
except Exception as e: except Exception as e:
logger.error(f"Error checking entity existence for '{name}': {str(e)}") logger.error(f"Error checking entity existence for '{name}': {str(e)}")
@ -175,12 +173,9 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
) )
@router.post("/graph/entity/edit", dependencies=[Depends(combined_auth)]) @router.post("/graph/entity/edit", dependencies=[Depends(combined_auth)])
async def update_entity( async def update_entity(request: EntityUpdateRequest):
request: EntityUpdateRequest,
tenant_rag: LightRAG = Depends(get_tenant_rag),
):
""" """
Update an entity's properties in the knowledge graph (tenant-scoped) Update an entity's properties in the knowledge graph
Args: Args:
request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag
@ -189,7 +184,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
Dict: Updated entity information Dict: Updated entity information
""" """
try: try:
result = await tenant_rag.aedit_entity( result = await rag.aedit_entity(
entity_name=request.entity_name, entity_name=request.entity_name,
updated_data=request.updated_data, updated_data=request.updated_data,
allow_rename=request.allow_rename, allow_rename=request.allow_rename,
@ -212,11 +207,8 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
) )
@router.post("/graph/relation/edit", dependencies=[Depends(combined_auth)]) @router.post("/graph/relation/edit", dependencies=[Depends(combined_auth)])
async def update_relation( async def update_relation(request: RelationUpdateRequest):
request: RelationUpdateRequest, """Update a relation's properties in the knowledge graph
tenant_rag: LightRAG = Depends(get_tenant_rag),
):
"""Update a relation's properties in the knowledge graph (tenant-scoped)
Args: Args:
request (RelationUpdateRequest): Request containing source ID, target ID and updated data request (RelationUpdateRequest): Request containing source ID, target ID and updated data
@ -225,7 +217,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
Dict: Updated relation information Dict: Updated relation information
""" """
try: try:
result = await tenant_rag.aedit_relation( result = await rag.aedit_relation(
source_entity=request.source_id, source_entity=request.source_id,
target_entity=request.target_id, target_entity=request.target_id,
updated_data=request.updated_data, updated_data=request.updated_data,
@ -249,4 +241,169 @@ def create_graph_routes(rag, api_key: Optional[str] = None, rag_manager: Optiona
status_code=500, detail=f"Error updating relation: {str(e)}" status_code=500, detail=f"Error updating relation: {str(e)}"
) )
@router.post("/graph/entity/create", dependencies=[Depends(combined_auth)])
async def create_entity(request: EntityCreateRequest):
"""
Create a new entity in the knowledge graph
Args:
request (EntityCreateRequest): Request containing:
- entity_name: Name of the entity
- entity_data: Dictionary of entity properties (e.g., description, entity_type)
Returns:
Dict: Created entity information
Example:
{
"entity_name": "Tesla",
"entity_data": {
"description": "Electric vehicle manufacturer",
"entity_type": "ORGANIZATION"
}
}
"""
try:
# Check if entity already exists
exists = await rag.chunk_entity_relation_graph.has_node(request.entity_name)
if exists:
raise ValueError(f"Entity '{request.entity_name}' already exists")
# Prepare entity data
entity_data = request.entity_data.copy()
entity_data["entity_id"] = request.entity_name
# Create the entity
await rag.chunk_entity_relation_graph.upsert_node(
request.entity_name, entity_data
)
return {
"status": "success",
"message": f"Entity '{request.entity_name}' created successfully",
"data": entity_data,
}
except ValueError as ve:
logger.error(f"Validation error creating entity '{request.entity_name}': {str(ve)}")
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(f"Error creating entity '{request.entity_name}': {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error creating entity: {str(e)}"
)
@router.post("/graph/relation/create", dependencies=[Depends(combined_auth)])
async def create_relation(request: RelationCreateRequest):
"""
Create a new relationship between two entities in the knowledge graph
Args:
request (RelationCreateRequest): Request containing:
- source_entity: Source entity name
- target_entity: Target entity name
- relation_data: Dictionary of relation properties (e.g., description, keywords, weight)
Returns:
Dict: Created relation information
Example:
{
"source_entity": "Elon Musk",
"target_entity": "Tesla",
"relation_data": {
"description": "Elon Musk is the CEO of Tesla",
"keywords": "CEO, founder",
"weight": 1.0
}
}
"""
try:
# Check if both entities exist
source_exists = await rag.chunk_entity_relation_graph.has_node(
request.source_entity
)
target_exists = await rag.chunk_entity_relation_graph.has_node(
request.target_entity
)
if not source_exists:
raise ValueError(f"Source entity '{request.source_entity}' does not exist")
if not target_exists:
raise ValueError(f"Target entity '{request.target_entity}' does not exist")
# Create the relationship
await rag.chunk_entity_relation_graph.upsert_edge(
request.source_entity, request.target_entity, request.relation_data
)
return {
"status": "success",
"message": f"Relation created successfully between '{request.source_entity}' and '{request.target_entity}'",
"data": {
"source": request.source_entity,
"target": request.target_entity,
**request.relation_data,
},
}
except ValueError as ve:
logger.error(
f"Validation error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(
f"Error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(e)}"
)
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error creating relation: {str(e)}"
)
@router.post("/graph/entities/merge", dependencies=[Depends(combined_auth)])
async def merge_entities(request: EntityMergeRequest):
"""
Merge multiple entities into a single entity, preserving all relationships.
This endpoint is useful for consolidating duplicate or misspelled entities.
All relationships from the source entities will be transferred to the target entity.
Args:
request (EntityMergeRequest): Request containing:
- entities_to_change: List of entity names to be removed
- entity_to_change_into: Name of the target entity to merge into
Returns:
Dict: Result of the merge operation with merged entity information
Example:
{
"entities_to_change": ["Elon Msk", "Ellon Musk"],
"entity_to_change_into": "Elon Musk"
}
"""
try:
result = await rag.amerge_entities(
source_entities=request.entities_to_change,
target_entity=request.entity_to_change_into,
)
return {
"status": "success",
"message": f"Successfully merged {len(request.entities_to_change)} entities into '{request.entity_to_change_into}'",
"data": result,
}
except ValueError as ve:
logger.error(
f"Validation error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(ve)}"
)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(
f"Error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(e)}"
)
logger.error(traceback.format_exc())
raise HTTPException(
status_code=500, detail=f"Error merging entities: {str(e)}"
)
return router return router