Optimize JSON write with fast/slow path to reduce memory usage
- Fast path for clean data (no sanitization)
- Slow path sanitizes during encoding
- Reload shared memory after sanitization
- Custom encoder avoids deep copies
- Comprehensive test coverage
(cherry picked from commit 777c987371)
This commit is contained in:
parent
7632805cd0
commit
ed79218550
4 changed files with 618 additions and 57 deletions
|
|
@ -180,7 +180,20 @@ class JsonDocStatusStorage(DocStatusStorage):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"[{self.workspace}] Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}"
|
f"[{self.workspace}] Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}"
|
||||||
)
|
)
|
||||||
write_json(data_dict, self._file_name)
|
|
||||||
|
# Write JSON and check if sanitization was applied
|
||||||
|
needs_reload = write_json(data_dict, self._file_name)
|
||||||
|
|
||||||
|
# If data was sanitized, reload cleaned data to update shared memory
|
||||||
|
if needs_reload:
|
||||||
|
logger.info(
|
||||||
|
f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
|
||||||
|
)
|
||||||
|
cleaned_data = load_json(self._file_name)
|
||||||
|
if cleaned_data:
|
||||||
|
self._data.clear()
|
||||||
|
self._data.update(cleaned_data)
|
||||||
|
|
||||||
await clear_all_update_flags(self.final_namespace)
|
await clear_all_update_flags(self.final_namespace)
|
||||||
|
|
||||||
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,20 @@ class JsonKVStorage(BaseKVStorage):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
|
f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
|
||||||
)
|
)
|
||||||
write_json(data_dict, self._file_name)
|
|
||||||
|
# Write JSON and check if sanitization was applied
|
||||||
|
needs_reload = write_json(data_dict, self._file_name)
|
||||||
|
|
||||||
|
# If data was sanitized, reload cleaned data to update shared memory
|
||||||
|
if needs_reload:
|
||||||
|
logger.info(
|
||||||
|
f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
|
||||||
|
)
|
||||||
|
cleaned_data = load_json(self._file_name)
|
||||||
|
if cleaned_data:
|
||||||
|
self._data.clear()
|
||||||
|
self._data.update(cleaned_data)
|
||||||
|
|
||||||
await clear_all_update_flags(self.final_namespace)
|
await clear_all_update_flags(self.final_namespace)
|
||||||
|
|
||||||
async def get_all(self) -> dict[str, Any]:
|
async def get_all(self) -> dict[str, Any]:
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,17 @@ from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Protocol,
|
||||||
|
Callable,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Iterable,
|
||||||
|
Sequence,
|
||||||
|
Collection,
|
||||||
|
)
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
|
@ -25,7 +35,9 @@ from lightrag.constants import (
|
||||||
DEFAULT_LOG_FILENAME,
|
DEFAULT_LOG_FILENAME,
|
||||||
GRAPH_FIELD_SEP,
|
GRAPH_FIELD_SEP,
|
||||||
DEFAULT_MAX_TOTAL_TOKENS,
|
DEFAULT_MAX_TOTAL_TOKENS,
|
||||||
DEFAULT_MAX_FILE_PATH_LENGTH,
|
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
|
||||||
|
VALID_SOURCE_IDS_LIMIT_METHODS,
|
||||||
|
SOURCE_IDS_LIMIT_METHOD_FIFO,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize logger with basic configuration
|
# Initialize logger with basic configuration
|
||||||
|
|
@ -341,8 +353,29 @@ class EmbeddingFunc:
|
||||||
embedding_dim: int
|
embedding_dim: int
|
||||||
func: callable
|
func: callable
|
||||||
max_token_size: int | None = None # deprecated keep it for compatible only
|
max_token_size: int | None = None # deprecated keep it for compatible only
|
||||||
|
send_dimensions: bool = (
|
||||||
|
False # Control whether to send embedding_dim to the function
|
||||||
|
)
|
||||||
|
|
||||||
async def __call__(self, *args, **kwargs) -> np.ndarray:
|
async def __call__(self, *args, **kwargs) -> np.ndarray:
|
||||||
|
# Only inject embedding_dim when send_dimensions is True
|
||||||
|
if self.send_dimensions:
|
||||||
|
# Check if user provided embedding_dim parameter
|
||||||
|
if "embedding_dim" in kwargs:
|
||||||
|
user_provided_dim = kwargs["embedding_dim"]
|
||||||
|
# If user's value differs from class attribute, output warning
|
||||||
|
if (
|
||||||
|
user_provided_dim is not None
|
||||||
|
and user_provided_dim != self.embedding_dim
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
f"Ignoring user-provided embedding_dim={user_provided_dim}, "
|
||||||
|
f"using declared embedding_dim={self.embedding_dim} from decorator"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Inject embedding_dim from decorator
|
||||||
|
kwargs["embedding_dim"] = self.embedding_dim
|
||||||
|
|
||||||
return await self.func(*args, **kwargs)
|
return await self.func(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -894,9 +927,169 @@ def load_json(file_name):
|
||||||
return json.load(f)
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_string_for_json(text: str) -> str:
|
||||||
|
"""Remove characters that cannot be encoded in UTF-8 for JSON serialization.
|
||||||
|
|
||||||
|
This is a simpler sanitizer specifically for JSON that directly removes
|
||||||
|
problematic characters without attempting to encode first.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: String to sanitize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized string safe for UTF-8 encoding in JSON
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Directly filter out problematic characters without pre-validation
|
||||||
|
sanitized = ""
|
||||||
|
for char in text:
|
||||||
|
code_point = ord(char)
|
||||||
|
# Skip surrogate characters (U+D800 to U+DFFF) - main cause of encoding errors
|
||||||
|
if 0xD800 <= code_point <= 0xDFFF:
|
||||||
|
continue
|
||||||
|
# Skip other non-characters in Unicode
|
||||||
|
elif code_point == 0xFFFE or code_point == 0xFFFF:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
sanitized += char
|
||||||
|
|
||||||
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_json_data(data: Any) -> Any:
|
||||||
|
"""Recursively sanitize all string values in data structure for safe UTF-8 encoding
|
||||||
|
|
||||||
|
DEPRECATED: This function creates a deep copy of the data which can be memory-intensive.
|
||||||
|
For new code, prefer using write_json with SanitizingJSONEncoder which sanitizes during
|
||||||
|
serialization without creating copies.
|
||||||
|
|
||||||
|
Handles all JSON-serializable types including:
|
||||||
|
- Dictionary keys and values
|
||||||
|
- Lists and tuples (preserves type)
|
||||||
|
- Nested structures
|
||||||
|
- Strings at any level
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Data to sanitize (dict, list, tuple, str, or other types)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized data with all strings cleaned of problematic characters
|
||||||
|
"""
|
||||||
|
if isinstance(data, dict):
|
||||||
|
# Sanitize both keys and values
|
||||||
|
return {
|
||||||
|
_sanitize_string_for_json(k)
|
||||||
|
if isinstance(k, str)
|
||||||
|
else k: _sanitize_json_data(v)
|
||||||
|
for k, v in data.items()
|
||||||
|
}
|
||||||
|
elif isinstance(data, (list, tuple)):
|
||||||
|
# Handle both lists and tuples, preserve original type
|
||||||
|
sanitized = [_sanitize_json_data(item) for item in data]
|
||||||
|
return type(data)(sanitized)
|
||||||
|
elif isinstance(data, str):
|
||||||
|
return _sanitize_string_for_json(data)
|
||||||
|
else:
|
||||||
|
# Numbers, booleans, None, etc. - return as-is
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class SanitizingJSONEncoder(json.JSONEncoder):
|
||||||
|
"""
|
||||||
|
Custom JSON encoder that sanitizes data during serialization.
|
||||||
|
|
||||||
|
This encoder cleans strings during the encoding process without creating
|
||||||
|
a full copy of the data structure, making it memory-efficient for large datasets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def encode(self, o):
|
||||||
|
"""Override encode method to handle simple string cases"""
|
||||||
|
if isinstance(o, str):
|
||||||
|
return json.encoder.encode_basestring(_sanitize_string_for_json(o))
|
||||||
|
return super().encode(o)
|
||||||
|
|
||||||
|
def iterencode(self, o, _one_shot=False):
|
||||||
|
"""
|
||||||
|
Override iterencode to sanitize strings during serialization.
|
||||||
|
This is the core method that handles complex nested structures.
|
||||||
|
"""
|
||||||
|
# Preprocess: sanitize all strings in the object
|
||||||
|
sanitized = self._sanitize_for_encoding(o)
|
||||||
|
|
||||||
|
# Call parent's iterencode with sanitized data
|
||||||
|
for chunk in super().iterencode(sanitized, _one_shot):
|
||||||
|
yield chunk
|
||||||
|
|
||||||
|
def _sanitize_for_encoding(self, obj):
|
||||||
|
"""
|
||||||
|
Recursively sanitize strings in an object.
|
||||||
|
Creates new objects only when necessary to avoid deep copies.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: Object to sanitize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized object with cleaned strings
|
||||||
|
"""
|
||||||
|
if isinstance(obj, str):
|
||||||
|
return _sanitize_string_for_json(obj)
|
||||||
|
|
||||||
|
elif isinstance(obj, dict):
|
||||||
|
# Create new dict with sanitized keys and values
|
||||||
|
new_dict = {}
|
||||||
|
for k, v in obj.items():
|
||||||
|
clean_k = _sanitize_string_for_json(k) if isinstance(k, str) else k
|
||||||
|
clean_v = self._sanitize_for_encoding(v)
|
||||||
|
new_dict[clean_k] = clean_v
|
||||||
|
return new_dict
|
||||||
|
|
||||||
|
elif isinstance(obj, (list, tuple)):
|
||||||
|
# Sanitize list/tuple elements
|
||||||
|
cleaned = [self._sanitize_for_encoding(item) for item in obj]
|
||||||
|
return type(obj)(cleaned) if isinstance(obj, tuple) else cleaned
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Numbers, booleans, None, etc. remain unchanged
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
def write_json(json_obj, file_name):
|
def write_json(json_obj, file_name):
|
||||||
|
"""
|
||||||
|
Write JSON data to file with optimized sanitization strategy.
|
||||||
|
|
||||||
|
This function uses a two-stage approach:
|
||||||
|
1. Fast path: Try direct serialization (works for clean data ~99% of time)
|
||||||
|
2. Slow path: Use custom encoder that sanitizes during serialization
|
||||||
|
|
||||||
|
The custom encoder approach avoids creating a deep copy of the data,
|
||||||
|
making it memory-efficient. When sanitization occurs, the caller should
|
||||||
|
reload the cleaned data from the file to update shared memory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
json_obj: Object to serialize (may be a shallow copy from shared memory)
|
||||||
|
file_name: Output file path
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if sanitization was applied (caller should reload data),
|
||||||
|
False if direct write succeeded (no reload needed)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Strategy 1: Fast path - try direct serialization
|
||||||
|
with open(file_name, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(json_obj, f, indent=2, ensure_ascii=False)
|
||||||
|
return False # No sanitization needed, no reload required
|
||||||
|
|
||||||
|
except (UnicodeEncodeError, UnicodeDecodeError) as e:
|
||||||
|
logger.debug(f"Direct JSON write failed, using sanitizing encoder: {e}")
|
||||||
|
|
||||||
|
# Strategy 2: Use custom encoder (sanitizes during serialization, zero memory copy)
|
||||||
with open(file_name, "w", encoding="utf-8") as f:
|
with open(file_name, "w", encoding="utf-8") as f:
|
||||||
json.dump(json_obj, f, indent=2, ensure_ascii=False)
|
json.dump(json_obj, f, indent=2, ensure_ascii=False, cls=SanitizingJSONEncoder)
|
||||||
|
|
||||||
|
logger.info(f"JSON sanitization applied during write: {file_name}")
|
||||||
|
return True # Sanitization applied, reload recommended
|
||||||
|
|
||||||
|
|
||||||
class TokenizerInterface(Protocol):
|
class TokenizerInterface(Protocol):
|
||||||
|
|
@ -1472,8 +1665,7 @@ async def aexport_data(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Unsupported file format: {file_format}. "
|
f"Unsupported file format: {file_format}. Choose from: csv, excel, md, txt"
|
||||||
f"Choose from: csv, excel, md, txt"
|
|
||||||
)
|
)
|
||||||
if file_format is not None:
|
if file_format is not None:
|
||||||
print(f"Data exported to: {output_path} with format: {file_format}")
|
print(f"Data exported to: {output_path} with format: {file_format}")
|
||||||
|
|
@ -1784,7 +1976,7 @@ def normalize_extracted_info(name: str, remove_inner_quotes=False) -> str:
|
||||||
- Filter out short numeric-only text (length < 3 and only digits/dots)
|
- Filter out short numeric-only text (length < 3 and only digits/dots)
|
||||||
- remove_inner_quotes = True
|
- remove_inner_quotes = True
|
||||||
remove Chinese quotes
|
remove Chinese quotes
|
||||||
remove English queotes in and around chinese
|
remove English quotes in and around chinese
|
||||||
Convert non-breaking spaces to regular spaces
|
Convert non-breaking spaces to regular spaces
|
||||||
Convert narrow non-breaking spaces after non-digits to regular spaces
|
Convert narrow non-breaking spaces after non-digits to regular spaces
|
||||||
|
|
||||||
|
|
@ -2466,63 +2658,156 @@ async def process_chunks_unified(
|
||||||
return final_chunks
|
return final_chunks
|
||||||
|
|
||||||
|
|
||||||
def build_file_path(already_file_paths, data_list, target):
|
def normalize_source_ids_limit_method(method: str | None) -> str:
|
||||||
"""Build file path string with UTF-8 byte length limit and deduplication
|
"""Normalize the source ID limiting strategy and fall back to default when invalid."""
|
||||||
|
|
||||||
|
if not method:
|
||||||
|
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
|
||||||
|
|
||||||
|
normalized = method.upper()
|
||||||
|
if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS:
|
||||||
|
logger.warning(
|
||||||
|
"Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s",
|
||||||
|
method,
|
||||||
|
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
|
||||||
|
)
|
||||||
|
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
|
||||||
|
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
|
def merge_source_ids(
|
||||||
|
existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None
|
||||||
|
) -> list[str]:
|
||||||
|
"""Merge two iterables of source IDs while preserving order and removing duplicates."""
|
||||||
|
|
||||||
|
merged: list[str] = []
|
||||||
|
seen: set[str] = set()
|
||||||
|
|
||||||
|
for sequence in (existing_ids, new_ids):
|
||||||
|
if not sequence:
|
||||||
|
continue
|
||||||
|
for source_id in sequence:
|
||||||
|
if not source_id:
|
||||||
|
continue
|
||||||
|
if source_id not in seen:
|
||||||
|
seen.add(source_id)
|
||||||
|
merged.append(source_id)
|
||||||
|
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
def apply_source_ids_limit(
|
||||||
|
source_ids: Sequence[str],
|
||||||
|
limit: int,
|
||||||
|
method: str,
|
||||||
|
*,
|
||||||
|
identifier: str | None = None,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Apply a limit strategy to a sequence of source IDs."""
|
||||||
|
|
||||||
|
if limit <= 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
source_ids_list = list(source_ids)
|
||||||
|
if len(source_ids_list) <= limit:
|
||||||
|
return source_ids_list
|
||||||
|
|
||||||
|
normalized_method = normalize_source_ids_limit_method(method)
|
||||||
|
|
||||||
|
if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
|
||||||
|
truncated = source_ids_list[-limit:]
|
||||||
|
else: # IGNORE_NEW
|
||||||
|
truncated = source_ids_list[:limit]
|
||||||
|
|
||||||
|
if identifier and len(truncated) < len(source_ids_list):
|
||||||
|
logger.debug(
|
||||||
|
"Source_id truncated: %s | %s keeping %s of %s entries",
|
||||||
|
identifier,
|
||||||
|
normalized_method,
|
||||||
|
len(truncated),
|
||||||
|
len(source_ids_list),
|
||||||
|
)
|
||||||
|
|
||||||
|
return truncated
|
||||||
|
|
||||||
|
|
||||||
|
def compute_incremental_chunk_ids(
|
||||||
|
existing_full_chunk_ids: list[str],
|
||||||
|
old_chunk_ids: list[str],
|
||||||
|
new_chunk_ids: list[str],
|
||||||
|
) -> list[str]:
|
||||||
|
"""
|
||||||
|
Compute incrementally updated chunk IDs based on changes.
|
||||||
|
|
||||||
|
This function applies delta changes (additions and removals) to an existing
|
||||||
|
list of chunk IDs while maintaining order and ensuring deduplication.
|
||||||
|
Delta additions from new_chunk_ids are placed at the end.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
already_file_paths: List of existing file paths
|
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
|
||||||
data_list: List of data items containing file_path
|
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
|
||||||
target: Target name for logging warnings
|
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: Combined file paths separated by GRAPH_FIELD_SEP
|
Updated list of chunk IDs with deduplication
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
|
||||||
|
>>> old = ['chunk-1', 'chunk-2']
|
||||||
|
>>> new = ['chunk-2', 'chunk-4']
|
||||||
|
>>> compute_incremental_chunk_ids(existing, old, new)
|
||||||
|
['chunk-3', 'chunk-2', 'chunk-4']
|
||||||
"""
|
"""
|
||||||
# set: deduplication
|
# Calculate changes
|
||||||
file_paths_set = {fp for fp in already_file_paths if fp}
|
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
|
||||||
|
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
|
||||||
|
|
||||||
# string: filter empty value and keep file order in already_file_paths
|
# Apply changes to full chunk_ids
|
||||||
file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
|
# Step 1: Remove chunks that are no longer needed
|
||||||
|
updated_chunk_ids = [
|
||||||
|
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
|
||||||
|
]
|
||||||
|
|
||||||
# Check if initial file_paths already exceeds byte length limit
|
# Step 2: Add new chunks (preserving order from new_chunk_ids)
|
||||||
if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH:
|
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
|
||||||
logger.warning(
|
for cid in new_chunk_ids:
|
||||||
f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
|
if cid in chunks_to_add and cid not in updated_chunk_ids:
|
||||||
f"current size: {len(file_paths.encode('utf-8'))} bytes"
|
updated_chunk_ids.append(cid)
|
||||||
)
|
|
||||||
|
|
||||||
# ignored file_paths
|
return updated_chunk_ids
|
||||||
file_paths_ignore = ""
|
|
||||||
# add file_paths
|
|
||||||
for dp in data_list:
|
|
||||||
cur_file_path = dp.get("file_path")
|
|
||||||
# empty
|
|
||||||
if not cur_file_path:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# skip duplicate item
|
|
||||||
if cur_file_path in file_paths_set:
|
|
||||||
continue
|
|
||||||
# add
|
|
||||||
file_paths_set.add(cur_file_path)
|
|
||||||
|
|
||||||
# check the UTF-8 byte length
|
def subtract_source_ids(
|
||||||
new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path
|
source_ids: Iterable[str],
|
||||||
if (
|
ids_to_remove: Collection[str],
|
||||||
len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8"))
|
) -> list[str]:
|
||||||
< DEFAULT_MAX_FILE_PATH_LENGTH - 5
|
"""Remove a collection of IDs from an ordered iterable while preserving order."""
|
||||||
):
|
|
||||||
# append
|
|
||||||
file_paths += new_addition
|
|
||||||
else:
|
|
||||||
# ignore
|
|
||||||
file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path
|
|
||||||
|
|
||||||
if file_paths_ignore:
|
removal_set = set(ids_to_remove)
|
||||||
logger.warning(
|
if not removal_set:
|
||||||
f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
|
return [source_id for source_id in source_ids if source_id]
|
||||||
f"ignoring file path: {file_paths_ignore}"
|
|
||||||
)
|
return [
|
||||||
return file_paths
|
source_id
|
||||||
|
for source_id in source_ids
|
||||||
|
if source_id and source_id not in removal_set
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def make_relation_chunk_key(src: str, tgt: str) -> str:
|
||||||
|
"""Create a deterministic storage key for relation chunk tracking."""
|
||||||
|
|
||||||
|
return GRAPH_FIELD_SEP.join(sorted((src, tgt)))
|
||||||
|
|
||||||
|
|
||||||
|
def parse_relation_chunk_key(key: str) -> tuple[str, str]:
|
||||||
|
"""Parse a relation chunk storage key back into its entity pair."""
|
||||||
|
|
||||||
|
parts = key.split(GRAPH_FIELD_SEP)
|
||||||
|
if len(parts) != 2:
|
||||||
|
raise ValueError(f"Invalid relation chunk key: {key}")
|
||||||
|
return parts[0], parts[1]
|
||||||
|
|
||||||
|
|
||||||
def generate_track_id(prefix: str = "upload") -> str:
|
def generate_track_id(prefix: str = "upload") -> str:
|
||||||
|
|
@ -2612,9 +2897,9 @@ def fix_tuple_delimiter_corruption(
|
||||||
record,
|
record,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fix: <X|#|> -> <|#|>, <|#|Y> -> <|#|>, <X|#|Y> -> <|#|>, <||#||> -> <|#|>, <||#> -> <|#|> (one extra characters outside pipes)
|
# Fix: <X|#|> -> <|#|>, <|#|Y> -> <|#|>, <X|#|Y> -> <|#|>, <||#||> -> <|#|> (one extra characters outside pipes)
|
||||||
record = re.sub(
|
record = re.sub(
|
||||||
rf"<.?\|{escaped_delimiter_core}\|*?>",
|
rf"<.?\|{escaped_delimiter_core}\|.?>",
|
||||||
tuple_delimiter,
|
tuple_delimiter,
|
||||||
record,
|
record,
|
||||||
)
|
)
|
||||||
|
|
@ -2634,7 +2919,6 @@ def fix_tuple_delimiter_corruption(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fix: <|#| -> <|#|>, <|#|| -> <|#|> (missing closing >)
|
# Fix: <|#| -> <|#|>, <|#|| -> <|#|> (missing closing >)
|
||||||
|
|
||||||
record = re.sub(
|
record = re.sub(
|
||||||
rf"<\|{escaped_delimiter_core}\|+(?!>)",
|
rf"<\|{escaped_delimiter_core}\|+(?!>)",
|
||||||
tuple_delimiter,
|
tuple_delimiter,
|
||||||
|
|
@ -2648,6 +2932,13 @@ def fix_tuple_delimiter_corruption(
|
||||||
record,
|
record,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Fix: <||#> -> <|#|> (double pipe at start, missing pipe at end)
|
||||||
|
record = re.sub(
|
||||||
|
rf"<\|+{escaped_delimiter_core}>",
|
||||||
|
tuple_delimiter,
|
||||||
|
record,
|
||||||
|
)
|
||||||
|
|
||||||
# Fix: <|| -> <|#|>
|
# Fix: <|| -> <|#|>
|
||||||
record = re.sub(
|
record = re.sub(
|
||||||
r"<\|\|(?!>)",
|
r"<\|\|(?!>)",
|
||||||
|
|
|
||||||
244
tests/test_write_json_optimization.py
Normal file
244
tests/test_write_json_optimization.py
Normal file
|
|
@ -0,0 +1,244 @@
|
||||||
|
"""
|
||||||
|
Test suite for write_json optimization
|
||||||
|
|
||||||
|
This test verifies:
|
||||||
|
1. Fast path works for clean data (no sanitization)
|
||||||
|
2. Slow path applies sanitization for dirty data
|
||||||
|
3. Sanitization is done during encoding (memory-efficient)
|
||||||
|
4. Reloading updates shared memory with cleaned data
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import tempfile
|
||||||
|
from lightrag.utils import write_json, load_json, SanitizingJSONEncoder
|
||||||
|
|
||||||
|
|
||||||
|
class TestWriteJsonOptimization:
|
||||||
|
"""Test write_json optimization with two-stage approach"""
|
||||||
|
|
||||||
|
def test_fast_path_clean_data(self):
|
||||||
|
"""Test that clean data takes the fast path without sanitization"""
|
||||||
|
clean_data = {
|
||||||
|
"name": "John Doe",
|
||||||
|
"age": 30,
|
||||||
|
"items": ["apple", "banana", "cherry"],
|
||||||
|
"nested": {"key": "value", "number": 42},
|
||||||
|
}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Write clean data - should return False (no sanitization)
|
||||||
|
needs_reload = write_json(clean_data, temp_file)
|
||||||
|
assert not needs_reload, "Clean data should not require sanitization"
|
||||||
|
|
||||||
|
# Verify data was written correctly
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
assert loaded_data == clean_data, "Loaded data should match original"
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
def test_slow_path_dirty_data(self):
|
||||||
|
"""Test that dirty data triggers sanitization"""
|
||||||
|
# Create data with surrogate characters (U+D800 to U+DFFF)
|
||||||
|
dirty_string = "Hello\ud800World" # Contains surrogate character
|
||||||
|
dirty_data = {"text": dirty_string, "number": 123}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Write dirty data - should return True (sanitization applied)
|
||||||
|
needs_reload = write_json(dirty_data, temp_file)
|
||||||
|
assert needs_reload, "Dirty data should trigger sanitization"
|
||||||
|
|
||||||
|
# Verify data was written and sanitized
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
assert loaded_data is not None, "Data should be written"
|
||||||
|
assert loaded_data["number"] == 123, "Clean fields should remain unchanged"
|
||||||
|
# Surrogate character should be removed
|
||||||
|
assert (
|
||||||
|
"\ud800" not in loaded_data["text"]
|
||||||
|
), "Surrogate character should be removed"
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
def test_sanitizing_encoder_removes_surrogates(self):
|
||||||
|
"""Test that SanitizingJSONEncoder removes surrogate characters"""
|
||||||
|
data_with_surrogates = {
|
||||||
|
"text": "Hello\ud800\udc00World", # Contains surrogate pair
|
||||||
|
"clean": "Clean text",
|
||||||
|
"nested": {"dirty_key\ud801": "value", "clean_key": "clean\ud802value"},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Encode using custom encoder
|
||||||
|
encoded = json.dumps(
|
||||||
|
data_with_surrogates, cls=SanitizingJSONEncoder, ensure_ascii=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify no surrogate characters in output
|
||||||
|
assert "\ud800" not in encoded, "Surrogate U+D800 should be removed"
|
||||||
|
assert "\udc00" not in encoded, "Surrogate U+DC00 should be removed"
|
||||||
|
assert "\ud801" not in encoded, "Surrogate U+D801 should be removed"
|
||||||
|
assert "\ud802" not in encoded, "Surrogate U+D802 should be removed"
|
||||||
|
|
||||||
|
# Verify clean parts remain
|
||||||
|
assert "Clean text" in encoded, "Clean text should remain"
|
||||||
|
assert "clean_key" in encoded, "Clean keys should remain"
|
||||||
|
|
||||||
|
def test_nested_structure_sanitization(self):
|
||||||
|
"""Test sanitization of deeply nested structures"""
|
||||||
|
nested_data = {
|
||||||
|
"level1": {
|
||||||
|
"level2": {
|
||||||
|
"level3": {"dirty": "text\ud800here", "clean": "normal text"},
|
||||||
|
"list": ["item1", "item\ud801dirty", "item3"],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
needs_reload = write_json(nested_data, temp_file)
|
||||||
|
assert needs_reload, "Nested dirty data should trigger sanitization"
|
||||||
|
|
||||||
|
# Verify nested structure is preserved
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
assert "level1" in loaded_data
|
||||||
|
assert "level2" in loaded_data["level1"]
|
||||||
|
assert "level3" in loaded_data["level1"]["level2"]
|
||||||
|
|
||||||
|
# Verify surrogates are removed
|
||||||
|
dirty_text = loaded_data["level1"]["level2"]["level3"]["dirty"]
|
||||||
|
assert "\ud800" not in dirty_text, "Nested surrogate should be removed"
|
||||||
|
|
||||||
|
# Verify list items are sanitized
|
||||||
|
list_items = loaded_data["level1"]["level2"]["list"]
|
||||||
|
assert (
|
||||||
|
"\ud801" not in list_items[1]
|
||||||
|
), "List item surrogates should be removed"
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
def test_unicode_non_characters_removed(self):
|
||||||
|
"""Test that Unicode non-characters (U+FFFE, U+FFFF) don't cause encoding errors
|
||||||
|
|
||||||
|
Note: U+FFFE and U+FFFF are valid UTF-8 characters (though discouraged),
|
||||||
|
so they don't trigger sanitization. They only get removed when explicitly
|
||||||
|
using the SanitizingJSONEncoder.
|
||||||
|
"""
|
||||||
|
data_with_nonchars = {"text1": "Hello\ufffeWorld", "text2": "Test\uffffString"}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
# These characters are valid UTF-8, so they take the fast path
|
||||||
|
needs_reload = write_json(data_with_nonchars, temp_file)
|
||||||
|
assert not needs_reload, "U+FFFE/U+FFFF are valid UTF-8 characters"
|
||||||
|
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
# They're written as-is in the fast path
|
||||||
|
assert loaded_data == data_with_nonchars
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
def test_mixed_clean_dirty_data(self):
|
||||||
|
"""Test data with both clean and dirty fields"""
|
||||||
|
mixed_data = {
|
||||||
|
"clean_field": "This is perfectly fine",
|
||||||
|
"dirty_field": "This has\ud800issues",
|
||||||
|
"number": 42,
|
||||||
|
"boolean": True,
|
||||||
|
"null_value": None,
|
||||||
|
"clean_list": [1, 2, 3],
|
||||||
|
"dirty_list": ["clean", "dirty\ud801item"],
|
||||||
|
}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
needs_reload = write_json(mixed_data, temp_file)
|
||||||
|
assert (
|
||||||
|
needs_reload
|
||||||
|
), "Mixed data with dirty fields should trigger sanitization"
|
||||||
|
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
|
||||||
|
# Clean fields should remain unchanged
|
||||||
|
assert loaded_data["clean_field"] == "This is perfectly fine"
|
||||||
|
assert loaded_data["number"] == 42
|
||||||
|
assert loaded_data["boolean"]
|
||||||
|
assert loaded_data["null_value"] is None
|
||||||
|
assert loaded_data["clean_list"] == [1, 2, 3]
|
||||||
|
|
||||||
|
# Dirty fields should be sanitized
|
||||||
|
assert "\ud800" not in loaded_data["dirty_field"]
|
||||||
|
assert "\ud801" not in loaded_data["dirty_list"][1]
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
def test_empty_and_none_strings(self):
|
||||||
|
"""Test handling of empty and None values"""
|
||||||
|
data = {
|
||||||
|
"empty": "",
|
||||||
|
"none": None,
|
||||||
|
"zero": 0,
|
||||||
|
"false": False,
|
||||||
|
"empty_list": [],
|
||||||
|
"empty_dict": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_file = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
needs_reload = write_json(data, temp_file)
|
||||||
|
assert (
|
||||||
|
not needs_reload
|
||||||
|
), "Clean empty values should not trigger sanitization"
|
||||||
|
|
||||||
|
loaded_data = load_json(temp_file)
|
||||||
|
assert loaded_data == data, "Empty/None values should be preserved"
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_file)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Run tests
|
||||||
|
test = TestWriteJsonOptimization()
|
||||||
|
|
||||||
|
print("Running test_fast_path_clean_data...")
|
||||||
|
test.test_fast_path_clean_data()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_slow_path_dirty_data...")
|
||||||
|
test.test_slow_path_dirty_data()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_sanitizing_encoder_removes_surrogates...")
|
||||||
|
test.test_sanitizing_encoder_removes_surrogates()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_nested_structure_sanitization...")
|
||||||
|
test.test_nested_structure_sanitization()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_unicode_non_characters_removed...")
|
||||||
|
test.test_unicode_non_characters_removed()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_mixed_clean_dirty_data...")
|
||||||
|
test.test_mixed_clean_dirty_data()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("Running test_empty_and_none_strings...")
|
||||||
|
test.test_empty_and_none_strings()
|
||||||
|
print("✓ Passed")
|
||||||
|
|
||||||
|
print("\n✅ All tests passed!")
|
||||||
Loading…
Add table
Reference in a new issue