diff --git a/lightrag/operate.py b/lightrag/operate.py index 59a6eee7..c5f370f3 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -57,6 +57,7 @@ from lightrag.constants import ( SOURCE_IDS_LIMIT_METHOD_KEEP, SOURCE_IDS_LIMIT_METHOD_FIFO, DEFAULT_FILE_PATH_MORE_PLACEHOLDER, + DEFAULT_MAX_FILE_PATHS, ) from lightrag.kg.shared_storage import get_storage_keyed_lock import time @@ -1188,7 +1189,7 @@ async def _rebuild_single_entity( file_paths_list = file_paths_list[:max_file_paths] file_paths_list.append( - f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..." ) logger.info( f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" @@ -1347,7 +1348,7 @@ async def _rebuild_single_relationship( file_paths_list = file_paths_list[:max_file_paths] file_paths_list.append( - f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + f"...{file_path_placeholder}({limit_method}:{max_file_paths}/{original_count})..." ) logger.info( f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" @@ -1470,6 +1471,7 @@ async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, @@ -1482,6 +1484,7 @@ async def _merge_nodes_then_upsert( already_description = [] already_file_paths = [] + # 1. Get existing node data from knowledge graph already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) @@ -1489,16 +1492,6 @@ async def _merge_nodes_then_upsert( already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) - entity_type = sorted( - Counter( - [dp["entity_type"] for dp in nodes_data] + already_entity_types - ).items(), - key=lambda x: x[1], - reverse=True, - )[0][0] # Get the entity type with the highest count - - original_nodes_count = len(nodes_data) - new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] existing_full_source_ids = [] @@ -1514,6 +1507,7 @@ async def _merge_nodes_then_upsert( chunk_id for chunk_id in already_source_ids if chunk_id ] + # 2. Merging new source ids with existing ones full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) if entity_chunks_storage is not None and full_source_ids: @@ -1526,6 +1520,7 @@ async def _merge_nodes_then_upsert( } ) + # 3. Finalize source_id by applying source ids limit limit_method = global_config.get("source_ids_limit_method") max_source_limit = global_config.get("max_source_ids_per_entity") source_ids = apply_source_ids_limit( @@ -1535,7 +1530,7 @@ async def _merge_nodes_then_upsert( identifier=f"`{entity_name}`", ) - # Only apply filtering in KEEP(ignore new) mode + # 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_nodes = [] @@ -1550,18 +1545,40 @@ async def _merge_nodes_then_upsert( continue filtered_nodes.append(dp) nodes_data = filtered_nodes - else: - # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only + else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only nodes_data = list(nodes_data) - skip_summary_due_to_limit = ( + # 5. Check if we need to skip summary due to source_ids limit + if ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit and not nodes_data - and already_description - ) + ): + if already_node: + logger.info( + f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_node_data = dict(already_node) + return existing_node_data + else: + logger.error(f"Internal Error: already_node missing for `{entity_name}`") + raise ValueError( + f"Internal Error: already_node missing for `{entity_name}`" + ) - # Deduplicate by description, keeping first occurrence + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # 6.2 Finalize entity type by highest count + entity_type = sorted( + Counter( + [dp["entity_type"] for dp in nodes_data] + already_entity_types + ).items(), + key=lambda x: x[1], + reverse=True, + )[0][0] + + # 7. Deduplicate nodes by description, keeping first occurrence in the same document unique_nodes = {} for dp in nodes_data: desc = dp.get("description") @@ -1570,146 +1587,122 @@ async def _merge_nodes_then_upsert( if desc not in unique_nodes: unique_nodes[desc] = dp - # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same + # Sort description by timestamp, then by description length when timestamps are the same sorted_nodes = sorted( unique_nodes.values(), key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), ) sorted_descriptions = [dp["description"] for dp in sorted_nodes] - truncation_info = "" - dd_message = "" - has_placeholder = False # Initialize to track placeholder in file paths - # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions - deduplicated_num = original_nodes_count - len(sorted_descriptions) - if deduplicated_num > 0: - dd_message = f"dd:{deduplicated_num}" + if not description_list: + logger.error(f"Entity {entity_name} has no description") + raise ValueError(f"Entity {entity_name} has no description") + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # 9. Build file_path within MAX_FILE_PATHS + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Indicating file_path has been truncated before + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) + ) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") + + logger.info( + f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) + + # 10.Log based on actual LLM usage num_fragment = len(description_list) already_fragment = len(already_description) - if skip_summary_due_to_limit: - description = ( - already_node.get("description", "(no description)") - if already_node - else "(no description)" + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + else: + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" + + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" + + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" ) - llm_was_used = False - status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached" - logger.debug(status_message) + + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) - elif num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Entity", - entity_name, - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - - # Add truncation info from apply_source_ids_limit if truncation occurred - if len(source_ids) < len(full_source_ids): - # Add + sign if has_placeholder is True, indicating actual file count is higher - full_source_count_str = ( - f"{len(full_source_ids)}+" - if has_placeholder - else str(len(full_source_ids)) - ) - truncation_info = ( - f"{limit_method}:{len(source_ids)}/{full_source_count_str}" - ) - - if dd_message or truncation_info: - status_message += f" ({', '.join([truncation_info, dd_message])})" - - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) - else: - logger.error(f"Entity {entity_name} has no description") - description = "(no description)" - - source_id = GRAPH_FIELD_SEP.join(source_ids) - - # Build file_path with count limit - if skip_summary_due_to_limit: - # Skip limit, keep original file_path - file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) - else: - # Collect and apply limit - file_paths_list = [] - seen_paths = set() - has_placeholder = False # Track if already_file_paths contains placeholder - - # Get placeholder to filter it out - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - - # Collect from already_file_paths, excluding placeholder - for fp in already_file_paths: - # Check if this is a placeholder record - if fp and fp.startswith(f"...{file_path_placeholder}"): - has_placeholder = True - continue - # Skip placeholders (format: "...{placeholder}(showing X of Y)...") - if fp and fp not in seen_paths: - file_paths_list.append(fp) - seen_paths.add(fp) - - # Collect from new data - for dp in nodes_data: - file_path_item = dp.get("file_path") - if file_path_item and file_path_item not in seen_paths: - file_paths_list.append(file_path_item) - seen_paths.add(file_path_item) - - # Apply count limit - max_file_paths = global_config.get("max_file_paths") - - if len(file_paths_list) > max_file_paths: - limit_method = global_config.get( - "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP - ) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - original_count = len(file_paths_list) - - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] - - file_paths_list.append( - f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." - ) - logger.info( - f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" - ) - - file_path = GRAPH_FIELD_SEP.join(file_paths_list) + logger.debug(status_message) + # 11. Update both graph and vector db node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1724,6 +1717,25 @@ async def _merge_nodes_then_upsert( node_data=node_data, ) node_data["entity_name"] = entity_name + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") + entity_content = f"{entity_name}\n{description}" + data_for_vdb = { + entity_vdb_id: { + "entity_name": entity_name, + "entity_type": entity_type, + "content": entity_content, + "source_id": source_id, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) return node_data @@ -1732,6 +1744,8 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, + relationships_vdb: BaseVectorStorage | None, + entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, @@ -1742,12 +1756,14 @@ async def _merge_edges_then_upsert( if src_id == tgt_id: return None + already_edge = None already_weights = [] already_source_ids = [] already_description = [] already_keywords = [] already_file_paths = [] + # 1. Get existing edge data from graph storage if await knowledge_graph_inst.has_edge(src_id, tgt_id): already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields @@ -1781,8 +1797,6 @@ async def _merge_edges_then_upsert( ) ) - original_edges_count = len(edges_data) - new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] storage_key = make_relation_chunk_key(src_id, tgt_id) @@ -1799,6 +1813,7 @@ async def _merge_edges_then_upsert( chunk_id for chunk_id in already_source_ids if chunk_id ] + # 2. Merge new source ids with existing ones full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) if relation_chunks_storage is not None and full_source_ids: @@ -1811,6 +1826,7 @@ async def _merge_edges_then_upsert( } ) + # 3. Finalize source_id by applying source ids limit limit_method = global_config.get("source_ids_limit_method") max_source_limit = global_config.get("max_source_ids_per_relation") source_ids = apply_source_ids_limit( @@ -1823,13 +1839,13 @@ async def _merge_edges_then_upsert( global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP ) - # Only apply filtering in IGNORE_NEW mode + # 4. Only keep edges with source_id in the final source_ids list if in KEEP mode if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_edges = [] for dp in edges_data: source_id = dp.get("source_id") - # Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap + # Skip relationship fragments sourced from chunks dropped by keep oldest cap if ( source_id and source_id not in allowed_source_ids @@ -1838,21 +1854,51 @@ async def _merge_edges_then_upsert( continue filtered_edges.append(dp) edges_data = filtered_edges - else: - # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only + else: # In FIFO mode, keep all edges - truncation happens at source_ids level only edges_data = list(edges_data) - skip_summary_due_to_limit = ( + # 5. Check if we need to skip summary due to source_ids limit + if ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit and not edges_data - and already_description - ) + ): + if already_edge: + logger.info( + f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" + ) + existing_edge_data = dict(already_edge) + return existing_edge_data + else: + logger.error( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) + raise ValueError( + f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" + ) - # Process edges_data with None checks + # 6.1 Finalize source_id + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # 6.2 Finalize weight by summing new edges and existing weights weight = sum([dp["weight"] for dp in edges_data] + already_weights) - # Deduplicate by description, keeping first occurrence + # 6.2 Finalize keywords by merging existing and new keywords + all_keywords = set() + # Process already_keywords (which are comma-separated) + for keyword_str in already_keywords: + if keyword_str: # Skip empty strings + all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip()) + # Process new keywords from edges_data + for edge in edges_data: + if edge.get("keywords"): + all_keywords.update( + k.strip() for k in edge["keywords"].split(",") if k.strip() + ) + # Join all unique keywords with commas + keywords = ",".join(sorted(all_keywords)) + + # 7. Deduplicate by description, keeping first occurrence in the same document unique_edges = {} for dp in edges_data: description_value = dp.get("description") @@ -1868,170 +1914,153 @@ async def _merge_edges_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_edges] - truncation_info = "" - dd_message = "" - has_placeholder = False # Initialize to track placeholder in file paths - # Combine already_description with sorted new descriptions description_list = already_description + sorted_descriptions - deduplicated_num = original_edges_count - len(sorted_descriptions) - if deduplicated_num > 0: - dd_message = f"dd:{deduplicated_num}" + if not description_list: + logger.error(f"Relation {src_id}~{tgt_id} has no description") + raise ValueError(f"Relation {src_id}~{tgt_id} has no description") - num_fragment = len(description_list) - already_fragment = len(already_description) + # 8. Get summary description an LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Relation", + f"({src_id}, {tgt_id})", + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) - if skip_summary_due_to_limit: - description = ( - already_edge.get("description", "(no description)") - if already_edge - else "(no description)" + # 9. Build file_path within MAX_FILE_PATHS limit + file_paths_list = [] + seen_paths = set() + has_placeholder = False # Track if already_file_paths contains placeholder + + max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Check if this is a placeholder record + if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders + has_placeholder = True + continue + if fp and fp not in seen_paths: + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP ) - llm_was_used = False - status_message = ( - f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached" - ) - logger.debug(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - elif num_fragment > 0: - # Get summary and LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Relation", - f"({src_id}, {tgt_id})", - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # Log based on actual LLM usage - if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - - # Add truncation info from apply_source_ids_limit if truncation occurred - if len(source_ids) < len(full_source_ids): - # Add + sign if has_placeholder is True, indicating actual file count is higher - full_source_count_str = ( - f"{len(full_source_ids)}+" - if has_placeholder - else str(len(full_source_ids)) - ) - truncation_info = ( - f"{limit_method}:{len(source_ids)}/{full_source_count_str}" - ) - - if dd_message or truncation_info: - status_message += f" ({', '.join([truncation_info, dd_message])})" - - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) - - else: - logger.error(f"Edge {src_id} - {tgt_id} has no description") - description = "(no description)" - - # Split all existing and new keywords into individual terms, then combine and deduplicate - all_keywords = set() - # Process already_keywords (which are comma-separated) - for keyword_str in already_keywords: - if keyword_str: # Skip empty strings - all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip()) - # Process new keywords from edges_data - for edge in edges_data: - if edge.get("keywords"): - all_keywords.update( - k.strip() for k in edge["keywords"].split(",") if k.strip() - ) - # Join all unique keywords with commas - keywords = ",".join(sorted(all_keywords)) - - source_id = GRAPH_FIELD_SEP.join(source_ids) - - # Build file_path with count limit - if skip_summary_due_to_limit: - # Skip limit, keep original file_path - file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) - else: - # Collect and apply limit - file_paths_list = [] - seen_paths = set() - has_placeholder = False # Track if already_file_paths contains placeholder - - # Get placeholder to filter it out file_path_placeholder = global_config.get( "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER ) - # Collect from already_file_paths, excluding placeholder - for fp in already_file_paths: - # Check if this is a placeholder record - if fp and fp.startswith(f"...{file_path_placeholder}"): - has_placeholder = True - continue - # Skip placeholders (format: "...{placeholder}(showing X of Y)...") - if fp and fp not in seen_paths: - file_paths_list.append(fp) - seen_paths.add(fp) + # Add + sign to indicate actual file count is higher + original_count_str = ( + f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) + ) - # Collect from new data - for dp in edges_data: - file_path_item = dp.get("file_path") - if file_path_item and file_path_item not in seen_paths: - file_paths_list.append(file_path_item) - seen_paths.add(file_path_item) + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") - # Apply count limit - max_file_paths = global_config.get("max_file_paths") + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" + ) + # Finalize file_path + file_path = GRAPH_FIELD_SEP.join(file_paths_list) - if len(file_paths_list) > max_file_paths: - limit_method = global_config.get( - "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP - ) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - original_count = len(file_paths_list) + # 10. Log based on actual LLM usage + num_fragment = len(description_list) + already_fragment = len(already_description) + if llm_was_used: + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + else: + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] + truncation_info = truncation_info_log = "" + if len(source_ids) < len(full_source_ids): + # Add truncation info from apply_source_ids_limit if truncation occurred + truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + truncation_info = truncation_info_log + else: + truncation_info = "KEEP Old" - file_paths_list.append( - f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." - ) - logger.info( - f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})" - ) + deduplicated_num = already_fragment + len(edges_data) - num_fragment + dd_message = "" + if deduplicated_num > 0: + # Duplicated description detected across multiple trucks for the same entity + dd_message = f"dd {deduplicated_num}" - file_path = GRAPH_FIELD_SEP.join(file_paths_list) + if dd_message or truncation_info_log: + status_message += ( + f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" + ) + # Add message to pipeline satus when merge happens + if already_fragment > 0 or llm_was_used: + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.debug(status_message) + + # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): + node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = f"{need_insert_id}\n{description}" + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": source_id, + "entity_type": "UNKNOWN", + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="added_entity_upsert", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -2040,10 +2069,11 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": int(time.time()), + "created_at": node_created_at, } added_entities.append(entity_data) + edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -2053,7 +2083,7 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, ), ) @@ -2065,10 +2095,41 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=int(time.time()), + created_at=edge_created_at, truncate=truncation_info, + weight=weight, ) + if relationships_vdb is not None: + rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") + rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") + try: + await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) + except Exception as e: + logger.debug( + f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" + ) + rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" + vdb_data = { + rel_vdb_id: { + "src_id": src_id, + "tgt_id": tgt_id, + "source_id": source_id, + "content": rel_content, + "keywords": keywords, + "description": description, + "weight": weight, + "file_path": file_path, + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), + operation_name="relationship_upsert", + entity_name=f"{src_id}-{tgt_id}", + max_retries=3, + retry_delay=0.2, + ) + return edge_data @@ -2158,12 +2219,12 @@ async def merge_nodes_and_edges( [entity_name], namespace=namespace, enable_logging=False ): try: - logger.debug(f"Inserting {entity_name} in Graph") - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing entity {entity_name}") entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, @@ -2171,36 +2232,9 @@ async def merge_nodes_and_edges( entity_chunks_storage, ) - # Vector database operation (equally critical, must succeed) - if entity_vdb is not None and entity_data: - data_for_vdb = { - compute_mdhash_id( - str(entity_data["entity_name"]), prefix="ent-" - ): { - "entity_name": entity_data["entity_name"], - "entity_type": entity_data["entity_type"], - "content": f"{entity_data['entity_name']}\n{entity_data['description']}", - "source_id": entity_data["source_id"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - logger.debug(f"Inserting {entity_name} in Graph") - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: entity_vdb.upsert(data_for_vdb), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) - return entity_data except Exception as e: - # Any database operation failure is critical error_msg = ( f"Critical error in entity processing for `{entity_name}`: {e}" ) @@ -2290,12 +2324,14 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - # Graph database operation (critical path, must succeed) + logger.debug(f"Processing relation {sorted_edge_key}") edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, + relationships_vdb, + entity_vdb, global_config, pipeline_status, pipeline_status_lock, @@ -2307,66 +2343,9 @@ async def merge_nodes_and_edges( if edge_data is None: return None, [] - # Vector database operation (equally critical, must succeed) - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id( - edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" - ): { - "src_id": edge_data["src_id"], - "tgt_id": edge_data["tgt_id"], - "keywords": edge_data["keywords"], - "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", - "source_id": edge_data["source_id"], - "file_path": edge_data.get( - "file_path", "unknown_source" - ), - "weight": edge_data.get("weight", 1.0), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda: relationships_vdb.upsert(data_for_vdb), - operation_name="relationship_upsert", - entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", - max_retries=3, - retry_delay=0.1, - ) - - # Update added_entities to entity vector database using safe operation wrapper - if added_entities and entity_vdb is not None: - for entity_data in added_entities: - entity_vdb_id = compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" - ) - entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" - - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": entity_data["entity_name"], - "source_id": entity_data["source_id"], - "entity_type": entity_data["entity_type"], - "file_path": entity_data.get( - "file_path", "unknown_source" - ), - } - } - - # Use safe operation wrapper - VDB failure must throw exception - await safe_vdb_operation_with_exception( - operation=lambda data=vdb_data: entity_vdb.upsert(data), - operation_name="added_entity_upsert", - entity_name=entity_data["entity_name"], - max_retries=3, - retry_delay=0.1, - ) - return edge_data, added_entities except Exception as e: - # Any database operation failure is critical error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" logger.error(error_msg)