diff --git a/lightrag/operate.py b/lightrag/operate.py index a5e168be..290f19b6 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1470,6 +1470,7 @@ async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, @@ -1595,13 +1596,25 @@ async def _merge_nodes_then_upsert( if already_node else "(no description)" ) - llm_was_used = False - status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached" + status_message = f"Skip merge for `{entity_name}`: KEEP limit reached" logger.debug(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + existing_node_data = dict(already_node or {}) + if not existing_node_data: + existing_node_data = { + "entity_id": entity_name, + "entity_type": entity_type, + "description": description, + "source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids), + "file_path": GRAPH_FIELD_SEP.join(already_file_paths), + "created_at": int(time.time()), + "truncate": "", + } + existing_node_data["entity_name"] = entity_name + return existing_node_data elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( @@ -1726,6 +1739,25 @@ async def _merge_nodes_then_upsert( node_data=node_data, ) node_data["entity_name"] = entity_name + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") + entity_content = f"{entity_name}\n{description}" + data_for_vdb = { + entity_vdb_id: { + "entity_name": entity_name, + "entity_type": entity_type, + "content": entity_content, + "source_id": source_id, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) return node_data @@ -1734,6 +1766,8 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage | None, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, @@ -1744,6 +1778,7 @@ async def _merge_edges_then_upsert( if src_id == tgt_id: return None + already_edge = None already_weights = [] already_source_ids = [] already_description = [] @@ -1825,13 +1860,13 @@ async def _merge_edges_then_upsert( global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP ) - # Only apply filtering in IGNORE_NEW mode + # Only apply filtering in KEEP(ignore new) mode if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_edges = [] for dp in edges_data: source_id = dp.get("source_id") - # Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap + # Skip relationship fragments sourced from chunks dropped by keep oldest cap if ( source_id and source_id not in allowed_source_ids @@ -1889,15 +1924,29 @@ async def _merge_edges_then_upsert( if already_edge else "(no description)" ) - llm_was_used = False status_message = ( - f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached" + f"Skip merge for `{src_id}`~`{tgt_id}`: KEEP limit reached" ) logger.debug(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + existing_edge_data = dict(already_edge or {}) + if not existing_edge_data: + existing_edge_data = { + "description": description, + "keywords": GRAPH_FIELD_SEP.join(already_keywords), + "source_id": GRAPH_FIELD_SEP.join(existing_full_source_ids), + "file_path": GRAPH_FIELD_SEP.join(already_file_paths), + "weight": sum(already_weights) if already_weights else 0.0, + "truncate": "", + "created_at": int(time.time()), + } + existing_edge_data.setdefault("created_at", int(time.time())) + existing_edge_data["src_id"] = src_id + existing_edge_data["tgt_id"] = tgt_id + return existing_edge_data elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( @@ -2025,17 +2074,38 @@ async def _merge_edges_then_upsert( for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): + node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = f"{need_insert_id}\n{description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": source_id, + "entity_type": "UNKNOWN", + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="added_entity_upsert", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -2044,10 +2114,11 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, } added_entities.append(entity_data) + edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -2057,7 +2128,7 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, ), ) @@ -2069,10 +2140,41 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, + weight=weight, ) + if relationships_vdb is not None: + rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" + vdb_data = { + rel_vdb_id: { + "src_id": src_id, + "tgt_id": tgt_id, + "source_id": source_id, + "content": rel_content, + "keywords": keywords, + "description": description, + "weight": weight, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), + operation_name="relationship_upsert", + entity_name=f"{src_id}-{tgt_id}", + max_retries=3, + retry_delay=0.2, + ) + return edge_data @@ -2162,12 +2264,12 @@ async def merge_nodes_and_edges( [entity_name], namespace=namespace, enable_logging=False ): try: - logger.debug(f"Inserting {entity_name} in Graph") - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing entity {entity_name}") entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, @@ -2175,36 +2277,9 @@ async def merge_nodes_and_edges( entity_chunks_storage, ) - # Vector database operation (equally critical, must succeed) - if entity_vdb is not None and entity_data: - data_for_vdb = { - compute_mdhash_id( - str(entity_data["entity_name"]), prefix="ent-" - ): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - logger.debug(f"Inserting {entity_name} in Graph") - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: entity_vdb.upsert(data_for_vdb), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) - return entity_data except Exception as e: - # Any database operation failure is critical error_msg = ( f"Critical error in entity processing for `{entity_name}`: {e}" ) @@ -2294,12 +2369,14 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing relation {sorted_edge_key}") edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, + relationships_vdb, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, @@ -2311,66 +2388,9 @@ async def merge_nodes_and_edges( if edge_data is None: return None, [] - # Vector database operation (equally critical, must succeed) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get( - "file_path", "unknown_source" - ), - "weight": edge_data.get("weight", 1.0), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: relationships_vdb.upsert(data_for_vdb), - operation_name="relationship_upsert", - entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", - max_retries=3, - retry_delay=0.1, - ) - - # Update added_entities to entity vector database using safe operation wrapper - if added_entities and entity_vdb is not None: - for entity_data in added_entities: - entity_vdb_id = compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ) - entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" - - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_data["entity_name"], - "source_id": entity_data["source_id"], - "entity_type": entity_data["entity_type"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda data=vdb_data: entity_vdb.upsert(data), - operation_name="added_entity_upsert", - entity_name=entity_data["entity_name"], - max_retries=3, - retry_delay=0.1, - ) - return edge_data, added_entities except Exception as e: - # Any database operation failure is critical error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" logger.error(error_msg)