Add file path limit configuration for entities and relations

• Add MAX_FILE_PATHS env variable
• Implement file path count limiting
• Support KEEP/FIFO strategies
• Add truncation placeholder
• Remove old build_file_path function
This commit is contained in:
yangdx 2025-10-20 20:12:53 +08:00
parent dc62c78f98
commit a9fec26798
5 changed files with 224 additions and 90 deletions

View file

@ -73,11 +73,14 @@ ENABLE_LLM_CACHE=true
# MAX_RELATION_TOKENS=8000 # MAX_RELATION_TOKENS=8000
### control the maximum tokens send to LLM (include entities, relations and chunks) ### control the maximum tokens send to LLM (include entities, relations and chunks)
# MAX_TOTAL_TOKENS=30000 # MAX_TOTAL_TOKENS=30000
### control the maximum chunk_ids stored in vector and graph db ### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=300 # MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300 # MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks) ### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks)
# SOURCE_IDS_LIMIT_METHOD=KEEP # SOURCE_IDS_LIMIT_METHOD=KEEP
### Maximum number of file paths stored in entity/relation file_path field
# MAX_FILE_PATHS=30
### maximum number of related chunks per source entity or relation ### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) ### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)

View file

@ -14,16 +14,6 @@ DEFAULT_MAX_GRAPH_NODES = 1000
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1 DEFAULT_MAX_GLEANING = 1
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Number of description fragments to trigger LLM summary # Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
# Max description token size to trigger LLM summary # Max description token size to trigger LLM summary
@ -67,8 +57,24 @@ DEFAULT_HISTORY_TURNS = 0
DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_MIN_RERANK_SCORE = 0.0
DEFAULT_RERANK_BINDING = "null" DEFAULT_RERANK_BINDING = "null"
# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) # Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Default file_path limit in meta data for entity and relation
DEFAULT_MAX_FILE_PATHS = 2
# Field length of file_path in Milvus Schema for entity and relation (Should not be changed)
# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata.
DEFAULT_MAX_FILE_PATH_LENGTH = 32768 DEFAULT_MAX_FILE_PATH_LENGTH = 32768
# Placeholder for more file paths in meta data for entity and relation (Should not be changed)
DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated"
# Default temperature for LLM # Default temperature for LLM
DEFAULT_TEMPERATURE = 1.0 DEFAULT_TEMPERATURE = 1.0

View file

