diff --git a/lightrag/operate.py b/lightrag/operate.py index c6724974..cd8d8a64 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1,6 +1,5 @@ from __future__ import annotations from functools import partial -from pathlib import Path import asyncio import json @@ -8,11 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict -from lightrag.exceptions import ( - PipelineCancelledException, - ChunkTokenLimitExceededError, -) -from lightrag.utils import ( +from .utils import ( logger, compute_mdhash_id, Tokenizer, @@ -31,16 +26,14 @@ from lightrag.utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, + build_file_path, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, convert_to_user_format, generate_reference_list_from_chunks, - apply_source_ids_limit, - merge_source_ids, - make_relation_chunk_key, ) -from lightrag.base import ( +from .base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, @@ -49,8 +42,8 @@ from lightrag.base import ( QueryResult, QueryContextResult, ) -from lightrag.prompt import PROMPTS -from lightrag.constants import ( +from .prompt import PROMPTS +from .constants import ( GRAPH_FIELD_SEP, DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_RELATION_TOKENS, @@ -59,41 +52,15 @@ from lightrag.constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, - SOURCE_IDS_LIMIT_METHOD_KEEP, - SOURCE_IDS_LIMIT_METHOD_FIFO, - DEFAULT_FILE_PATH_MORE_PLACEHOLDER, - DEFAULT_MAX_FILE_PATHS, - DEFAULT_ENTITY_NAME_MAX_LENGTH, ) -from lightrag.kg.shared_storage import get_storage_keyed_lock +from .kg.shared_storage import get_storage_keyed_lock import time from dotenv import load_dotenv # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file -load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) - - -def _truncate_entity_identifier( - identifier: str, limit: int, chunk_key: str, identifier_role: str -) -> str: - """Truncate entity identifiers that exceed the configured length limit.""" - - if len(identifier) <= limit: - return identifier - - display_value = identifier[:limit] - preview = identifier[:20] # Show first 20 characters as preview - logger.warning( - "%s: %s len %d > %d chars (Name: '%s...')", - chunk_key, - identifier_role, - len(identifier), - limit, - preview, - ) - return display_value +load_dotenv(dotenv_path=".env", override=False) def chunking_by_token_size( @@ -101,8 +68,8 @@ def chunking_by_token_size( content: str, split_by_character: str | None = None, split_by_character_only: bool = False, - chunk_overlap_token_size: int = 100, - chunk_token_size: int = 1200, + overlap_token_size: int = 128, + max_token_size: int = 1024, ) -> list[dict[str, Any]]: tokens = tokenizer.encode(content) results: list[dict[str, Any]] = [] @@ -112,30 +79,19 @@ def chunking_by_token_size( if split_by_character_only: for chunk in raw_chunks: _tokens = tokenizer.encode(chunk) - if len(_tokens) > chunk_token_size: - logger.warning( - "Chunk split_by_character exceeds token limit: len=%d limit=%d", - len(_tokens), - chunk_token_size, - ) - raise ChunkTokenLimitExceededError( - chunk_tokens=len(_tokens), - chunk_token_limit=chunk_token_size, - chunk_preview=chunk[:120], - ) new_chunks.append((len(_tokens), chunk)) else: for chunk in raw_chunks: _tokens = tokenizer.encode(chunk) - if len(_tokens) > chunk_token_size: + if len(_tokens) > max_token_size: for start in range( - 0, len(_tokens), chunk_token_size - chunk_overlap_token_size + 0, len(_tokens), max_token_size - overlap_token_size ): chunk_content = tokenizer.decode( - _tokens[start : start + chunk_token_size] + _tokens[start : start + max_token_size] ) new_chunks.append( - (min(chunk_token_size, len(_tokens) - start), chunk_content) + (min(max_token_size, len(_tokens) - start), chunk_content) ) else: new_chunks.append((len(_tokens), chunk)) @@ -149,12 +105,12 @@ def chunking_by_token_size( ) else: for index, start in enumerate( - range(0, len(tokens), chunk_token_size - chunk_overlap_token_size) + range(0, len(tokens), max_token_size - overlap_token_size) ): - chunk_content = tokenizer.decode(tokens[start : start + chunk_token_size]) + chunk_content = tokenizer.decode(tokens[start : start + max_token_size]) results.append( { - "tokens": min(chunk_token_size, len(tokens) - start), + "tokens": min(max_token_size, len(tokens) - start), "content": chunk_content.strip(), "chunk_order_index": index, } @@ -359,20 +315,6 @@ async def _summarize_descriptions( llm_response_cache=llm_response_cache, cache_type="summary", ) - - # Check summary token length against embedding limit - embedding_token_limit = global_config.get("embedding_token_limit") - if embedding_token_limit is not None and summary: - tokenizer = global_config["tokenizer"] - summary_token_count = len(tokenizer.encode(summary)) - threshold = int(embedding_token_limit * 0.9) - - if summary_token_count > threshold: - logger.warning( - f"Summary tokens ({summary_token_count}) exceeds 90% of embedding limit " - f"({embedding_token_limit}) for {description_type}: {description_name}" - ) - return summary @@ -397,8 +339,8 @@ async def _handle_single_entity_extraction( # Validate entity name after all cleaning steps if not entity_name or not entity_name.strip(): - logger.info( - f"Empty entity name found after sanitization. Original: '{record_attributes[1]}'" + logger.warning( + f"Entity extraction error: entity name became empty after cleaning. Original: '{record_attributes[1]}'" ) return None @@ -459,7 +401,7 @@ async def _handle_single_relationship_extraction( ): # treat "relationship" and "relation" interchangeable if len(record_attributes) > 1 and "relation" in record_attributes[0]: logger.warning( - f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" + f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) >2 else 'N/A'}`" ) logger.debug(record_attributes) return None @@ -474,14 +416,14 @@ async def _handle_single_relationship_extraction( # Validate entity names after all cleaning steps if not source: - logger.info( - f"Empty source entity found after sanitization. Original: '{record_attributes[1]}'" + logger.warning( + f"Relationship extraction error: source entity became empty after cleaning. Original: '{record_attributes[1]}'" ) return None if not target: - logger.info( - f"Empty target entity found after sanitization. Original: '{record_attributes[2]}'" + logger.warning( + f"Relationship extraction error: target entity became empty after cleaning. Original: '{record_attributes[2]}'" ) return None @@ -530,9 +472,9 @@ async def _handle_single_relationship_extraction( return None -async def rebuild_knowledge_from_chunks( - entities_to_rebuild: dict[str, list[str]], - relationships_to_rebuild: dict[tuple[str, str], list[str]], +async def _rebuild_knowledge_from_chunks( + entities_to_rebuild: dict[str, set[str]], + relationships_to_rebuild: dict[tuple[str, str], set[str]], knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, @@ -541,8 +483,6 @@ async def rebuild_knowledge_from_chunks( global_config: dict[str, str], pipeline_status: dict | None = None, pipeline_status_lock=None, - entity_chunks_storage: BaseKVStorage | None = None, - relation_chunks_storage: BaseKVStorage | None = None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results with parallel processing @@ -551,8 +491,8 @@ async def rebuild_knowledge_from_chunks( controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: - entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids - relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids + entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids knowledge_graph_inst: Knowledge graph storage entities_vdb: Entity vector database relationships_vdb: Relationship vector database @@ -561,8 +501,6 @@ async def rebuild_knowledge_from_chunks( global_config: Global configuration containing llm_model_max_async pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status - entity_chunks_storage: KV storage maintaining full chunk IDs per entity - relation_chunks_storage: KV storage maintaining full chunk IDs per relation """ if not entities_to_rebuild and not relationships_to_rebuild: return @@ -702,9 +640,16 @@ async def rebuild_knowledge_from_chunks( chunk_entities=chunk_entities, llm_response_cache=llm_response_cache, global_config=global_config, - entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 + status_message = ( + f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" + ) + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 status_message = f"Failed to rebuild `{entity_name}`: {e}" @@ -730,22 +675,25 @@ async def rebuild_knowledge_from_chunks( await _rebuild_single_relationship( knowledge_graph_inst=knowledge_graph_inst, relationships_vdb=relationships_vdb, - entities_vdb=entities_vdb, src=src, tgt=tgt, chunk_ids=chunk_ids, chunk_relationships=chunk_relationships, llm_response_cache=llm_response_cache, global_config=global_config, - relation_chunks_storage=relation_chunks_storage, - entity_chunks_storage=entity_chunks_storage, - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 + status_message = ( + f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" + ) + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild `{src}`~`{tgt}`: {e}" + status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -998,14 +946,7 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if entity_data is not None: - truncated_name = _truncate_entity_identifier( - entity_data["entity_name"], - DEFAULT_ENTITY_NAME_MAX_LENGTH, - chunk_key, - "Entity name", - ) - entity_data["entity_name"] = truncated_name - maybe_nodes[truncated_name].append(entity_data) + maybe_nodes[entity_data["entity_name"]].append(entity_data) continue # Try to parse as relationship @@ -1013,21 +954,9 @@ async def _process_extraction_result( record_attributes, chunk_key, timestamp, file_path ) if relationship_data is not None: - truncated_source = _truncate_entity_identifier( - relationship_data["src_id"], - DEFAULT_ENTITY_NAME_MAX_LENGTH, - chunk_key, - "Relation entity", - ) - truncated_target = _truncate_entity_identifier( - relationship_data["tgt_id"], - DEFAULT_ENTITY_NAME_MAX_LENGTH, - chunk_key, - "Relation entity", - ) - relationship_data["src_id"] = truncated_source - relationship_data["tgt_id"] = truncated_target - maybe_edges[(truncated_source, truncated_target)].append(relationship_data) + maybe_edges[ + (relationship_data["src_id"], relationship_data["tgt_id"]) + ].append(relationship_data) return dict(maybe_nodes), dict(maybe_edges) @@ -1072,13 +1001,10 @@ async def _rebuild_single_entity( knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, entity_name: str, - chunk_ids: list[str], + chunk_ids: set[str], chunk_entities: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], - entity_chunks_storage: BaseKVStorage | None = None, - pipeline_status: dict | None = None, - pipeline_status_lock=None, ) -> None: """Rebuild a single entity from cached extraction results""" @@ -1089,11 +1015,7 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, - entity_type: str, - file_paths: list[str], - source_chunk_ids: list[str], - truncation_info: str = "", + final_description: str, entity_type: str, file_paths: set[str] ): try: # Update entity in graph storage (critical path) @@ -1101,12 +1023,10 @@ async def _rebuild_single_entity( **current_entity, "description": final_description, "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), - "created_at": int(time.time()), - "truncate": truncation_info, } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -1139,33 +1059,9 @@ async def _rebuild_single_entity( logger.error(error_msg) raise # Re-raise exception - # normalized_chunk_ids = merge_source_ids([], chunk_ids) - normalized_chunk_ids = chunk_ids - - if entity_chunks_storage is not None and normalized_chunk_ids: - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": normalized_chunk_ids, - "count": len(normalized_chunk_ids), - } - } - ) - - limit_method = ( - global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP - ) - - limited_chunk_ids = apply_source_ids_limit( - normalized_chunk_ids, - global_config["max_source_ids_per_entity"], - limit_method, - identifier=f"`{entity_name}`", - ) - - # Collect all entity data from relevant (limited) chunks + # Collect all entity data from relevant chunks all_entity_data = [] - for chunk_id in limited_chunk_ids: + for chunk_id in chunk_ids: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: all_entity_data.extend(chunk_entities[chunk_id][entity_name]) @@ -1212,19 +1108,13 @@ async def _rebuild_single_entity( final_description = current_entity.get("description", "") entity_type = current_entity.get("entity_type", "UNKNOWN") - await _update_entity_storage( - final_description, - entity_type, - file_paths, - limited_chunk_ids, - ) + await _update_entity_storage(final_description, entity_type, file_paths) return # Process cached entity data descriptions = [] entity_types = [] - file_paths_list = [] - seen_paths = set() + file_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1232,33 +1122,7 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_path = entity_data["file_path"] - if file_path and file_path not in seen_paths: - file_paths_list.append(file_path) - seen_paths.add(file_path) - - # Apply MAX_FILE_PATHS limit - max_file_paths = global_config.get("max_file_paths") - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - limit_method = global_config.get("source_ids_limit_method") - - original_count = len(file_paths_list) - if original_count > max_file_paths: - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] - - file_paths_list.append( - f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" - ) - logger.info( - f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" - ) + file_paths.add(entity_data["file_path"]) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1284,47 +1148,18 @@ async def _rebuild_single_entity( else: final_description = current_entity.get("description", "") - if len(limited_chunk_ids) < len(normalized_chunk_ids): - truncation_info = ( - f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" - ) - else: - truncation_info = "" - - await _update_entity_storage( - final_description, - entity_type, - file_paths_list, - limited_chunk_ids, - truncation_info, - ) - - # Log rebuild completion with truncation info - status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" - if truncation_info: - status_message += f" ({truncation_info})" - logger.info(status_message) - # Update pipeline status - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) + await _update_entity_storage(final_description, entity_type, file_paths) async def _rebuild_single_relationship( knowledge_graph_inst: BaseGraphStorage, relationships_vdb: BaseVectorStorage, - entities_vdb: BaseVectorStorage, src: str, tgt: str, - chunk_ids: list[str], + chunk_ids: set[str], chunk_relationships: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], - relation_chunks_storage: BaseKVStorage | None = None, - entity_chunks_storage: BaseKVStorage | None = None, - pipeline_status: dict | None = None, - pipeline_status_lock=None, ) -> None: """Rebuild a single relationship from cached extraction results @@ -1337,33 +1172,9 @@ async def _rebuild_single_relationship( if not current_relationship: return - # normalized_chunk_ids = merge_source_ids([], chunk_ids) - normalized_chunk_ids = chunk_ids - - if relation_chunks_storage is not None and normalized_chunk_ids: - storage_key = make_relation_chunk_key(src, tgt) - await relation_chunks_storage.upsert( - { - storage_key: { - "chunk_ids": normalized_chunk_ids, - "count": len(normalized_chunk_ids), - } - } - ) - - limit_method = ( - global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP - ) - limited_chunk_ids = apply_source_ids_limit( - normalized_chunk_ids, - global_config["max_source_ids_per_relation"], - limit_method, - identifier=f"`{src}`~`{tgt}`", - ) - # Collect all relationship data from relevant chunks all_relationship_data = [] - for chunk_id in limited_chunk_ids: + for chunk_id in chunk_ids: if chunk_id in chunk_relationships: # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional for edge_key in [(src, tgt), (tgt, src)]: @@ -1380,8 +1191,7 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths_list = [] - seen_paths = set() + file_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1391,33 +1201,7 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_path = rel_data["file_path"] - if file_path and file_path not in seen_paths: - file_paths_list.append(file_path) - seen_paths.add(file_path) - - # Apply count limit - max_file_paths = global_config.get("max_file_paths") - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - limit_method = global_config.get("source_ids_limit_method") - - original_count = len(file_paths_list) - if original_count > max_file_paths: - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] - - file_paths_list.append( - f"...{file_path_placeholder}...({limit_method} {max_file_paths}/{original_count})" - ) - logger.info( - f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" - ) + file_paths.add(rel_data["file_path"]) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1445,13 +1229,6 @@ async def _rebuild_single_relationship( # fallback to keep current(unchanged) final_description = current_relationship.get("description", "") - if len(limited_chunk_ids) < len(normalized_chunk_ids): - truncation_info = ( - f"{limit_method} {len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" - ) - else: - truncation_info = "" - # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1460,75 +1237,14 @@ async def _rebuild_single_relationship( else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, - "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), - "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths_list if fp]) - if file_paths_list + "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) + if file_paths else current_relationship.get("file_path", "unknown_source"), - "truncate": truncation_info, } - - # Ensure both endpoint nodes exist before writing the edge back - # (certain storage backends require pre-existing nodes). - node_description = ( - updated_relationship_data["description"] - if updated_relationship_data.get("description") - else current_relationship.get("description", "") - ) - node_source_id = updated_relationship_data.get("source_id", "") - node_file_path = updated_relationship_data.get("file_path", "unknown_source") - - for node_id in {src, tgt}: - if not (await knowledge_graph_inst.has_node(node_id)): - node_created_at = int(time.time()) - node_data = { - "entity_id": node_id, - "source_id": node_source_id, - "description": node_description, - "entity_type": "UNKNOWN", - "file_path": node_file_path, - "created_at": node_created_at, - "truncate": "", - } - await knowledge_graph_inst.upsert_node(node_id, node_data=node_data) - - # Update entity_chunks_storage for the newly created entity - if entity_chunks_storage is not None and limited_chunk_ids: - await entity_chunks_storage.upsert( - { - node_id: { - "chunk_ids": limited_chunk_ids, - "count": len(limited_chunk_ids), - } - } - ) - - # Update entity_vdb for the newly created entity - if entities_vdb is not None: - entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-") - entity_content = f"{node_id}\n{node_description}" - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": node_id, - "source_id": node_source_id, - "entity_type": "UNKNOWN", - "file_path": node_file_path, - } - } - await safe_vdb_operation_with_exception( - operation=lambda payload=vdb_data: entities_vdb.upsert(payload), - operation_name="rebuild_added_entity_upsert", - entity_name=node_id, - max_retries=3, - retry_delay=0.1, - ) - await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) # Update relationship in vector database - # Sort src and tgt to ensure consistent ordering (smaller string first) - if src > tgt: - src, tgt = tgt, src try: rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-") rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-") @@ -1570,36 +1286,15 @@ async def _rebuild_single_relationship( logger.error(error_msg) raise # Re-raise exception - # Log rebuild completion with truncation info - status_message = f"Rebuild `{src}`~`{tgt}` from {len(chunk_ids)} chunks" - if truncation_info: - status_message += f" ({truncation_info})" - # Add truncation info from apply_source_ids_limit if truncation occurred - if len(limited_chunk_ids) < len(normalized_chunk_ids): - truncation_info = ( - f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})" - ) - status_message += truncation_info - - logger.info(status_message) - - # Update pipeline status - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - async def _merge_nodes_then_upsert( entity_name: str, nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, - entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, - entity_chunks_storage: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -1607,7 +1302,6 @@ async def _merge_nodes_then_upsert( already_description = [] already_file_paths = [] - # 1. Get existing node data from knowledge graph already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) @@ -1615,102 +1309,22 @@ async def _merge_nodes_then_upsert( already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) - new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] - - existing_full_source_ids = [] - if entity_chunks_storage is not None: - stored_chunks = await entity_chunks_storage.get_by_id(entity_name) - if stored_chunks and isinstance(stored_chunks, dict): - existing_full_source_ids = [ - chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id - ] - - if not existing_full_source_ids: - existing_full_source_ids = [ - chunk_id for chunk_id in already_source_ids if chunk_id - ] - - # 2. Merging new source ids with existing ones - full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) - - if entity_chunks_storage is not None and full_source_ids: - await entity_chunks_storage.upsert( - { - entity_name: { - "chunk_ids": full_source_ids, - "count": len(full_source_ids), - } - } - ) - - # 3. Finalize source_id by applying source ids limit - limit_method = global_config.get("source_ids_limit_method") - max_source_limit = global_config.get("max_source_ids_per_entity") - source_ids = apply_source_ids_limit( - full_source_ids, - max_source_limit, - limit_method, - identifier=f"`{entity_name}`", - ) - - # 4. Only keep nodes not filter by apply_source_ids_limit if limit_method is KEEP - if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: - allowed_source_ids = set(source_ids) - filtered_nodes = [] - for dp in nodes_data: - source_id = dp.get("source_id") - # Skip descriptions sourced from chunks dropped by the limitation cap - if ( - source_id - and source_id not in allowed_source_ids - and source_id not in existing_full_source_ids - ): - continue - filtered_nodes.append(dp) - nodes_data = filtered_nodes - else: # In FIFO mode, keep all nodes - truncation happens at source_ids level only - nodes_data = list(nodes_data) - - # 5. Check if we need to skip summary due to source_ids limit - if ( - limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP - and len(existing_full_source_ids) >= max_source_limit - and not nodes_data - ): - if already_node: - logger.info( - f"Skipped `{entity_name}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" - ) - existing_node_data = dict(already_node) - return existing_node_data - else: - logger.error(f"Internal Error: already_node missing for `{entity_name}`") - raise ValueError( - f"Internal Error: already_node missing for `{entity_name}`" - ) - - # 6.1 Finalize source_id - source_id = GRAPH_FIELD_SEP.join(source_ids) - - # 6.2 Finalize entity type by highest count entity_type = sorted( Counter( [dp["entity_type"] for dp in nodes_data] + already_entity_types ).items(), key=lambda x: x[1], reverse=True, - )[0][0] + )[0][0] # Get the entity type with the highest count - # 7. Deduplicate nodes by description, keeping first occurrence in the same document + # Deduplicate by description, keeping first occurrence unique_nodes = {} for dp in nodes_data: - desc = dp.get("description") - if not desc: - continue + desc = dp["description"] if desc not in unique_nodes: unique_nodes[desc] = dp - # Sort description by timestamp, then by description length when timestamps are the same + # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same sorted_nodes = sorted( unique_nodes.values(), key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), @@ -1719,119 +1333,49 @@ async def _merge_nodes_then_upsert( # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions - if not description_list: - logger.error(f"Entity {entity_name} has no description") - raise ValueError(f"Entity {entity_name} has no description") - # Check for cancellation before LLM summary - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled during entity summary") - - # 8. Get summary description an LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Entity", - entity_name, - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # 9. Build file_path within MAX_FILE_PATHS - file_paths_list = [] - seen_paths = set() - has_placeholder = False # Indicating file_path has been truncated before - - max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - - # Collect from already_file_paths, excluding placeholder - for fp in already_file_paths: - if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders - has_placeholder = True - continue - if fp and fp not in seen_paths: - file_paths_list.append(fp) - seen_paths.add(fp) - - # Collect from new data - for dp in nodes_data: - file_path_item = dp.get("file_path") - if file_path_item and file_path_item not in seen_paths: - file_paths_list.append(file_path_item) - seen_paths.add(file_path_item) - - # Apply count limit - if len(file_paths_list) > max_file_paths: - limit_method = global_config.get( - "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP - ) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - # Add + sign to indicate actual file count is higher - original_count_str = ( - f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) - ) - - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] - file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") - - logger.info( - f"Limited `{entity_name}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" - ) - # Finalize file_path - file_path = GRAPH_FIELD_SEP.join(file_paths_list) - - # 10.Log based on actual LLM usage num_fragment = len(description_list) already_fragment = len(already_description) - if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" - - truncation_info = truncation_info_log = "" - if len(source_ids) < len(full_source_ids): - # Add truncation info from apply_source_ids_limit if truncation occurred - truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - truncation_info = truncation_info_log - else: - truncation_info = "KEEP Old" - deduplicated_num = already_fragment + len(nodes_data) - num_fragment - dd_message = "" if deduplicated_num > 0: - # Duplicated description detected across multiple trucks for the same entity - dd_message = f"dd {deduplicated_num}" - - if dd_message or truncation_info_log: - status_message += ( - f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" + if num_fragment > 0: + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, ) - # Add message to pipeline satus when merge happens - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + else: + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + + if already_fragment > 0 or llm_was_used: + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.debug(status_message) + + else: + logger.error(f"Entity {entity_name} has no description") + description = "(no description)" + + source_id = GRAPH_FIELD_SEP.join( + set([dp["source_id"] for dp in nodes_data] + already_source_ids) + ) + file_path = build_file_path(already_file_paths, nodes_data, entity_name) - # 11. Update both graph and vector db node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1839,32 +1383,12 @@ async def _merge_nodes_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), - truncate=truncation_info, ) await knowledge_graph_inst.upsert_node( entity_name, node_data=node_data, ) node_data["entity_name"] = entity_name - if entity_vdb is not None: - entity_vdb_id = compute_mdhash_id(str(entity_name), prefix="ent-") - entity_content = f"{entity_name}\n{description}" - data_for_vdb = { - entity_vdb_id: { - "entity_name": entity_name, - "entity_type": entity_type, - "content": entity_content, - "source_id": source_id, - "file_path": file_path, - } - } - await safe_vdb_operation_with_exception( - operation=lambda payload=data_for_vdb: entity_vdb.upsert(payload), - operation_name="entity_upsert", - entity_name=entity_name, - max_retries=3, - retry_delay=0.1, - ) return node_data @@ -1873,27 +1397,21 @@ async def _merge_edges_then_upsert( tgt_id: str, edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, - relationships_vdb: BaseVectorStorage | None, - entity_vdb: BaseVectorStorage | None, global_config: dict, pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing - relation_chunks_storage: BaseKVStorage | None = None, - entity_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None - already_edge = None already_weights = [] already_source_ids = [] already_description = [] already_keywords = [] already_file_paths = [] - # 1. Get existing edge data from graph storage if await knowledge_graph_inst.has_edge(src_id, tgt_id): already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields @@ -1927,93 +1445,65 @@ async def _merge_edges_then_upsert( ) ) - new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] - - storage_key = make_relation_chunk_key(src_id, tgt_id) - existing_full_source_ids = [] - if relation_chunks_storage is not None: - stored_chunks = await relation_chunks_storage.get_by_id(storage_key) - if stored_chunks and isinstance(stored_chunks, dict): - existing_full_source_ids = [ - chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id - ] - - if not existing_full_source_ids: - existing_full_source_ids = [ - chunk_id for chunk_id in already_source_ids if chunk_id - ] - - # 2. Merge new source ids with existing ones - full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) - - if relation_chunks_storage is not None and full_source_ids: - await relation_chunks_storage.upsert( - { - storage_key: { - "chunk_ids": full_source_ids, - "count": len(full_source_ids), - } - } - ) - - # 3. Finalize source_id by applying source ids limit - limit_method = global_config.get("source_ids_limit_method") - max_source_limit = global_config.get("max_source_ids_per_relation") - source_ids = apply_source_ids_limit( - full_source_ids, - max_source_limit, - limit_method, - identifier=f"`{src_id}`~`{tgt_id}`", - ) - limit_method = ( - global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP - ) - - # 4. Only keep edges with source_id in the final source_ids list if in KEEP mode - if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: - allowed_source_ids = set(source_ids) - filtered_edges = [] - for dp in edges_data: - source_id = dp.get("source_id") - # Skip relationship fragments sourced from chunks dropped by keep oldest cap - if ( - source_id - and source_id not in allowed_source_ids - and source_id not in existing_full_source_ids - ): - continue - filtered_edges.append(dp) - edges_data = filtered_edges - else: # In FIFO mode, keep all edges - truncation happens at source_ids level only - edges_data = list(edges_data) - - # 5. Check if we need to skip summary due to source_ids limit - if ( - limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP - and len(existing_full_source_ids) >= max_source_limit - and not edges_data - ): - if already_edge: - logger.info( - f"Skipped `{src_id}`~`{tgt_id}`: KEEP old chunks {already_source_ids}/{len(full_source_ids)}" - ) - existing_edge_data = dict(already_edge) - return existing_edge_data - else: - logger.error( - f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" - ) - raise ValueError( - f"Internal Error: already_node missing for `{src_id}`~`{tgt_id}`" - ) - - # 6.1 Finalize source_id - source_id = GRAPH_FIELD_SEP.join(source_ids) - - # 6.2 Finalize weight by summing new edges and existing weights + # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - # 6.2 Finalize keywords by merging existing and new keywords + # Deduplicate by description, keeping first occurrence + unique_edges = {} + for dp in edges_data: + if dp.get("description"): + desc = dp["description"] + if desc not in unique_edges: + unique_edges[desc] = dp + + # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same + sorted_edges = sorted( + unique_edges.values(), + key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), + ) + sorted_descriptions = [dp["description"] for dp in sorted_edges] + + # Combine already_description with sorted new descriptions + description_list = already_description + sorted_descriptions + + num_fragment = len(description_list) + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(edges_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" + if num_fragment > 0: + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Relation", + f"({src_id}, {tgt_id})", + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + else: + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + + if already_fragment > 0 or llm_was_used: + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.debug(status_message) + + else: + logger.error(f"Edge {src_id} - {tgt_id} has no description") + description = "(no description)" + + # Split all existing and new keywords into individual terms, then combine and deduplicate all_keywords = set() # Process already_keywords (which are comma-separated) for keyword_str in already_keywords: @@ -2028,194 +1518,26 @@ async def _merge_edges_then_upsert( # Join all unique keywords with commas keywords = ",".join(sorted(all_keywords)) - # 7. Deduplicate by description, keeping first occurrence in the same document - unique_edges = {} - for dp in edges_data: - description_value = dp.get("description") - if not description_value: - continue - if description_value not in unique_edges: - unique_edges[description_value] = dp - - # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same - sorted_edges = sorted( - unique_edges.values(), - key=lambda x: (x.get("timestamp", 0), -len(x.get("description", ""))), + source_id = GRAPH_FIELD_SEP.join( + set( + [dp["source_id"] for dp in edges_data if dp.get("source_id")] + + already_source_ids + ) ) - sorted_descriptions = [dp["description"] for dp in sorted_edges] + file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") - # Combine already_description with sorted new descriptions - description_list = already_description + sorted_descriptions - if not description_list: - logger.error(f"Relation {src_id}~{tgt_id} has no description") - raise ValueError(f"Relation {src_id}~{tgt_id} has no description") - - # Check for cancellation before LLM summary - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException( - "User cancelled during relation summary" - ) - - # 8. Get summary description an LLM usage status - description, llm_was_used = await _handle_entity_relation_summary( - "Relation", - f"({src_id}, {tgt_id})", - description_list, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) - - # 9. Build file_path within MAX_FILE_PATHS limit - file_paths_list = [] - seen_paths = set() - has_placeholder = False # Track if already_file_paths contains placeholder - - max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - - # Collect from already_file_paths, excluding placeholder - for fp in already_file_paths: - # Check if this is a placeholder record - if fp and fp.startswith(f"...{file_path_placeholder}"): # Skip placeholders - has_placeholder = True - continue - if fp and fp not in seen_paths: - file_paths_list.append(fp) - seen_paths.add(fp) - - # Collect from new data - for dp in edges_data: - file_path_item = dp.get("file_path") - if file_path_item and file_path_item not in seen_paths: - file_paths_list.append(file_path_item) - seen_paths.add(file_path_item) - - # Apply count limit - max_file_paths = global_config.get("max_file_paths") - - if len(file_paths_list) > max_file_paths: - limit_method = global_config.get( - "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP - ) - file_path_placeholder = global_config.get( - "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER - ) - - # Add + sign to indicate actual file count is higher - original_count_str = ( - f"{len(file_paths_list)}+" if has_placeholder else str(len(file_paths_list)) - ) - - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - # FIFO: keep tail (newest), discard head - file_paths_list = file_paths_list[-max_file_paths:] - file_paths_list.append(f"...{file_path_placeholder}...(FIFO)") - else: - # KEEP: keep head (earliest), discard tail - file_paths_list = file_paths_list[:max_file_paths] - file_paths_list.append(f"...{file_path_placeholder}...(KEEP Old)") - - logger.info( - f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count_str} -> {max_file_paths} ({limit_method})" - ) - # Finalize file_path - file_path = GRAPH_FIELD_SEP.join(file_paths_list) - - # 10. Log based on actual LLM usage - num_fragment = len(description_list) - already_fragment = len(already_description) - if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" - - truncation_info = truncation_info_log = "" - if len(source_ids) < len(full_source_ids): - # Add truncation info from apply_source_ids_limit if truncation occurred - truncation_info_log = f"{limit_method} {len(source_ids)}/{len(full_source_ids)}" - if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: - truncation_info = truncation_info_log - else: - truncation_info = "KEEP Old" - - deduplicated_num = already_fragment + len(edges_data) - num_fragment - dd_message = "" - if deduplicated_num > 0: - # Duplicated description detected across multiple trucks for the same entity - dd_message = f"dd {deduplicated_num}" - - if dd_message or truncation_info_log: - status_message += ( - f" ({', '.join(filter(None, [truncation_info_log, dd_message]))})" - ) - - # Add message to pipeline satus when merge happens - if already_fragment > 0 or llm_was_used: - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - else: - logger.debug(status_message) - - # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: - # Optimization: Use get_node instead of has_node + get_node - existing_node = await knowledge_graph_inst.get_node(need_insert_id) - - if existing_node is None: - # Node doesn't exist - create new node - node_created_at = int(time.time()) + if not (await knowledge_graph_inst.has_node(need_insert_id)): node_data = { "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN", "file_path": file_path, - "created_at": node_created_at, - "truncate": "", + "created_at": int(time.time()), } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) - # Update entity_chunks_storage for the newly created entity - if entity_chunks_storage is not None: - chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id] - if chunk_ids: - await entity_chunks_storage.upsert( - { - need_insert_id: { - "chunk_ids": chunk_ids, - "count": len(chunk_ids), - } - } - ) - - if entity_vdb is not None: - entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") - entity_content = f"{need_insert_id}\n{description}" - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": need_insert_id, - "source_id": source_id, - "entity_type": "UNKNOWN", - "file_path": file_path, - } - } - await safe_vdb_operation_with_exception( - operation=lambda payload=vdb_data: entity_vdb.upsert(payload), - operation_name="added_entity_upsert", - entity_name=need_insert_id, - max_retries=3, - retry_delay=0.1, - ) - # Track entities added during edge processing if added_entities is not None: entity_data = { @@ -2224,114 +1546,10 @@ async def _merge_edges_then_upsert( "description": description, "source_id": source_id, "file_path": file_path, - "created_at": node_created_at, + "created_at": int(time.time()), } added_entities.append(entity_data) - else: - # Node exists - update its source_ids by merging with new source_ids - updated = False # Track if any update occurred - # 1. Get existing full source_ids from entity_chunks_storage - existing_full_source_ids = [] - if entity_chunks_storage is not None: - stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id) - if stored_chunks and isinstance(stored_chunks, dict): - existing_full_source_ids = [ - chunk_id - for chunk_id in stored_chunks.get("chunk_ids", []) - if chunk_id - ] - - # If not in entity_chunks_storage, get from graph database - if not existing_full_source_ids: - if existing_node.get("source_id"): - existing_full_source_ids = existing_node["source_id"].split( - GRAPH_FIELD_SEP - ) - - # 2. Merge with new source_ids from this relationship - new_source_ids_from_relation = [ - chunk_id for chunk_id in source_ids if chunk_id - ] - merged_full_source_ids = merge_source_ids( - existing_full_source_ids, new_source_ids_from_relation - ) - - # 3. Save merged full list to entity_chunks_storage (conditional) - if ( - entity_chunks_storage is not None - and merged_full_source_ids != existing_full_source_ids - ): - updated = True - await entity_chunks_storage.upsert( - { - need_insert_id: { - "chunk_ids": merged_full_source_ids, - "count": len(merged_full_source_ids), - } - } - ) - - # 4. Apply source_ids limit for graph and vector db - limit_method = global_config.get( - "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP - ) - max_source_limit = global_config.get("max_source_ids_per_entity") - limited_source_ids = apply_source_ids_limit( - merged_full_source_ids, - max_source_limit, - limit_method, - identifier=f"`{need_insert_id}`", - ) - - # 5. Update graph database and vector database with limited source_ids (conditional) - limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids) - - if limited_source_id_str != existing_node.get("source_id", ""): - updated = True - updated_node_data = { - **existing_node, - "source_id": limited_source_id_str, - } - await knowledge_graph_inst.upsert_node( - need_insert_id, node_data=updated_node_data - ) - - # Update vector database - if entity_vdb is not None: - entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") - entity_content = ( - f"{need_insert_id}\n{existing_node.get('description', '')}" - ) - vdb_data = { - entity_vdb_id: { - "content": entity_content, - "entity_name": need_insert_id, - "source_id": limited_source_id_str, - "entity_type": existing_node.get("entity_type", "UNKNOWN"), - "file_path": existing_node.get( - "file_path", "unknown_source" - ), - } - } - await safe_vdb_operation_with_exception( - operation=lambda payload=vdb_data: entity_vdb.upsert(payload), - operation_name="existing_entity_update", - entity_name=need_insert_id, - max_retries=3, - retry_delay=0.1, - ) - - # 6. Log once at the end if any update occurred - if updated: - status_message = f"Chunks appended from relation: `{need_insert_id}`" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - - edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -2341,8 +1559,7 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=edge_created_at, - truncate=truncation_info, + created_at=int(time.time()), ), ) @@ -2353,45 +1570,9 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, - created_at=edge_created_at, - truncate=truncation_info, - weight=weight, + created_at=int(time.time()), ) - # Sort src_id and tgt_id to ensure consistent ordering (smaller string first) - if src_id > tgt_id: - src_id, tgt_id = tgt_id, src_id - - if relationships_vdb is not None: - rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-") - rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-") - try: - await relationships_vdb.delete([rel_vdb_id, rel_vdb_id_reverse]) - except Exception as e: - logger.debug( - f"Could not delete old relationship vector records {rel_vdb_id}, {rel_vdb_id_reverse}: {e}" - ) - rel_content = f"{keywords}\t{src_id}\n{tgt_id}\n{description}" - vdb_data = { - rel_vdb_id: { - "src_id": src_id, - "tgt_id": tgt_id, - "source_id": source_id, - "content": rel_content, - "keywords": keywords, - "description": description, - "weight": weight, - "file_path": file_path, - } - } - await safe_vdb_operation_with_exception( - operation=lambda payload=vdb_data: relationships_vdb.upsert(payload), - operation_name="relationship_upsert", - entity_name=f"{src_id}-{tgt_id}", - max_retries=3, - retry_delay=0.2, - ) - return edge_data @@ -2407,8 +1588,6 @@ async def merge_nodes_and_edges( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, - entity_chunks_storage: BaseKVStorage | None = None, - relation_chunks_storage: BaseKVStorage | None = None, current_file_number: int = 0, total_files: int = 0, file_path: str = "unknown_source", @@ -2432,19 +1611,11 @@ async def merge_nodes_and_edges( pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache - entity_chunks_storage: Storage tracking full chunk lists per entity - relation_chunks_storage: Storage tracking full chunk lists per relation current_file_number: Current file number for logging total_files: Total files for logging file_path: File path for logging """ - # Check for cancellation at the start of merge - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException("User cancelled during merge phase") - # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -2481,37 +1652,55 @@ async def merge_nodes_and_edges( async def _locked_process_entity_name(entity_name, entities): async with semaphore: - # Check for cancellation before processing entity - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException( - "User cancelled during entity merge" - ) - workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( [entity_name], namespace=namespace, enable_logging=False ): try: - logger.debug(f"Processing entity {entity_name}") + # Graph database operation (critical path, must succeed) entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, - entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, - entity_chunks_storage, ) + # Vector database operation (equally critical, must succeed) + if entity_vdb is not None and entity_data: + data_for_vdb = { + compute_mdhash_id( + entity_data["entity_name"], prefix="ent-" + ): { + "entity_name": entity_data["entity_name"], + "entity_type": entity_data["entity_type"], + "content": f"{entity_data['entity_name']}\n{entity_data['description']}", + "source_id": entity_data["source_id"], + "file_path": entity_data.get( + "file_path", "unknown_source" + ), + } + } + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: entity_vdb.upsert(data_for_vdb), + operation_name="entity_upsert", + entity_name=entity_name, + max_retries=3, + retry_delay=0.1, + ) + return entity_data except Exception as e: - error_msg = f"Error processing entity `{entity_name}`: {e}" + # Any database operation failure is critical + error_msg = ( + f"Critical error in entity processing for `{entity_name}`: {e}" + ) logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2547,32 +1736,36 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) + # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - processed_entities = [] + successful_results = [] for task in done: try: - result = task.result() - except BaseException as e: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + successful_results.append(task.result()) + except Exception as e: if first_exception is None: first_exception = e - else: - processed_entities.append(result) - - if pending: - for task in pending: - task.cancel() - pending_results = await asyncio.gather(*pending, return_exceptions=True) - for result in pending_results: - if isinstance(result, BaseException): - if first_exception is None: - first_exception = result - else: - processed_entities.append(result) + # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the first exception to notify the caller raise first_exception + # If all tasks completed successfully, collect results + processed_entities = [task.result() for task in entity_tasks] + # ===== Phase 2: Process all relationships concurrently ===== log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})" logger.info(log_message) @@ -2582,14 +1775,6 @@ async def merge_nodes_and_edges( async def _locked_process_edges(edge_key, edges): async with semaphore: - # Check for cancellation before processing edges - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException( - "User cancelled during relation merge" - ) - workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" sorted_edge_key = sorted([edge_key[0], edge_key[1]]) @@ -2602,30 +1787,83 @@ async def merge_nodes_and_edges( try: added_entities = [] # Track entities added during edge processing - logger.debug(f"Processing relation {sorted_edge_key}") + # Graph database operation (critical path, must succeed) edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, - relationships_vdb, - entity_vdb, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities - relation_chunks_storage, - entity_chunks_storage, # Add entity_chunks_storage parameter ) if edge_data is None: return None, [] + # Vector database operation (equally critical, must succeed) + if relationships_vdb is not None: + data_for_vdb = { + compute_mdhash_id( + edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" + ): { + "src_id": edge_data["src_id"], + "tgt_id": edge_data["tgt_id"], + "keywords": edge_data["keywords"], + "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", + "source_id": edge_data["source_id"], + "file_path": edge_data.get( + "file_path", "unknown_source" + ), + "weight": edge_data.get("weight", 1.0), + } + } + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda: relationships_vdb.upsert(data_for_vdb), + operation_name="relationship_upsert", + entity_name=f"{edge_data['src_id']}-{edge_data['tgt_id']}", + max_retries=3, + retry_delay=0.1, + ) + + # Update added_entities to entity vector database using safe operation wrapper + if added_entities and entity_vdb is not None: + for entity_data in added_entities: + entity_vdb_id = compute_mdhash_id( + entity_data["entity_name"], prefix="ent-" + ) + entity_content = f"{entity_data['entity_name']}\n{entity_data['description']}" + + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": entity_data["entity_name"], + "source_id": entity_data["source_id"], + "entity_type": entity_data["entity_type"], + "file_path": entity_data.get( + "file_path", "unknown_source" + ), + } + } + + # Use safe operation wrapper - VDB failure must throw exception + await safe_vdb_operation_with_exception( + operation=lambda data=vdb_data: entity_vdb.upsert(data), + operation_name="added_entity_upsert", + entity_name=entity_data["entity_name"], + max_retries=3, + retry_delay=0.1, + ) + return edge_data, added_entities except Exception as e: - error_msg = f"Error processing relation `{sorted_edge_key}`: {e}" + # Any database operation failure is critical + error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2663,36 +1901,40 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) + # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None + successful_results = [] for task in done: try: - edge_data, added_entities = task.result() - except BaseException as e: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + successful_results.append(task.result()) + except Exception as e: if first_exception is None: first_exception = e - else: - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) - - if pending: - for task in pending: - task.cancel() - pending_results = await asyncio.gather(*pending, return_exceptions=True) - for result in pending_results: - if isinstance(result, BaseException): - if first_exception is None: - first_exception = result - else: - edge_data, added_entities = result - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) + # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the first exception to notify the caller raise first_exception + # If all tasks completed successfully, collect results + for task in edge_tasks: + edge_data, added_entities = task.result() + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) + # ===== Phase 3: Update full_entities and full_relations storage ===== if full_entities_storage and full_relations_storage and doc_id: try: @@ -2773,14 +2015,6 @@ async def extract_entities( llm_response_cache: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None, ) -> list: - # Check for cancellation at the start of entity extraction - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException( - "User cancelled during entity extraction" - ) - use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] @@ -2948,14 +2182,6 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: - # Check for cancellation before processing chunk - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - if pipeline_status.get("cancellation_requested", False): - raise PipelineCancelledException( - "User cancelled during chunk processing" - ) - try: return await _process_single_content(chunk) except Exception as e: @@ -2999,7 +2225,7 @@ async def extract_entities( await asyncio.wait(pending) # Add progress prefix to the exception message - progress_prefix = f"C[{processed_chunks + 1}/{total_chunks}]" + progress_prefix = f"C[{processed_chunks+1}/{total_chunks}]" # Re-raise the original exception with a prefix prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) @@ -3021,7 +2247,7 @@ async def kg_query( hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, chunks_vdb: BaseVectorStorage = None, -) -> QueryResult | None: +) -> QueryResult: """ Execute knowledge graph query and return unified QueryResult object. @@ -3038,7 +2264,7 @@ async def kg_query( chunks_vdb: Document chunks vector database Returns: - QueryResult | None: Unified query result object containing: + QueryResult: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) @@ -3049,8 +2275,6 @@ async def kg_query( - only_need_prompt=True: content contains complete prompt - stream=True: response_iterator contains streaming response, raw_data contains complete data - default: content contains LLM response text, raw_data contains complete data - - Returns None when no relevant context could be constructed for the query. """ if not query: return QueryResult(content=PROMPTS["fail_response"]) @@ -3098,8 +2322,7 @@ async def kg_query( ) if context_result is None: - logger.info("[kg_query] No query context could be built; returning no-result.") - return None + return QueryResult(content=PROMPTS["fail_response"]) # Return different content based on query parameters if query_param.only_need_context and not query_param.only_need_prompt: @@ -3453,10 +2676,10 @@ async def _perform_kg_search( ) query_embedding = None if query and (kg_chunk_pick_method == "VECTOR" or chunks_vdb): - actual_embedding_func = text_chunks_db.embedding_func - if actual_embedding_func: + embedding_func_config = text_chunks_db.embedding_func + if embedding_func_config and embedding_func_config.func: try: - query_embedding = await actual_embedding_func([query]) + query_embedding = await embedding_func_config.func([query]) query_embedding = query_embedding[ 0 ] # Extract first embedding from batch result @@ -3860,7 +3083,7 @@ async def _merge_all_chunks( return merged_chunks -async def _build_context_str( +async def _build_llm_context( entities_context: list[dict], relations_context: list[dict], merged_chunks: list[dict], @@ -3960,32 +3183,23 @@ async def _build_context_str( truncated_chunks ) - # Rebuild chunks_context with truncated chunks + # Rebuild text_units_context with truncated chunks # The actual tokens may be slightly less than available_chunk_tokens due to deduplication logic - chunks_context = [] + text_units_context = [] for i, chunk in enumerate(truncated_chunks): - chunks_context.append( + text_units_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], } ) - text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context - ) - reference_list_str = "\n".join( - f"[{ref['reference_id']}] {ref['file_path']}" - for ref in reference_list - if ref["reference_id"] - ) - logger.info( - f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(chunks_context)} chunks" + f"Final context: {len(entities_context)} entities, {len(relations_context)} relations, {len(text_units_context)} chunks" ) # not necessary to use LLM to generate a response - if not entities_context and not relations_context and not chunks_context: + if not entities_context and not relations_context: # Return empty raw data structure when no entities/relations empty_raw_data = convert_to_user_format( [], @@ -4014,7 +3228,16 @@ async def _build_context_str( chunk_tracking_log.append("?0/0") if chunk_tracking_log: - logger.info(f"Final chunks S+F/O: {' '.join(chunk_tracking_log)}") + logger.info(f"chunks S+F/O: {' '.join(chunk_tracking_log)}") + + text_units_str = "\n".join( + json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context + ) + reference_list_str = "\n".join( + f"[{ref['reference_id']}] {ref['file_path']}" + for ref in reference_list + if ref["reference_id"] + ) result = kg_context_template.format( entities_str=entities_str, @@ -4025,7 +3248,7 @@ async def _build_context_str( # Always return both context and complete data structure (unified approach) logger.debug( - f"[_build_context_str] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" + f"[_build_llm_context] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" ) final_data = convert_to_user_format( entities_context, @@ -4037,7 +3260,7 @@ async def _build_context_str( relation_id_to_original, ) logger.debug( - f"[_build_context_str] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" + f"[_build_llm_context] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" ) return result, final_data @@ -4106,16 +3329,12 @@ async def _build_query_context( query_embedding=search_result["query_embedding"], ) - if ( - not merged_chunks - and not truncation_result["entities_context"] - and not truncation_result["relations_context"] - ): + if not merged_chunks: return None # Stage 4: Build final LLM context with dynamic token processing - # _build_context_str now always returns tuple[str, dict] - context, raw_data = await _build_context_str( + # _build_llm_context now always returns tuple[str, dict] + context, raw_data = await _build_llm_context( entities_context=truncation_result["entities_context"], relations_context=truncation_result["relations_context"], merged_chunks=merged_chunks, @@ -4364,21 +3583,25 @@ async def _find_related_text_unit_from_entities( num_of_chunks = int(max_related_chunks * len(entities_with_chunks) / 2) # Get embedding function from global config - actual_embedding_func = text_chunks_db.embedding_func - if not actual_embedding_func: + embedding_func_config = text_chunks_db.embedding_func + if not embedding_func_config: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=entities_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + actual_embedding_func = embedding_func_config.func + + selected_chunk_ids = None + if actual_embedding_func: + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=entities_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -4653,21 +3876,24 @@ async def _find_related_text_unit_from_relations( num_of_chunks = int(max_related_chunks * len(relations_with_chunks) / 2) # Get embedding function from global config - actual_embedding_func = text_chunks_db.embedding_func - if not actual_embedding_func: + embedding_func_config = text_chunks_db.embedding_func + if not embedding_func_config: logger.warning("No embedding function found, falling back to WEIGHT method") kg_chunk_pick_method = "WEIGHT" else: try: - selected_chunk_ids = await pick_by_vector_similarity( - query=query, - text_chunks_storage=text_chunks_db, - chunks_vdb=chunks_vdb, - num_of_chunks=num_of_chunks, - entity_info=relations_with_chunks, - embedding_func=actual_embedding_func, - query_embedding=query_embedding, - ) + actual_embedding_func = embedding_func_config.func + + if actual_embedding_func: + selected_chunk_ids = await pick_by_vector_similarity( + query=query, + text_chunks_storage=text_chunks_db, + chunks_vdb=chunks_vdb, + num_of_chunks=num_of_chunks, + entity_info=relations_with_chunks, + embedding_func=actual_embedding_func, + query_embedding=query_embedding, + ) if selected_chunk_ids == []: kg_chunk_pick_method = "WEIGHT" @@ -4759,7 +3985,7 @@ async def naive_query( global_config: dict[str, str], hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, -) -> QueryResult | None: +) -> QueryResult: """ Execute naive query and return unified QueryResult object. @@ -4772,13 +3998,11 @@ async def naive_query( system_prompt: System prompt Returns: - QueryResult | None: Unified query result object containing: + QueryResult: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) - is_streaming: Whether this is a streaming result - - Returns None when no relevant chunks are retrieved. """ if not query: @@ -4799,10 +4023,16 @@ async def naive_query( chunks = await _get_vector_context(query, chunks_vdb, query_param, None) if chunks is None or len(chunks) == 0: - logger.info( - "[naive_query] No relevant document chunks found; returning no-result." + # Build empty raw data structure for naive mode + empty_raw_data = convert_to_user_format( + [], # naive mode has no entities + [], # naive mode has no relationships + [], # no chunks + [], # no references + "naive", ) - return None + empty_raw_data["message"] = "No relevant document chunks found." + return QueryResult(content=PROMPTS["fail_response"], raw_data=empty_raw_data) # Calculate dynamic token limit for chunks max_total_tokens = getattr( @@ -4881,10 +4111,10 @@ async def naive_query( "final_chunks_count": len(processed_chunks_with_ref_ids), } - # Build chunks_context from processed chunks with reference IDs - chunks_context = [] + # Build text_units_context from processed chunks with reference IDs + text_units_context = [] for i, chunk in enumerate(processed_chunks_with_ref_ids): - chunks_context.append( + text_units_context.append( { "reference_id": chunk["reference_id"], "content": chunk["content"], @@ -4892,7 +4122,7 @@ async def naive_query( ) text_units_str = "\n".join( - json.dumps(text_unit, ensure_ascii=False) for text_unit in chunks_context + json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context ) reference_list_str = "\n".join( f"[{ref['reference_id']}] {ref['file_path']}"