Fix query windows size limitation for Milvus data migration
This commit is contained in:
parent
47b8caaf64
commit
a9d6807432
1 changed files with 76 additions and 91 deletions
|
|
@ -498,7 +498,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
# Get max_length from field params
|
# Get max_length from field params
|
||||||
max_length = file_path_field.get("params", {}).get("max_length")
|
max_length = file_path_field.get("params", {}).get("max_length")
|
||||||
|
|
||||||
if max_length and max_length != DEFAULT_MAX_FILE_PATH_LENGTH:
|
if max_length and max_length < DEFAULT_MAX_FILE_PATH_LENGTH:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, "
|
f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, "
|
||||||
f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}"
|
f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}"
|
||||||
|
|
@ -576,59 +576,23 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
f"[{self.workspace}] Schema compatibility check passed for {self.namespace}"
|
f"[{self.workspace}] Schema compatibility check passed for {self.namespace}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _create_migration_client(self) -> MilvusClient:
|
|
||||||
"""Create a new MilvusClient instance for migration operations"""
|
|
||||||
return MilvusClient(
|
|
||||||
uri=os.environ.get(
|
|
||||||
"MILVUS_URI",
|
|
||||||
config.get(
|
|
||||||
"milvus",
|
|
||||||
"uri",
|
|
||||||
fallback=os.path.join(
|
|
||||||
self.global_config["working_dir"], "milvus_lite.db"
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
user=os.environ.get(
|
|
||||||
"MILVUS_USER", config.get("milvus", "user", fallback=None)
|
|
||||||
),
|
|
||||||
password=os.environ.get(
|
|
||||||
"MILVUS_PASSWORD",
|
|
||||||
config.get("milvus", "password", fallback=None),
|
|
||||||
),
|
|
||||||
token=os.environ.get(
|
|
||||||
"MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
|
|
||||||
),
|
|
||||||
db_name=os.environ.get(
|
|
||||||
"MILVUS_DB_NAME",
|
|
||||||
config.get("milvus", "db_name", fallback=None),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
def _migrate_collection_schema(self):
|
def _migrate_collection_schema(self):
|
||||||
"""Migrate collection schema using optimized dual client architecture"""
|
"""Migrate collection schema using query_iterator - completely solves query window limitations"""
|
||||||
read_client = None
|
|
||||||
original_collection_name = self.final_namespace
|
original_collection_name = self.final_namespace
|
||||||
temp_collection_name = f"{self.final_namespace}_temp"
|
temp_collection_name = f"{self.final_namespace}_temp"
|
||||||
|
iterator = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Starting optimized dual-client schema migration for {self.namespace}"
|
f"[{self.workspace}] Starting iterator-based schema migration for {self.namespace}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._client is not None:
|
# Step 1: Create temporary collection with new schema
|
||||||
self._client.close()
|
|
||||||
|
|
||||||
# Step 1: use self._client to create a temporary collection
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}"
|
f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}"
|
||||||
)
|
)
|
||||||
# Create indexes for the new collection using self._client
|
|
||||||
# Temporarily update final_namespace for index creation
|
# Temporarily update final_namespace for index creation
|
||||||
self.final_namespace = temp_collection_name
|
self.final_namespace = temp_collection_name
|
||||||
|
|
||||||
# Create new client instance for self._client
|
|
||||||
self._client = self._create_migration_client()
|
|
||||||
new_schema = self._create_schema_for_namespace()
|
new_schema = self._create_schema_for_namespace()
|
||||||
self._client.create_collection(
|
self._client.create_collection(
|
||||||
collection_name=temp_collection_name, schema=new_schema
|
collection_name=temp_collection_name, schema=new_schema
|
||||||
|
|
@ -644,75 +608,94 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
# Load the new collection
|
# Load the new collection
|
||||||
self._client.load_collection(temp_collection_name)
|
self._client.load_collection(temp_collection_name)
|
||||||
|
|
||||||
# Step 2: copy old data to new collection
|
# Step 2: Copy data using query_iterator (solves query window limitation)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Step 2: Copying old data to new collection: {original_collection_name}"
|
f"[{self.workspace}] Step 2: Copying data using query_iterator from: {original_collection_name}"
|
||||||
)
|
)
|
||||||
read_client = self._create_migration_client()
|
|
||||||
read_client.load_collection(original_collection_name)
|
|
||||||
|
|
||||||
page_size = 1000
|
# Create query iterator
|
||||||
offset = 0
|
try:
|
||||||
total_migrated = 0
|
iterator = self._client.query_iterator(
|
||||||
while True:
|
|
||||||
# Read data from old collection using read_client
|
|
||||||
page_data = read_client.query(
|
|
||||||
collection_name=original_collection_name,
|
collection_name=original_collection_name,
|
||||||
filter="", # Empty filter to get all data
|
batch_size=2000, # Adjustable batch size for optimal performance
|
||||||
output_fields=["*"], # Get all fields
|
output_fields=["*"], # Get all fields
|
||||||
limit=page_size,
|
|
||||||
offset=offset,
|
|
||||||
)
|
)
|
||||||
|
logger.debug(f"[{self.workspace}] Query iterator created successfully")
|
||||||
|
except Exception as iterator_error:
|
||||||
|
logger.error(
|
||||||
|
f"[{self.workspace}] Failed to create query iterator: {iterator_error}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
if not page_data:
|
# Iterate through all data
|
||||||
# No more data to retrieve
|
total_migrated = 0
|
||||||
break
|
batch_number = 1
|
||||||
|
|
||||||
# Write data to new collection using self._client
|
while True:
|
||||||
try:
|
try:
|
||||||
self._client.insert(
|
batch_data = iterator.next()
|
||||||
collection_name=temp_collection_name, data=page_data
|
if not batch_data:
|
||||||
)
|
# No more data available
|
||||||
total_migrated += len(page_data)
|
break
|
||||||
|
|
||||||
logger.debug(
|
# Insert batch data to new collection
|
||||||
f"[{self.workspace}] Migrated batch {offset//page_size + 1}, "
|
try:
|
||||||
f"processed {len(page_data)} records, total migrated: {total_migrated}"
|
self._client.insert(
|
||||||
)
|
collection_name=temp_collection_name, data=batch_data
|
||||||
except Exception as batch_error:
|
)
|
||||||
|
total_migrated += len(batch_data)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"[{self.workspace}] Iterator batch {batch_number}: "
|
||||||
|
f"processed {len(batch_data)} records, total migrated: {total_migrated}"
|
||||||
|
)
|
||||||
|
batch_number += 1
|
||||||
|
|
||||||
|
except Exception as batch_error:
|
||||||
|
logger.error(
|
||||||
|
f"[{self.workspace}] Failed to insert iterator batch {batch_number}: {batch_error}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
except Exception as next_error:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[{self.workspace}] Failed to migrate batch {offset//page_size + 1}: {batch_error}"
|
f"[{self.workspace}] Iterator next() failed at batch {batch_number}: {next_error}"
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
offset += page_size
|
|
||||||
|
|
||||||
# If we got less than page_size records, we've reached the end
|
|
||||||
if len(page_data) < page_size:
|
|
||||||
break
|
|
||||||
|
|
||||||
if total_migrated > 0:
|
if total_migrated > 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Successfully migrated {total_migrated} records"
|
f"[{self.workspace}] Successfully migrated {total_migrated} records using iterator"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] No data found in original collection, migration completed"
|
f"[{self.workspace}] No data found in original collection, migration completed"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3: drop old collection
|
# Step 3: Rename origin collection (keep for safety)
|
||||||
logger.info(f"[{self.workspace}] Step 3: Dropping old collection")
|
logger.info(
|
||||||
read_client.drop_collection(original_collection_name)
|
f"[{self.workspace}] Step 3: Rename origin collection to {original_collection_name}_old"
|
||||||
read_client.close()
|
)
|
||||||
read_client = None
|
try:
|
||||||
|
self._client.rename_collection(
|
||||||
|
original_collection_name, f"{original_collection_name}_old"
|
||||||
|
)
|
||||||
|
except Exception as rename_error:
|
||||||
|
try:
|
||||||
|
logger.warning(
|
||||||
|
f"[{self.workspace}] Try to drop origin collection instead"
|
||||||
|
)
|
||||||
|
self._client.drop_collection(original_collection_name)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"[{self.workspace}] Rename operation failed: {rename_error}"
|
||||||
|
)
|
||||||
|
raise e
|
||||||
|
|
||||||
# Step 4: Rename temporary collection to original name
|
# Step 4: Rename temporary collection to original name
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}"
|
f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}"
|
||||||
)
|
)
|
||||||
# create new client instance for self._client
|
|
||||||
self._client = self._create_migration_client()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._client.rename_collection(
|
self._client.rename_collection(
|
||||||
temp_collection_name, original_collection_name
|
temp_collection_name, original_collection_name
|
||||||
|
|
@ -726,12 +709,12 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
f"Failed to rename collection: {rename_error}"
|
f"Failed to rename collection: {rename_error}"
|
||||||
) from rename_error
|
) from rename_error
|
||||||
|
|
||||||
# restore final_namespace
|
# Restore final_namespace
|
||||||
self.final_namespace = original_collection_name
|
self.final_namespace = original_collection_name
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[{self.workspace}] Data migration failed for {self.namespace}: {e}"
|
f"[{self.workspace}] Iterator-based migration failed for {self.namespace}: {e}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Attempt cleanup of temporary collection if it exists
|
# Attempt cleanup of temporary collection if it exists
|
||||||
|
|
@ -748,18 +731,20 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
|
|
||||||
# Re-raise the original error
|
# Re-raise the original error
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Optimized dual-client migration failed for collection {self.namespace}: {e}"
|
f"Iterator-based migration failed for collection {self.namespace}: {e}"
|
||||||
) from e
|
) from e
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Ensure read_client is properly closed
|
# Ensure iterator is properly closed
|
||||||
if read_client:
|
if iterator:
|
||||||
try:
|
try:
|
||||||
read_client.close()
|
iterator.close()
|
||||||
logger.debug(f"[{self.workspace}] Read client closed successfully")
|
logger.debug(
|
||||||
|
f"[{self.workspace}] Query iterator closed successfully"
|
||||||
|
)
|
||||||
except Exception as close_error:
|
except Exception as close_error:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[{self.workspace}] Failed to close read client: {close_error}"
|
f"[{self.workspace}] Failed to close query iterator: {close_error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _validate_collection_compatibility(self):
|
def _validate_collection_compatibility(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue