diff --git a/env.example b/env.example index d5ae9a91..73f2d7b7 100644 --- a/env.example +++ b/env.example @@ -1,22 +1,5 @@ ### This is sample file of .env -############################################################################### -### ⚡️ QUICK START: OpenAI Configuration (Recommended) -############################################################################### -### To get started with OpenAI, you only need: -### 1. Set your OpenAI API key (get from https://platform.openai.com/api-keys) -### export OPENAI_API_KEY="sk-your-actual-api-key" -### 2. Then you can start the server with default OpenAI configuration: -### lightrag-server -### -### The default configuration will use: -### - LLM: gpt-4o-mini (entity/relation extraction, graph merging, query answering) -### - Embedding: text-embedding-3-small (vector embeddings) -### No additional configuration needed! -### -### See LLM and Embedding Configuration sections below to customize models -############################################################################### - ########################### ### Server Configuration ########################### @@ -40,13 +23,13 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System" # WORKING_DIR= ### Tiktoken cache directory (Store cached files in this folder for offline deployment) -# TIKTOKEN_CACHE_DIR=./temp/tiktoken +# TIKTOKEN_CACHE_DIR=/app/data/tiktoken -### Ollama Emulating Model and Tag (used only when EMBEDDING_BINDING=ollama) +### Ollama Emulating Model and Tag # OLLAMA_EMULATING_MODEL_NAME=lightrag OLLAMA_EMULATING_MODEL_TAG=latest -### Max nodes return from grap retrieval in webui +### Max nodes return from graph retrieval in webui # MAX_GRAPH_NODES=1000 ### Logging level @@ -67,44 +50,30 @@ OLLAMA_EMULATING_MODEL_TAG=latest # JWT_ALGORITHM=HS256 ### API-Key to access LightRAG Server API -### Use this key in HTTP requests with the 'X-API-Key' header -### Example: curl -H "X-API-Key: your-secure-api-key-here" http://localhost:9621/query # LIGHTRAG_API_KEY=your-secure-api-key-here # WHITELIST_PATHS=/health,/api/* -############################################################################### -### OpenAI API Key (Required for OpenAI LLM and Embedding) -### Get your key from: https://platform.openai.com/api-keys -### This is the PRIMARY way to configure OpenAI (environment variable takes precedence) -############################################################################### -# OPENAI_API_KEY=sk-your-actual-openai-api-key-here - ###################################################################################### ### Query Configuration ### -### How to control the context lenght sent to LLM: +### How to control the context length sent to LLM: ### MAX_ENTITY_TOKENS + MAX_RELATION_TOKENS < MAX_TOTAL_TOKENS -### Chunk_Tokens = MAX_TOTAL_TOKENS - Actual_Entity_Tokens - Actual_Reation_Tokens +### Chunk_Tokens = MAX_TOTAL_TOKENS - Actual_Entity_Tokens - Actual_Relation_Tokens ###################################################################################### -# LLM responde cache for query (Not valid for streaming response) +# LLM response cache for query (Not valid for streaming response) ENABLE_LLM_CACHE=true # COSINE_THRESHOLD=0.2 ### Number of entities or relations retrieved from KG # TOP_K=40 -### Maxmium number or chunks for naive vector search +### Maximum number or chunks for naive vector search # CHUNK_TOP_K=20 -### control the actual enties send to LLM +### control the actual entities send to LLM # MAX_ENTITY_TOKENS=6000 ### control the actual relations send to LLM # MAX_RELATION_TOKENS=8000 -### control the maximum tokens send to LLM (include entities, raltions and chunks) +### control the maximum tokens send to LLM (include entities, relations and chunks) # MAX_TOTAL_TOKENS=30000 -### maximum number of related chunks per source entity or relation -### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) -### Higher values increase re-ranking time -# RELATED_CHUNK_NUMBER=5 - ### chunk selection strategies ### VECTOR: Pick KG chunks by vector similarity, delivered chunks to the LLM aligning more closely with naive retrieval ### WEIGHT: Pick KG chunks by entity and chunk weight, delivered more solely KG related chunks to the LLM @@ -119,7 +88,7 @@ ENABLE_LLM_CACHE=true RERANK_BINDING=null ### Enable rerank by default in query params when RERANK_BINDING is not null # RERANK_BY_DEFAULT=True -### rerank score chunk filter(set to 0.0 to keep all chunks, 0.6 or above if LLM is not strong enought) +### rerank score chunk filter(set to 0.0 to keep all chunks, 0.6 or above if LLM is not strong enough) # MIN_RERANK_SCORE=0.0 ### For local deployment with vLLM @@ -157,7 +126,7 @@ SUMMARY_LANGUAGE=English # CHUNK_SIZE=1200 # CHUNK_OVERLAP_SIZE=100 -### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented) +### Number of summary segments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommended) # FORCE_LLM_SUMMARY_ON_MERGE=8 ### Max description token size to trigger LLM summary # SUMMARY_MAX_TOKENS = 1200 @@ -166,6 +135,19 @@ SUMMARY_LANGUAGE=English ### Maximum context size sent to LLM for description summary # SUMMARY_CONTEXT_SIZE=12000 +### control the maximum chunk_ids stored in vector and graph db +# MAX_SOURCE_IDS_PER_ENTITY=300 +# MAX_SOURCE_IDS_PER_RELATION=300 +### control chunk_ids limitation method: KEEP, FIFO (KEEP: Keep oldest, FIFO: First in first out) +# SOURCE_IDS_LIMIT_METHOD=KEEP +### Maximum number of file paths stored in entity/relation file_path field +# MAX_FILE_PATHS=30 + +### maximum number of related chunks per source entity or relation +### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) +### Higher values increase re-ranking time +# RELATED_CHUNK_NUMBER=5 + ############################### ### Concurrency Configuration ############################### @@ -185,17 +167,10 @@ MAX_PARALLEL_INSERT=2 ### LLM request timeout setting for all llm (0 means no timeout for Ollma) # LLM_TIMEOUT=180 -# PRIMARY CONFIGURATION: OpenAI (Recommended for production) LLM_BINDING=openai -LLM_MODEL=gpt-4o-mini +LLM_MODEL=gpt-4o LLM_BINDING_HOST=https://api.openai.com/v1 LLM_BINDING_API_KEY=your_api_key -# Note: By default, uses OPENAI_API_KEY environment variable - -### ALTERNATIVE: Using gpt-4o for higher quality (higher cost) -# LLM_BINDING=openai -# LLM_MODEL=gpt-4o -# LLM_BINDING_HOST=https://api.openai.com/v1 ### Optional for Azure # AZURE_OPENAI_API_VERSION=2024-08-01-preview @@ -212,7 +187,7 @@ LLM_BINDING_API_KEY=your_api_key # OPENAI_LLM_TEMPERATURE=0.9 ### Set the max_tokens to mitigate endless output of some LLM (less than LLM_TIMEOUT * llm_output_tokens/second, i.e. 9000 = 180s * 50 tokens/s) ### Typically, max_tokens does not include prompt content, though some models, such as Gemini Models, are exceptions -### For vLLM/SGLang doployed models, or most of OpenAI compatible API provider +### For vLLM/SGLang deployed models, or most of OpenAI compatible API provider # OPENAI_LLM_MAX_TOKENS=9000 ### For OpenAI o1-mini or newer modles OPENAI_LLM_MAX_COMPLETION_TOKENS=9000 @@ -226,7 +201,7 @@ OPENAI_LLM_MAX_COMPLETION_TOKENS=9000 # OPENAI_LLM_REASONING_EFFORT=minimal ### OpenRouter Specific Parameters # OPENAI_LLM_EXTRA_BODY='{"reasoning": {"enabled": false}}' -### Qwen3 Specific Parameters depoly by vLLM +### Qwen3 Specific Parameters deploy by vLLM # OPENAI_LLM_EXTRA_BODY='{"chat_template_kwargs": {"enable_thinking": false}}' ### use the following command to see all support options for Ollama LLM @@ -244,52 +219,44 @@ OLLAMA_LLM_NUM_CTX=32768 #################################################################################### ### Embedding Configuration (Should not be changed after the first file processed) -### EMBEDDING_BINDING: openai, ollama, azure_openai, jina, lollms, aws_bedrock -### PRIMARY CONFIGURATION: OpenAI (Recommended) +### EMBEDDING_BINDING: ollama, openai, azure_openai, jina, lollms, aws_bedrock #################################################################################### # EMBEDDING_TIMEOUT=30 -EMBEDDING_BINDING=openai -EMBEDDING_MODEL=text-embedding-3-small -EMBEDDING_DIM=1536 -EMBEDDING_BINDING_HOST=https://api.openai.com/v1 -# EMBEDDING_BINDING_API_KEY=your_openai_api_key (uses OPENAI_API_KEY env var by default) +EMBEDDING_BINDING=ollama +EMBEDDING_MODEL=bge-m3:latest +EMBEDDING_DIM=1024 +EMBEDDING_BINDING_API_KEY=your_api_key +# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost +EMBEDDING_BINDING_HOST=http://localhost:11434 -### ALTERNATIVE: Text-embedding-3-large (higher quality, higher cost) +### OpenAI compatible (VoyageAI embedding openai compatible) # EMBEDDING_BINDING=openai # EMBEDDING_MODEL=text-embedding-3-large # EMBEDDING_DIM=3072 # EMBEDDING_BINDING_HOST=https://api.openai.com/v1 - -### ALTERNATIVE: Local Ollama embedding (no API key required, requires Ollama service) -# EMBEDDING_BINDING=ollama -# EMBEDDING_MODEL=bge-m3:latest -# EMBEDDING_DIM=1024 -# EMBEDDING_BINDING_HOST=http://localhost:11434 # EMBEDDING_BINDING_API_KEY=your_api_key -# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost -### ALTERNATIVE: Azure OpenAI embedding -# EMBEDDING_BINDING=azure_openai -# AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-small +### Optional for Azure +# AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large # AZURE_EMBEDDING_API_VERSION=2023-05-15 # AZURE_EMBEDDING_ENDPOINT=your_endpoint # AZURE_EMBEDDING_API_KEY=your_api_key -### ALTERNATIVE: Jina AI Embedding +### Jina AI Embedding # EMBEDDING_BINDING=jina # EMBEDDING_BINDING_HOST=https://api.jina.ai/v1/embeddings # EMBEDDING_MODEL=jina-embeddings-v4 # EMBEDDING_DIM=2048 # EMBEDDING_BINDING_API_KEY=your_api_key -### Ollama embedding options (only used when EMBEDDING_BINDING=ollama) +### Optional for Ollama embedding OLLAMA_EMBEDDING_NUM_CTX=8192 ### use the following command to see all support options for Ollama embedding ### lightrag-server --embedding-binding ollama --help #################################################################### -### WORKSPACE setting workspace name for all storage types -### in the purpose of isolating data from LightRAG instances. +### WORKSPACE sets workspace name for all storage types +### for the purpose of isolating data from LightRAG instances. ### Valid workspace name constraints: a-z, A-Z, 0-9, and _ #################################################################### # WORKSPACE=space1 diff --git a/lightrag/constants.py b/lightrag/constants.py index 14584559..ad12cccf 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -57,8 +57,24 @@ DEFAULT_HISTORY_TURNS = 0 DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_RERANK_BINDING = "null" -# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) +# Default source ids limit in meta data for entity and relation +DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3 +DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3 +SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP" # Keep oldest +SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO" # First In First Out (Keep newest) +DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP +VALID_SOURCE_IDS_LIMIT_METHODS = { + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, +} +# Default file_path limit in meta data for entity and relation (Use same limit method as source_ids) +DEFAULT_MAX_FILE_PATHS = 2 + +# Field length of file_path in Milvus Schema for entity and relation (Should not be changed) +# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata. DEFAULT_MAX_FILE_PATH_LENGTH = 32768 +# Placeholder for more file paths in meta data for entity and relation (Should not be changed) +DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated" # Default temperature for LLM DEFAULT_TEMPERATURE = 1.0 diff --git a/lightrag/operate.py b/lightrag/operate.py index 15ba7696..3e889eb7 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -7,7 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict -from .utils import ( +from lightrag.utils import ( logger, compute_mdhash_id, Tokenizer, @@ -26,14 +26,16 @@ from .utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, convert_to_user_format, generate_reference_list_from_chunks, + apply_source_ids_limit, + merge_source_ids, + make_relation_chunk_key, ) -from .base import ( +from lightrag.base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, @@ -42,8 +44,8 @@ from .base import ( QueryResult, QueryContextResult, ) -from .prompt import PROMPTS -from .constants import ( +from lightrag.prompt import PROMPTS +from lightrag.constants import ( GRAPH_FIELD_SEP, DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_RELATION_TOKENS, @@ -52,8 +54,11 @@ from .constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) -from .kg.shared_storage import get_storage_keyed_lock +from lightrag.kg.shared_storage import get_storage_keyed_lock import time from dotenv import load_dotenv @@ -401,7 +406,7 @@ async def _handle_single_relationship_extraction( ): # treat "relationship" and "relation" interchangeable if len(record_attributes) > 1 and "relation" in record_attributes[0]: logger.warning( - f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) >2 else 'N/A'}`" + f"{chunk_key}: LLM output format error; found {len(record_attributes)}/5 fields on REALTION `{record_attributes[1]}`~`{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" ) logger.debug(record_attributes) return None @@ -473,8 +478,8 @@ async def _handle_single_relationship_extraction( async def _rebuild_knowledge_from_chunks( - entities_to_rebuild: dict[str, set[str]], - relationships_to_rebuild: dict[tuple[str, str], set[str]], + entities_to_rebuild: dict[str, list[str]], + relationships_to_rebuild: dict[tuple[str, str], list[str]], knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, @@ -483,6 +488,8 @@ async def _rebuild_knowledge_from_chunks( global_config: dict[str, str], pipeline_status: dict | None = None, pipeline_status_lock=None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, ) -> None: """Rebuild entity and relationship descriptions from cached extraction results with parallel processing @@ -491,8 +498,8 @@ async def _rebuild_knowledge_from_chunks( controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. Args: - entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids - relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids + entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids + relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids knowledge_graph_inst: Knowledge graph storage entities_vdb: Entity vector database relationships_vdb: Relationship vector database @@ -501,6 +508,8 @@ async def _rebuild_knowledge_from_chunks( global_config: Global configuration containing llm_model_max_async pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status + entity_chunks_storage: KV storage maintaining full chunk IDs per entity + relation_chunks_storage: KV storage maintaining full chunk IDs per relation """ if not entities_to_rebuild and not relationships_to_rebuild: return @@ -640,10 +649,11 @@ async def _rebuild_knowledge_from_chunks( chunk_entities=chunk_entities, llm_response_cache=llm_response_cache, global_config=global_config, + entity_chunks_storage=entity_chunks_storage, ) rebuilt_entities_count += 1 status_message = ( - f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" + f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -681,16 +691,11 @@ async def _rebuild_knowledge_from_chunks( chunk_relationships=chunk_relationships, llm_response_cache=llm_response_cache, global_config=global_config, + relation_chunks_storage=relation_chunks_storage, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, ) rebuilt_relationships_count += 1 - status_message = ( - f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" - ) - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" @@ -1001,10 +1006,13 @@ async def _rebuild_single_entity( knowledge_graph_inst: BaseGraphStorage, entities_vdb: BaseVectorStorage, entity_name: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_entities: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + entity_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single entity from cached extraction results""" @@ -1015,7 +1023,11 @@ async def _rebuild_single_entity( # Helper function to update entity in both graph and vector storage async def _update_entity_storage( - final_description: str, entity_type: str, file_paths: set[str] + final_description: str, + entity_type: str, + file_paths: set[str], + source_chunk_ids: list[str], + truncation_info: str = "", ): try: # Update entity in graph storage (critical path) @@ -1023,10 +1035,12 @@ async def _rebuild_single_entity( **current_entity, "description": final_description, "entity_type": entity_type, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids), "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"), + "created_at": int(time.time()), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) @@ -1059,9 +1073,33 @@ async def _rebuild_single_entity( logger.error(error_msg) raise # Re-raise exception - # Collect all entity data from relevant chunks + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if entity_chunks_storage is not None and normalized_chunk_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_entity"], + limit_method, + identifier=f"`{entity_name}`", + ) + + # Collect all entity data from relevant (limited) chunks all_entity_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: all_entity_data.extend(chunk_entities[chunk_id][entity_name]) @@ -1108,13 +1146,19 @@ async def _rebuild_single_entity( final_description = current_entity.get("description", "") entity_type = current_entity.get("entity_type", "UNKNOWN") - await _update_entity_storage(final_description, entity_type, file_paths) + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + ) return # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1122,7 +1166,35 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1148,7 +1220,31 @@ async def _rebuild_single_entity( else: final_description = current_entity.get("description", "") - await _update_entity_storage(final_description, entity_type, file_paths) + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + + await _update_entity_storage( + final_description, + entity_type, + file_paths, + limited_chunk_ids, + truncation_info, + ) + + # Log rebuild completion with truncation info + status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + logger.info(status_message) + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) async def _rebuild_single_relationship( @@ -1156,10 +1252,13 @@ async def _rebuild_single_relationship( relationships_vdb: BaseVectorStorage, src: str, tgt: str, - chunk_ids: set[str], + chunk_ids: list[str], chunk_relationships: dict, llm_response_cache: BaseKVStorage, global_config: dict[str, str], + relation_chunks_storage: BaseKVStorage | None = None, + pipeline_status: dict | None = None, + pipeline_status_lock=None, ) -> None: """Rebuild a single relationship from cached extraction results @@ -1172,9 +1271,33 @@ async def _rebuild_single_relationship( if not current_relationship: return + # normalized_chunk_ids = merge_source_ids([], chunk_ids) + normalized_chunk_ids = chunk_ids + + if relation_chunks_storage is not None and normalized_chunk_ids: + storage_key = make_relation_chunk_key(src, tgt) + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": normalized_chunk_ids, + "count": len(normalized_chunk_ids), + } + } + ) + + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + limited_chunk_ids = apply_source_ids_limit( + normalized_chunk_ids, + global_config["max_source_ids_per_relation"], + limit_method, + identifier=f"`{src}`~`{tgt}`", + ) + # Collect all relationship data from relevant chunks all_relationship_data = [] - for chunk_id in chunk_ids: + for chunk_id in limited_chunk_ids: if chunk_id in chunk_relationships: # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional for edge_key in [(src, tgt), (tgt, src)]: @@ -1191,7 +1314,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1201,7 +1325,35 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1229,6 +1381,13 @@ async def _rebuild_single_relationship( # fallback to keep current(unchanged) final_description = current_relationship.get("description", "") + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}" + ) + else: + truncation_info = "" + # Update relationship in graph storage updated_relationship_data = { **current_relationship, @@ -1237,10 +1396,11 @@ async def _rebuild_single_relationship( else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, - "source_id": GRAPH_FIELD_SEP.join(chunk_ids), + "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids), "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) if file_paths else current_relationship.get("file_path", "unknown_source"), + "truncate": truncation_info, } await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) @@ -1286,6 +1446,25 @@ async def _rebuild_single_relationship( logger.error(error_msg) raise # Re-raise exception + # Log rebuild completion with truncation info + status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks" + if truncation_info: + status_message += f" ({truncation_info})" + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(limited_chunk_ids) < len(normalized_chunk_ids): + truncation_info = ( + f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})" + ) + status_message += truncation_info + + logger.info(status_message) + + # Update pipeline status + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + async def _merge_nodes_then_upsert( entity_name: str, @@ -1295,6 +1474,7 @@ async def _merge_nodes_then_upsert( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -1317,10 +1497,76 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] # Get the entity type with the highest count + original_nodes_count = len(nodes_data) + + new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")] + + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(entity_name) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if entity_chunks_storage is not None and full_source_ids: + await entity_chunks_storage.upsert( + { + entity_name: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{entity_name}`", + ) + + # Only apply filtering in KEEP(ignore new) mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_nodes = [] + for dp in nodes_data: + source_id = dp.get("source_id") + # Skip descriptions sourced from chunks dropped by the limitation cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_nodes.append(dp) + nodes_data = filtered_nodes + else: + # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only + nodes_data = list(nodes_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not nodes_data + and already_description + ) + # Deduplicate by description, keeping first occurrence unique_nodes = {} for dp in nodes_data: - desc = dp["description"] + desc = dp.get("description") + if not desc: + continue if desc not in unique_nodes: unique_nodes[desc] = dp @@ -1331,17 +1577,31 @@ async def _merge_nodes_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_nodes] + truncation_info = "" + dd_message = "" + # Combine already_description with sorted new sorted descriptions description_list = already_description + sorted_descriptions + deduplicated_num = original_nodes_count - len(sorted_descriptions) + if deduplicated_num > 0: + dd_message = f"dd:{deduplicated_num}" num_fragment = len(description_list) already_fragment = len(already_description) - deduplicated_num = already_fragment + len(nodes_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + if skip_summary_due_to_limit: + description = ( + already_node.get("description", "(no description)") + if already_node + else "(no description)" + ) + llm_was_used = False + status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached" + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Entity", @@ -1354,9 +1614,18 @@ async def _merge_nodes_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" + + if dd_message or truncation_info: + status_message += ( + f" ({', '.join(filter(None, [truncation_info, dd_message]))})" + ) if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1371,10 +1640,67 @@ async def _merge_nodes_then_upsert( logger.error(f"Entity {entity_name} has no description") description = "(no description)" - source_id = GRAPH_FIELD_SEP.join( - set([dp["source_id"] for dp in nodes_data] + already_source_ids) - ) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) node_data = dict( entity_id=entity_name, @@ -1383,6 +1709,7 @@ async def _merge_nodes_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) await knowledge_graph_inst.upsert_node( entity_name, @@ -1402,6 +1729,7 @@ async def _merge_edges_then_upsert( pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, added_entities: list = None, # New parameter to track entities added during edge processing + relation_chunks_storage: BaseKVStorage | None = None, ): if src_id == tgt_id: return None @@ -1445,16 +1773,85 @@ async def _merge_edges_then_upsert( ) ) + original_edges_count = len(edges_data) + + new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")] + + storage_key = make_relation_chunk_key(src_id, tgt_id) + existing_full_source_ids = [] + if relation_chunks_storage is not None: + stored_chunks = await relation_chunks_storage.get_by_id(storage_key) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id + ] + + if not existing_full_source_ids: + existing_full_source_ids = [ + chunk_id for chunk_id in already_source_ids if chunk_id + ] + + full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids) + + if relation_chunks_storage is not None and full_source_ids: + await relation_chunks_storage.upsert( + { + storage_key: { + "chunk_ids": full_source_ids, + "count": len(full_source_ids), + } + } + ) + + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") + source_ids = apply_source_ids_limit( + full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{src_id}`~`{tgt_id}`", + ) + limit_method = ( + global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP + ) + + # Only apply filtering in IGNORE_NEW mode + if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: + allowed_source_ids = set(source_ids) + filtered_edges = [] + for dp in edges_data: + source_id = dp.get("source_id") + # Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap + if ( + source_id + and source_id not in allowed_source_ids + and source_id not in existing_full_source_ids + ): + continue + filtered_edges.append(dp) + edges_data = filtered_edges + else: + # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only + edges_data = list(edges_data) + + skip_summary_due_to_limit = ( + limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP + and len(existing_full_source_ids) >= max_source_limit + and not edges_data + and already_description + ) + # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) # Deduplicate by description, keeping first occurrence unique_edges = {} for dp in edges_data: - if dp.get("description"): - desc = dp["description"] - if desc not in unique_edges: - unique_edges[desc] = dp + description_value = dp.get("description") + if not description_value: + continue + if description_value not in unique_edges: + unique_edges[description_value] = dp # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same sorted_edges = sorted( @@ -1463,17 +1860,34 @@ async def _merge_edges_then_upsert( ) sorted_descriptions = [dp["description"] for dp in sorted_edges] + truncation_info = "" + dd_message = "" + # Combine already_description with sorted new descriptions description_list = already_description + sorted_descriptions + deduplicated_num = original_edges_count - len(sorted_descriptions) + if deduplicated_num > 0: + dd_message = f"dd:{deduplicated_num}" num_fragment = len(description_list) already_fragment = len(already_description) - deduplicated_num = already_fragment + len(edges_data) - num_fragment - if deduplicated_num > 0: - dd_message = f"(dd:{deduplicated_num})" - else: - dd_message = "" - if num_fragment > 0: + + if skip_summary_due_to_limit: + description = ( + already_edge.get("description", "(no description)") + if already_edge + else "(no description)" + ) + llm_was_used = False + status_message = ( + f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached" + ) + logger.debug(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + elif num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Relation", @@ -1486,9 +1900,18 @@ async def _merge_edges_then_upsert( # Log based on actual LLM usage if llm_was_used: - status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" else: - status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" + status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}" + + # Add truncation info from apply_source_ids_limit if truncation occurred + if len(source_ids) < len(full_source_ids): + truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" + + if dd_message or truncation_info: + status_message += ( + f" ({', '.join(filter(None, [truncation_info, dd_message]))})" + ) if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1518,13 +1941,67 @@ async def _merge_edges_then_upsert( # Join all unique keywords with commas keywords = ",".join(sorted(all_keywords)) - source_id = GRAPH_FIELD_SEP.join( - set( - [dp["source_id"] for dp in edges_data if dp.get("source_id")] - + already_source_ids + source_id = GRAPH_FIELD_SEP.join(source_ids) + + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER ) - ) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}({limit_method} {max_file_paths}/{original_count})..." + ) + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): @@ -1535,6 +2012,7 @@ async def _merge_edges_then_upsert( "entity_type": "UNKNOWN", "file_path": file_path, "created_at": int(time.time()), + "truncate": "", } await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) @@ -1560,6 +2038,7 @@ async def _merge_edges_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ), ) @@ -1571,6 +2050,7 @@ async def _merge_edges_then_upsert( source_id=source_id, file_path=file_path, created_at=int(time.time()), + truncate=truncation_info, ) return edge_data @@ -1588,6 +2068,8 @@ async def merge_nodes_and_edges( pipeline_status: dict = None, pipeline_status_lock=None, llm_response_cache: BaseKVStorage | None = None, + entity_chunks_storage: BaseKVStorage | None = None, + relation_chunks_storage: BaseKVStorage | None = None, current_file_number: int = 0, total_files: int = 0, file_path: str = "unknown_source", @@ -1611,6 +2093,8 @@ async def merge_nodes_and_edges( pipeline_status: Pipeline status dictionary pipeline_status_lock: Lock for pipeline status llm_response_cache: LLM response cache + entity_chunks_storage: Storage tracking full chunk lists per entity + relation_chunks_storage: Storage tracking full chunk lists per relation current_file_number: Current file number for logging total_files: Total files for logging file_path: File path for logging @@ -1658,6 +2142,7 @@ async def merge_nodes_and_edges( [entity_name], namespace=namespace, enable_logging=False ): try: + logger.debug(f"Inserting {entity_name} in Graph") # Graph database operation (critical path, must succeed) entity_data = await _merge_nodes_then_upsert( entity_name, @@ -1667,13 +2152,14 @@ async def merge_nodes_and_edges( pipeline_status, pipeline_status_lock, llm_response_cache, + entity_chunks_storage, ) # Vector database operation (equally critical, must succeed) if entity_vdb is not None and entity_data: data_for_vdb = { compute_mdhash_id( - entity_data["entity_name"], prefix="ent-" + str(entity_data["entity_name"]), prefix="ent-" ): { "entity_name": entity_data["entity_name"], "entity_type": entity_data["entity_type"], @@ -1685,6 +2171,7 @@ async def merge_nodes_and_edges( } } + logger.debug(f"Inserting {entity_name} in Graph") # Use safe operation wrapper - VDB failure must throw exception await safe_vdb_operation_with_exception( operation=lambda: entity_vdb.upsert(data_for_vdb), @@ -1798,6 +2285,7 @@ async def merge_nodes_and_edges( pipeline_status_lock, llm_response_cache, added_entities, # Pass list to collect added entities + relation_chunks_storage, ) if edge_data is None: @@ -2225,7 +2713,7 @@ async def extract_entities( await asyncio.wait(pending) # Add progress prefix to the exception message - progress_prefix = f"C[{processed_chunks+1}/{total_chunks}]" + progress_prefix = f"C[{processed_chunks + 1}/{total_chunks}]" # Re-raise the original exception with a prefix prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) @@ -2247,7 +2735,7 @@ async def kg_query( hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, chunks_vdb: BaseVectorStorage = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute knowledge graph query and return unified QueryResult object. @@ -2264,7 +2752,7 @@ async def kg_query( chunks_vdb: Document chunks vector database Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) @@ -2275,6 +2763,8 @@ async def kg_query( - only_need_prompt=True: content contains complete prompt - stream=True: response_iterator contains streaming response, raw_data contains complete data - default: content contains LLM response text, raw_data contains complete data + + Returns None when no relevant context could be constructed for the query. """ if not query: return QueryResult(content=PROMPTS["fail_response"]) @@ -2322,7 +2812,8 @@ async def kg_query( ) if context_result is None: - return QueryResult(content=PROMPTS["fail_response"]) + logger.info("[kg_query] No query context could be built; returning no-result.") + return None # Return different content based on query parameters if query_param.only_need_context and not query_param.only_need_prompt: @@ -3228,12 +3719,12 @@ async def _build_llm_context( chunk_tracking_log.append("?0/0") if chunk_tracking_log: - logger.info(f"chunks S+F/O: {' '.join(chunk_tracking_log)}") + logger.info(f"Final chunks S+F/O: {' '.join(chunk_tracking_log)}") text_units_str = "\n".join( json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context ) - reference_list_str = "\n\n".join( + reference_list_str = "\n".join( f"[{ref['reference_id']}] {ref['file_path']}" for ref in reference_list if ref["reference_id"] @@ -3329,7 +3820,11 @@ async def _build_query_context( query_embedding=search_result["query_embedding"], ) - if not merged_chunks: + if ( + not merged_chunks + and not truncation_result["entities_context"] + and not truncation_result["relations_context"] + ): return None # Stage 4: Build final LLM context with dynamic token processing @@ -3985,7 +4480,7 @@ async def naive_query( global_config: dict[str, str], hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, -) -> QueryResult: +) -> QueryResult | None: """ Execute naive query and return unified QueryResult object. @@ -3998,11 +4493,13 @@ async def naive_query( system_prompt: System prompt Returns: - QueryResult: Unified query result object containing: + QueryResult | None: Unified query result object containing: - content: Non-streaming response text content - response_iterator: Streaming response iterator - raw_data: Complete structured data (including references and metadata) - is_streaming: Whether this is a streaming result + + Returns None when no relevant chunks are retrieved. """ if not query: @@ -4023,16 +4520,10 @@ async def naive_query( chunks = await _get_vector_context(query, chunks_vdb, query_param, None) if chunks is None or len(chunks) == 0: - # Build empty raw data structure for naive mode - empty_raw_data = convert_to_user_format( - [], # naive mode has no entities - [], # naive mode has no relationships - [], # no chunks - [], # no references - "naive", + logger.info( + "[naive_query] No relevant document chunks found; returning no-result." ) - empty_raw_data["message"] = "No relevant document chunks found." - return QueryResult(content=PROMPTS["fail_response"], raw_data=empty_raw_data) + return None # Calculate dynamic token limit for chunks max_total_tokens = getattr( @@ -4124,7 +4615,7 @@ async def naive_query( text_units_str = "\n".join( json.dumps(text_unit, ensure_ascii=False) for text_unit in text_units_context ) - reference_list_str = "\n\n".join( + reference_list_str = "\n".join( f"[{ref['reference_id']}] {ref['file_path']}" for ref in reference_list if ref["reference_id"]