From 9f44e89de75377ebf64b8a2d1cc06b84e446552a Mon Sep 17 00:00:00 2001 From: NeelM0906 Date: Wed, 8 Oct 2025 15:59:47 -0400 Subject: [PATCH 1/3] Add knowledge graph manipulation endpoints Added three new REST API endpoints for direct knowledge graph manipulation: - POST /graph/entity/create: Create new entities in the knowledge graph - POST /graph/relation/create: Create relationships between entities - POST /graph/entities/merge: Merge duplicate/misspelled entities while preserving relationships The merge endpoint is particularly useful for consolidating entities discovered after document processing, fixing spelling errors, and cleaning up the knowledge graph. All relationships from source entities are transferred to the target entity, with intelligent handling of duplicate relationships. Updated API documentation in lightrag/api/README.md with usage examples for all three endpoints. --- lightrag/api/README.md | 63 ++++++++++ lightrag/api/routers/graph_routes.py | 181 +++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) diff --git a/lightrag/api/README.md b/lightrag/api/README.md index 60a1d2ab..6d8cecd4 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -543,6 +543,69 @@ You can test the API endpoints using the provided curl commands or through the S 4. Query the system using the query endpoints 5. Trigger document scan if new files are put into the inputs directory +### Graph Manipulation Endpoints + +LightRAG provides REST API endpoints for direct knowledge graph manipulation: + +#### Create Entity + +Create a new entity in the knowledge graph: + +```bash +curl -X POST "http://localhost:9621/graph/entity/create" \ + -H "Content-Type: application/json" \ + -d '{ + "entity_name": "Tesla", + "entity_data": { + "description": "Electric vehicle manufacturer", + "entity_type": "ORGANIZATION" + } + }' +``` + +#### Create Relationship + +Create a new relationship between two existing entities: + +```bash +curl -X POST "http://localhost:9621/graph/relation/create" \ + -H "Content-Type: application/json" \ + -d '{ + "source_entity": "Elon Musk", + "target_entity": "Tesla", + "relation_data": { + "description": "Elon Musk is the CEO of Tesla", + "keywords": "CEO, founder", + "weight": 1.0 + } + }' +``` + +#### Merge Entities + +Consolidate duplicate or misspelled entities while preserving all relationships: + +```bash +curl -X POST "http://localhost:9621/graph/entities/merge" \ + -H "Content-Type: application/json" \ + -d '{ + "entities_to_change": ["Elon Msk", "Ellon Musk"], + "entity_to_change_into": "Elon Musk" + }' +``` + +**What the merge operation does:** +- Deletes the specified source entities +- Transfers all relationships from source entities to the target entity +- Intelligently merges duplicate relationships +- Updates vector embeddings for accurate retrieval +- Preserves the entire graph structure + +This is particularly useful for: +- Fixing spelling errors in entity names +- Consolidating duplicate entities discovered after document processing +- Cleaning up the knowledge graph for better query performance + ## Asynchronous Document Indexing with Progress Tracking LightRAG implements asynchronous document indexing to enable frontend monitoring and querying of document processing progress. Upon uploading files or inserting text through designated endpoints, a unique Track ID is returned to facilitate real-time progress monitoring. diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index 0c1710fc..078fd452 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -25,6 +25,22 @@ class RelationUpdateRequest(BaseModel): updated_data: Dict[str, Any] +class EntityMergeRequest(BaseModel): + entities_to_change: list[str] + entity_to_change_into: str + + +class EntityCreateRequest(BaseModel): + entity_name: str + entity_data: Dict[str, Any] + + +class RelationCreateRequest(BaseModel): + source_entity: str + target_entity: str + relation_data: Dict[str, Any] + + def create_graph_routes(rag, api_key: Optional[str] = None): combined_auth = get_combined_auth_dependency(api_key) @@ -225,4 +241,169 @@ def create_graph_routes(rag, api_key: Optional[str] = None): status_code=500, detail=f"Error updating relation: {str(e)}" ) + @router.post("/graph/entity/create", dependencies=[Depends(combined_auth)]) + async def create_entity(request: EntityCreateRequest): + """ + Create a new entity in the knowledge graph + + Args: + request (EntityCreateRequest): Request containing: + - entity_name: Name of the entity + - entity_data: Dictionary of entity properties (e.g., description, entity_type) + + Returns: + Dict: Created entity information + + Example: + { + "entity_name": "Tesla", + "entity_data": { + "description": "Electric vehicle manufacturer", + "entity_type": "ORGANIZATION" + } + } + """ + try: + # Check if entity already exists + exists = await rag.chunk_entity_relation_graph.has_node(request.entity_name) + if exists: + raise ValueError(f"Entity '{request.entity_name}' already exists") + + # Prepare entity data + entity_data = request.entity_data.copy() + entity_data["entity_id"] = request.entity_name + + # Create the entity + await rag.chunk_entity_relation_graph.upsert_node( + request.entity_name, entity_data + ) + + return { + "status": "success", + "message": f"Entity '{request.entity_name}' created successfully", + "data": entity_data, + } + except ValueError as ve: + logger.error(f"Validation error creating entity '{request.entity_name}': {str(ve)}") + raise HTTPException(status_code=400, detail=str(ve)) + except Exception as e: + logger.error(f"Error creating entity '{request.entity_name}': {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException( + status_code=500, detail=f"Error creating entity: {str(e)}" + ) + + @router.post("/graph/relation/create", dependencies=[Depends(combined_auth)]) + async def create_relation(request: RelationCreateRequest): + """ + Create a new relationship between two entities in the knowledge graph + + Args: + request (RelationCreateRequest): Request containing: + - source_entity: Source entity name + - target_entity: Target entity name + - relation_data: Dictionary of relation properties (e.g., description, keywords, weight) + + Returns: + Dict: Created relation information + + Example: + { + "source_entity": "Elon Musk", + "target_entity": "Tesla", + "relation_data": { + "description": "Elon Musk is the CEO of Tesla", + "keywords": "CEO, founder", + "weight": 1.0 + } + } + """ + try: + # Check if both entities exist + source_exists = await rag.chunk_entity_relation_graph.has_node( + request.source_entity + ) + target_exists = await rag.chunk_entity_relation_graph.has_node( + request.target_entity + ) + + if not source_exists: + raise ValueError(f"Source entity '{request.source_entity}' does not exist") + if not target_exists: + raise ValueError(f"Target entity '{request.target_entity}' does not exist") + + # Create the relationship + await rag.chunk_entity_relation_graph.upsert_edge( + request.source_entity, request.target_entity, request.relation_data + ) + + return { + "status": "success", + "message": f"Relation created successfully between '{request.source_entity}' and '{request.target_entity}'", + "data": { + "source": request.source_entity, + "target": request.target_entity, + **request.relation_data, + }, + } + except ValueError as ve: + logger.error( + f"Validation error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(ve)}" + ) + raise HTTPException(status_code=400, detail=str(ve)) + except Exception as e: + logger.error( + f"Error creating relation between '{request.source_entity}' and '{request.target_entity}': {str(e)}" + ) + logger.error(traceback.format_exc()) + raise HTTPException( + status_code=500, detail=f"Error creating relation: {str(e)}" + ) + + @router.post("/graph/entities/merge", dependencies=[Depends(combined_auth)]) + async def merge_entities(request: EntityMergeRequest): + """ + Merge multiple entities into a single entity, preserving all relationships. + + This endpoint is useful for consolidating duplicate or misspelled entities. + All relationships from the source entities will be transferred to the target entity. + + Args: + request (EntityMergeRequest): Request containing: + - entities_to_change: List of entity names to be removed + - entity_to_change_into: Name of the target entity to merge into + + Returns: + Dict: Result of the merge operation with merged entity information + + Example: + { + "entities_to_change": ["Elon Msk", "Ellon Musk"], + "entity_to_change_into": "Elon Musk" + } + """ + try: + result = await rag.amerge_entities( + source_entities=request.entities_to_change, + target_entity=request.entity_to_change_into, + ) + return { + "status": "success", + "message": f"Successfully merged {len(request.entities_to_change)} entities into '{request.entity_to_change_into}'", + "data": result, + } + except ValueError as ve: + logger.error( + f"Validation error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(ve)}" + ) + raise HTTPException(status_code=400, detail=str(ve)) + except Exception as e: + logger.error( + f"Error merging entities {request.entities_to_change} into '{request.entity_to_change_into}': {str(e)}" + ) + logger.error(traceback.format_exc()) + raise HTTPException( + status_code=500, detail=f"Error merging entities: {str(e)}" + ) + return router From f6d1fb98ac9ccb1b478f8dc7d44967bbc06dd5d5 Mon Sep 17 00:00:00 2001 From: NeelM0906 Date: Thu, 9 Oct 2025 16:52:22 -0400 Subject: [PATCH 2/3] Fix Linting errors --- lightrag/api/routers/graph_routes.py | 12 +++++++++--- lightrag/kg/postgres_impl.py | 4 ++-- lightrag/operate.py | 4 ++-- lightrag/utils.py | 3 +-- reproduce/batch_eval.py | 2 +- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index 078fd452..46e225b6 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -284,7 +284,9 @@ def create_graph_routes(rag, api_key: Optional[str] = None): "data": entity_data, } except ValueError as ve: - logger.error(f"Validation error creating entity '{request.entity_name}': {str(ve)}") + logger.error( + f"Validation error creating entity '{request.entity_name}': {str(ve)}" + ) raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: logger.error(f"Error creating entity '{request.entity_name}': {str(e)}") @@ -328,9 +330,13 @@ def create_graph_routes(rag, api_key: Optional[str] = None): ) if not source_exists: - raise ValueError(f"Source entity '{request.source_entity}' does not exist") + raise ValueError( + f"Source entity '{request.source_entity}' does not exist" + ) if not target_exists: - raise ValueError(f"Target entity '{request.target_entity}' does not exist") + raise ValueError( + f"Target entity '{request.target_entity}' does not exist" + ) # Create the relationship await rag.chunk_entity_relation_graph.upsert_edge( diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index ad271b15..96f4be94 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -828,8 +828,8 @@ class PostgreSQLDB: # Execute the migration alter_sql = f""" - ALTER TABLE {migration['table']} - ALTER COLUMN {migration['column']} TYPE {migration['new_type']} + ALTER TABLE {migration["table"]} + ALTER COLUMN {migration["column"]} TYPE {migration["new_type"]} """ await self.execute(alter_sql) diff --git a/lightrag/operate.py b/lightrag/operate.py index cd8d8a64..e11b039e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -401,7 +401,7 @@ async def _handle_single_relationship_extraction( ): # treat "relationship" and "relation" interchangeable if len(record_attributes) > 1 and "relation" in record_attributes[0]: logger.warning( - f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) >2 else 'N/A'}`" + f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" ) logger.debug(record_attributes) return None @@ -2225,7 +2225,7 @@ async def extract_entities( await asyncio.wait(pending) # Add progress prefix to the exception message - progress_prefix = f"C[{processed_chunks+1}/{total_chunks}]" + progress_prefix = f"C[{processed_chunks + 1}/{total_chunks}]" # Re-raise the original exception with a prefix prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) diff --git a/lightrag/utils.py b/lightrag/utils.py index 60542e43..83a3c394 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1472,8 +1472,7 @@ async def aexport_data( else: raise ValueError( - f"Unsupported file format: {file_format}. " - f"Choose from: csv, excel, md, txt" + f"Unsupported file format: {file_format}. Choose from: csv, excel, md, txt" ) if file_format is not None: print(f"Data exported to: {output_path} with format: {file_format}") diff --git a/reproduce/batch_eval.py b/reproduce/batch_eval.py index 424b4f54..5a4cfc38 100644 --- a/reproduce/batch_eval.py +++ b/reproduce/batch_eval.py @@ -73,7 +73,7 @@ def batch_eval(query_file, result1_file, result2_file, output_file_path): """ request_data = { - "custom_id": f"request-{i+1}", + "custom_id": f"request-{i + 1}", "method": "POST", "url": "/v1/chat/completions", "body": { From b7c77396a0d49ab96df6afd31a4abf74ea36c11e Mon Sep 17 00:00:00 2001 From: NeelM0906 Date: Thu, 9 Oct 2025 17:02:17 -0400 Subject: [PATCH 3/3] Fix entity/relation creation endpoints to properly update vector stores - Changed create_entity to use rag.acreate_entity() instead of direct graph manipulation - Changed create_relation to use rag.acreate_relation() instead of direct graph manipulation - This ensures vector embeddings are created and entities/relations are searchable - Adds proper concurrency locks and metadata population --- lightrag/api/routers/graph_routes.py | 58 ++++++++++------------------ 1 file changed, 20 insertions(+), 38 deletions(-) diff --git a/lightrag/api/routers/graph_routes.py b/lightrag/api/routers/graph_routes.py index 46e225b6..bac3e104 100644 --- a/lightrag/api/routers/graph_routes.py +++ b/lightrag/api/routers/graph_routes.py @@ -264,24 +264,20 @@ def create_graph_routes(rag, api_key: Optional[str] = None): } """ try: - # Check if entity already exists - exists = await rag.chunk_entity_relation_graph.has_node(request.entity_name) - if exists: - raise ValueError(f"Entity '{request.entity_name}' already exists") - - # Prepare entity data - entity_data = request.entity_data.copy() - entity_data["entity_id"] = request.entity_name - - # Create the entity - await rag.chunk_entity_relation_graph.upsert_node( - request.entity_name, entity_data + # Use the proper acreate_entity method which handles: + # - Graph lock for concurrency + # - Vector embedding creation in entities_vdb + # - Metadata population and defaults + # - Index consistency via _edit_entity_done + result = await rag.acreate_entity( + entity_name=request.entity_name, + entity_data=request.entity_data, ) return { "status": "success", "message": f"Entity '{request.entity_name}' created successfully", - "data": entity_data, + "data": result, } except ValueError as ve: logger.error( @@ -321,36 +317,22 @@ def create_graph_routes(rag, api_key: Optional[str] = None): } """ try: - # Check if both entities exist - source_exists = await rag.chunk_entity_relation_graph.has_node( - request.source_entity - ) - target_exists = await rag.chunk_entity_relation_graph.has_node( - request.target_entity - ) - - if not source_exists: - raise ValueError( - f"Source entity '{request.source_entity}' does not exist" - ) - if not target_exists: - raise ValueError( - f"Target entity '{request.target_entity}' does not exist" - ) - - # Create the relationship - await rag.chunk_entity_relation_graph.upsert_edge( - request.source_entity, request.target_entity, request.relation_data + # Use the proper acreate_relation method which handles: + # - Graph lock for concurrency + # - Entity existence validation + # - Duplicate relation checks + # - Vector embedding creation in relationships_vdb + # - Index consistency via _edit_relation_done + result = await rag.acreate_relation( + source_entity=request.source_entity, + target_entity=request.target_entity, + relation_data=request.relation_data, ) return { "status": "success", "message": f"Relation created successfully between '{request.source_entity}' and '{request.target_entity}'", - "data": { - "source": request.source_entity, - "target": request.target_entity, - **request.relation_data, - }, + "data": result, } except ValueError as ve: logger.error(