Quick fix to limit source_id ballooning while inserting nodes

(cherry picked from commit 54f0a7d1ca)
This commit is contained in:
DivinesLight 2025-10-14 14:47:04 +05:00 committed by Raphaël MANSUY
parent 429cd6a66f
commit b9fc6f19dd
3 changed files with 390 additions and 1308 deletions

View file

@ -13,6 +13,7 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1
DEFAULT_MAX_CHUNK_IDS_PER_ENTITY = 500 # Applies to Both Graph + Vector DBs
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3

File diff suppressed because it is too large Load diff

View file

@ -15,17 +15,7 @@ from dataclasses import dataclass
from datetime import datetime
from functools import wraps
from hashlib import md5
from typing import (
Any,
Protocol,
Callable,
TYPE_CHECKING,
List,
Optional,
Iterable,
Sequence,
Collection,
)
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional
import numpy as np
from dotenv import load_dotenv
@ -36,9 +26,7 @@ from lightrag.constants import (
GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO,
DEFAULT_MAX_CHUNK_IDS_PER_ENTITY,
)
# Initialize logger with basic configuration
@ -2477,157 +2465,19 @@ async def process_chunks_unified(
return final_chunks
def truncate_entity_source_id(chunk_ids: set, entity_name: str) -> set:
"""Limit chunk_ids, for entities that appear a HUGE no of times (To not break VDB hard upper limits)"""
already_len: int = len(chunk_ids)
def normalize_source_ids_limit_method(method: str | None) -> str:
"""Normalize the source ID limiting strategy and fall back to default when invalid."""
if not method:
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
normalized = method.upper()
if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS:
if already_len >= DEFAULT_MAX_CHUNK_IDS_PER_ENTITY:
logger.warning(
"Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s",
method,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
f"Chunk Ids already exceeds {DEFAULT_MAX_CHUNK_IDS_PER_ENTITY} for {entity_name}, "
f"current size: {already_len} entries."
)
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
truncated_chunk_ids = set(list(chunk_ids)[0:DEFAULT_MAX_CHUNK_IDS_PER_ENTITY])
return normalized
def merge_source_ids(
existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None
) -> list[str]:
"""Merge two iterables of source IDs while preserving order and removing duplicates."""
merged: list[str] = []
seen: set[str] = set()
for sequence in (existing_ids, new_ids):
if not sequence:
continue
for source_id in sequence:
if not source_id:
continue
if source_id not in seen:
seen.add(source_id)
merged.append(source_id)
return merged
def apply_source_ids_limit(
source_ids: Sequence[str],
limit: int,
method: str,
*,
identifier: str | None = None,
) -> list[str]:
"""Apply a limit strategy to a sequence of source IDs."""
if limit <= 0:
return []
source_ids_list = list(source_ids)
if len(source_ids_list) <= limit:
return source_ids_list
normalized_method = normalize_source_ids_limit_method(method)
if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncated = source_ids_list[-limit:]
else: # IGNORE_NEW
truncated = source_ids_list[:limit]
if identifier and len(truncated) < len(source_ids_list):
logger.debug(
"Source_id truncated: %s | %s keeping %s of %s entries",
identifier,
normalized_method,
len(truncated),
len(source_ids_list),
)
return truncated
def compute_incremental_chunk_ids(
existing_full_chunk_ids: list[str],
old_chunk_ids: list[str],
new_chunk_ids: list[str],
) -> list[str]:
"""
Compute incrementally updated chunk IDs based on changes.
This function applies delta changes (additions and removals) to an existing
list of chunk IDs while maintaining order and ensuring deduplication.
Delta additions from new_chunk_ids are placed at the end.
Args:
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
Returns:
Updated list of chunk IDs with deduplication
Example:
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
>>> old = ['chunk-1', 'chunk-2']
>>> new = ['chunk-2', 'chunk-4']
>>> compute_incremental_chunk_ids(existing, old, new)
['chunk-3', 'chunk-2', 'chunk-4']
"""
# Calculate changes
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
# Apply changes to full chunk_ids
# Step 1: Remove chunks that are no longer needed
updated_chunk_ids = [
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
]
# Step 2: Add new chunks (preserving order from new_chunk_ids)
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
for cid in new_chunk_ids:
if cid in chunks_to_add and cid not in updated_chunk_ids:
updated_chunk_ids.append(cid)
return updated_chunk_ids
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],
) -> list[str]:
"""Remove a collection of IDs from an ordered iterable while preserving order."""
removal_set = set(ids_to_remove)
if not removal_set:
return [source_id for source_id in source_ids if source_id]
return [
source_id
for source_id in source_ids
if source_id and source_id not in removal_set
]
def make_relation_chunk_key(src: str, tgt: str) -> str:
"""Create a deterministic storage key for relation chunk tracking."""
return GRAPH_FIELD_SEP.join(sorted((src, tgt)))
def parse_relation_chunk_key(key: str) -> tuple[str, str]:
"""Parse a relation chunk storage key back into its entity pair."""
parts = key.split(GRAPH_FIELD_SEP)
if len(parts) != 2:
raise ValueError(f"Invalid relation chunk key: {key}")
return parts[0], parts[1]
return truncated_chunk_ids
def build_file_path(already_file_paths, data_list, target):
@ -2776,9 +2626,9 @@ def fix_tuple_delimiter_corruption(
record,
)
# Fix: <X|#|> -> <|#|>, <|#|Y> -> <|#|>, <X|#|Y> -> <|#|>, <||#||> -> <|#|> (one extra characters outside pipes)
# Fix: <X|#|> -> <|#|>, <|#|Y> -> <|#|>, <X|#|Y> -> <|#|>, <||#||> -> <|#|>, <||#> -> <|#|> (one extra characters outside pipes)
record = re.sub(
rf"<.?\|{escaped_delimiter_core}\|.?>",
rf"<.?\|{escaped_delimiter_core}\|*?>",
tuple_delimiter,
record,
)
@ -2798,6 +2648,7 @@ def fix_tuple_delimiter_corruption(
)
# Fix: <|#| -> <|#|>, <|#|| -> <|#|> (missing closing >)
record = re.sub(
rf"<\|{escaped_delimiter_core}\|+(?!>)",
tuple_delimiter,
@ -2811,13 +2662,6 @@ def fix_tuple_delimiter_corruption(
record,
)
# Fix: <||#> -> <|#|> (double pipe at start, missing pipe at end)
record = re.sub(
rf"<\|+{escaped_delimiter_core}>",
tuple_delimiter,
record,
)
# Fix: <|| -> <|#|>
record = re.sub(
r"<\|\|(?!>)",