diff --git a/lightrag/operate.py b/lightrag/operate.py index 91d1ee68..453e647d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -451,6 +451,58 @@ async def _rebuild_single_entity( if not current_entity: return + # Helper function to update entity in both graph and vector storage + async def _update_entity_storage( + final_description: str, + entity_type: str, + file_paths: set[str] + ): + # Update entity in graph storage + updated_entity_data = { + **current_entity, + "description": final_description, + "entity_type": entity_type, + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + } + await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) + + # Update entity in vector database + entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Delete old vector record first + try: + await entities_vdb.delete([entity_vdb_id]) + except Exception as e: + logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") + + # Insert new vector record + entity_content = f"{entity_name}\n{final_description}" + await entities_vdb.upsert( + { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_name, + "source_id": updated_entity_data["source_id"], + "description": final_description, + "entity_type": entity_type, + "file_path": updated_entity_data["file_path"], + } + } + ) + + # Helper function to generate final description with optional LLM summary + async def _generate_final_description(combined_description: str) -> str: + if len(combined_description) > global_config["summary_to_max_tokens"]: + return await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + return combined_description + # Collect all entity data from relevant chunks all_entity_data = [] for chunk_id in chunk_ids: @@ -458,10 +510,41 @@ async def _rebuild_single_entity( all_entity_data.extend(chunk_entities[chunk_id][entity_name]) if not all_entity_data: - logger.warning(f"No cached entity data found for {entity_name}") + logger.warning(f"No cached entity data found for {entity_name}, trying to rebuild from relationships") + + # Get all edges connected to this entity + edges = await knowledge_graph_inst.get_node_edges(entity_name) + if not edges: + logger.warning(f"No relationships found for entity {entity_name}") + return + + # Collect relationship data to extract entity information + relationship_descriptions = [] + file_paths = set() + + # Get edge data for all connected relationships + for src_id, tgt_id in edges: + edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id) + if edge_data: + if edge_data.get("description"): + relationship_descriptions.append(edge_data["description"]) + + if edge_data.get("file_path"): + edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) + file_paths.update(edge_file_paths) + + # Generate description from relationships or fallback to current + if relationship_descriptions: + combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) + final_description = await _generate_final_description(combined_description) + else: + final_description = current_entity.get("description", "") + + entity_type = current_entity.get("entity_type", "UNKNOWN") + await _update_entity_storage(final_description, entity_type, file_paths) return - # Merge descriptions and get the most common entity type + # Process cached entity data descriptions = [] entity_types = [] file_paths = set() @@ -488,52 +571,9 @@ async def _rebuild_single_entity( else current_entity.get("entity_type", "UNKNOWN") ) - # Use summary if description is too long - if len(combined_description) > global_config["summary_to_max_tokens"]: - final_description = await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - final_description = combined_description - - # Update entity in graph storage - updated_entity_data = { - **current_entity, - "description": final_description, - "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), - "file_path": GRAPH_FIELD_SEP.join(file_paths) - if file_paths - else current_entity.get("file_path", "unknown_source"), - } - await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) - - # Update entity in vector database - entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Delete old vector record first - try: - await entities_vdb.delete([entity_vdb_id]) - except Exception as e: - logger.debug(f"Could not delete old entity vector record {entity_vdb_id}: {e}") - - # Insert new vector record - entity_content = f"{entity_name}\n{final_description}" - await entities_vdb.upsert( - { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_name, - "source_id": updated_entity_data["source_id"], - "description": final_description, - "entity_type": entity_type, - "file_path": updated_entity_data["file_path"], - } - } - ) + # Generate final description and update storage + final_description = await _generate_final_description(combined_description) + await _update_entity_storage(final_description, entity_type, file_paths) async def _rebuild_single_relationship( @@ -798,7 +838,8 @@ async def _merge_edges_then_upsert( ) # Process edges_data with None checks - weight = sum([dp["weight"] for dp in edges_data] + already_weights) + all_weights = [dp["weight"] for dp in edges_data] + already_weights + weight = sum(all_weights) / len(all_weights) description = GRAPH_FIELD_SEP.join( sorted( set(