Add text sanitization to prevent UTF-8 encoding errors in LLM calls

• Remove surrogate characters
• Clean control characters
• Sanitize input and history messages
• Add comprehensive error handling
• Log sanitization activities
This commit is contained in:
yangdx 2025-08-19 18:50:52 +08:00
parent 64015548df
commit f9cf544805

View file

@ -1400,11 +1400,13 @@ async def use_llm_func_with_cache(
chunk_id: str | None = None,
cache_keys_collector: list = None,
) -> str:
"""Call LLM function with cache support
"""Call LLM function with cache support and text sanitization
If cache is available and enabled (determined by handle_cache based on mode),
retrieve result from cache; otherwise call LLM function and save result to cache.
This function applies text sanitization to prevent UTF-8 encoding errors for all LLM providers.
Args:
input_text: Input text to send to LLM
use_llm_func: LLM function with higher priority
@ -1419,12 +1421,27 @@ async def use_llm_func_with_cache(
Returns:
LLM response text
"""
# Sanitize input text to prevent UTF-8 encoding errors for all LLM providers
safe_input_text = safe_encode_for_llm(input_text, f"llm_input_{cache_type}")
# Sanitize history messages if provided
safe_history_messages = None
if history_messages:
safe_history_messages = []
for i, msg in enumerate(history_messages):
safe_msg = msg.copy()
if "content" in safe_msg:
safe_msg["content"] = safe_encode_for_llm(
safe_msg["content"], f"history_message_{i}"
)
safe_history_messages.append(safe_msg)
if llm_response_cache:
if history_messages:
history = json.dumps(history_messages, ensure_ascii=False)
_prompt = history + "\n" + input_text
if safe_history_messages:
history = json.dumps(safe_history_messages, ensure_ascii=False)
_prompt = history + "\n" + safe_input_text
else:
_prompt = input_text
_prompt = safe_input_text
arg_hash = compute_args_hash(_prompt)
# Generate cache key for this LLM call
@ -1448,14 +1465,14 @@ async def use_llm_func_with_cache(
return cached_return
statistic_data["llm_call"] += 1
# Call LLM
# Call LLM with sanitized input
kwargs = {}
if history_messages:
kwargs["history_messages"] = history_messages
if safe_history_messages:
kwargs["history_messages"] = safe_history_messages
if max_tokens is not None:
kwargs["max_tokens"] = max_tokens
res: str = await use_llm_func(input_text, **kwargs)
res: str = await use_llm_func(safe_input_text, **kwargs)
res = remove_think_tags(res)
if llm_response_cache.global_config.get("enable_llm_cache_for_entity_extract"):
@ -1476,15 +1493,15 @@ async def use_llm_func_with_cache(
return res
# When cache is disabled, directly call LLM
# When cache is disabled, directly call LLM with sanitized input
kwargs = {}
if history_messages:
kwargs["history_messages"] = history_messages
if safe_history_messages:
kwargs["history_messages"] = safe_history_messages
if max_tokens is not None:
kwargs["max_tokens"] = max_tokens
logger.info(f"Call LLM function with query text length: {len(input_text)}")
res = await use_llm_func(input_text, **kwargs)
logger.info(f"Call LLM function with query text length: {len(safe_input_text)}")
res = await use_llm_func(safe_input_text, **kwargs)
return remove_think_tags(res)
@ -1570,6 +1587,134 @@ def clean_text(text: str) -> str:
return text.strip().replace("\x00", "")
def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str:
"""Sanitize text to ensure safe UTF-8 encoding by removing or replacing problematic characters.
This function handles:
- Surrogate characters (the main cause of the encoding error)
- Other invalid Unicode sequences
- Control characters that might cause issues
Args:
text: Input text to sanitize
replacement_char: Character to use for replacing invalid sequences
Returns:
Sanitized text that can be safely encoded as UTF-8
"""
if not isinstance(text, str):
return str(text)
if not text:
return text
try:
# First, try to encode/decode to catch any encoding issues early
text.encode("utf-8")
# Remove or replace surrogate characters (U+D800 to U+DFFF)
# These are the main cause of the encoding error
sanitized = ""
for char in text:
code_point = ord(char)
# Check for surrogate characters
if 0xD800 <= code_point <= 0xDFFF:
# Replace surrogate with replacement character
sanitized += replacement_char
continue
# Check for other problematic characters
elif code_point == 0xFFFE or code_point == 0xFFFF:
# These are non-characters in Unicode
sanitized += replacement_char
continue
else:
sanitized += char
# Additional cleanup: remove null bytes and other control characters
# that might cause issues (but preserve common whitespace)
sanitized = re.sub(
r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized
)
# Test final encoding to ensure it's safe
sanitized.encode("utf-8")
return sanitized
except UnicodeEncodeError as e:
logger.warning(
f"Text sanitization: UnicodeEncodeError encountered, applying aggressive cleaning: {str(e)[:100]}"
)
# Aggressive fallback: encode with error handling
try:
# Use 'replace' error handling to substitute problematic characters
safe_bytes = text.encode("utf-8", errors="replace")
sanitized = safe_bytes.decode("utf-8")
# Additional cleanup
sanitized = re.sub(
r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized
)
return sanitized
except Exception as fallback_error:
logger.error(
f"Text sanitization: Aggressive fallback failed: {str(fallback_error)}"
)
# Last resort: return a safe placeholder
return f"[TEXT_ENCODING_ERROR: {len(text)} characters]"
except Exception as e:
logger.error(f"Text sanitization: Unexpected error: {str(e)}")
# Return original text if no encoding issues detected
return text
def safe_encode_for_llm(content: str, context: str = "unknown") -> str:
"""Safely encode content for LLM API calls with comprehensive error handling.
This is the main function to use before sending text to LLM APIs to prevent
UTF-8 encoding errors.
Args:
content: Text content to encode safely
context: Context description for logging (e.g., "document_chunk", "prompt")
Returns:
Safely encoded text that won't cause UTF-8 encoding errors
"""
if not content:
return content
original_length = len(content)
try:
# Apply text sanitization
sanitized = sanitize_text_for_encoding(content)
# Check if any changes were made
if len(sanitized) != original_length or sanitized != content:
# Count replaced characters (empty replacement chars)
replaced_count = original_length - len(sanitized)
logger.info(
f"Text encoding safety: Removed {replaced_count} problematic chars "
f"(original: {original_length} chars, sanitized: {len(sanitized)} chars)"
)
return sanitized
except Exception as e:
logger.error(
f"Text encoding safety: Failed to sanitize {context} content: {str(e)}"
)
# Return a safe fallback
return (
f"[CONTENT_SANITIZATION_ERROR: {original_length} characters from {context}]"
)
def check_storage_env_vars(storage_name: str) -> None:
"""Check if all required environment variables for storage implementation exist