From 61469c0a56660f7a83615506f255e8a9dac96723 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 17 Aug 2025 12:45:48 +0800 Subject: [PATCH] Add Chinese pinyin sorting support across document operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Replace pyuca with centralized utils function • Add pinyin sort keys for file paths • Update MongoDB indexes with zh collation • Migrate existing indexes for compatibility • Support Chinese chars in Redis/JSON storage • Keep PostgreSQL sorting order controled by Database Collate order --- lightrag/api/routers/document_routes.py | 10 +-- lightrag/kg/json_doc_status_impl.py | 5 ++ lightrag/kg/mongo_impl.py | 114 +++++++++++++----------- lightrag/kg/redis_impl.py | 6 +- lightrag/utils.py | 43 +++++++++ 5 files changed, 122 insertions(+), 56 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index e3477759..4bbfbf6c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,8 +3,7 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio -from pyuca import Collator -from lightrag.utils import logger +from lightrag.utils import logger, get_pinyin_sort_key import aiofiles import shutil import traceback @@ -1269,9 +1268,10 @@ async def pipeline_index_files( try: enqueued = False - # Create Collator for Unicode sorting - collator = Collator() - sorted_file_paths = sorted(file_paths, key=lambda p: collator.sort_key(str(p))) + # Use get_pinyin_sort_key for Chinese pinyin sorting + sorted_file_paths = sorted( + file_paths, key=lambda p: get_pinyin_sort_key(str(p)) + ) # Process files sequentially with track_id for file_path in sorted_file_paths: diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 9fb114f2..13054cde 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -11,6 +11,7 @@ from lightrag.utils import ( load_json, logger, write_json, + get_pinyin_sort_key, ) from .shared_storage import ( get_namespace_data, @@ -241,6 +242,10 @@ class JsonDocStatusStorage(DocStatusStorage): # Add sort key for sorting if sort_field == "id": doc_status._sort_key = doc_id + elif sort_field == "file_path": + # Use pinyin sorting for file_path field to support Chinese characters + file_path_value = getattr(doc_status, sort_field, "") + doc_status._sort_key = get_pinyin_sort_key(file_path_value) else: doc_status._sort_key = getattr(doc_status, sort_field, "") diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 8fa53c60..b28ead15 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -329,11 +329,8 @@ class MongoDocStatusStorage(DocStatusStorage): self._data = await get_or_create_collection(self.db, self._collection_name) - # Create track_id index for better query performance - await self.create_track_id_index_if_not_exists() - - # Create pagination indexes for better query performance - await self.create_pagination_indexes_if_not_exists() + # Create and migrate all indexes including Chinese collation for file_path + await self.create_and_migrate_indexes_if_not_exists() logger.debug( f"[{self.workspace}] Use MongoDB as DocStatus {self._collection_name}" @@ -476,39 +473,32 @@ class MongoDocStatusStorage(DocStatusStorage): async def delete(self, ids: list[str]) -> None: await self._data.delete_many({"_id": {"$in": ids}}) - async def create_track_id_index_if_not_exists(self): - """Create track_id index for better query performance""" - try: - # Check if index already exists - indexes_cursor = await self._data.list_indexes() - existing_indexes = await indexes_cursor.to_list(length=None) - track_id_index_exists = any( - "track_id" in idx.get("key", {}) for idx in existing_indexes - ) - - if not track_id_index_exists: - await self._data.create_index("track_id") - logger.info( - f"[{self.workspace}] Created track_id index for collection {self._collection_name}" - ) - else: - logger.debug( - f"[{self.workspace}] track_id index already exists for collection {self._collection_name}" - ) - - except PyMongoError as e: - logger.error( - f"[{self.workspace}] Error creating track_id index for {self._collection_name}: {e}" - ) - - async def create_pagination_indexes_if_not_exists(self): - """Create indexes to optimize pagination queries""" + async def create_and_migrate_indexes_if_not_exists(self): + """Create indexes to optimize pagination queries and migrate file_path indexes for Chinese collation""" try: indexes_cursor = await self._data.list_indexes() existing_indexes = await indexes_cursor.to_list(length=None) + existing_index_names = {idx.get("name", "") for idx in existing_indexes} - # Define indexes needed for pagination - pagination_indexes = [ + # Define collation configuration for Chinese pinyin sorting + collation_config = {"locale": "zh", "numericOrdering": True} + + # 1. Handle file_path index migration: drop old indexes without collation + for idx in existing_indexes: + if ( + "file_path" in str(idx.get("key", {})) + and not idx.get("collation") + and idx.get("name") + not in ["file_path_zh_collation", "status_file_path_zh_collation"] + ): + await self._data.drop_index(idx["name"]) + logger.info( + f"[{self.workspace}] Migrated: dropped old file_path index {idx['name']}" + ) + + # 2. Define all indexes needed (including original pagination indexes and new collation indexes) + all_indexes = [ + # Original pagination indexes { "name": "status_updated_at", "keys": [("status", 1), ("updated_at", -1)], @@ -520,27 +510,40 @@ class MongoDocStatusStorage(DocStatusStorage): {"name": "updated_at", "keys": [("updated_at", -1)]}, {"name": "created_at", "keys": [("created_at", -1)]}, {"name": "id", "keys": [("_id", 1)]}, - {"name": "file_path", "keys": [("file_path", 1)]}, + {"name": "track_id", "keys": [("track_id", 1)]}, + # New file_path indexes with Chinese collation + { + "name": "file_path_zh_collation", + "keys": [("file_path", 1)], + "collation": collation_config, + }, + { + "name": "status_file_path_zh_collation", + "keys": [("status", 1), ("file_path", 1)], + "collation": collation_config, + }, ] - # Check which indexes already exist - existing_index_names = {idx.get("name", "") for idx in existing_indexes} - - for index_info in pagination_indexes: + # 3. Create all needed indexes + for index_info in all_indexes: index_name = index_info["name"] if index_name not in existing_index_names: - await self._data.create_index(index_info["keys"], name=index_name) + create_kwargs = {"name": index_name} + if "collation" in index_info: + create_kwargs["collation"] = index_info["collation"] + + await self._data.create_index(index_info["keys"], **create_kwargs) logger.info( - f"[{self.workspace}] Created pagination index '{index_name}' for collection {self._collection_name}" + f"[{self.workspace}] Created index '{index_name}' for collection {self._collection_name}" ) else: logger.debug( - f"[{self.workspace}] Pagination index '{index_name}' already exists for collection {self._collection_name}" + f"[{self.workspace}] Index '{index_name}' already exists for collection {self._collection_name}" ) except PyMongoError as e: logger.error( - f"[{self.workspace}] Error creating pagination indexes for {self._collection_name}: {e}" + f"[{self.workspace}] Error creating/migrating indexes for {self._collection_name}: {e}" ) async def get_docs_paginated( @@ -592,13 +595,24 @@ class MongoDocStatusStorage(DocStatusStorage): sort_direction_value = 1 if sort_direction.lower() == "asc" else -1 sort_criteria = [(sort_field, sort_direction_value)] - # Query for paginated data - cursor = ( - self._data.find(query_filter) - .sort(sort_criteria) - .skip(skip) - .limit(page_size) - ) + # Query for paginated data with Chinese collation for file_path sorting + if sort_field == "file_path": + # Use Chinese collation for pinyin sorting + cursor = ( + self._data.find(query_filter) + .sort(sort_criteria) + .collation({"locale": "zh", "numericOrdering": True}) + .skip(skip) + .limit(page_size) + ) + else: + # Use default sorting for other fields + cursor = ( + self._data.find(query_filter) + .sort(sort_criteria) + .skip(skip) + .limit(page_size) + ) result = await cursor.to_list(length=page_size) # Convert to (doc_id, DocProcessingStatus) tuples diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 8fc1ec4b..ac07c915 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -12,7 +12,7 @@ if not pm.is_installed("redis"): # aioredis is a depricated library, replaced with redis from redis.asyncio import Redis, ConnectionPool # type: ignore from redis.exceptions import RedisError, ConnectionError, TimeoutError # type: ignore -from lightrag.utils import logger +from lightrag.utils import logger, get_pinyin_sort_key from lightrag.base import ( BaseKVStorage, @@ -998,6 +998,10 @@ class RedisDocStatusStorage(DocStatusStorage): # Calculate sort key for sorting (but don't add to data) if sort_field == "id": sort_key = doc_id + elif sort_field == "file_path": + # Use pinyin sorting for file_path field to support Chinese characters + file_path_value = data.get(sort_field, "") + sort_key = get_pinyin_sort_key(file_path_value) else: sort_key = data.get(sort_field, "") diff --git a/lightrag/utils.py b/lightrag/utils.py index 340e4251..49b7136f 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -17,6 +17,20 @@ from hashlib import md5 from typing import Any, Protocol, Callable, TYPE_CHECKING, List import numpy as np from dotenv import load_dotenv + +# Import pyuca for Chinese pinyin sorting +try: + import pyuca + + _pinyin_collator = pyuca.Collator() + _pyuca_available = True +except ImportError: + _pinyin_collator = None + _pyuca_available = False +except Exception: + _pinyin_collator = None + _pyuca_available = False + from lightrag.constants import ( DEFAULT_LOG_MAX_BYTES, DEFAULT_LOG_BACKUP_COUNT, @@ -2059,3 +2073,32 @@ def generate_track_id(prefix: str = "upload") -> str: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] # Use first 8 characters of UUID return f"{prefix}_{timestamp}_{unique_id}" + + +def get_pinyin_sort_key(text: str) -> str: + """Generate sort key for Chinese pinyin sorting + + This function uses pyuca (Python Unicode Collation Algorithm) to generate + sort keys that handle Chinese characters by their pinyin pronunciation. + For non-Chinese text, it falls back to standard Unicode sorting. + + Args: + text: Text to generate sort key for + + Returns: + str: Sort key that can be used for comparison and sorting + """ + if not text: + return "" + + # Use the globally initialized collator + if _pyuca_available and _pinyin_collator is not None: + try: + return _pinyin_collator.sort_key(text) + except Exception as e: + logger.warning( + f"Failed to generate pinyin sort key for '{text}': {e}. Using fallback." + ) + + # Fallback to standard string sorting + return text.lower()