""" Test suite for write_json optimization This test verifies: 1. Fast path works for clean data (no sanitization) 2. Slow path applies sanitization for dirty data 3. Sanitization is done during encoding (memory-efficient) 4. Reloading updates shared memory with cleaned data """ import json import os import tempfile import pytest from lightrag.utils import SanitizingJSONEncoder, load_json, write_json @pytest.mark.offline class TestWriteJsonOptimization: """Test write_json optimization with two-stage approach""" def test_fast_path_clean_data(self): """Test that clean data takes the fast path without sanitization""" clean_data = { 'name': 'John Doe', 'age': 30, 'items': ['apple', 'banana', 'cherry'], 'nested': {'key': 'value', 'number': 42}, } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # Write clean data - should return False (no sanitization) needs_reload = write_json(clean_data, temp_file) assert not needs_reload, 'Clean data should not require sanitization' # Verify data was written correctly loaded_data = load_json(temp_file) assert loaded_data == clean_data, 'Loaded data should match original' finally: os.unlink(temp_file) def test_slow_path_dirty_data(self): """Test that dirty data triggers sanitization""" # Create data with surrogate characters (U+D800 to U+DFFF) dirty_string = 'Hello\ud800World' # Contains surrogate character dirty_data = {'text': dirty_string, 'number': 123} with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # Write dirty data - should return True (sanitization applied) needs_reload = write_json(dirty_data, temp_file) assert needs_reload, 'Dirty data should trigger sanitization' # Verify data was written and sanitized loaded_data = load_json(temp_file) assert loaded_data is not None, 'Data should be written' assert loaded_data['number'] == 123, 'Clean fields should remain unchanged' # Surrogate character should be removed assert '\ud800' not in loaded_data['text'], 'Surrogate character should be removed' finally: os.unlink(temp_file) def test_sanitizing_encoder_removes_surrogates(self): """Test that SanitizingJSONEncoder removes surrogate characters""" data_with_surrogates = { 'text': 'Hello\ud800\udc00World', # Contains surrogate pair 'clean': 'Clean text', 'nested': {'dirty_key\ud801': 'value', 'clean_key': 'clean\ud802value'}, } # Encode using custom encoder encoded = json.dumps(data_with_surrogates, cls=SanitizingJSONEncoder, ensure_ascii=False) # Verify no surrogate characters in output assert '\ud800' not in encoded, 'Surrogate U+D800 should be removed' assert '\udc00' not in encoded, 'Surrogate U+DC00 should be removed' assert '\ud801' not in encoded, 'Surrogate U+D801 should be removed' assert '\ud802' not in encoded, 'Surrogate U+D802 should be removed' # Verify clean parts remain assert 'Clean text' in encoded, 'Clean text should remain' assert 'clean_key' in encoded, 'Clean keys should remain' def test_nested_structure_sanitization(self): """Test sanitization of deeply nested structures""" nested_data = { 'level1': { 'level2': { 'level3': {'dirty': 'text\ud800here', 'clean': 'normal text'}, 'list': ['item1', 'item\ud801dirty', 'item3'], } } } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: needs_reload = write_json(nested_data, temp_file) assert needs_reload, 'Nested dirty data should trigger sanitization' # Verify nested structure is preserved loaded_data = load_json(temp_file) assert 'level1' in loaded_data assert 'level2' in loaded_data['level1'] assert 'level3' in loaded_data['level1']['level2'] # Verify surrogates are removed dirty_text = loaded_data['level1']['level2']['level3']['dirty'] assert '\ud800' not in dirty_text, 'Nested surrogate should be removed' # Verify list items are sanitized list_items = loaded_data['level1']['level2']['list'] assert '\ud801' not in list_items[1], 'List item surrogates should be removed' finally: os.unlink(temp_file) def test_unicode_non_characters_removed(self): """Test that Unicode non-characters (U+FFFE, U+FFFF) don't cause encoding errors Note: U+FFFE and U+FFFF are valid UTF-8 characters (though discouraged), so they don't trigger sanitization. They only get removed when explicitly using the SanitizingJSONEncoder. """ data_with_nonchars = {'text1': 'Hello\ufffeWorld', 'text2': 'Test\uffffString'} with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # These characters are valid UTF-8, so they take the fast path needs_reload = write_json(data_with_nonchars, temp_file) assert not needs_reload, 'U+FFFE/U+FFFF are valid UTF-8 characters' loaded_data = load_json(temp_file) # They're written as-is in the fast path assert loaded_data == data_with_nonchars finally: os.unlink(temp_file) def test_mixed_clean_dirty_data(self): """Test data with both clean and dirty fields""" mixed_data = { 'clean_field': 'This is perfectly fine', 'dirty_field': 'This has\ud800issues', 'number': 42, 'boolean': True, 'null_value': None, 'clean_list': [1, 2, 3], 'dirty_list': ['clean', 'dirty\ud801item'], } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: needs_reload = write_json(mixed_data, temp_file) assert needs_reload, 'Mixed data with dirty fields should trigger sanitization' loaded_data = load_json(temp_file) # Clean fields should remain unchanged assert loaded_data['clean_field'] == 'This is perfectly fine' assert loaded_data['number'] == 42 assert loaded_data['boolean'] assert loaded_data['null_value'] is None assert loaded_data['clean_list'] == [1, 2, 3] # Dirty fields should be sanitized assert '\ud800' not in loaded_data['dirty_field'] assert '\ud801' not in loaded_data['dirty_list'][1] finally: os.unlink(temp_file) def test_empty_and_none_strings(self): """Test handling of empty and None values""" data = { 'empty': '', 'none': None, 'zero': 0, 'false': False, 'empty_list': [], 'empty_dict': {}, } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: needs_reload = write_json(data, temp_file) assert not needs_reload, 'Clean empty values should not trigger sanitization' loaded_data = load_json(temp_file) assert loaded_data == data, 'Empty/None values should be preserved' finally: os.unlink(temp_file) def test_specific_surrogate_udc9a(self): """Test specific surrogate character \\udc9a mentioned in the issue""" # Test the exact surrogate character from the error message: # UnicodeEncodeError: 'utf-8' codec can't encode character '\\udc9a' data_with_udc9a = { 'text': 'Some text with surrogate\udc9acharacter', 'position': 201, # As mentioned in the error 'clean_field': 'Normal text', } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # Write data - should trigger sanitization needs_reload = write_json(data_with_udc9a, temp_file) assert needs_reload, 'Data with \\udc9a should trigger sanitization' # Verify surrogate was removed loaded_data = load_json(temp_file) assert loaded_data is not None assert '\udc9a' not in loaded_data['text'], '\\udc9a should be removed' assert loaded_data['clean_field'] == 'Normal text', 'Clean fields should remain' finally: os.unlink(temp_file) def test_migration_with_surrogate_sanitization(self): """Test that migration process handles surrogate characters correctly This test simulates the scenario where legacy cache contains surrogate characters and ensures they are cleaned during migration. """ # Simulate legacy cache data with surrogate characters legacy_data_with_surrogates = { 'cache_entry_1': { 'return': 'Result with\ud800surrogate', 'cache_type': 'extract', 'original_prompt': 'Some\udc9aprompt', }, 'cache_entry_2': { 'return': 'Clean result', 'cache_type': 'query', 'original_prompt': 'Clean prompt', }, } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # First write the dirty data directly (simulating legacy cache file) # Use custom encoder to force write even with surrogates with open(temp_file, 'w', encoding='utf-8') as f: json.dump( legacy_data_with_surrogates, f, cls=SanitizingJSONEncoder, ensure_ascii=False, ) # Load and verify surrogates were cleaned during initial write loaded_data = load_json(temp_file) assert loaded_data is not None # The data should be sanitized assert '\ud800' not in loaded_data['cache_entry_1']['return'], 'Surrogate in return should be removed' assert '\udc9a' not in loaded_data['cache_entry_1']['original_prompt'], ( 'Surrogate in prompt should be removed' ) # Clean data should remain unchanged assert loaded_data['cache_entry_2']['return'] == 'Clean result', 'Clean data should remain' finally: os.unlink(temp_file) def test_empty_values_after_sanitization(self): """Test that data with empty values after sanitization is properly handled Critical edge case: When sanitization results in data with empty string values, we must use 'if cleaned_data is not None' instead of 'if cleaned_data' to ensure proper reload, since truthy check on dict depends on content, not just existence. """ # Create data where ALL values are only surrogate characters all_dirty_data = { 'key1': '\ud800\udc00\ud801', 'key2': '\ud802\ud803', } with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: temp_file = f.name try: # Write dirty data - should trigger sanitization needs_reload = write_json(all_dirty_data, temp_file) assert needs_reload, 'All-dirty data should trigger sanitization' # Load the sanitized data cleaned_data = load_json(temp_file) # Critical assertions for the edge case assert cleaned_data is not None, 'Cleaned data should not be None' # Sanitization removes surrogates but preserves keys with empty values assert cleaned_data == { 'key1': '', 'key2': '', }, 'Surrogates should be removed, keys preserved' # This dict is truthy because it has keys (even with empty values) assert cleaned_data, 'Dict with keys is truthy' # Test the actual edge case: empty dict empty_data = {} needs_reload2 = write_json(empty_data, temp_file) assert not needs_reload2, 'Empty dict is clean' reloaded_empty = load_json(temp_file) assert reloaded_empty is not None, 'Empty dict should not be None' assert reloaded_empty == {}, 'Empty dict should remain empty' assert not reloaded_empty, 'Empty dict evaluates to False (the critical check)' finally: os.unlink(temp_file) if __name__ == '__main__': # Run tests test = TestWriteJsonOptimization() print('Running test_fast_path_clean_data...') test.test_fast_path_clean_data() print('✓ Passed') print('Running test_slow_path_dirty_data...') test.test_slow_path_dirty_data() print('✓ Passed') print('Running test_sanitizing_encoder_removes_surrogates...') test.test_sanitizing_encoder_removes_surrogates() print('✓ Passed') print('Running test_nested_structure_sanitization...') test.test_nested_structure_sanitization() print('✓ Passed') print('Running test_unicode_non_characters_removed...') test.test_unicode_non_characters_removed() print('✓ Passed') print('Running test_mixed_clean_dirty_data...') test.test_mixed_clean_dirty_data() print('✓ Passed') print('Running test_empty_and_none_strings...') test.test_empty_and_none_strings() print('✓ Passed') print('Running test_specific_surrogate_udc9a...') test.test_specific_surrogate_udc9a() print('✓ Passed') print('Running test_migration_with_surrogate_sanitization...') test.test_migration_with_surrogate_sanitization() print('✓ Passed') print('Running test_empty_values_after_sanitization...') test.test_empty_values_after_sanitization() print('✓ Passed') print('\n✅ All tests passed!')