From 9b6de7512d647fd854560be467f924fbdf1eb078 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 17:10:51 +0800 Subject: [PATCH 01/16] Optimize the stability of description merging order --- lightrag/operate.py | 138 +++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index cc06801c..74fdf197 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -713,21 +713,6 @@ async def _rebuild_single_entity( } ) - # Helper function to generate final description with optional LLM summary - async def _generate_final_description(combined_description: str) -> str: - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: - return await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - return combined_description - # Collect all entity data from relevant chunks all_entity_data = [] for chunk_id in chunk_ids: @@ -763,7 +748,18 @@ async def _rebuild_single_entity( # Generate description from relationships or fallback to current if relationship_descriptions: combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) - final_description = await _generate_final_description(combined_description) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 + + if num_fragment >= force_llm_summary_on_merge: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description else: final_description = current_entity.get("description", "") @@ -784,6 +780,10 @@ async def _rebuild_single_entity( if entity_data.get("file_path"): file_paths.add(entity_data["file_path"]) + # Remove duplicates while preserving order + descriptions = list(dict.fromkeys(descriptions)) + entity_types = list(dict.fromkeys(entity_types)) + # Combine all descriptions combined_description = ( GRAPH_FIELD_SEP.join(descriptions) @@ -799,7 +799,19 @@ async def _rebuild_single_entity( ) # Generate final description and update storage - final_description = await _generate_final_description(combined_description) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 + + if num_fragment >= force_llm_summary_on_merge: + final_description = await _handle_entity_relation_summary( + entity_name, + combined_description, + global_config, + llm_response_cache=llm_response_cache, + ) + else: + final_description = combined_description + await _update_entity_storage(final_description, entity_type, file_paths) @@ -855,7 +867,11 @@ async def _rebuild_single_relationship( if rel_data.get("file_path"): file_paths.add(rel_data["file_path"]) - # Combine descriptions and keywords + # Remove duplicates while preserving order + descriptions = list(dict.fromkeys(descriptions)) + keywords = list(dict.fromkeys(keywords)) + + # Combine descriptions and keywords (fallback to keep currunt unchanged) combined_description = ( GRAPH_FIELD_SEP.join(descriptions) if descriptions @@ -963,22 +979,22 @@ async def _merge_nodes_then_upsert( key=lambda x: x[1], reverse=True, )[0][0] + description = GRAPH_FIELD_SEP.join( - sorted(set([dp["description"] for dp in nodes_data] + already_description)) + already_description + + list( + dict.fromkeys( + [dp["description"] for dp in nodes_data if dp.get("description")] + ) + ) ) - source_id = GRAPH_FIELD_SEP.join( - set([dp["source_id"] for dp in nodes_data] + already_source_ids) - ) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len(set([dp["description"] for dp in nodes_data])) - + already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 if num_fragment > 1: if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + status_message = f"LLM merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -991,13 +1007,18 @@ async def _merge_nodes_then_upsert( llm_response_cache, ) else: - status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + status_message = f"Merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + source_id = GRAPH_FIELD_SEP.join( + set([dp["source_id"] for dp in nodes_data] + already_source_ids) + ) + file_path = build_file_path(already_file_paths, nodes_data, entity_name) + node_data = dict( entity_id=entity_name, entity_type=entity_type, @@ -1071,15 +1092,41 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) + description = GRAPH_FIELD_SEP.join( - sorted( - set( + already_description + + list( + dict.fromkeys( [dp["description"] for dp in edges_data if dp.get("description")] - + already_description ) ) ) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + if num_fragment > 1: + if num_fragment >= force_llm_summary_on_merge: + status_message = f"LLM merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + description = await _handle_entity_relation_summary( + f"({src_id}, {tgt_id})", + description, + global_config, + llm_response_cache, + ) + else: + status_message = f"Merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + # Split all existing and new keywords into individual terms, then combine and deduplicate all_keywords = set() # Process already_keywords (which are comma-separated) @@ -1127,35 +1174,6 @@ async def _merge_edges_then_upsert( } added_entities.append(entity_data) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 - num_new_fragment = len( - set([dp["description"] for dp in edges_data if dp.get("description")]) - ) - - if num_fragment > 1: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description, - global_config, - llm_response_cache, - ) - else: - status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - await knowledge_graph_inst.upsert_edge( src_id, tgt_id, From cac8e189e7002995450186e7896a7152cf4c6422 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 17:18:51 +0800 Subject: [PATCH 02/16] Remove redundant entity vector deletion before upsert --- lightrag/operate.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 74fdf197..5744a81f 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -690,15 +690,6 @@ async def _rebuild_single_entity( # Update entity in vector database entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-") - # Delete old vector record first - try: - await entities_vdb.delete([entity_vdb_id]) - except Exception as e: - logger.debug( - f"Could not delete old entity vector record {entity_vdb_id}: {e}" - ) - - # Insert new vector record entity_content = f"{entity_name}\n{final_description}" await entities_vdb.upsert( { From 0b1b264a5d6285ddc806483a8e600f8f79b617d0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 17:46:32 +0800 Subject: [PATCH 03/16] refactor: optimize graph lock scope in document deletion - Move dependency analysis outside graph database lock - Add persistence call before lock release to prevent dirty reads --- lightrag/lightrag.py | 253 +++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 129 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 721181d5..fa529784 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2272,117 +2272,111 @@ class LightRAG: relationships_to_delete = set() relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Use graph database lock to ensure atomic merges and updates + try: + # Get affected entities and relations from full_entities and full_relations storage + doc_entities_data = await self.full_entities.get_by_id(doc_id) + doc_relations_data = await self.full_relations.get_by_id(doc_id) + + affected_nodes = [] + affected_edges = [] + + # Get entity data from graph storage using entity names from full_entities + if doc_entities_data and "entity_names" in doc_entities_data: + entity_names = doc_entities_data["entity_names"] + # get_nodes_batch returns dict[str, dict], need to convert to list[dict] + nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch( + entity_names + ) + for entity_name in entity_names: + node_data = nodes_dict.get(entity_name) + if node_data: + # Ensure compatibility with existing logic that expects "id" field + if "id" not in node_data: + node_data["id"] = entity_name + affected_nodes.append(node_data) + + # Get relation data from graph storage using relation pairs from full_relations + if doc_relations_data and "relation_pairs" in doc_relations_data: + relation_pairs = doc_relations_data["relation_pairs"] + edge_pairs_dicts = [ + {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs + ] + # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] + edges_dict = await self.chunk_entity_relation_graph.get_edges_batch( + edge_pairs_dicts + ) + + for pair in relation_pairs: + src, tgt = pair[0], pair[1] + edge_key = (src, tgt) + edge_data = edges_dict.get(edge_key) + if edge_data: + # Ensure compatibility with existing logic that expects "source" and "target" fields + if "source" not in edge_data: + edge_data["source"] = src + if "target" not in edge_data: + edge_data["target"] = tgt + affected_edges.append(edge_data) + + except Exception as e: + logger.error(f"Failed to analyze affected graph elements: {e}") + raise Exception(f"Failed to analyze graph dependencies: {e}") from e + + try: + # Process entities + for node_data in affected_nodes: + node_label = node_data.get("entity_id") + if node_label and "source_id" in node_data: + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + entities_to_delete.add(node_label) + elif remaining_sources != sources: + entities_to_rebuild[node_label] = remaining_sources + + async with pipeline_status_lock: + log_message = f"Found {len(entities_to_rebuild)} affected entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Process relationships + for edge_data in affected_edges: + src = edge_data.get("source") + tgt = edge_data.get("target") + + if src and tgt and "source_id" in edge_data: + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue + + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + elif remaining_sources != sources: + relationships_to_rebuild[edge_tuple] = remaining_sources + + async with pipeline_status_lock: + log_message = ( + f"Found {len(relationships_to_rebuild)} affected relations" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to process graph analysis results: {e}") + raise Exception(f"Failed to process graph dependencies: {e}") from e + + # Use graph database lock to prevent dirty read graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: - try: - # Get affected entities and relations from full_entities and full_relations storage - doc_entities_data = await self.full_entities.get_by_id(doc_id) - doc_relations_data = await self.full_relations.get_by_id(doc_id) - - affected_nodes = [] - affected_edges = [] - - # Get entity data from graph storage using entity names from full_entities - if doc_entities_data and "entity_names" in doc_entities_data: - entity_names = doc_entities_data["entity_names"] - # get_nodes_batch returns dict[str, dict], need to convert to list[dict] - nodes_dict = ( - await self.chunk_entity_relation_graph.get_nodes_batch( - entity_names - ) - ) - for entity_name in entity_names: - node_data = nodes_dict.get(entity_name) - if node_data: - # Ensure compatibility with existing logic that expects "id" field - if "id" not in node_data: - node_data["id"] = entity_name - affected_nodes.append(node_data) - - # Get relation data from graph storage using relation pairs from full_relations - if doc_relations_data and "relation_pairs" in doc_relations_data: - relation_pairs = doc_relations_data["relation_pairs"] - edge_pairs_dicts = [ - {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs - ] - # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] - edges_dict = ( - await self.chunk_entity_relation_graph.get_edges_batch( - edge_pairs_dicts - ) - ) - - for pair in relation_pairs: - src, tgt = pair[0], pair[1] - edge_key = (src, tgt) - edge_data = edges_dict.get(edge_key) - if edge_data: - # Ensure compatibility with existing logic that expects "source" and "target" fields - if "source" not in edge_data: - edge_data["source"] = src - if "target" not in edge_data: - edge_data["target"] = tgt - affected_edges.append(edge_data) - - except Exception as e: - logger.error(f"Failed to analyze affected graph elements: {e}") - raise Exception(f"Failed to analyze graph dependencies: {e}") from e - - try: - # Process entities - for node_data in affected_nodes: - node_label = node_data.get("entity_id") - if node_label and "source_id" in node_data: - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - entities_to_delete.add(node_label) - elif remaining_sources != sources: - entities_to_rebuild[node_label] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(entities_to_rebuild)} affected entities" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Process relationships - for edge_data in affected_edges: - src = edge_data.get("source") - tgt = edge_data.get("target") - - if src and tgt and "source_id" in edge_data: - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue - - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - elif remaining_sources != sources: - relationships_to_rebuild[edge_tuple] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(relationships_to_rebuild)} affected relations" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to process graph analysis results: {e}") - raise Exception(f"Failed to process graph dependencies: {e}") from e - # 5. Delete chunks from storage if chunk_ids: try: @@ -2453,27 +2447,28 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e - # 8. Rebuild entities and relationships from remaining chunks - if entities_to_rebuild or relationships_to_rebuild: - try: - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks_storage=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - ) + # Persist changes to graph database before releasing graph database lock + await self._insert_done() - except Exception as e: - logger.error(f"Failed to rebuild knowledge from chunks: {e}") - raise Exception( - f"Failed to rebuild knowledge graph: {e}" - ) from e + # 8. Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + try: + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks_storage=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + + except Exception as e: + logger.error(f"Failed to rebuild knowledge from chunks: {e}") + raise Exception(f"Failed to rebuild knowledge graph: {e}") from e # 9. Delete from full_entities and full_relations storage try: From 882d6857d88ad8e483d570b3af6fcc94ee8ef6f2 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 21:03:16 +0800 Subject: [PATCH 04/16] feat: Implement map-reduce summarization to handle large humber of description merging --- lightrag/operate.py | 268 +++++++++++++++++++++++++++++--------------- 1 file changed, 180 insertions(+), 88 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 5744a81f..7ebb09df 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -115,47 +115,152 @@ def chunking_by_token_size( async def _handle_entity_relation_summary( entity_or_relation_name: str, - description: str, + description_list: list[str], + force_llm_summary_on_merge: int, + seperator: str, global_config: dict, llm_response_cache: BaseKVStorage | None = None, ) -> str: - """Handle entity relation summary - For each entity or relation, input is the combined description of already existing description and new description. - If too long, use LLM to summarize. + """Handle entity relation description summary using map-reduce approach. + + This function summarizes a list of descriptions using a map-reduce strategy: + 1. If total tokens <= summary_max_tokens, summarize directly + 2. Otherwise, split descriptions into chunks that fit within token limits + 3. Summarize each chunk, then recursively process the summaries + 4. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge + + Args: + entity_or_relation_name: Name of the entity or relation being summarized + description_list: List of description strings to summarize + global_config: Global configuration containing tokenizer and limits + llm_response_cache: Optional cache for LLM responses + + Returns: + Final summarized description string + """ + # Handle empty input + if not description_list: + return "" + + # If only one description, return it directly (no need for LLM call) + if len(description_list) == 1: + return description_list[0] + + # Get configuration + tokenizer: Tokenizer = global_config["tokenizer"] + summary_max_tokens = global_config["summary_max_tokens"] + + current_list = description_list[:] # Copy the list to avoid modifying original + + # Iterative map-reduce process + while True: + # Calculate total tokens in current list + total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list) + + # If total length is within limits, perform final summarization + if ( + total_tokens <= summary_max_tokens + or len(current_list) < force_llm_summary_on_merge + ): + if len(current_list) < force_llm_summary_on_merge: + # Already the final result + final_description = seperator.join(current_list) + return final_description if final_description else "" + else: + # Final summarization of remaining descriptions + return await _summarize_descriptions( + entity_or_relation_name, + current_list, + global_config, + llm_response_cache, + ) + + # Need to split into chunks - Map phase + chunks = [] + current_chunk = [] + current_tokens = 0 + + for desc in current_list: + desc_tokens = len(tokenizer.encode(desc)) + + # If adding current description would exceed limit, finalize current chunk + if current_tokens + desc_tokens > summary_max_tokens and current_chunk: + chunks.append(current_chunk) + current_chunk = [desc] + current_tokens = desc_tokens + else: + current_chunk.append(desc) + current_tokens += desc_tokens + + # Add the last chunk if it exists + if current_chunk: + chunks.append(current_chunk) + + logger.info( + f"Summarizing {entity_or_relation_name}: split {len(current_list)} descriptions into {len(chunks)} groups" + ) + + # Reduce phase: summarize each chunk + new_summaries = [] + for chunk in chunks: + if len(chunk) == 1: + # Optimization: single description chunks don't need LLM summarization + new_summaries.append(chunk[0]) + else: + # Multiple descriptions need LLM summarization + summary = await _summarize_descriptions( + entity_or_relation_name, chunk, global_config, llm_response_cache + ) + new_summaries.append(summary) + + # Update current list with new summaries for next iteration + current_list = new_summaries + + +async def _summarize_descriptions( + entity_or_relation_name: str, + description_list: list[str], + global_config: dict, + llm_response_cache: BaseKVStorage | None = None, +) -> str: + """Helper function to summarize a list of descriptions using LLM. + + Args: + entity_or_relation_name: Name of the entity or relation being summarized + descriptions: List of description strings to summarize + global_config: Global configuration containing LLM function and settings + llm_response_cache: Optional cache for LLM responses + + Returns: + Summarized description string """ use_llm_func: callable = global_config["llm_model_func"] # Apply higher priority (8) to entity/relation summary tasks use_llm_func = partial(use_llm_func, _priority=8) - tokenizer: Tokenizer = global_config["tokenizer"] - llm_max_tokens = global_config["summary_max_tokens"] - language = global_config["addon_params"].get( "language", PROMPTS["DEFAULT_LANGUAGE"] ) - tokens = tokenizer.encode(description) - - ### summarize is not determined here anymore (It's determined by num_fragment now) - # if len(tokens) < summary_max_tokens: # No need for summary - # return description - prompt_template = PROMPTS["summarize_entity_descriptions"] - use_description = tokenizer.decode(tokens[:llm_max_tokens]) + + # Prepare context for the prompt context_base = dict( entity_name=entity_or_relation_name, - description_list=use_description.split(GRAPH_FIELD_SEP), + description_list="\n".join(description_list), language=language, ) use_prompt = prompt_template.format(**context_base) - logger.debug(f"Trigger summary: {entity_or_relation_name}") + + logger.debug( + f"Summarizing {len(description_list)} descriptions for: {entity_or_relation_name}" + ) # Use LLM function with cache (higher priority for summary generation) summary = await use_llm_func_with_cache( use_prompt, use_llm_func, llm_response_cache=llm_response_cache, - # max_tokens=summary_max_tokens, cache_type="extract", ) return summary @@ -413,7 +518,7 @@ async def _rebuild_knowledge_from_chunks( ) rebuilt_entities_count += 1 status_message = ( - f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks" + f"Entity `{entity_name}` rebuilt from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -453,7 +558,7 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks" + status_message = f"Relationship `{src}->{tgt}` rebuilt from {len(chunk_ids)} chunks" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -736,21 +841,20 @@ async def _rebuild_single_entity( edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP) file_paths.update(edge_file_paths) - # Generate description from relationships or fallback to current - if relationship_descriptions: - combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 + # deduplicate descriptions + description_list = list(dict.fromkeys(relationship_descriptions)) - if num_fragment >= force_llm_summary_on_merge: - final_description = await _handle_entity_relation_summary( - entity_name, - combined_description, - global_config, - llm_response_cache=llm_response_cache, - ) - else: - final_description = combined_description + # Generate description from relationships or fallback to current + if description_list: + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + final_description = await _handle_entity_relation_summary( + entity_name, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache=llm_response_cache, + ) else: final_description = current_entity.get("description", "") @@ -772,16 +876,9 @@ async def _rebuild_single_entity( file_paths.add(entity_data["file_path"]) # Remove duplicates while preserving order - descriptions = list(dict.fromkeys(descriptions)) + description_list = list(dict.fromkeys(descriptions)) entity_types = list(dict.fromkeys(entity_types)) - # Combine all descriptions - combined_description = ( - GRAPH_FIELD_SEP.join(descriptions) - if descriptions - else current_entity.get("description", "") - ) - # Get most common entity type entity_type = ( max(set(entity_types), key=entity_types.count) @@ -791,17 +888,17 @@ async def _rebuild_single_entity( # Generate final description and update storage force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: + if description_list: final_description = await _handle_entity_relation_summary( entity_name, - combined_description, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, ) else: - final_description = combined_description + final_description = current_entity.get("description", "") await _update_entity_storage(final_description, entity_type, file_paths) @@ -859,45 +956,38 @@ async def _rebuild_single_relationship( file_paths.add(rel_data["file_path"]) # Remove duplicates while preserving order - descriptions = list(dict.fromkeys(descriptions)) + description_list = list(dict.fromkeys(descriptions)) keywords = list(dict.fromkeys(keywords)) - # Combine descriptions and keywords (fallback to keep currunt unchanged) - combined_description = ( - GRAPH_FIELD_SEP.join(descriptions) - if descriptions - else current_relationship.get("description", "") - ) combined_keywords = ( ", ".join(set(keywords)) if keywords else current_relationship.get("keywords", "") ) - # weight = ( - # sum(weights) / len(weights) - # if weights - # else current_relationship.get("weight", 1.0) - # ) + weight = sum(weights) if weights else current_relationship.get("weight", 1.0) # Use summary if description has too many fragments force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1 - - if num_fragment >= force_llm_summary_on_merge: + if description_list: final_description = await _handle_entity_relation_summary( f"{src}-{tgt}", - combined_description, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, ) else: - final_description = combined_description + # fallback to keep current(unchanged) + final_description = current_relationship.get("description", "") # Update relationship in graph storage updated_relationship_data = { **current_relationship, - "description": final_description, + "description": final_description + if final_description + else current_relationship.get("description", ""), "keywords": combined_keywords, "weight": weight, "source_id": GRAPH_FIELD_SEP.join(chunk_ids), @@ -971,21 +1061,16 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] - description = GRAPH_FIELD_SEP.join( - already_description - + list( - dict.fromkeys( - [dp["description"] for dp in nodes_data if dp.get("description")] - ) - ) + description_list = already_description + list( + dict.fromkeys([dp["description"] for dp in nodes_data if dp.get("description")]) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + num_fragment = len(description_list) already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 - if num_fragment > 1: + if num_fragment > 0: if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" + status_message = f"LLM merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -993,17 +1078,23 @@ async def _merge_nodes_then_upsert( pipeline_status["history_messages"].append(status_message) description = await _handle_entity_relation_summary( entity_name, - description, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, global_config, llm_response_cache, ) else: - status_message = f"Merge N: {entity_name} | {already_fragment}+{num_fragment-already_fragment}" + status_message = f"Merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + description = GRAPH_FIELD_SEP.join(description_list) + else: + logger.error(f"Entity {entity_name} has no description") + description = "(no description)" source_id = GRAPH_FIELD_SEP.join( set([dp["source_id"] for dp in nodes_data] + already_source_ids) @@ -1084,21 +1175,16 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - description = GRAPH_FIELD_SEP.join( - already_description - + list( - dict.fromkeys( - [dp["description"] for dp in edges_data if dp.get("description")] - ) - ) + description_list = already_description + list( + dict.fromkeys([dp["description"] for dp in edges_data if dp.get("description")]) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + num_fragment = len(description_list) already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 - if num_fragment > 1: + if num_fragment > 0: if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + status_message = f"LLM merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -1106,17 +1192,23 @@ async def _merge_edges_then_upsert( pipeline_status["history_messages"].append(status_message) description = await _handle_entity_relation_summary( f"({src_id}, {tgt_id})", - description, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, global_config, llm_response_cache, ) else: - status_message = f"Merge E: {src_id} - {tgt_id} | {already_fragment}+{num_fragment-already_fragment}" + status_message = f"Merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = status_message pipeline_status["history_messages"].append(status_message) + description = GRAPH_FIELD_SEP.join(description_list) + else: + logger.error(f"Edge {src_id} - {tgt_id} has no description") + description = "(no description)" # Split all existing and new keywords into individual terms, then combine and deduplicate all_keywords = set() From 15cdd0dd8f0f6730a4f579952e12bef6a41836ff Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 21:41:33 +0800 Subject: [PATCH 05/16] fix: Sort cached extraction results by the create_time within each chunk This ensures the KG rebuilds maintain the original creation order of the first extraction result for each chunk. --- lightrag/operate.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 7ebb09df..779938af 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -630,14 +630,20 @@ async def _get_cached_extraction_results( ) -> dict[str, list[str]]: """Get cached extraction results for specific chunk IDs + This function retrieves cached LLM extraction results for the given chunk IDs and returns + them sorted by creation time. The results are sorted at two levels: + 1. Individual extraction results within each chunk are sorted by create_time (earliest first) + 2. Chunks themselves are sorted by the create_time of their earliest extraction result + Args: llm_response_cache: LLM response cache storage chunk_ids: Set of chunk IDs to get cached results for - text_chunks_data: Pre-loaded chunk data (optional, for performance) - text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None) + text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references Returns: - Dict mapping chunk_id -> list of extraction_result_text + Dict mapping chunk_id -> list of extraction_result_text, where: + - Keys (chunk_ids) are ordered by the create_time of their first extraction result + - Values (extraction results) are ordered by create_time within each chunk """ cached_results = {} @@ -646,15 +652,13 @@ async def _get_cached_extraction_results( # Read from storage chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids)) - for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list): + for chunk_data in chunk_data_list: if chunk_data and isinstance(chunk_data, dict): llm_cache_list = chunk_data.get("llm_cache_list", []) if llm_cache_list: all_cache_ids.update(llm_cache_list) else: - logger.warning( - f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}" - ) + logger.warning(f"Chunk data is invalid or None: {chunk_data}") if not all_cache_ids: logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs") @@ -665,7 +669,7 @@ async def _get_cached_extraction_results( # Process cache entries and group by chunk_id valid_entries = 0 - for cache_id, cache_entry in zip(all_cache_ids, cache_data_list): + for cache_entry in cache_data_list: if ( cache_entry is not None and isinstance(cache_entry, dict) @@ -685,16 +689,30 @@ async def _get_cached_extraction_results( # Store tuple with extraction result and creation time for sorting cached_results[chunk_id].append((extraction_result, create_time)) - # Sort extraction results by create_time for each chunk + # Sort extraction results by create_time for each chunk and collect earliest times + chunk_earliest_times = {} for chunk_id in cached_results: # Sort by create_time (x[1]), then extract only extraction_result (x[0]) cached_results[chunk_id].sort(key=lambda x: x[1]) + # Store the earliest create_time for this chunk (first item after sorting) + chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1] + # Extract only extraction_result (x[0]) cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]] - logger.info( - f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results" + # Sort cached_results by the earliest create_time of each chunk + sorted_chunk_ids = sorted( + chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id] ) - return cached_results + + # Rebuild cached_results in sorted order + sorted_cached_results = {} + for chunk_id in sorted_chunk_ids: + sorted_cached_results[chunk_id] = cached_results[chunk_id] + + logger.info( + f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results" + ) + return sorted_cached_results async def _parse_extraction_result( From 91767ffcee2194c512e5b7f5a987d4a8c23180a6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 25 Aug 2025 21:55:29 +0800 Subject: [PATCH 06/16] Improve warning message formatting in entity/relationship rebuild --- lightrag/operate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 779938af..5ad573cc 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -835,13 +835,13 @@ async def _rebuild_single_entity( if not all_entity_data: logger.warning( - f"No cached entity data found for {entity_name}, trying to rebuild from relationships" + f"No entity data found for `{entity_name}`, trying to rebuild from relationships" ) # Get all edges connected to this entity edges = await knowledge_graph_inst.get_node_edges(entity_name) if not edges: - logger.warning(f"No relationships found for entity {entity_name}") + logger.warning(f"No relations attached to entity `{entity_name}`") return # Collect relationship data to extract entity information @@ -954,7 +954,7 @@ async def _rebuild_single_relationship( ) if not all_relationship_data: - logger.warning(f"No cached relationship data found for {src}-{tgt}") + logger.warning(f"No relation data found for `{src}-{tgt}`") return # Merge descriptions and keywords From de2daf65653ce0d06a128aecc8b3faf492c1f74b Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 01:35:50 +0800 Subject: [PATCH 07/16] refac: Rename summary_max_tokens to summary_context_size, comprehensive parameter validation for summary configuration - Update algorithm logic in operate.py for better token management - Fix health endpoint to use correct parameter names --- README-zh.md | 9 +++++---- README.md | 3 ++- env.example | 11 ++++++----- lightrag/api/config.py | 13 +++++++++++-- lightrag/api/lightrag_server.py | 13 ++++++++----- lightrag/api/utils_api.py | 4 ++-- lightrag/constants.py | 6 ++++-- lightrag/lightrag.py | 21 +++++++++++++++++++++ lightrag/operate.py | 24 +++++++++++++----------- lightrag_webui/src/api/lightrag.ts | 1 - 10 files changed, 72 insertions(+), 33 deletions(-) diff --git a/README-zh.md b/README-zh.md index 8b7239ec..d3403b35 100644 --- a/README-zh.md +++ b/README-zh.md @@ -268,7 +268,8 @@ if __name__ == "__main__": | **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` | | **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` | | **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` | -| **summary_max_tokens** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `30000`(由环境变量 SUMMARY_MAX_TOKENS 设置) | +| **summary_context_size** | `int` | 合并实体关系摘要时送给LLM的最大令牌数 | `10000`(由环境变量 SUMMARY_MAX_CONTEXT 设置) | +| **summary_max_tokens** | `int` | 合并实体关系描述的最大令牌数长度 | `500`(由环境变量 SUMMARY_MAX_TOKENS 设置) | | **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`(默认值由环境变量MAX_ASYNC更改) | | **llm_model_kwargs** | `dict` | LLM生成的附加参数 | | | **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2(默认值由环境变量COSINE_THRESHOLD更改) | @@ -598,9 +599,9 @@ if __name__ == "__main__": 为了提高检索质量,可以根据更有效的相关性评分模型对文档进行重排序。`rerank.py`文件提供了三个Reranker提供商的驱动函数: -* **Cohere / vLLM**: `cohere_rerank` -* **Jina AI**: `jina_rerank` -* **Aliyun阿里云**: `ali_rerank` +* **Cohere / vLLM**: `cohere_rerank` +* **Jina AI**: `jina_rerank` +* **Aliyun阿里云**: `ali_rerank` 您可以将这些函数之一注入到LightRAG对象的`rerank_model_func`属性中。这将使LightRAG的查询功能能够使用注入的函数对检索到的文本块进行重新排序。有关详细用法,请参阅`examples/rerank_example.py`文件。 diff --git a/README.md b/README.md index eacb4982..5ad37f01 100644 --- a/README.md +++ b/README.md @@ -275,7 +275,8 @@ A full list of LightRAG init parameters: | **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` | | **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` | | **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` | -| **summary_max_tokens** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `30000`(configured by env var SUMMARY_MAX_TOKENS) | +| **summary_context_size** | `int` | Maximum tokens send to LLM to generate summaries for entity relation merging | `10000`(configured by env var SUMMARY_CONTEXT_SIZE) | +| **summary_max_tokens** | `int` | Maximum token size for entity/relation description | `500`(configured by env var SUMMARY_MAX_TOKENS) | | **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`(default value changed by env var MAX_ASYNC) | | **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | | | **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2(default value changed by env var COSINE_THRESHOLD) | diff --git a/env.example b/env.example index 41c77ede..a824a1f5 100644 --- a/env.example +++ b/env.example @@ -125,12 +125,13 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true ### Chunk size for document splitting, 500~1500 is recommended # CHUNK_SIZE=1200 # CHUNK_OVERLAP_SIZE=100 -### Entity and relation summarization configuration -### Number of duplicated entities/edges to trigger LLM re-summary on merge (at least 3 is recommented), and max tokens send to LLM + +### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented) # FORCE_LLM_SUMMARY_ON_MERGE=4 -# SUMMARY_MAX_TOKENS=30000 -### Maximum number of entity extraction attempts for ambiguous content -# MAX_GLEANING=1 +### Number of tokens to trigger LLM summary on entity/relation merge +# SUMMARY_MAX_TOKENS = 500 +### Maximum context size sent to LLM for description summary +# SUMMARY_CONTEXT_SIZE=10000 ############################### ### Concurrency Configuration diff --git a/lightrag/api/config.py b/lightrag/api/config.py index a5e352dc..f4a281a7 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -30,6 +30,7 @@ from lightrag.constants import ( DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, DEFAULT_MAX_ASYNC, DEFAULT_SUMMARY_MAX_TOKENS, + DEFAULT_SUMMARY_CONTEXT_SIZE, DEFAULT_SUMMARY_LANGUAGE, DEFAULT_EMBEDDING_FUNC_MAX_ASYNC, DEFAULT_EMBEDDING_BATCH_NUM, @@ -119,10 +120,18 @@ def parse_args() -> argparse.Namespace: help=f"Maximum async operations (default: from env or {DEFAULT_MAX_ASYNC})", ) parser.add_argument( - "--max-tokens", + "--summary-max-tokens", type=int, default=get_env_value("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS, int), - help=f"Maximum token size (default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})", + help=f"Maximum token size for entity/relation summary(default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})", + ) + parser.add_argument( + "--summary-context-size", + type=int, + default=get_env_value( + "SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE, int + ), + help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})", ) # Logging configuration diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ec1d38d5..2cb53fcd 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -2,7 +2,7 @@ LightRAG FastAPI Server """ -from fastapi import FastAPI, Depends, HTTPException, status +from fastapi import FastAPI, Depends, HTTPException import asyncio import os import logging @@ -472,7 +472,8 @@ def create_app(args): ), llm_model_name=args.llm_model, llm_model_max_async=args.max_async, - summary_max_tokens=args.max_tokens, + summary_max_tokens=args.summary_max_tokens, + summary_context_size=args.summary_context_size, chunk_token_size=int(args.chunk_size), chunk_overlap_token_size=int(args.chunk_overlap_size), llm_model_kwargs=( @@ -510,7 +511,8 @@ def create_app(args): chunk_overlap_token_size=int(args.chunk_overlap_size), llm_model_name=args.llm_model, llm_model_max_async=args.max_async, - summary_max_tokens=args.max_tokens, + summary_max_tokens=args.summary_max_tokens, + summary_context_size=args.summary_context_size, embedding_func=embedding_func, kv_storage=args.kv_storage, graph_storage=args.graph_storage, @@ -598,7 +600,7 @@ def create_app(args): username = form_data.username if auth_handler.accounts.get(username) != form_data.password: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials" + status_code=401, detail="Incorrect credentials" ) # Regular user login @@ -642,7 +644,8 @@ def create_app(args): "embedding_binding": args.embedding_binding, "embedding_binding_host": args.embedding_binding_host, "embedding_model": args.embedding_model, - "max_tokens": args.max_tokens, + "summary_max_tokens": args.summary_max_tokens, + "summary_context_size": args.summary_context_size, "kv_storage": args.kv_storage, "doc_status_storage": args.doc_status_storage, "graph_storage": args.graph_storage, diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index fc05716c..a53f8bee 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -242,8 +242,8 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.llm_model}") ASCIIColors.white(" ├─ Max Async for LLM: ", end="") ASCIIColors.yellow(f"{args.max_async}") - ASCIIColors.white(" ├─ Max Tokens: ", end="") - ASCIIColors.yellow(f"{args.max_tokens}") + ASCIIColors.white(" ├─ Summary Context Size: ", end="") + ASCIIColors.yellow(f"{args.summary_context_size}") ASCIIColors.white(" ├─ LLM Cache Enabled: ", end="") ASCIIColors.yellow(f"{args.enable_llm_cache}") ASCIIColors.white(" └─ LLM Cache for Extraction Enabled: ", end="") diff --git a/lightrag/constants.py b/lightrag/constants.py index 9445872e..c180e2dd 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -12,9 +12,11 @@ DEFAULT_MAX_GRAPH_NODES = 1000 # Default values for extraction settings DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries -DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4 DEFAULT_MAX_GLEANING = 1 -DEFAULT_SUMMARY_MAX_TOKENS = 30000 # Default maximum token size + +DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4 +DEFAULT_SUMMARY_MAX_TOKENS = 500 # Max token size for entity/relation summary +DEFAULT_SUMMARY_CONTEXT_SIZE = 10000 # Default maximum token size # Separator for graph fields GRAPH_FIELD_SEP = "" diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index fa529784..1d8c08ed 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -34,6 +34,7 @@ from lightrag.constants import ( DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_MIN_RERANK_SCORE, DEFAULT_SUMMARY_MAX_TOKENS, + DEFAULT_SUMMARY_CONTEXT_SIZE, DEFAULT_MAX_ASYNC, DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_GRAPH_NODES, @@ -285,6 +286,11 @@ class LightRAG: summary_max_tokens: int = field( default=int(os.getenv("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS)) ) + """Maximum tokens allowed for entity/relation description.""" + + summary_context_size: int = field( + default=int(os.getenv("SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE)) + ) """Maximum number of tokens allowed per LLM response.""" llm_model_max_async: int = field( @@ -416,6 +422,21 @@ class LightRAG: if self.ollama_server_infos is None: self.ollama_server_infos = OllamaServerInfos() + + # Validate config + if self.force_llm_summary_on_merge < 3: + logger.warning( + f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}" + ) + if self.summary_max_tokens * self.force_llm_summary_on_merge > self.summary_context_size: + logger.warning( + f"summary_context_size must be at least summary_max_tokens * force_llm_summary_on_merge, got {self.summary_context_size}" + ) + if self.summary_context_size > self.max_total_tokens: + logger.warning( + f"summary_context_size must be less than max_total_tokens, got {self.summary_context_size}" + ) + # Fix global_config now global_config = asdict(self) diff --git a/lightrag/operate.py b/lightrag/operate.py index 5ad573cc..e59e944d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -124,10 +124,11 @@ async def _handle_entity_relation_summary( """Handle entity relation description summary using map-reduce approach. This function summarizes a list of descriptions using a map-reduce strategy: - 1. If total tokens <= summary_max_tokens, summarize directly - 2. Otherwise, split descriptions into chunks that fit within token limits - 3. Summarize each chunk, then recursively process the summaries - 4. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge + 1. If total tokens < summary_context_size and len(description_list) < force_llm_summary_on_merge, no need to summarize + 2. If total tokens < summary_max_tokens, summarize with LLM directly + 3. Otherwise, split descriptions into chunks that fit within token limits + 4. Summarize each chunk, then recursively process the summaries + 5. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge Args: entity_or_relation_name: Name of the entity or relation being summarized @@ -148,6 +149,7 @@ async def _handle_entity_relation_summary( # Get configuration tokenizer: Tokenizer = global_config["tokenizer"] + summary_context_size = global_config["summary_context_size"] summary_max_tokens = global_config["summary_max_tokens"] current_list = description_list[:] # Copy the list to avoid modifying original @@ -158,11 +160,11 @@ async def _handle_entity_relation_summary( total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list) # If total length is within limits, perform final summarization - if ( - total_tokens <= summary_max_tokens - or len(current_list) < force_llm_summary_on_merge - ): - if len(current_list) < force_llm_summary_on_merge: + if total_tokens <= summary_context_size: + if ( + len(current_list) < force_llm_summary_on_merge + and total_tokens < summary_max_tokens + ): # Already the final result final_description = seperator.join(current_list) return final_description if final_description else "" @@ -184,9 +186,9 @@ async def _handle_entity_relation_summary( desc_tokens = len(tokenizer.encode(desc)) # If adding current description would exceed limit, finalize current chunk - if current_tokens + desc_tokens > summary_max_tokens and current_chunk: + if current_tokens + desc_tokens > summary_context_size and current_chunk: chunks.append(current_chunk) - current_chunk = [desc] + current_chunk = [desc] # Intial chunk for next group current_tokens = desc_tokens else: current_chunk.append(desc) diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index d2f23f12..265126c7 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -35,7 +35,6 @@ export type LightragStatus = { embedding_binding: string embedding_binding_host: string embedding_model: string - max_tokens: number kv_storage: string doc_status_storage: string graph_storage: string From cb0fe38b9aff9a9593e3cae6575d72af3c67ebbd Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 02:22:34 +0800 Subject: [PATCH 08/16] Fix linting --- lightrag/api/lightrag_server.py | 4 +--- lightrag/lightrag.py | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 2cb53fcd..28e13614 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -599,9 +599,7 @@ def create_app(args): } username = form_data.username if auth_handler.accounts.get(username) != form_data.password: - raise HTTPException( - status_code=401, detail="Incorrect credentials" - ) + raise HTTPException(status_code=401, detail="Incorrect credentials") # Regular user login user_token = auth_handler.create_token( diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 1d8c08ed..f5529bad 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -422,13 +422,15 @@ class LightRAG: if self.ollama_server_infos is None: self.ollama_server_infos = OllamaServerInfos() - # Validate config if self.force_llm_summary_on_merge < 3: logger.warning( f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}" ) - if self.summary_max_tokens * self.force_llm_summary_on_merge > self.summary_context_size: + if ( + self.summary_max_tokens * self.force_llm_summary_on_merge + > self.summary_context_size + ): logger.warning( f"summary_context_size must be at least summary_max_tokens * force_llm_summary_on_merge, got {self.summary_context_size}" ) From 9eb2be79b8788ad9f3706e5c34bb8ebd9a3636cb Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 03:56:18 +0800 Subject: [PATCH 09/16] feat: track actual LLM usage in entity/relation merging - Modified _handle_entity_relation_summary to return tuple[str, bool] - Updated merge functions to log "LLMmerg" vs "Merging" based on actual LLM usage - Replaced hardcoded fragment count prediction with real-time LLM usage tracking --- lightrag/operate.py | 171 +++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 80 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index e59e944d..e84abd48 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -120,7 +120,7 @@ async def _handle_entity_relation_summary( seperator: str, global_config: dict, llm_response_cache: BaseKVStorage | None = None, -) -> str: +) -> tuple[str, bool]: """Handle entity relation description summary using map-reduce approach. This function summarizes a list of descriptions using a map-reduce strategy: @@ -137,15 +137,15 @@ async def _handle_entity_relation_summary( llm_response_cache: Optional cache for LLM responses Returns: - Final summarized description string + Tuple of (final_summarized_description_string, llm_was_used_boolean) """ # Handle empty input if not description_list: - return "" + return "", False # If only one description, return it directly (no need for LLM call) if len(description_list) == 1: - return description_list[0] + return description_list[0], False # Get configuration tokenizer: Tokenizer = global_config["tokenizer"] @@ -153,6 +153,7 @@ async def _handle_entity_relation_summary( summary_max_tokens = global_config["summary_max_tokens"] current_list = description_list[:] # Copy the list to avoid modifying original + llm_was_used = False # Track whether LLM was used during the entire process # Iterative map-reduce process while True: @@ -165,17 +166,18 @@ async def _handle_entity_relation_summary( len(current_list) < force_llm_summary_on_merge and total_tokens < summary_max_tokens ): - # Already the final result + # Already the final result - no LLM needed final_description = seperator.join(current_list) - return final_description if final_description else "" + return final_description if final_description else "", llm_was_used else: - # Final summarization of remaining descriptions - return await _summarize_descriptions( + # Final summarization of remaining descriptions - LLM will be used + final_summary = await _summarize_descriptions( entity_or_relation_name, current_list, global_config, llm_response_cache, ) + return final_summary, True # LLM was used for final summarization # Need to split into chunks - Map phase chunks = [] @@ -188,7 +190,7 @@ async def _handle_entity_relation_summary( # If adding current description would exceed limit, finalize current chunk if current_tokens + desc_tokens > summary_context_size and current_chunk: chunks.append(current_chunk) - current_chunk = [desc] # Intial chunk for next group + current_chunk = [desc] # Initial chunk for next group current_tokens = desc_tokens else: current_chunk.append(desc) @@ -214,6 +216,7 @@ async def _handle_entity_relation_summary( entity_or_relation_name, chunk, global_config, llm_response_cache ) new_summaries.append(summary) + llm_was_used = True # Mark that LLM was used in reduce phase # Update current list with new summaries for next iteration current_list = new_summaries @@ -529,7 +532,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 - status_message = f"Failed to rebuild entity {entity_name}: {e}" + status_message = f"Failed to rebuild entity `{entity_name}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -560,7 +563,7 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Relationship `{src}->{tgt}` rebuilt from {len(chunk_ids)} chunks" + status_message = f"Relationship `{src} - {tgt}` rebuilt from {len(chunk_ids)} chunks" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -568,7 +571,9 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}" + status_message = ( + f"Failed to rebuild relationship `{src} - {tgt}`: {e}" + ) logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -867,7 +872,7 @@ async def _rebuild_single_entity( # Generate description from relationships or fallback to current if description_list: force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( entity_name, description_list, force_llm_summary_on_merge, @@ -909,7 +914,7 @@ async def _rebuild_single_entity( # Generate final description and update storage force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( entity_name, description_list, force_llm_summary_on_merge, @@ -990,7 +995,7 @@ async def _rebuild_single_relationship( # Use summary if description has too many fragments force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: - final_description = await _handle_entity_relation_summary( + final_description, _ = await _handle_entity_relation_summary( f"{src}-{tgt}", description_list, force_llm_summary_on_merge, @@ -1065,13 +1070,9 @@ async def _merge_nodes_then_upsert( already_node = await knowledge_graph_inst.get_node(entity_name) if already_node: already_entity_types.append(already_node["entity_type"]) - already_source_ids.extend( - split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP]) - ) - already_file_paths.extend( - split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP]) - ) - already_description.append(already_node["description"]) + already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP)) + already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP)) + already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP)) entity_type = sorted( Counter( @@ -1079,39 +1080,45 @@ async def _merge_nodes_then_upsert( ).items(), key=lambda x: x[1], reverse=True, - )[0][0] + )[0][0] # Get the entity type with the highest count - description_list = already_description + list( - dict.fromkeys([dp["description"] for dp in nodes_data if dp.get("description")]) + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in nodes_data if dp.get("description")] + ) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) - already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(nodes_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" if num_fragment > 0: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - entity_name, - description_list, - force_llm_summary_on_merge, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + entity_name, + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" else: - status_message = f"Merging `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = GRAPH_FIELD_SEP.join(description_list) + status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) else: logger.error(f"Entity {entity_name} has no description") description = "(no description)" @@ -1167,22 +1174,20 @@ async def _merge_edges_then_upsert( # Get source_id with empty string default if missing or None if already_edge.get("source_id") is not None: already_source_ids.extend( - split_string_by_multi_markers( - already_edge["source_id"], [GRAPH_FIELD_SEP] - ) + already_edge["source_id"].split(GRAPH_FIELD_SEP) ) # Get file_path with empty string default if missing or None if already_edge.get("file_path") is not None: already_file_paths.extend( - split_string_by_multi_markers( - already_edge["file_path"], [GRAPH_FIELD_SEP] - ) + already_edge["file_path"].split(GRAPH_FIELD_SEP) ) # Get description with empty string default if missing or None if already_edge.get("description") is not None: - already_description.append(already_edge["description"]) + already_description.extend( + already_edge["description"].split(GRAPH_FIELD_SEP) + ) # Get keywords with empty string default if missing or None if already_edge.get("keywords") is not None: @@ -1195,37 +1200,43 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - description_list = already_description + list( - dict.fromkeys([dp["description"] for dp in edges_data if dp.get("description")]) + description_list = list( + dict.fromkeys( + already_description + + [dp["description"] for dp in edges_data if dp.get("description")] + ) ) force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) - already_fragment = already_description.count(GRAPH_FIELD_SEP) + 1 + already_fragment = len(already_description) + deduplicated_num = already_fragment + len(edges_data) - num_fragment + if deduplicated_num > 0: + dd_message = f"(dd:{deduplicated_num})" + else: + dd_message = "" if num_fragment > 0: - if num_fragment >= force_llm_summary_on_merge: - status_message = f"LLM merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description_list, - force_llm_summary_on_merge, - GRAPH_FIELD_SEP, - global_config, - llm_response_cache, - ) + # Get summary and LLM usage status + description, llm_was_used = await _handle_entity_relation_summary( + f"({src_id}, {tgt_id})", + description_list, + force_llm_summary_on_merge, + GRAPH_FIELD_SEP, + global_config, + llm_response_cache, + ) + + # Log based on actual LLM usage + if llm_was_used: + status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" else: - status_message = f"Merging `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - description = GRAPH_FIELD_SEP.join(description_list) + status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}" + + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) else: logger.error(f"Edge {src_id} - {tgt_id} has no description") description = "(no description)" From 84416d104d988931796512a3a00666e26586b5b1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 03:57:35 +0800 Subject: [PATCH 10/16] Increase default LLM summary merge threshold from 4 to 8 for reducing summary trigger frequency --- env.example | 2 +- lightrag/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/env.example b/env.example index a824a1f5..a64626ef 100644 --- a/env.example +++ b/env.example @@ -127,7 +127,7 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true # CHUNK_OVERLAP_SIZE=100 ### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented) -# FORCE_LLM_SUMMARY_ON_MERGE=4 +# FORCE_LLM_SUMMARY_ON_MERGE=8 ### Number of tokens to trigger LLM summary on entity/relation merge # SUMMARY_MAX_TOKENS = 500 ### Maximum context size sent to LLM for description summary diff --git a/lightrag/constants.py b/lightrag/constants.py index c180e2dd..0e6d6dcd 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -14,7 +14,7 @@ DEFAULT_MAX_GRAPH_NODES = 1000 DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries DEFAULT_MAX_GLEANING = 1 -DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4 +DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 DEFAULT_SUMMARY_MAX_TOKENS = 500 # Max token size for entity/relation summary DEFAULT_SUMMARY_CONTEXT_SIZE = 10000 # Default maximum token size From 025f70089a22d1bed324bbd0ceb72945857369cb Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 04:26:15 +0800 Subject: [PATCH 11/16] Simplify status messages in knowledge rebuild operations --- lightrag/operate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index e84abd48..6820401c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -523,7 +523,7 @@ async def _rebuild_knowledge_from_chunks( ) rebuilt_entities_count += 1 status_message = ( - f"Entity `{entity_name}` rebuilt from {len(chunk_ids)} chunks" + f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: @@ -532,7 +532,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_entities_count += 1 - status_message = f"Failed to rebuild entity `{entity_name}`: {e}" + status_message = f"Failed to rebuild `{entity_name}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -563,7 +563,7 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Relationship `{src} - {tgt}` rebuilt from {len(chunk_ids)} chunks" + status_message = f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -572,7 +572,7 @@ async def _rebuild_knowledge_from_chunks( except Exception as e: failed_relationships_count += 1 status_message = ( - f"Failed to rebuild relationship `{src} - {tgt}`: {e}" + f"Failed to rebuild `{src} - {tgt}`: {e}" ) logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: From 6bcfe696ee7c0caf55ffbdbe6723045f07a77030 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 14:41:12 +0800 Subject: [PATCH 12/16] feat: add output length recommendation and description type to LLM summary - Add SUMMARY_LENGTH_RECOMMENDED parameter (600 tokens) - Optimize prompt temple for LLM summary --- env.example | 8 +++++--- lightrag/api/config.py | 9 +++++++++ lightrag/constants.py | 9 +++++++-- lightrag/lightrag.py | 14 +++++++++++++- lightrag/operate.py | 36 ++++++++++++++++++++++++++---------- lightrag/prompt.py | 30 ++++++++++++++++++------------ 6 files changed, 78 insertions(+), 28 deletions(-) diff --git a/env.example b/env.example index a64626ef..3ce6d1d9 100644 --- a/env.example +++ b/env.example @@ -128,10 +128,12 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true ### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented) # FORCE_LLM_SUMMARY_ON_MERGE=8 -### Number of tokens to trigger LLM summary on entity/relation merge -# SUMMARY_MAX_TOKENS = 500 +### Max description token size to trigger LLM summary +# SUMMARY_MAX_TOKENS = 1200 +### Recommended LLM summary output length in tokens +# SUMMARY_LENGTH_RECOMMENDED_=600 ### Maximum context size sent to LLM for description summary -# SUMMARY_CONTEXT_SIZE=10000 +# SUMMARY_CONTEXT_SIZE=12000 ############################### ### Concurrency Configuration diff --git a/lightrag/api/config.py b/lightrag/api/config.py index f4a281a7..a6badec4 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -30,6 +30,7 @@ from lightrag.constants import ( DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, DEFAULT_MAX_ASYNC, DEFAULT_SUMMARY_MAX_TOKENS, + DEFAULT_SUMMARY_LENGTH_RECOMMENDED, DEFAULT_SUMMARY_CONTEXT_SIZE, DEFAULT_SUMMARY_LANGUAGE, DEFAULT_EMBEDDING_FUNC_MAX_ASYNC, @@ -133,6 +134,14 @@ def parse_args() -> argparse.Namespace: ), help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})", ) + parser.add_argument( + "--summary-length-recommended", + type=int, + default=get_env_value( + "SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED, int + ), + help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_LENGTH_RECOMMENDED})", + ) # Logging configuration parser.add_argument( diff --git a/lightrag/constants.py b/lightrag/constants.py index 0e6d6dcd..2f493277 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -14,9 +14,14 @@ DEFAULT_MAX_GRAPH_NODES = 1000 DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries DEFAULT_MAX_GLEANING = 1 +# Number of description fragments to trigger LLM summary DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 -DEFAULT_SUMMARY_MAX_TOKENS = 500 # Max token size for entity/relation summary -DEFAULT_SUMMARY_CONTEXT_SIZE = 10000 # Default maximum token size +# Max description token size to trigger LLM summary +DEFAULT_SUMMARY_MAX_TOKENS = 1200 +# Recommended LLM summary output length in tokens +DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600 +# Maximum token size sent to LLM for summary +DEFAULT_SUMMARY_CONTEXT_SIZE = 12000 # Separator for graph fields GRAPH_FIELD_SEP = "" diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f5529bad..f4dc9dd4 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -35,6 +35,7 @@ from lightrag.constants import ( DEFAULT_MIN_RERANK_SCORE, DEFAULT_SUMMARY_MAX_TOKENS, DEFAULT_SUMMARY_CONTEXT_SIZE, + DEFAULT_SUMMARY_LENGTH_RECOMMENDED, DEFAULT_MAX_ASYNC, DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_GRAPH_NODES, @@ -293,6 +294,13 @@ class LightRAG: ) """Maximum number of tokens allowed per LLM response.""" + summary_length_recommended: int = field( + default=int( + os.getenv("SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED) + ) + ) + """Recommended length of LLM summary output.""" + llm_model_max_async: int = field( default=int(os.getenv("MAX_ASYNC", DEFAULT_MAX_ASYNC)) ) @@ -435,9 +443,13 @@ class LightRAG: f"summary_context_size must be at least summary_max_tokens * force_llm_summary_on_merge, got {self.summary_context_size}" ) if self.summary_context_size > self.max_total_tokens: - logger.warning( + logger.error( f"summary_context_size must be less than max_total_tokens, got {self.summary_context_size}" ) + if self.summary_length_recommended > self.summary_max_tokens: + logger.warning( + f"summary_length_recommended should less than max_total_tokens, got {self.summary_length_recommended}" + ) # Fix global_config now global_config = asdict(self) diff --git a/lightrag/operate.py b/lightrag/operate.py index 6820401c..17dfa58c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -114,6 +114,7 @@ def chunking_by_token_size( async def _handle_entity_relation_summary( + description_type: str, entity_or_relation_name: str, description_list: list[str], force_llm_summary_on_merge: int, @@ -172,6 +173,7 @@ async def _handle_entity_relation_summary( else: # Final summarization of remaining descriptions - LLM will be used final_summary = await _summarize_descriptions( + description_type, entity_or_relation_name, current_list, global_config, @@ -213,7 +215,11 @@ async def _handle_entity_relation_summary( else: # Multiple descriptions need LLM summarization summary = await _summarize_descriptions( - entity_or_relation_name, chunk, global_config, llm_response_cache + description_type, + entity_or_relation_name, + chunk, + global_config, + llm_response_cache, ) new_summaries.append(summary) llm_was_used = True # Mark that LLM was used in reduce phase @@ -223,7 +229,8 @@ async def _handle_entity_relation_summary( async def _summarize_descriptions( - entity_or_relation_name: str, + description_type: str, + description_name: str, description_list: list[str], global_config: dict, llm_response_cache: BaseKVStorage | None = None, @@ -247,18 +254,22 @@ async def _summarize_descriptions( "language", PROMPTS["DEFAULT_LANGUAGE"] ) + summary_length_recommended = global_config["summary_length_recommended"] + prompt_template = PROMPTS["summarize_entity_descriptions"] # Prepare context for the prompt context_base = dict( - entity_name=entity_or_relation_name, - description_list="\n".join(description_list), + description_type=description_type, + description_name=description_name, + description_list="\n\n".join(description_list), + summary_length=summary_length_recommended, language=language, ) use_prompt = prompt_template.format(**context_base) logger.debug( - f"Summarizing {len(description_list)} descriptions for: {entity_or_relation_name}" + f"Summarizing {len(description_list)} descriptions for: {description_name}" ) # Use LLM function with cache (higher priority for summary generation) @@ -563,7 +574,9 @@ async def _rebuild_knowledge_from_chunks( global_config=global_config, ) rebuilt_relationships_count += 1 - status_message = f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" + status_message = ( + f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks" + ) logger.info(status_message) if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -571,9 +584,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) except Exception as e: failed_relationships_count += 1 - status_message = ( - f"Failed to rebuild `{src} - {tgt}`: {e}" - ) + status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" logger.info(status_message) # Per requirement, change to info if pipeline_status is not None and pipeline_status_lock is not None: async with pipeline_status_lock: @@ -873,6 +884,7 @@ async def _rebuild_single_entity( if description_list: force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] final_description, _ = await _handle_entity_relation_summary( + "Entity", entity_name, description_list, force_llm_summary_on_merge, @@ -915,6 +927,7 @@ async def _rebuild_single_entity( force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: final_description, _ = await _handle_entity_relation_summary( + "Entity", entity_name, description_list, force_llm_summary_on_merge, @@ -996,6 +1009,7 @@ async def _rebuild_single_relationship( force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if description_list: final_description, _ = await _handle_entity_relation_summary( + "Relation", f"{src}-{tgt}", description_list, force_llm_summary_on_merge, @@ -1100,6 +1114,7 @@ async def _merge_nodes_then_upsert( if num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( + "Entity", entity_name, description_list, force_llm_summary_on_merge, @@ -1218,6 +1233,7 @@ async def _merge_edges_then_upsert( if num_fragment > 0: # Get summary and LLM usage status description, llm_was_used = await _handle_entity_relation_summary( + "Relation", f"({src_id}, {tgt_id})", description_list, force_llm_summary_on_merge, @@ -1595,7 +1611,7 @@ async def merge_nodes_and_edges( ) # Don't raise exception to avoid affecting main flow - log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} added entities, {len(processed_edges)} relations" + log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} extra entities, {len(processed_edges)} relations" logger.info(log_message) async with pipeline_status_lock: pipeline_status["latest_message"] = log_message diff --git a/lightrag/prompt.py b/lightrag/prompt.py index 32666bb5..c8f8c3a3 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -133,20 +133,26 @@ Output: #############################""", ] -PROMPTS[ - "summarize_entity_descriptions" -] = """You are a helpful assistant responsible for generating a comprehensive summary of the data provided below. -Given one or two entities, and a list of descriptions, all related to the same entity or group of entities. -Please concatenate all of these into a single, comprehensive description. Make sure to include information collected from all the descriptions. -If the provided descriptions are contradictory, please resolve the contradictions and provide a single, coherent summary. -Make sure it is written in third person, and include the entity names so we the have full context. -Use {language} as output language. +PROMPTS["summarize_entity_descriptions"] = """---Role--- +You are a Knowledge Graph Specialist responsible for data curation and synthesis. + +---Task--- +Your task is to synthesize a list of descriptions of a given entity or relation into a single, comprehensive, and cohesive summary. + +---Instructions--- +1. **Comprehensiveness:** The summary must integrate key information from all provided descriptions. Do not omit important facts. +2. **Consistency:** If the descriptions contain contradictions, you must resolve them to produce a logically consistent summary. If a contradiction cannot be resolved, phrase the information neutrally. +3. **Context:** The summary must explicitly mention the name of the entity or relation for full context. +4. **Style:** The output must be written from an objective, third-person perspective. +5. **Conciseness:** Be concise and avoid redundancy. The summary's length must not exceed {summary_length} tokens. +6. **Language:** The entire output must be written in {language}. -####### ---Data--- -Entities: {entity_name} -Description List: {description_list} -####### +{description_type} Name: {description_name} +Description List: +{description_list} + +---Output--- Output: """ From 01a2c79f29ceab3cbf9cdcbff2e8ec33a2abd245 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 14:42:52 +0800 Subject: [PATCH 13/16] Standardize prompt formatting and section headers across templates - Remove hash delimiters - Consistent section headers - Add "Output:" labels - Clean up example formatting --- lightrag/prompt.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/lightrag/prompt.py b/lightrag/prompt.py index c8f8c3a3..ce78f8a6 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -40,22 +40,19 @@ Format the content-level key words as ("content_keywords"{tuple_delimiter} Date: Tue, 26 Aug 2025 18:02:39 +0800 Subject: [PATCH 14/16] Refactor: move force_llm_summary_on_merge to global_config access - Remove parameter from function signature - Access from global_config instead - Improve code consistency --- lightrag/operate.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 17dfa58c..ee17ab56 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -117,7 +117,6 @@ async def _handle_entity_relation_summary( description_type: str, entity_or_relation_name: str, description_list: list[str], - force_llm_summary_on_merge: int, seperator: str, global_config: dict, llm_response_cache: BaseKVStorage | None = None, @@ -152,6 +151,7 @@ async def _handle_entity_relation_summary( tokenizer: Tokenizer = global_config["tokenizer"] summary_context_size = global_config["summary_context_size"] summary_max_tokens = global_config["summary_max_tokens"] + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] current_list = description_list[:] # Copy the list to avoid modifying original llm_was_used = False # Track whether LLM was used during the entire process @@ -880,14 +880,12 @@ async def _rebuild_single_entity( # deduplicate descriptions description_list = list(dict.fromkeys(relationship_descriptions)) - # Generate description from relationships or fallback to current + # Generate final description from relationships or fallback to current if description_list: - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] final_description, _ = await _handle_entity_relation_summary( "Entity", entity_name, description_list, - force_llm_summary_on_merge, GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, @@ -923,14 +921,12 @@ async def _rebuild_single_entity( else current_entity.get("entity_type", "UNKNOWN") ) - # Generate final description and update storage - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + # Generate final description from entities or fallback to current if description_list: final_description, _ = await _handle_entity_relation_summary( "Entity", entity_name, description_list, - force_llm_summary_on_merge, GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, @@ -1005,14 +1001,12 @@ async def _rebuild_single_relationship( weight = sum(weights) if weights else current_relationship.get("weight", 1.0) - # Use summary if description has too many fragments - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] + # Generate final description from relations or fallback to current if description_list: final_description, _ = await _handle_entity_relation_summary( "Relation", f"{src}-{tgt}", description_list, - force_llm_summary_on_merge, GRAPH_FIELD_SEP, global_config, llm_response_cache=llm_response_cache, @@ -1096,6 +1090,7 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] # Get the entity type with the highest count + # merge and deduplicate description description_list = list( dict.fromkeys( already_description @@ -1103,7 +1098,6 @@ async def _merge_nodes_then_upsert( ) ) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) already_fragment = len(already_description) deduplicated_num = already_fragment + len(nodes_data) - num_fragment @@ -1117,7 +1111,6 @@ async def _merge_nodes_then_upsert( "Entity", entity_name, description_list, - force_llm_summary_on_merge, GRAPH_FIELD_SEP, global_config, llm_response_cache, @@ -1222,7 +1215,6 @@ async def _merge_edges_then_upsert( ) ) - force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] num_fragment = len(description_list) already_fragment = len(already_description) deduplicated_num = already_fragment + len(edges_data) - num_fragment @@ -1236,7 +1228,6 @@ async def _merge_edges_then_upsert( "Relation", f"({src_id}, {tgt_id})", description_list, - force_llm_summary_on_merge, GRAPH_FIELD_SEP, global_config, llm_response_cache, From e0a755e42cef804a8c6be3dfea8e48a805a8a523 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 18:28:57 +0800 Subject: [PATCH 15/16] Refactor prompt instructions to emphasize depth and completeness --- lightrag/prompt.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lightrag/prompt.py b/lightrag/prompt.py index ce78f8a6..586f7ae6 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -141,11 +141,10 @@ Your task is to synthesize a list of descriptions of a given entity or relation ---Instructions--- 1. **Comprehensiveness:** The summary must integrate key information from all provided descriptions. Do not omit important facts. -2. **Consistency:** If the descriptions contain contradictions, you must resolve them to produce a logically consistent summary. If a contradiction cannot be resolved, phrase the information neutrally. -3. **Context:** The summary must explicitly mention the name of the entity or relation for full context. -4. **Style:** The output must be written from an objective, third-person perspective. -5. **Conciseness:** Be concise and avoid redundancy. The summary's length must not exceed {summary_length} tokens. -6. **Language:** The entire output must be written in {language}. +2. **Context:** The summary must explicitly mention the name of the entity or relation for full context. +3. **Style:** The output must be written from an objective, third-person perspective. +4. **Length:** Maintain depth and completeness while ensuring the summary's length not exceed {summary_length} tokens. +5. **Language:** The entire output must be written in {language}. ---Data--- {description_type} Name: {description_name} From d3623cc9ae84de67cfb417800f7f512a86533442 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 26 Aug 2025 21:58:31 +0800 Subject: [PATCH 16/16] fix: resolve infinite loop risk in _handle_entity_relation_summary - Ensure oversized descriptions are force-merged with subsequent ones - Add len(current_list) <= 2 termination condition to guarantee convergence - Implement token-based truncation in _summarize_descriptions to prevent overflow --- lightrag/lightrag.py | 13 +++---------- lightrag/operate.py | 46 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f4dc9dd4..8dbff74f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -435,20 +435,13 @@ class LightRAG: logger.warning( f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}" ) - if ( - self.summary_max_tokens * self.force_llm_summary_on_merge - > self.summary_context_size - ): - logger.warning( - f"summary_context_size must be at least summary_max_tokens * force_llm_summary_on_merge, got {self.summary_context_size}" - ) if self.summary_context_size > self.max_total_tokens: - logger.error( - f"summary_context_size must be less than max_total_tokens, got {self.summary_context_size}" + logger.warning( + f"summary_context_size({self.summary_context_size}) should no greater than max_total_tokens({self.max_total_tokens})" ) if self.summary_length_recommended > self.summary_max_tokens: logger.warning( - f"summary_length_recommended should less than max_total_tokens, got {self.summary_length_recommended}" + f"max_total_tokens({self.summary_max_tokens}) should greater than summary_length_recommended({self.summary_length_recommended})" ) # Fix global_config now diff --git a/lightrag/operate.py b/lightrag/operate.py index ee17ab56..486f7e69 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -162,15 +162,19 @@ async def _handle_entity_relation_summary( total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list) # If total length is within limits, perform final summarization - if total_tokens <= summary_context_size: + if total_tokens <= summary_context_size or len(current_list) <= 2: if ( len(current_list) < force_llm_summary_on_merge and total_tokens < summary_max_tokens ): - # Already the final result - no LLM needed + # no LLM needed, just join the descriptions final_description = seperator.join(current_list) return final_description if final_description else "", llm_was_used else: + if total_tokens > summary_context_size and len(current_list) <= 2: + logger.warning( + f"Summarizing {entity_or_relation_name}: Oversize descpriton found" + ) # Final summarization of remaining descriptions - LLM will be used final_summary = await _summarize_descriptions( description_type, @@ -182,18 +186,31 @@ async def _handle_entity_relation_summary( return final_summary, True # LLM was used for final summarization # Need to split into chunks - Map phase + # Ensure each chunk has minimum 2 descriptions to guarantee progress chunks = [] current_chunk = [] current_tokens = 0 - for desc in current_list: + # Currently least 3 descriptions in current_list + for i, desc in enumerate(current_list): desc_tokens = len(tokenizer.encode(desc)) # If adding current description would exceed limit, finalize current chunk if current_tokens + desc_tokens > summary_context_size and current_chunk: - chunks.append(current_chunk) - current_chunk = [desc] # Initial chunk for next group - current_tokens = desc_tokens + # Ensure we have at least 2 descriptions in the chunk (when possible) + if len(current_chunk) == 1: + # Force add one more description to ensure minimum 2 per chunk + current_chunk.append(desc) + chunks.append(current_chunk) + logger.warning( + f"Summarizing {entity_or_relation_name}: Oversize descpriton found" + ) + current_chunk = [] # next group is empty + current_tokens = 0 + else: # curren_chunk is ready for summary in reduce phase + chunks.append(current_chunk) + current_chunk = [desc] # leave it for next group + current_tokens = desc_tokens else: current_chunk.append(desc) current_tokens += desc_tokens @@ -203,10 +220,10 @@ async def _handle_entity_relation_summary( chunks.append(current_chunk) logger.info( - f"Summarizing {entity_or_relation_name}: split {len(current_list)} descriptions into {len(chunks)} groups" + f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups" ) - # Reduce phase: summarize each chunk + # Reduce phase: summarize each group from chunks new_summaries = [] for chunk in chunks: if len(chunk) == 1: @@ -258,11 +275,22 @@ async def _summarize_descriptions( prompt_template = PROMPTS["summarize_entity_descriptions"] + # Join descriptions and apply token-based truncation if necessary + joined_descriptions = "\n\n".join(description_list) + tokenizer = global_config["tokenizer"] + summary_context_size = global_config["summary_context_size"] + + # Token-based truncation to ensure input fits within limits + tokens = tokenizer.encode(joined_descriptions) + if len(tokens) > summary_context_size: + truncated_tokens = tokens[:summary_context_size] + joined_descriptions = tokenizer.decode(truncated_tokens) + # Prepare context for the prompt context_base = dict( description_type=description_type, description_name=description_name, - description_list="\n\n".join(description_list), + description_list=joined_descriptions, summary_length=summary_length_recommended, language=language, )