From a9d680743274b556faf070ac9587928b5e8f82f8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 18 Aug 2025 16:29:03 +0800 Subject: [PATCH] Fix query windows size limitation for Milvus data migration --- lightrag/kg/milvus_impl.py | 167 +++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 91 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index a90b9e59..82dce30c 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -498,7 +498,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Get max_length from field params max_length = file_path_field.get("params", {}).get("max_length") - if max_length and max_length != DEFAULT_MAX_FILE_PATH_LENGTH: + if max_length and max_length < DEFAULT_MAX_FILE_PATH_LENGTH: logger.info( f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, " f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}" @@ -576,59 +576,23 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"[{self.workspace}] Schema compatibility check passed for {self.namespace}" ) - def _create_migration_client(self) -> MilvusClient: - """Create a new MilvusClient instance for migration operations""" - return MilvusClient( - uri=os.environ.get( - "MILVUS_URI", - config.get( - "milvus", - "uri", - fallback=os.path.join( - self.global_config["working_dir"], "milvus_lite.db" - ), - ), - ), - user=os.environ.get( - "MILVUS_USER", config.get("milvus", "user", fallback=None) - ), - password=os.environ.get( - "MILVUS_PASSWORD", - config.get("milvus", "password", fallback=None), - ), - token=os.environ.get( - "MILVUS_TOKEN", config.get("milvus", "token", fallback=None) - ), - db_name=os.environ.get( - "MILVUS_DB_NAME", - config.get("milvus", "db_name", fallback=None), - ), - ) - def _migrate_collection_schema(self): - """Migrate collection schema using optimized dual client architecture""" - read_client = None + """Migrate collection schema using query_iterator - completely solves query window limitations""" original_collection_name = self.final_namespace temp_collection_name = f"{self.final_namespace}_temp" + iterator = None try: logger.info( - f"[{self.workspace}] Starting optimized dual-client schema migration for {self.namespace}" + f"[{self.workspace}] Starting iterator-based schema migration for {self.namespace}" ) - if self._client is not None: - self._client.close() - - # Step 1: use self._client to create a temporary collection + # Step 1: Create temporary collection with new schema logger.info( f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}" ) - # Create indexes for the new collection using self._client # Temporarily update final_namespace for index creation self.final_namespace = temp_collection_name - - # Create new client instance for self._client - self._client = self._create_migration_client() new_schema = self._create_schema_for_namespace() self._client.create_collection( collection_name=temp_collection_name, schema=new_schema @@ -644,75 +608,94 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Load the new collection self._client.load_collection(temp_collection_name) - # Step 2: copy old data to new collection + # Step 2: Copy data using query_iterator (solves query window limitation) logger.info( - f"[{self.workspace}] Step 2: Copying old data to new collection: {original_collection_name}" + f"[{self.workspace}] Step 2: Copying data using query_iterator from: {original_collection_name}" ) - read_client = self._create_migration_client() - read_client.load_collection(original_collection_name) - page_size = 1000 - offset = 0 - total_migrated = 0 - while True: - # Read data from old collection using read_client - page_data = read_client.query( + # Create query iterator + try: + iterator = self._client.query_iterator( collection_name=original_collection_name, - filter="", # Empty filter to get all data + batch_size=2000, # Adjustable batch size for optimal performance output_fields=["*"], # Get all fields - limit=page_size, - offset=offset, ) + logger.debug(f"[{self.workspace}] Query iterator created successfully") + except Exception as iterator_error: + logger.error( + f"[{self.workspace}] Failed to create query iterator: {iterator_error}" + ) + raise - if not page_data: - # No more data to retrieve - break + # Iterate through all data + total_migrated = 0 + batch_number = 1 - # Write data to new collection using self._client + while True: try: - self._client.insert( - collection_name=temp_collection_name, data=page_data - ) - total_migrated += len(page_data) + batch_data = iterator.next() + if not batch_data: + # No more data available + break - logger.debug( - f"[{self.workspace}] Migrated batch {offset//page_size + 1}, " - f"processed {len(page_data)} records, total migrated: {total_migrated}" - ) - except Exception as batch_error: + # Insert batch data to new collection + try: + self._client.insert( + collection_name=temp_collection_name, data=batch_data + ) + total_migrated += len(batch_data) + + logger.info( + f"[{self.workspace}] Iterator batch {batch_number}: " + f"processed {len(batch_data)} records, total migrated: {total_migrated}" + ) + batch_number += 1 + + except Exception as batch_error: + logger.error( + f"[{self.workspace}] Failed to insert iterator batch {batch_number}: {batch_error}" + ) + raise + + except Exception as next_error: logger.error( - f"[{self.workspace}] Failed to migrate batch {offset//page_size + 1}: {batch_error}" + f"[{self.workspace}] Iterator next() failed at batch {batch_number}: {next_error}" ) raise - offset += page_size - - # If we got less than page_size records, we've reached the end - if len(page_data) < page_size: - break - if total_migrated > 0: logger.info( - f"[{self.workspace}] Successfully migrated {total_migrated} records" + f"[{self.workspace}] Successfully migrated {total_migrated} records using iterator" ) else: logger.info( f"[{self.workspace}] No data found in original collection, migration completed" ) - # Step 3: drop old collection - logger.info(f"[{self.workspace}] Step 3: Dropping old collection") - read_client.drop_collection(original_collection_name) - read_client.close() - read_client = None + # Step 3: Rename origin collection (keep for safety) + logger.info( + f"[{self.workspace}] Step 3: Rename origin collection to {original_collection_name}_old" + ) + try: + self._client.rename_collection( + original_collection_name, f"{original_collection_name}_old" + ) + except Exception as rename_error: + try: + logger.warning( + f"[{self.workspace}] Try to drop origin collection instead" + ) + self._client.drop_collection(original_collection_name) + except Exception as e: + logger.error( + f"[{self.workspace}] Rename operation failed: {rename_error}" + ) + raise e # Step 4: Rename temporary collection to original name logger.info( f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}" ) - # create new client instance for self._client - self._client = self._create_migration_client() - try: self._client.rename_collection( temp_collection_name, original_collection_name @@ -726,12 +709,12 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"Failed to rename collection: {rename_error}" ) from rename_error - # restore final_namespace + # Restore final_namespace self.final_namespace = original_collection_name except Exception as e: logger.error( - f"[{self.workspace}] Data migration failed for {self.namespace}: {e}" + f"[{self.workspace}] Iterator-based migration failed for {self.namespace}: {e}" ) # Attempt cleanup of temporary collection if it exists @@ -748,18 +731,20 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Re-raise the original error raise RuntimeError( - f"Optimized dual-client migration failed for collection {self.namespace}: {e}" + f"Iterator-based migration failed for collection {self.namespace}: {e}" ) from e finally: - # Ensure read_client is properly closed - if read_client: + # Ensure iterator is properly closed + if iterator: try: - read_client.close() - logger.debug(f"[{self.workspace}] Read client closed successfully") + iterator.close() + logger.debug( + f"[{self.workspace}] Query iterator closed successfully" + ) except Exception as close_error: logger.warning( - f"[{self.workspace}] Failed to close read client: {close_error}" + f"[{self.workspace}] Failed to close query iterator: {close_error}" ) def _validate_collection_compatibility(self):