@ -47,6 +47,8 @@ from lightrag.constants import (
DEFAULT_LLM_TIMEOUT, DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_SOURCE_IDS_LIMIT_METHOD, DEFAULT_SOURCE_IDS_LIMIT_METHOD,
DEFAULT_MAX_FILE_PATHS,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
) )
from lightrag.utils import get_env_value from lightrag.utils import get_env_value
@ -393,6 +395,14 @@ class LightRAG:
) )
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO.""" """Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
max_file_paths: int = field(
default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int)
)
"""Maximum number of file paths to store in entity/relation file_path field."""
file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER)
"""Placeholder text when file paths exceed max_file_paths limit."""
addon_params: dict[str, Any] = field( addon_params: dict[str, Any] = field(
default_factory=lambda: { default_factory=lambda: {
"language": get_env_value( "language": get_env_value(

View file

@ -26,7 +26,6 @@ from lightrag.utils import (
pick_by_weighted_polling, pick_by_weighted_polling,
pick_by_vector_similarity, pick_by_vector_similarity,
process_chunks_unified, process_chunks_unified,
build_file_path,
safe_vdb_operation_with_exception, safe_vdb_operation_with_exception,
create_prefixed_exception, create_prefixed_exception,
fix_tuple_delimiter_corruption, fix_tuple_delimiter_corruption,
@ -56,6 +55,8 @@ from lightrag.constants import (
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
SOURCE_IDS_LIMIT_METHOD_KEEP, SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
) )
from lightrag.kg.shared_storage import get_storage_keyed_lock from lightrag.kg.shared_storage import get_storage_keyed_lock
import time import time
@ -1156,7 +1157,8 @@ async def _rebuild_single_entity(
# Process cached entity data # Process cached entity data
descriptions = [] descriptions = []
entity_types = [] entity_types = []
file_paths = set() file_paths_list = []
seen_paths = set()
for entity_data in all_entity_data: for entity_data in all_entity_data:
if entity_data.get("description"): if entity_data.get("description"):
@ -1164,7 +1166,35 @@ async def _rebuild_single_entity(
if entity_data.get("entity_type"): if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"]) entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"): if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"]) file_path = entity_data["file_path"]
if file_path and file_path not in seen_paths:
file_paths_list.append(file_path)
seen_paths.add(file_path)
# Apply MAX_FILE_PATHS limit
max_file_paths = global_config.get("max_file_paths")
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
limit_method = global_config.get("source_ids_limit_method")
original_count = len(file_paths_list)
if original_count > max_file_paths:
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order # Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions)) description_list = list(dict.fromkeys(descriptions))
@ -1284,7 +1314,8 @@ async def _rebuild_single_relationship(
descriptions = [] descriptions = []
keywords = [] keywords = []
weights = [] weights = []
file_paths = set() file_paths_list = []
seen_paths = set()
for rel_data in all_relationship_data: for rel_data in all_relationship_data:
if rel_data.get("description"): if rel_data.get("description"):
@ -1294,7 +1325,35 @@ async def _rebuild_single_relationship(
if rel_data.get("weight"): if rel_data.get("weight"):
weights.append(rel_data["weight"]) weights.append(rel_data["weight"])
if rel_data.get("file_path"): if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"]) file_path = rel_data["file_path"]
if file_path and file_path not in seen_paths:
file_paths_list.append(file_path)
seen_paths.add(file_path)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
limit_method = global_config.get("source_ids_limit_method")
original_count = len(file_paths_list)
if original_count > max_file_paths:
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order # Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions)) description_list = list(dict.fromkeys(descriptions))
@ -1467,23 +1526,22 @@ async def _merge_nodes_then_upsert(
} }
) )
limit_method = ( limit_method = global_config.get("source_ids_limit_method")
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP max_source_limit = global_config.get("max_source_ids_per_entity")
)
source_ids = apply_source_ids_limit( source_ids = apply_source_ids_limit(
full_source_ids, full_source_ids,
global_config["max_source_ids_per_entity"], max_source_limit,
limit_method, limit_method,
identifier=f"`{entity_name}`", identifier=f"`{entity_name}`",
) )
# Only apply filtering in IGNORE_NEW mode # Only apply filtering in KEEP(ignore new) mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP: if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids) allowed_source_ids = set(source_ids)
filtered_nodes = [] filtered_nodes = []
for dp in nodes_data: for dp in nodes_data:
source_id = dp.get("source_id") source_id = dp.get("source_id")
# Skip descriptions sourced from chunks dropped by the IGNORE_NEW cap # Skip descriptions sourced from chunks dropped by the limitation cap
if ( if (
source_id source_id
and source_id not in allowed_source_ids and source_id not in allowed_source_ids
@ -1496,7 +1554,6 @@ async def _merge_nodes_then_upsert(
# In FIFO mode, keep all node descriptions - truncation happens at source_ids level only # In FIFO mode, keep all node descriptions - truncation happens at source_ids level only
nodes_data = list(nodes_data) nodes_data = list(nodes_data)
max_source_limit = global_config["max_source_ids_per_entity"]
skip_summary_due_to_limit = ( skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit and len(existing_full_source_ids) >= max_source_limit
@ -1566,7 +1623,7 @@ async def _merge_nodes_then_upsert(
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info: if dd_message or truncation_info:
status_message += f"({','.join([truncation_info, dd_message])})" status_message += f" ({', '.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1583,7 +1640,65 @@ async def _merge_nodes_then_upsert(
source_id = GRAPH_FIELD_SEP.join(source_ids) source_id = GRAPH_FIELD_SEP.join(source_ids)
file_path = build_file_path(already_file_paths, nodes_data, entity_name) # Build file_path with count limit
if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
# Get placeholder to filter it out
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
if (
fp
and not fp.startswith(f"...{file_path_placeholder}")
and fp not in seen_paths
):
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in nodes_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
node_data = dict( node_data = dict(
entity_id=entity_name, entity_id=entity_name,
@ -1686,10 +1801,12 @@ async def _merge_edges_then_upsert(
} }
) )
limit_method = global_config.get("source_ids_limit_method")
max_source_limit = global_config.get("max_source_ids_per_relation")
source_ids = apply_source_ids_limit( source_ids = apply_source_ids_limit(
full_source_ids, full_source_ids,
global_config["max_source_ids_per_relation"], max_source_limit,
global_config.get("source_ids_limit_method"), limit_method,
identifier=f"`{src_id}`~`{tgt_id}`", identifier=f"`{src_id}`~`{tgt_id}`",
) )
limit_method = ( limit_method = (
@ -1715,7 +1832,6 @@ async def _merge_edges_then_upsert(
# In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only # In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only
edges_data = list(edges_data) edges_data = list(edges_data)
max_source_limit = global_config["max_source_ids_per_relation"]
skip_summary_due_to_limit = ( skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit and len(existing_full_source_ids) >= max_source_limit
@ -1791,7 +1907,7 @@ async def _merge_edges_then_upsert(
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}" truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info: if dd_message or truncation_info:
status_message += f"({','.join([truncation_info, dd_message])})" status_message += f" ({', '.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1822,7 +1938,66 @@ async def _merge_edges_then_upsert(
keywords = ",".join(sorted(all_keywords)) keywords = ",".join(sorted(all_keywords))
source_id = GRAPH_FIELD_SEP.join(source_ids) source_id = GRAPH_FIELD_SEP.join(source_ids)
file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}")
# Build file_path with count limit
if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
# Get placeholder to filter it out
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
if (
fp
and not fp.startswith(f"...{file_path_placeholder}")
and fp not in seen_paths
):
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in edges_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)): if not (await knowledge_graph_inst.has_node(need_insert_id)):

View file

@ -35,7 +35,6 @@ from lightrag.constants import (
DEFAULT_LOG_FILENAME, DEFAULT_LOG_FILENAME,
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS, DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH,
DEFAULT_SOURCE_IDS_LIMIT_METHOD, DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS, VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO, SOURCE_IDS_LIMIT_METHOD_FIFO,
@ -2584,65 +2583,6 @@ def parse_relation_chunk_key(key: str) -> tuple[str, str]:
return parts[0], parts[1] return parts[0], parts[1]
def build_file_path(already_file_paths, data_list, target):
"""Build file path string with UTF-8 byte length limit and deduplication
Args:
already_file_paths: List of existing file paths
data_list: List of data items containing file_path
target: Target name for logging warnings
Returns:
str: Combined file paths separated by GRAPH_FIELD_SEP
"""
# set: deduplication
file_paths_set = {fp for fp in already_file_paths if fp}
# string: filter empty value and keep file order in already_file_paths
file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
# Check if initial file_paths already exceeds byte length limit
if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH:
logger.warning(
f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
f"current size: {len(file_paths.encode('utf-8'))} bytes"
)
# ignored file_paths
file_paths_ignore = ""
# add file_paths
for dp in data_list:
cur_file_path = dp.get("file_path")
# empty
if not cur_file_path:
continue
# skip duplicate item
if cur_file_path in file_paths_set:
continue
# add
file_paths_set.add(cur_file_path)
# check the UTF-8 byte length
new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path
if (
len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8"))
< DEFAULT_MAX_FILE_PATH_LENGTH - 5
):
# append
file_paths += new_addition
else:
# ignore
file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path
if file_paths_ignore:
logger.warning(
f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
f"ignoring file path: {file_paths_ignore}"
)
return file_paths
def generate_track_id(prefix: str = "upload") -> str: def generate_track_id(prefix: str = "upload") -> str:
"""Generate a unique tracking ID with timestamp and UUID """Generate a unique tracking ID with timestamp and UUID