diff --git a/README-zh.md b/README-zh.md index 8b7239ec..d3403b35 100644 --- a/README-zh.md +++ b/README-zh.md @@ -268,7 +268,8 @@ if __name__ == "__main__": | **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` | | **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` | | **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` | -| **summary_max_tokens** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `30000`(由环境变量 SUMMARY_MAX_TOKENS 设置) | +| **summary_context_size** | `int` | 合并实体关系摘要时送给LLM的最大令牌数 | `10000`(由环境变量 SUMMARY_MAX_CONTEXT 设置) | +| **summary_max_tokens** | `int` | 合并实体关系描述的最大令牌数长度 | `500`(由环境变量 SUMMARY_MAX_TOKENS 设置) | | **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`(默认值由环境变量MAX_ASYNC更改) | | **llm_model_kwargs** | `dict` | LLM生成的附加参数 | | | **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2(默认值由环境变量COSINE_THRESHOLD更改) | @@ -598,9 +599,9 @@ if __name__ == "__main__": 为了提高检索质量,可以根据更有效的相关性评分模型对文档进行重排序。`rerank.py`文件提供了三个Reranker提供商的驱动函数: -* **Cohere / vLLM**: `cohere_rerank` -* **Jina AI**: `jina_rerank` -* **Aliyun阿里云**: `ali_rerank` +* **Cohere / vLLM**: `cohere_rerank` +* **Jina AI**: `jina_rerank` +* **Aliyun阿里云**: `ali_rerank` 您可以将这些函数之一注入到LightRAG对象的`rerank_model_func`属性中。这将使LightRAG的查询功能能够使用注入的函数对检索到的文本块进行重新排序。有关详细用法,请参阅`examples/rerank_example.py`文件。 diff --git a/README.md b/README.md index eacb4982..5ad37f01 100644 --- a/README.md +++ b/README.md @@ -275,7 +275,8 @@ A full list of LightRAG init parameters: | **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` | | **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` | | **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` | -| **summary_max_tokens** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `30000`(configured by env var SUMMARY_MAX_TOKENS) | +| **summary_context_size** | `int` | Maximum tokens send to LLM to generate summaries for entity relation merging | `10000`(configured by env var SUMMARY_CONTEXT_SIZE) | +| **summary_max_tokens** | `int` | Maximum token size for entity/relation description | `500`(configured by env var SUMMARY_MAX_TOKENS) | | **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`(default value changed by env var MAX_ASYNC) | | **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | | | **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2(default value changed by env var COSINE_THRESHOLD) | diff --git a/env.example b/env.example index 41c77ede..3ce6d1d9 100644 --- a/env.example +++ b/env.example @@ -125,12 +125,15 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true ### Chunk size for document splitting, 500~1500 is recommended # CHUNK_SIZE=1200 # CHUNK_OVERLAP_SIZE=100 -### Entity and relation summarization configuration -### Number of duplicated entities/edges to trigger LLM re-summary on merge (at least 3 is recommented), and max tokens send to LLM -# FORCE_LLM_SUMMARY_ON_MERGE=4 -# SUMMARY_MAX_TOKENS=30000 -### Maximum number of entity extraction attempts for ambiguous content -# MAX_GLEANING=1 + +### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented) +# FORCE_LLM_SUMMARY_ON_MERGE=8 +### Max description token size to trigger LLM summary +# SUMMARY_MAX_TOKENS = 1200 +### Recommended LLM summary output length in tokens +# SUMMARY_LENGTH_RECOMMENDED_=600 +### Maximum context size sent to LLM for description summary +# SUMMARY_CONTEXT_SIZE=12000 ############################### ### Concurrency Configuration diff --git a/lightrag/api/config.py b/lightrag/api/config.py index a5e352dc..a6badec4 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -30,6 +30,8 @@ from lightrag.constants import ( DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, DEFAULT_MAX_ASYNC, DEFAULT_SUMMARY_MAX_TOKENS, + DEFAULT_SUMMARY_LENGTH_RECOMMENDED, + DEFAULT_SUMMARY_CONTEXT_SIZE, DEFAULT_SUMMARY_LANGUAGE, DEFAULT_EMBEDDING_FUNC_MAX_ASYNC, DEFAULT_EMBEDDING_BATCH_NUM, @@ -119,10 +121,26 @@ def parse_args() -> argparse.Namespace: help=f"Maximum async operations (default: from env or {DEFAULT_MAX_ASYNC})", ) parser.add_argument( - "--max-tokens", + "--summary-max-tokens", type=int, default=get_env_value("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS, int), - help=f"Maximum token size (default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})", + help=f"Maximum token size for entity/relation summary(default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})", + ) + parser.add_argument( + "--summary-context-size", + type=int, + default=get_env_value( + "SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE, int + ), + help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})", + ) + parser.add_argument( + "--summary-length-recommended", + type=int, + default=get_env_value( + "SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED, int + ), + help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_LENGTH_RECOMMENDED})", ) # Logging configuration diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ec1d38d5..28e13614 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -2,7 +2,7 @@ LightRAG FastAPI Server """ -from fastapi import FastAPI, Depends, HTTPException, status +from fastapi import FastAPI, Depends, HTTPException import asyncio import os import logging @@ -472,7 +472,8 @@ def create_app(args): ), llm_model_name=args.llm_model, llm_model_max_async=args.max_async, - summary_max_tokens=args.max_tokens, + summary_max_tokens=args.summary_max_tokens, + summary_context_size=args.summary_context_size, chunk_token_size=int(args.chunk_size), chunk_overlap_token_size=int(args.chunk_overlap_size), llm_model_kwargs=( @@ -510,7 +511,8 @@ def create_app(args): chunk_overlap_token_size=int(args.chunk_overlap_size), llm_model_name=args.llm_model, llm_model_max_async=args.max_async, - summary_max_tokens=args.max_tokens, + summary_max_tokens=args.summary_max_tokens, + summary_context_size=args.summary_context_size, embedding_func=embedding_func, kv_storage=args.kv_storage, graph_storage=args.graph_storage, @@ -597,9 +599,7 @@ def create_app(args): } username = form_data.username if auth_handler.accounts.get(username) != form_data.password: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials" - ) + raise HTTPException(status_code=401, detail="Incorrect credentials") # Regular user login user_token = auth_handler.create_token( @@ -642,7 +642,8 @@ def create_app(args): "embedding_binding": args.embedding_binding, "embedding_binding_host": args.embedding_binding_host, "embedding_model": args.embedding_model, - "max_tokens": args.max_tokens, + "summary_max_tokens": args.summary_max_tokens, + "summary_context_size": args.summary_context_size, "kv_storage": args.kv_storage, "doc_status_storage": args.doc_status_storage, "graph_storage": args.graph_storage, diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index fc05716c..a53f8bee 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -242,8 +242,8 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.llm_model}") ASCIIColors.white(" ├─ Max Async for LLM: ", end="") ASCIIColors.yellow(f"{args.max_async}") - ASCIIColors.white(" ├─ Max Tokens: ", end="") - ASCIIColors.yellow(f"{args.max_tokens}") + ASCIIColors.white(" ├─ Summary Context Size: ", end="") + ASCIIColors.yellow(f"{args.summary_context_size}") ASCIIColors.white(" ├─ LLM Cache Enabled: ", end="") ASCIIColors.yellow(f"{args.enable_llm_cache}") ASCIIColors.white(" └─ LLM Cache for Extraction Enabled: ", end="") diff --git a/lightrag/constants.py b/lightrag/constants.py index 9445872e..2f493277 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -12,9 +12,16 @@ DEFAULT_MAX_GRAPH_NODES = 1000 # Default values for extraction settings DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries -DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4 DEFAULT_MAX_GLEANING = 1 -DEFAULT_SUMMARY_MAX_TOKENS = 30000 # Default maximum token size + +# Number of description fragments to trigger LLM summary +DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 +# Max description token size to trigger LLM summary +DEFAULT_SUMMARY_MAX_TOKENS = 1200 +# Recommended LLM summary output length in tokens +DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600 +# Maximum token size sent to LLM for summary +DEFAULT_SUMMARY_CONTEXT_SIZE = 12000 # Separator for graph fields GRAPH_FIELD_SEP = "" diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 721181d5..8dbff74f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -34,6 +34,8 @@ from lightrag.constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_MIN_RERANK_SCORE, DEFAULT_SUMMARY_MAX_TOKENS, + DEFAULT_SUMMARY_CONTEXT_SIZE, + DEFAULT_SUMMARY_LENGTH_RECOMMENDED, DEFAULT_MAX_ASYNC, DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_GRAPH_NODES, @@ -285,8 +287,20 @@ class LightRAG: summary_max_tokens: int = field( default=int(os.getenv("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS)) ) + """Maximum tokens allowed for entity/relation description.""" + + summary_context_size: int = field( + default=int(os.getenv("SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE)) + ) """Maximum number of tokens allowed per LLM response.""" + summary_length_recommended: int = field( + default=int( + os.getenv("SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED) + ) + ) + """Recommended length of LLM summary output.""" + llm_model_max_async: int = field( default=int(os.getenv("MAX_ASYNC", DEFAULT_MAX_ASYNC)) ) @@ -416,6 +430,20 @@ class LightRAG: if self.ollama_server_infos is None: self.ollama_server_infos = OllamaServerInfos() + # Validate config + if self.force_llm_summary_on_merge < 3: + logger.warning( + f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}" + ) + if self.summary_context_size > self.max_total_tokens: + logger.warning( + f"summary_context_size({self.summary_context_size}) should no greater than max_total_tokens({self.max_total_tokens})" + ) + if self.summary_length_recommended > self.summary_max_tokens: + logger.warning( + f"max_total_tokens({self.summary_max_tokens}) should greater than summary_length_recommended({self.summary_length_recommended})" + ) + # Fix global_config now global_config = asdict(self) @@ -2272,117 +2300,111 @@ class LightRAG: relationships_to_delete = set() relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Use graph database lock to ensure atomic merges and updates + try: + # Get affected entities and relations from full_entities and full_relations storage + doc_entities_data = await self.full_entities.get_by_id(doc_id) + doc_relations_data = await self.full_relations.get_by_id(doc_id) + + affected_nodes = [] + affected_edges = [] + + # Get entity data from graph storage using entity names from full_entities + if doc_entities_data and "entity_names" in doc_entities_data: + entity_names = doc_entities_data["entity_names"] + # get_nodes_batch returns dict[str, dict], need to convert to list[dict] + nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch( + entity_names + ) + for entity_name in entity_names: + node_data = nodes_dict.get(entity_name) + if node_data: + # Ensure compatibility with existing logic that expects "id" field + if "id" not in node_data: + node_data["id"] = entity_name + affected_nodes.append(node_data) + + # Get relation data from graph storage using relation pairs from full_relations + if doc_relations_data and "relation_pairs" in doc_relations_data: + relation_pairs = doc_relations_data["relation_pairs"] + edge_pairs_dicts = [ + {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs + ] + # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] + edges_dict = await self.chunk_entity_relation_graph.get_edges_batch( + edge_pairs_dicts + ) + + for pair in relation_pairs: + src, tgt = pair[0], pair[1] + edge_key = (src, tgt) + edge_data = edges_dict.get(edge_key) + if edge_data: + # Ensure compatibility with existing logic that expects "source" and "target" fields + if "source" not in edge_data: + edge_data["source"] = src + if "target" not in edge_data: + edge_data["target"] = tgt + affected_edges.append(edge_data) + + except Exception as e: + logger.error(f"Failed to analyze affected graph elements: {e}") + raise Exception(f"Failed to analyze graph dependencies: {e}") from e + + try: + # Process entities + for node_data in affected_nodes: + node_label = node_data.get("entity_id") + if node_label and "source_id" in node_data: + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + entities_to_delete.add(node_label) + elif remaining_sources != sources: + entities_to_rebuild[node_label] = remaining_sources + + async with pipeline_status_lock: + log_message = f"Found {len(entities_to_rebuild)} affected entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Process relationships + for edge_data in affected_edges: + src = edge_data.get("source") + tgt = edge_data.get("target") + + if src and tgt and "source_id" in edge_data: + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue + + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + elif remaining_sources != sources: + relationships_to_rebuild[edge_tuple] = remaining_sources + + async with pipeline_status_lock: + log_message = ( + f"Found {len(relationships_to_rebuild)} affected relations" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to process graph analysis results: {e}") + raise Exception(f"Failed to process graph dependencies: {e}") from e + + # Use graph database lock to prevent dirty read graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: - try: - # Get affected entities and relations from full_entities and full_relations storage - doc_entities_data = await self.full_entities.get_by_id(doc_id) - doc_relations_data = await self.full_relations.get_by_id(doc_id) - - affected_nodes = [] - affected_edges = [] - - # Get entity data from graph storage using entity names from full_entities - if doc_entities_data and "entity_names" in doc_entities_data: - entity_names = doc_entities_data["entity_names"] - # get_nodes_batch returns dict[str, dict], need to convert to list[dict] - nodes_dict = ( - await self.chunk_entity_relation_graph.get_nodes_batch( - entity_names - ) - ) - for entity_name in entity_names: - node_data = nodes_dict.get(entity_name) - if node_data: - # Ensure compatibility with existing logic that expects "id" field - if "id" not in node_data: - node_data["id"] = entity_name - affected_nodes.append(node_data) - - # Get relation data from graph storage using relation pairs from full_relations - if doc_relations_data and "relation_pairs" in doc_relations_data: - relation_pairs = doc_relations_data["relation_pairs"] - edge_pairs_dicts = [ - {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs - ] - # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] - edges_dict = ( - await self.chunk_entity_relation_graph.get_edges_batch( - edge_pairs_dicts - ) - ) - - for pair in relation_pairs: - src, tgt = pair[0], pair[1] - edge_key = (src, tgt) - edge_data = edges_dict.get(edge_key) - if edge_data: - # Ensure compatibility with existing logic that expects "source" and "target" fields - if "source" not in edge_data: - edge_data["source"] = src - if "target" not in edge_data: - edge_data["target"] = tgt - affected_edges.append(edge_data) - - except Exception as e: - logger.error(f"Failed to analyze affected graph elements: {e}") - raise Exception(f"Failed to analyze graph dependencies: {e}") from e - - try: - # Process entities - for node_data in affected_nodes: - node_label = node_data.get("entity_id") - if node_label and "source_id" in node_data: - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - entities_to_delete.add(node_label) - elif remaining_sources != sources: - entities_to_rebuild[node_label] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(entities_to_rebuild)} affected entities" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Process relationships - for edge_data in affected_edges: - src = edge_data.get("source") - tgt = edge_data.get("target") - - if src and tgt and "source_id" in edge_data: - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue - - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - elif remaining_sources != sources: - relationships_to_rebuild[edge_tuple] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(relationships_to_rebuild)} affected relations" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to process graph analysis results: {e}") - raise Exception(f"Failed to process graph dependencies: {e}") from e - # 5. Delete chunks from storage if chunk_ids: try: @@ -2453,27 +2475,28 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e - # 8. Rebuild entities and relationships from remaining chunks - if entities_to_rebuild or relationships_to_rebuild: - try: - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks_storage=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - ) + # Persist changes to graph database before releasing graph database lock + await self._insert_done() - except Exception as e: - logger.error(f"Failed to rebuild knowledge from chunks: {e}") - raise Exception( - f"Failed to rebuild knowledge graph: {e}" - ) from e + # 8. Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + try: + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks_storage=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + + except Exception as e: + logger.error(f"Failed to rebuild knowledge from chunks: {e}") + raise Exception(f"Failed to rebuild knowledge graph: {e}") from e # 9. Delete from full_entities and full_relations storage try: diff --git a/lightrag/operate.py b/lightrag/operate.py index cc06801c..486f7e69 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -114,48 +114,197 @@ def chunking_by_token_size( async def _handle_entity_relation_summary( + description_type: str, entity_or_relation_name: str, - description: str, + description_list: list[str], + seperator: str, + global_config: dict, + llm_response_cache: BaseKVStorage | None = None, +) -> tuple[str, bool]: + """Handle entity relation description summary using map-reduce approach. + + This function summarizes a list of descriptions using a map-reduce strategy: + 1. If total tokens < summary_context_size and len(description_list) < force_llm_summary_on_merge, no need to summarize + 2. If total tokens < summary_max_tokens, summarize with LLM directly + 3. Otherwise, split descriptions into chunks that fit within token limits + 4. Summarize each chunk, then recursively process the summaries + 5. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge + + Args: + entity_or_relation_name: Name of the entity or relation being summarized + description_list: List of description strings to summarize + global_config: Global configuration containing tokenizer and limits + llm_response_cache: Optional cache for LLM responses + + Returns: + Tuple of (final_summarized_description_string, llm_was_used_boolean) + """ + # Handle empty input + if not description_list: + return "", False + + # If only one description, return it directly (no need for LLM call) + if len(description_list) == 1: + return description_list[0], False + + # Get configuration + tokenizer: Tokenizer = global_config["tokenizer"] + summary_context_size = global_config["summary_context_size"] + summary_max_tokens = global_config["summary_max_tokens"] + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + + current_list = description_list[:] # Copy the list to avoid modifying original + llm_was_used = False # Track whether LLM was used during the entire process + + # Iterative map-reduce process + while True: + # Calculate total tokens in current list + total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list) + + # If total length is within limits, perform final summarization + if total_tokens <= summary_context_size or len(current_list) <= 2: + if ( + len(current_list) < force_llm_summary_on_merge + and total_tokens < summary_max_tokens + ): + # no LLM needed, just join the descriptions + final_description = seperator.join(current_list) + return final_description if final_description else "", llm_was_used + else: + if total_tokens > summary_context_size and len(current_list) <= 2: + logger.warning( + f"Summarizing {entity_or_relation_name}: Oversize descpriton found" + ) + # Final summarization of remaining descriptions - LLM will be used + final_summary = await _summarize_descriptions( + description_type, + entity_or_relation_name, + current_list, + global_config, + llm_response_cache, + ) + return final_summary, True # LLM was used for final summarization + + # Need to split into chunks - Map phase + # Ensure each chunk has minimum 2 descriptions to guarantee progress + chunks = [] + current_chunk = [] + current_tokens = 0 + + # Currently least 3 descriptions in current_list + for i, desc in enumerate(current_list): + desc_tokens = len(tokenizer.encode(desc)) + + # If adding current description would exceed limit, finalize current chunk + if current_tokens + desc_tokens > summary_context_size and current_chunk: + # Ensure we have at least 2 descriptions in the chunk (when possible) + if len(current_chunk) == 1: + # Force add one more description to ensure minimum 2 per chunk + current_chunk.append(desc) + chunks.append(current_chunk) + logger.warning( + f"Summarizing {entity_or_relation_name}: Oversize descpriton found" + ) + current_chunk = [] # next group is empty + current_tokens = 0 + else: # curren_chunk is ready for summary in reduce phase + chunks.append(current_chunk) + current_chunk = [desc] # leave it for next group + current_tokens = desc_tokens + else: + current_chunk.append(desc) + current_tokens += desc_tokens + + # Add the last chunk if it exists + if current_chunk: + chunks.append(current_chunk) + + logger.info( + f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups" + ) + + # Reduce phase: summarize each group from chunks + new_summaries = [] + for chunk in chunks: + if len(chunk) == 1: + # Optimization: single description chunks don't need LLM summarization + new_summaries.append(chunk[0]) + else: + # Multiple descriptions need LLM summarization + summary = await _summarize_descriptions( + description_type, + entity_or_relation_name, + chunk, + global_config, + llm_response_cache, + ) + new_summaries.append(summary) + llm_was_used = True # Mark that LLM was used in reduce phase + + # Update current list with new summaries for next iteration + current_list = new_summaries + + +async def _summarize_descriptions( + description_type: str, + description_name: str, + description_list: list[str], global_config: dict, llm_response_cache: BaseKVStorage | None = None, ) -> str: - """Handle entity relation summary - For each entity or relation, input is the combined description of already existing description and new description. - If too long, use LLM to summarize. + """Helper function to summarize a list of descriptions using LLM. + + Args: + entity_or_relation_name: Name of the entity or relation being summarized + descriptions: List of description strings to summarize + global_config: Global configuration containing LLM function and settings + llm_response_cache: Optional cache for LLM responses + + Returns: + Summarized description string """ use_llm_func: callable = global_config["llm_model_func"] # Apply higher priority (8) to entity/relation summary tasks use_llm_func = partial(use_llm_func, _priority=8) - tokenizer: Tokenizer = global_config["tokenizer"] - llm_max_tokens = global_config["summary_max_tokens"] - language = global_config["addon_params"].get( "language", PROMPTS["DEFAULT_LANGUAGE"] ) - tokens = tokenizer.encode(description) - - ### summarize is not determined here anymore (It's determined by num_fragment now) - # if len(tokens) < summary_max_tokens: # No need for summary - # return description + summary_length_recommended = global_config["summary_length_recommended"] prompt_template = PROMPTS["summarize_entity_descriptions"] - use_description = tokenizer.decode(tokens[:llm_max_tokens]) + + # Join descriptions and apply token-based truncation if necessary + joined_descriptions = "\n\n".join(description_list) + tokenizer = global_config["tokenizer"] + summary_context_size = global_config["summary_context_size"] + + # Token-based truncation to ensure input fits within limits + tokens = tokenizer.encode(joined_descriptions) + if len(tokens) > summary_context_size: + truncated_tokens = tokens[:summary_context_size] + joined_descriptions = tokenizer.decode(truncated_tokens) + + # Prepare context for the prompt context_base = dict( - entity_name=entity_or_relation_name, - description_list=use_description.split(GRAPH_FIELD_SEP), + description_type=description_type, + description_name=description_name, + description_list=joined_descriptions, + summary_length=summary_length_recommended, language=language, ) use_prompt = prompt_template.format(**context_base) - logger.debug(f"Trigger summary: {entity_or_relation_name}") + + logger.debug( + f"Summarizing {len(description_list)} descriptions for: {description_name}" + ) # Use LLM function with cache (higher priority for summary generation) summary = await use_llm_func_with_cache( use_prompt, use_llm_func, llm_response_cache=llm_response_cache, - # max_tokens=summary_max_tokens, cache_type="extract", ) return summary @@ -413,7 +562,7 @@ async def _rebuild_knowledge_from_chunks( ) rebuilt_entities_count += 1 status_message = ( - f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks" + f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -422,7 +571,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 - status_message = f"Failed to rebuild entity {entity_name}: {e}" + status_message = f"Failed to rebuild `{entity_name}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -453,7 +602,9 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks" + status_message = ( + f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" + ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -461,7 +612,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" + status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -525,14 +676,20 @@ async def _get_cached_extraction_results( ) -> dict[str, list[str]]: """Get cached extraction results for specific chunk IDs + This function retrieves cached LLM extraction results for the given chunk IDs and returns + them sorted by creation time. The results are sorted at two levels: + 1. Individual extraction results within each chunk are sorted by create_time (earliest first) + 2. Chunks themselves are sorted by the create_time of their earliest extraction result + Args: llm_response_cache: LLM response cache storage chunk_ids: Set of chunk IDs to get cached results for - text_chunks_data: Pre-loaded chunk data (optional, for performance) - text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None) + text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references Returns: - Dict mapping chunk_id -> list of extraction_result_text + Dict mapping chunk_id -> list of extraction_result_text, where: + - Keys (chunk_ids) are ordered by the create_time of their first extraction result + - Values (extraction results) are ordered by create_time within each chunk """ cached_results = {} @@ -541,15 +698,13 @@ async def _get_cached_extraction_results( # Read from storage chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids)) - for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list): + for chunk_data in chunk_data_list: if chunk_data and isinstance(chunk_data, dict): llm_cache_list = chunk_data.get("llm_cache_list", []) if llm_cache_list: all_cache_ids.update(llm_cache_list) else: - logger.warning( - f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}" - ) + logger.warning(f"Chunk data is invalid or None: {chunk_data}") if not all_cache_ids: logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs") @@ -560,7 +715,7 @@ async def _get_cached_extraction_results( # Process cache entries and group by chunk_id valid_entries = 0 - for cache_id, cache_entry in zip(all_cache_ids, cache_data_list): + for cache_entry in cache_data_list: if ( cache_entry is not None and isinstance(cache_entry, dict) @@ -580,16 +735,30 @@ async def _get_cached_extraction_results( # Store tuple with extraction result and creation time for sorting cached_results[chunk_id].append((extraction_result, create_time)) - # Sort extraction results by create_time for each chunk + # Sort extraction results by create_time for each chunk and collect earliest times + chunk_earliest_times = {} for chunk_id in cached_results: # Sort by create_time (x[1]), then extract only extraction_result (x[0]) cached_results[chunk_id].sort(key=lambda x: x[1]) + # Store the earliest create_time for this chunk (first item after sorting) + chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1] + # Extract only extraction_result (x[0]) cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]] - logger.info( - f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results" + # Sort cached_results by the earliest create_time of each chunk + sorted_chunk_ids = sorted( + chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id] ) - return cached_results + + # Rebuild cached_results in sorted order + sorted_cached_results = {} + for chunk_id in sorted_chunk_ids: + sorted_cached_results[chunk_id] = cached_results[chunk_id] + + logger.info( + f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results" + ) + return sorted_cached_results async def _parse_extraction_result( @@ -690,15 +859,6 @@ async def _rebuild_single_entity( # Update entity in vector database entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") - # Delete old vector record first - try: - await entities_vdb.delete([entity_vdb_id]) - except Exception as e: - logger.debug( - f"Could not delete old entity vector record {entity_vdb_id}: {e}" - ) - - # Insert new vector record entity_content = f"{entity_name}\n{final_description}" await entities_vdb.upsert( { @@ -713,21 +873,6 @@ async def _rebuild_single_entity( } ) - # Helper function to generate final description with optional LLM summary - async def _generate_final_description(combined_description: str) -> str: - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: - return await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - return combined_description - # Collect all entity data from relevant chunks all_entity_data = [] for chunk_id in chunk_ids: @@ -736,13 +881,13 @@ async def _rebuild_single_entity( if not all_entity_data: logger.warning( - f"No cached entity data found for {entity_name}, trying to rebuild from relationships" + f"No entity data found for `{entity_name}`, trying to rebuild from relationships" ) # Get all edges connected to this entity edges = await knowledge_graph_inst.get_node_edges(entity_name) if not edges: - logger.warning(f"No relationships found for entity {entity_name}") + logger.warning(f"No relations attached to entity `{entity_name}`") return # Collect relationship data to extract entity information @@ -760,10 +905,19 @@ async def _rebuild_single_entity( edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) file_paths.update(edge_file_paths) - # Generate description from relationships or fallback to current - if relationship_descriptions: - combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) - final_description = await _generate_final_description(combined_description) + # deduplicate descriptions + description_list = list(dict.fromkeys(relationship_descriptions)) + + # Generate final description from relationships or fallback to current + if description_list: + final_description, _ = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache=llm_response_cache, + ) else: final_description = current_entity.get("description", "") @@ -784,12 +938,9 @@ async def _rebuild_single_entity( if entity_data.get("file_path"): file_paths.add(entity_data["file_path"]) - # Combine all descriptions - combined_description = ( - GRAPH_FIELD_SEP.join(descriptions) - if descriptions - else current_entity.get("description", "") - ) + # Remove duplicates while preserving order + description_list = list(dict.fromkeys(descriptions)) + entity_types = list(dict.fromkeys(entity_types)) # Get most common entity type entity_type = ( @@ -798,8 +949,19 @@ async def _rebuild_single_entity( else current_entity.get("entity_type", "UNKNOWN") ) - # Generate final description and update storage - final_description = await _generate_final_description(combined_description) + # Generate final description from entities or fallback to current + if description_list: + final_description, _ = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = current_entity.get("description", "") + await _update_entity_storage(final_description, entity_type, file_paths) @@ -836,7 +998,7 @@ async def _rebuild_single_relationship( ) if not all_relationship_data: - logger.warning(f"No cached relationship data found for {src}-{tgt}") + logger.warning(f"No relation data found for `{src}-{tgt}`") return # Merge descriptions and keywords @@ -855,42 +1017,38 @@ async def _rebuild_single_relationship( if rel_data.get("file_path"): file_paths.add(rel_data["file_path"]) - # Combine descriptions and keywords - combined_description = ( - GRAPH_FIELD_SEP.join(descriptions) - if descriptions - else current_relationship.get("description", "") - ) + # Remove duplicates while preserving order + description_list = list(dict.fromkeys(descriptions)) + keywords = list(dict.fromkeys(keywords)) + combined_keywords = ( ", ".join(set(keywords)) if keywords else current_relationship.get("keywords", "") ) - # weight = ( - # sum(weights) / len(weights) - # if weights - # else current_relationship.get("weight", 1.0) - # ) + weight = sum(weights) if weights else current_relationship.get("weight", 1.0) - # Use summary if description has too many fragments - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: - final_description = await _handle_entity_relation_summary( + # Generate final description from relations or fallback to current + if description_list: + final_description, _ = await _handle_entity_relation_summary( + "Relation", f"{src}-{tgt}", - combined_description, + description_list, + GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, ) else: - final_description = combined_description + # fallback to keep current(unchanged) + final_description = current_relationship.get("description", "") # Update relationship in graph storage updated_relationship_data = { **current_relationship, - "description": final_description, + "description": final_description + if final_description + else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, "source_id": GRAPH_FIELD_SEP.join(chunk_ids), @@ -948,13 +1106,9 @@ async def _merge_nodes_then_upsert( already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) - already_source_ids.extend( - split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP]) - ) - already_file_paths.extend( - split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP]) - ) - already_description.append(already_node["description"]) + already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP)) + already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) + already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) entity_type = sorted( Counter( @@ -962,42 +1116,54 @@ async def _merge_nodes_then_upsert( ).items(), key=lambda x: x[1], reverse=True, - )[0][0] - description = GRAPH_FIELD_SEP.join( - sorted(set([dp["description"] for dp in nodes_data] + already_description)) + )[0][0] # Get the entity type with the highest count + + # merge and deduplicate description + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in nodes_data if dp.get("description")] + ) ) + + num_fragment = len(description_list) + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" + if num_fragment > 0: + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Entity", + entity_name, + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + else: + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.error(f"Entity {entity_name} has no description") + description = "(no description)" + source_id = GRAPH_FIELD_SEP.join( set([dp["source_id"] for dp in nodes_data] + already_source_ids) ) file_path = build_file_path(already_file_paths, nodes_data, entity_name) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len(set([dp["description"] for dp in nodes_data])) - - if num_fragment > 1: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - entity_name, - description, - global_config, - llm_response_cache, - ) - else: - status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1044,22 +1210,20 @@ async def _merge_edges_then_upsert( # Get source_id with empty string default if missing or None if already_edge.get("source_id") is not None: already_source_ids.extend( - split_string_by_multi_markers( - already_edge["source_id"], [GRAPH_FIELD_SEP] - ) + already_edge["source_id"].split(GRAPH_FIELD_SEP) ) # Get file_path with empty string default if missing or None if already_edge.get("file_path") is not None: already_file_paths.extend( - split_string_by_multi_markers( - already_edge["file_path"], [GRAPH_FIELD_SEP] - ) + already_edge["file_path"].split(GRAPH_FIELD_SEP) ) # Get description with empty string default if missing or None if already_edge.get("description") is not None: - already_description.append(already_edge["description"]) + already_description.extend( + already_edge["description"].split(GRAPH_FIELD_SEP) + ) # Get keywords with empty string default if missing or None if already_edge.get("keywords") is not None: @@ -1071,15 +1235,47 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - description = GRAPH_FIELD_SEP.join( - sorted( - set( - [dp["description"] for dp in edges_data if dp.get("description")] - + already_description - ) + + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in edges_data if dp.get("description")] ) ) + num_fragment = len(description_list) + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(edges_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" + if num_fragment > 0: + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + "Relation", + f"({src_id}, {tgt_id})", + description_list, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + else: + status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + else: + logger.error(f"Edge {src_id} - {tgt_id} has no description") + description = "(no description)" + # Split all existing and new keywords into individual terms, then combine and deduplicate all_keywords = set() # Process already_keywords (which are comma-separated) @@ -1127,35 +1323,6 @@ async def _merge_edges_then_upsert( } added_entities.append(entity_data) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len( - set([dp["description"] for dp in edges_data if dp.get("description")]) - ) - - if num_fragment > 1: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description, - global_config, - llm_response_cache, - ) - else: - status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - await knowledge_graph_inst.upsert_edge( src_id, tgt_id, @@ -1463,7 +1630,7 @@ async def merge_nodes_and_edges( ) # Don't raise exception to avoid affecting main flow - log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} added entities, {len(processed_edges)} relations" + log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} extra entities, {len(processed_edges)} relations" logger.info(log_message) async with pipeline_status_lock: pipeline_status["latest_message"] = log_message diff --git a/lightrag/prompt.py b/lightrag/prompt.py index 32666bb5..586f7ae6 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -40,22 +40,19 @@ Format the content-level key words as ("content_keywords"{tuple_delimiter}