diff --git a/lightrag/utils.py b/lightrag/utils.py index 87ccbea1..e830d944 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1400,11 +1400,13 @@ async def use_llm_func_with_cache( chunk_id: str | None = None, cache_keys_collector: list = None, ) -> str: - """Call LLM function with cache support + """Call LLM function with cache support and text sanitization If cache is available and enabled (determined by handle_cache based on mode), retrieve result from cache; otherwise call LLM function and save result to cache. + This function applies text sanitization to prevent UTF-8 encoding errors for all LLM providers. + Args: input_text: Input text to send to LLM use_llm_func: LLM function with higher priority @@ -1419,12 +1421,27 @@ async def use_llm_func_with_cache( Returns: LLM response text """ + # Sanitize input text to prevent UTF-8 encoding errors for all LLM providers + safe_input_text = safe_encode_for_llm(input_text, f"llm_input_{cache_type}") + + # Sanitize history messages if provided + safe_history_messages = None + if history_messages: + safe_history_messages = [] + for i, msg in enumerate(history_messages): + safe_msg = msg.copy() + if "content" in safe_msg: + safe_msg["content"] = safe_encode_for_llm( + safe_msg["content"], f"history_message_{i}" + ) + safe_history_messages.append(safe_msg) + if llm_response_cache: - if history_messages: - history = json.dumps(history_messages, ensure_ascii=False) - _prompt = history + "\n" + input_text + if safe_history_messages: + history = json.dumps(safe_history_messages, ensure_ascii=False) + _prompt = history + "\n" + safe_input_text else: - _prompt = input_text + _prompt = safe_input_text arg_hash = compute_args_hash(_prompt) # Generate cache key for this LLM call @@ -1448,14 +1465,14 @@ async def use_llm_func_with_cache( return cached_return statistic_data["llm_call"] += 1 - # Call LLM + # Call LLM with sanitized input kwargs = {} - if history_messages: - kwargs["history_messages"] = history_messages + if safe_history_messages: + kwargs["history_messages"] = safe_history_messages if max_tokens is not None: kwargs["max_tokens"] = max_tokens - res: str = await use_llm_func(input_text, **kwargs) + res: str = await use_llm_func(safe_input_text, **kwargs) res = remove_think_tags(res) if llm_response_cache.global_config.get("enable_llm_cache_for_entity_extract"): @@ -1476,15 +1493,15 @@ async def use_llm_func_with_cache( return res - # When cache is disabled, directly call LLM + # When cache is disabled, directly call LLM with sanitized input kwargs = {} - if history_messages: - kwargs["history_messages"] = history_messages + if safe_history_messages: + kwargs["history_messages"] = safe_history_messages if max_tokens is not None: kwargs["max_tokens"] = max_tokens - logger.info(f"Call LLM function with query text length: {len(input_text)}") - res = await use_llm_func(input_text, **kwargs) + logger.info(f"Call LLM function with query text length: {len(safe_input_text)}") + res = await use_llm_func(safe_input_text, **kwargs) return remove_think_tags(res) @@ -1570,6 +1587,134 @@ def clean_text(text: str) -> str: return text.strip().replace("\x00", "") +def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: + """Sanitize text to ensure safe UTF-8 encoding by removing or replacing problematic characters. + + This function handles: + - Surrogate characters (the main cause of the encoding error) + - Other invalid Unicode sequences + - Control characters that might cause issues + + Args: + text: Input text to sanitize + replacement_char: Character to use for replacing invalid sequences + + Returns: + Sanitized text that can be safely encoded as UTF-8 + """ + if not isinstance(text, str): + return str(text) + + if not text: + return text + + try: + # First, try to encode/decode to catch any encoding issues early + text.encode("utf-8") + + # Remove or replace surrogate characters (U+D800 to U+DFFF) + # These are the main cause of the encoding error + sanitized = "" + for char in text: + code_point = ord(char) + # Check for surrogate characters + if 0xD800 <= code_point <= 0xDFFF: + # Replace surrogate with replacement character + sanitized += replacement_char + continue + # Check for other problematic characters + elif code_point == 0xFFFE or code_point == 0xFFFF: + # These are non-characters in Unicode + sanitized += replacement_char + continue + else: + sanitized += char + + # Additional cleanup: remove null bytes and other control characters + # that might cause issues (but preserve common whitespace) + sanitized = re.sub( + r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized + ) + + # Test final encoding to ensure it's safe + sanitized.encode("utf-8") + + return sanitized + + except UnicodeEncodeError as e: + logger.warning( + f"Text sanitization: UnicodeEncodeError encountered, applying aggressive cleaning: {str(e)[:100]}" + ) + + # Aggressive fallback: encode with error handling + try: + # Use 'replace' error handling to substitute problematic characters + safe_bytes = text.encode("utf-8", errors="replace") + sanitized = safe_bytes.decode("utf-8") + + # Additional cleanup + sanitized = re.sub( + r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized + ) + + return sanitized + + except Exception as fallback_error: + logger.error( + f"Text sanitization: Aggressive fallback failed: {str(fallback_error)}" + ) + # Last resort: return a safe placeholder + return f"[TEXT_ENCODING_ERROR: {len(text)} characters]" + + except Exception as e: + logger.error(f"Text sanitization: Unexpected error: {str(e)}") + # Return original text if no encoding issues detected + return text + + +def safe_encode_for_llm(content: str, context: str = "unknown") -> str: + """Safely encode content for LLM API calls with comprehensive error handling. + + This is the main function to use before sending text to LLM APIs to prevent + UTF-8 encoding errors. + + Args: + content: Text content to encode safely + context: Context description for logging (e.g., "document_chunk", "prompt") + + Returns: + Safely encoded text that won't cause UTF-8 encoding errors + """ + if not content: + return content + + original_length = len(content) + + try: + # Apply text sanitization + sanitized = sanitize_text_for_encoding(content) + + # Check if any changes were made + if len(sanitized) != original_length or sanitized != content: + # Count replaced characters (empty replacement chars) + replaced_count = original_length - len(sanitized) + logger.info( + f"Text encoding safety: Removed {replaced_count} problematic chars " + f"(original: {original_length} chars, sanitized: {len(sanitized)} chars)" + ) + + return sanitized + + except Exception as e: + logger.error( + f"Text encoding safety: Failed to sanitize {context} content: {str(e)}" + ) + # Return a safe fallback + return ( + f"[CONTENT_SANITIZATION_ERROR: {original_length} characters from {context}]" + ) + + def check_storage_env_vars(storage_name: str) -> None: """Check if all required environment variables for storage implementation exist