diff --git a/env.example b/env.example index 6d53c390..3529cf58 100644 --- a/env.example +++ b/env.example @@ -73,11 +73,14 @@ ENABLE_LLM_CACHE=true # MAX_RELATION_TOKENS=8000 ### control the maximum tokens send to LLM (include entities, relations and chunks) # MAX_TOTAL_TOKENS=30000 + ### control the maximum chunk_ids stored in vector and graph db # MAX_SOURCE_IDS_PER_ENTITY=300 # MAX_SOURCE_IDS_PER_RELATION=300 ### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks) # SOURCE_IDS_LIMIT_METHOD=KEEP +### Maximum number of file paths stored in entity/relation file_path field +# MAX_FILE_PATHS=30 ### maximum number of related chunks per source entity or relation ### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) diff --git a/lightrag/constants.py b/lightrag/constants.py index e374a991..62ca1888 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -14,16 +14,6 @@ DEFAULT_MAX_GRAPH_NODES = 1000 DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing DEFAULT_MAX_GLEANING = 1 -DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3 -DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3 -SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP" -SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO" -DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP -VALID_SOURCE_IDS_LIMIT_METHODS = { - SOURCE_IDS_LIMIT_METHOD_KEEP, - SOURCE_IDS_LIMIT_METHOD_FIFO, -} - # Number of description fragments to trigger LLM summary DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 # Max description token size to trigger LLM summary @@ -67,8 +57,24 @@ DEFAULT_HISTORY_TURNS = 0 DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_RERANK_BINDING = "null" -# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) +# Default source ids limit in meta data for entity and relation +DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3 +DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3 +SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP" +SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO" +DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP +VALID_SOURCE_IDS_LIMIT_METHODS = { + SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, +} +# Default file_path limit in meta data for entity and relation +DEFAULT_MAX_FILE_PATHS = 2 + +# Field length of file_path in Milvus Schema for entity and relation (Should not be changed) +# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata. DEFAULT_MAX_FILE_PATH_LENGTH = 32768 +# Placeholder for more file paths in meta data for entity and relation (Should not be changed) +DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated" # Default temperature for LLM DEFAULT_TEMPERATURE = 1.0 diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 1f32da50..4380a276 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -47,6 +47,8 @@ from lightrag.constants import ( DEFAULT_LLM_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_SOURCE_IDS_LIMIT_METHOD, + DEFAULT_MAX_FILE_PATHS, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) from lightrag.utils import get_env_value @@ -393,6 +395,14 @@ class LightRAG: ) """Strategy for enforcing source_id limits: IGNORE_NEW or FIFO.""" + max_file_paths: int = field( + default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int) + ) + """Maximum number of file paths to store in entity/relation file_path field.""" + + file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER) + """Placeholder text when file paths exceed max_file_paths limit.""" + addon_params: dict[str, Any] = field( default_factory=lambda: { "language": get_env_value( diff --git a/lightrag/operate.py b/lightrag/operate.py index 2f7f6340..6b409f21 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -26,7 +26,6 @@ from lightrag.utils import ( pick_by_weighted_polling, pick_by_vector_similarity, process_chunks_unified, - build_file_path, safe_vdb_operation_with_exception, create_prefixed_exception, fix_tuple_delimiter_corruption, @@ -56,6 +55,8 @@ from lightrag.constants import ( DEFAULT_ENTITY_TYPES, DEFAULT_SUMMARY_LANGUAGE, SOURCE_IDS_LIMIT_METHOD_KEEP, + SOURCE_IDS_LIMIT_METHOD_FIFO, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) from lightrag.kg.shared_storage import get_storage_keyed_lock import time @@ -1156,7 +1157,8 @@ async def _rebuild_single_entity( # Process cached entity data descriptions = [] entity_types = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for entity_data in all_entity_data: if entity_data.get("description"): @@ -1164,7 +1166,35 @@ async def _rebuild_single_entity( if entity_data.get("entity_type"): entity_types.append(entity_data["entity_type"]) if entity_data.get("file_path"): - file_paths.add(entity_data["file_path"]) + file_path = entity_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply MAX_FILE_PATHS limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1284,7 +1314,8 @@ async def _rebuild_single_relationship( descriptions = [] keywords = [] weights = [] - file_paths = set() + file_paths_list = [] + seen_paths = set() for rel_data in all_relationship_data: if rel_data.get("description"): @@ -1294,7 +1325,35 @@ async def _rebuild_single_relationship( if rel_data.get("weight"): weights.append(rel_data["weight"]) if rel_data.get("file_path"): - file_paths.add(rel_data["file_path"]) + file_path = rel_data["file_path"] + if file_path and file_path not in seen_paths: + file_paths_list.append(file_path) + seen_paths.add(file_path) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + limit_method = global_config.get("source_ids_limit_method") + + original_count = len(file_paths_list) + if original_count > max_file_paths: + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_paths = set(file_paths_list) # Remove duplicates while preserving order description_list = list(dict.fromkeys(descriptions)) @@ -1467,23 +1526,22 @@ async def _merge_nodes_then_upsert( } ) - limit_method = ( - global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP - ) + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_entity") source_ids = apply_source_ids_limit( full_source_ids, - global_config["max_source_ids_per_entity"], + max_source_limit, limit_method, identifier=f"`{entity_name}`", ) - # Only apply filtering in IGNORE_NEW mode + # Only apply filtering in KEEP(ignore new) mode if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: allowed_source_ids = set(source_ids) filtered_nodes = [] for dp in nodes_data: source_id = dp.get("source_id") - # Skip descriptions sourced from chunks dropped by the IGNORE_NEW cap + # Skip descriptions sourced from chunks dropped by the limitation cap if ( source_id and source_id not in allowed_source_ids @@ -1496,7 +1554,6 @@ async def _merge_nodes_then_upsert( # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only nodes_data = list(nodes_data) - max_source_limit = global_config["max_source_ids_per_entity"] skip_summary_due_to_limit = ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit @@ -1566,7 +1623,7 @@ async def _merge_nodes_then_upsert( truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" if dd_message or truncation_info: - status_message += f"({','.join([truncation_info, dd_message])})" + status_message += f" ({', '.join([truncation_info, dd_message])})" if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1583,7 +1640,65 @@ async def _merge_nodes_then_upsert( source_id = GRAPH_FIELD_SEP.join(source_ids) - file_path = build_file_path(already_file_paths, nodes_data, entity_name) + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in nodes_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) node_data = dict( entity_id=entity_name, @@ -1686,10 +1801,12 @@ async def _merge_edges_then_upsert( } ) + limit_method = global_config.get("source_ids_limit_method") + max_source_limit = global_config.get("max_source_ids_per_relation") source_ids = apply_source_ids_limit( full_source_ids, - global_config["max_source_ids_per_relation"], - global_config.get("source_ids_limit_method"), + max_source_limit, + limit_method, identifier=f"`{src_id}`~`{tgt_id}`", ) limit_method = ( @@ -1715,7 +1832,6 @@ async def _merge_edges_then_upsert( # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only edges_data = list(edges_data) - max_source_limit = global_config["max_source_ids_per_relation"] skip_summary_due_to_limit = ( limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP and len(existing_full_source_ids) >= max_source_limit @@ -1791,7 +1907,7 @@ async def _merge_edges_then_upsert( truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" if dd_message or truncation_info: - status_message += f"({','.join([truncation_info, dd_message])})" + status_message += f" ({', '.join([truncation_info, dd_message])})" if already_fragment > 0 or llm_was_used: logger.info(status_message) @@ -1822,7 +1938,66 @@ async def _merge_edges_then_upsert( keywords = ",".join(sorted(all_keywords)) source_id = GRAPH_FIELD_SEP.join(source_ids) - file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") + + # Build file_path with count limit + if skip_summary_due_to_limit: + # Skip limit, keep original file_path + file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + else: + # Collect and apply limit + file_paths_list = [] + seen_paths = set() + + # Get placeholder to filter it out + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + + # Collect from already_file_paths, excluding placeholder + for fp in already_file_paths: + # Skip placeholders (format: "...{placeholder}(showing X of Y)...") + if ( + fp + and not fp.startswith(f"...{file_path_placeholder}") + and fp not in seen_paths + ): + file_paths_list.append(fp) + seen_paths.add(fp) + + # Collect from new data + for dp in edges_data: + file_path_item = dp.get("file_path") + if file_path_item and file_path_item not in seen_paths: + file_paths_list.append(file_path_item) + seen_paths.add(file_path_item) + + # Apply count limit + max_file_paths = global_config.get("max_file_paths") + + if len(file_paths_list) > max_file_paths: + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + file_path_placeholder = global_config.get( + "file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER + ) + original_count = len(file_paths_list) + + if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO: + # FIFO: keep tail (newest), discard head + file_paths_list = file_paths_list[-max_file_paths:] + else: + # KEEP: keep head (earliest), discard tail + file_paths_list = file_paths_list[:max_file_paths] + + file_paths_list.append( + f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..." + ) + logger.info( + f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})" + ) + + file_path = GRAPH_FIELD_SEP.join(file_paths_list) for need_insert_id in [src_id, tgt_id]: if not (await knowledge_graph_inst.has_node(need_insert_id)): diff --git a/lightrag/utils.py b/lightrag/utils.py index 6805227e..bfa3cac4 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -35,7 +35,6 @@ from lightrag.constants import ( DEFAULT_LOG_FILENAME, GRAPH_FIELD_SEP, DEFAULT_MAX_TOTAL_TOKENS, - DEFAULT_MAX_FILE_PATH_LENGTH, DEFAULT_SOURCE_IDS_LIMIT_METHOD, VALID_SOURCE_IDS_LIMIT_METHODS, SOURCE_IDS_LIMIT_METHOD_FIFO, @@ -2584,65 +2583,6 @@ def parse_relation_chunk_key(key: str) -> tuple[str, str]: return parts[0], parts[1] -def build_file_path(already_file_paths, data_list, target): - """Build file path string with UTF-8 byte length limit and deduplication - - Args: - already_file_paths: List of existing file paths - data_list: List of data items containing file_path - target: Target name for logging warnings - - Returns: - str: Combined file paths separated by GRAPH_FIELD_SEP - """ - # set: deduplication - file_paths_set = {fp for fp in already_file_paths if fp} - - # string: filter empty value and keep file order in already_file_paths - file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) - - # Check if initial file_paths already exceeds byte length limit - if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH: - logger.warning( - f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " - f"current size: {len(file_paths.encode('utf-8'))} bytes" - ) - - # ignored file_paths - file_paths_ignore = "" - # add file_paths - for dp in data_list: - cur_file_path = dp.get("file_path") - # empty - if not cur_file_path: - continue - - # skip duplicate item - if cur_file_path in file_paths_set: - continue - # add - file_paths_set.add(cur_file_path) - - # check the UTF-8 byte length - new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path - if ( - len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8")) - < DEFAULT_MAX_FILE_PATH_LENGTH - 5 - ): - # append - file_paths += new_addition - else: - # ignore - file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path - - if file_paths_ignore: - logger.warning( - f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " - f"ignoring file path: {file_paths_ignore}" - ) - return file_paths - - def generate_track_id(prefix: str = "upload") -> str: """Generate a unique tracking ID with timestamp and UUID