feat: increase file path length limit to 32768 and add schema migration for Milvus DB
- Bump path limit to 32768 chars - Add migration detection logic - Implement dual-client migration - Auto-migrate old collections
This commit is contained in:
parent
377f1a022e
commit
dcec511f72
2 changed files with 223 additions and 4 deletions
|
|
@ -36,7 +36,7 @@ DEFAULT_ENABLE_RERANK = True
|
|||
DEFAULT_MIN_RERANK_SCORE = 0.0
|
||||
|
||||
# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema)
|
||||
DEFAULT_MAX_FILE_PATH_LENGTH = 4090
|
||||
DEFAULT_MAX_FILE_PATH_LENGTH = 32768
|
||||
|
||||
# Default temperature for LLM
|
||||
DEFAULT_TEMPERATURE = 1.0
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|||
FieldSchema(
|
||||
name="file_path",
|
||||
dtype=DataType.VARCHAR,
|
||||
max_length=1024,
|
||||
max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
|
||||
nullable=True,
|
||||
),
|
||||
]
|
||||
|
|
@ -95,7 +95,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|||
FieldSchema(
|
||||
name="file_path",
|
||||
dtype=DataType.VARCHAR,
|
||||
max_length=1024,
|
||||
max_length=DEFAULT_MAX_FILE_PATH_LENGTH,
|
||||
nullable=True,
|
||||
),
|
||||
]
|
||||
|
|
@ -482,8 +482,33 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|||
)
|
||||
return
|
||||
|
||||
def _check_file_path_length_restriction(self, collection_info: dict) -> bool:
|
||||
"""Check if collection has file_path length restrictions that need migration
|
||||
|
||||
Returns:
|
||||
bool: True if migration is needed, False otherwise
|
||||
"""
|
||||
existing_fields = {
|
||||
field["name"]: field for field in collection_info.get("fields", [])
|
||||
}
|
||||
|
||||
# Check if file_path field exists and has length restrictions
|
||||
if "file_path" in existing_fields:
|
||||
file_path_field = existing_fields["file_path"]
|
||||
# Get max_length from field params
|
||||
max_length = file_path_field.get("params", {}).get("max_length")
|
||||
|
||||
if max_length and max_length != DEFAULT_MAX_FILE_PATH_LENGTH:
|
||||
logger.info(
|
||||
f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, "
|
||||
f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}"
|
||||
)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _check_schema_compatibility(self, collection_info: dict):
|
||||
"""Check schema field compatibility"""
|
||||
"""Check schema field compatibility and detect migration needs"""
|
||||
existing_fields = {
|
||||
field["name"]: field for field in collection_info.get("fields", [])
|
||||
}
|
||||
|
|
@ -505,6 +530,14 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|||
)
|
||||
return
|
||||
|
||||
# Check if migration is needed for file_path length restrictions
|
||||
if self._check_file_path_length_restriction(collection_info):
|
||||
logger.info(
|
||||
f"[{self.workspace}] Starting automatic migration for collection {self.namespace}"
|
||||
)
|
||||
self._migrate_collection_schema()
|
||||
return
|
||||
|
||||
# For collections with vector field, check basic compatibility
|
||||
# Only check for critical incompatibilities, not missing optional fields
|
||||
critical_fields = {"id": {"type": "VarChar", "is_primary": True}}
|
||||
|
|
@ -543,6 +576,192 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
|||
f"[{self.workspace}] Schema compatibility check passed for {self.namespace}"
|
||||
)
|
||||
|
||||
def _create_migration_client(self) -> MilvusClient:
|
||||
"""Create a new MilvusClient instance for migration operations"""
|
||||
return MilvusClient(
|
||||
uri=os.environ.get(
|
||||
"MILVUS_URI",
|
||||
config.get(
|
||||
"milvus",
|
||||
"uri",
|
||||
fallback=os.path.join(
|
||||
self.global_config["working_dir"], "milvus_lite.db"
|
||||
),
|
||||
),
|
||||
),
|
||||
user=os.environ.get(
|
||||
"MILVUS_USER", config.get("milvus", "user", fallback=None)
|
||||
),
|
||||
password=os.environ.get(
|
||||
"MILVUS_PASSWORD",
|
||||
config.get("milvus", "password", fallback=None),
|
||||
),
|
||||
token=os.environ.get(
|
||||
"MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
|
||||
),
|
||||
db_name=os.environ.get(
|
||||
"MILVUS_DB_NAME",
|
||||
config.get("milvus", "db_name", fallback=None),
|
||||
),
|
||||
)
|
||||
|
||||
def _migrate_collection_schema(self):
|
||||
"""Migrate collection schema using optimized dual client architecture"""
|
||||
read_client = None
|
||||
original_collection_name = self.final_namespace
|
||||
temp_collection_name = f"{self.final_namespace}_temp"
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
f"[{self.workspace}] Starting optimized dual-client schema migration for {self.namespace}"
|
||||
)
|
||||
|
||||
if self._client is not None:
|
||||
self._client.close()
|
||||
|
||||
# Step 1: use self._client to create a temporary collection
|
||||
logger.info(
|
||||
f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}"
|
||||
)
|
||||
# Create indexes for the new collection using self._client
|
||||
# Temporarily update final_namespace for index creation
|
||||
self.final_namespace = temp_collection_name
|
||||
|
||||
# Create new client instance for self._client
|
||||
self._client = self._create_migration_client()
|
||||
new_schema = self._create_schema_for_namespace()
|
||||
self._client.create_collection(
|
||||
collection_name=temp_collection_name, schema=new_schema
|
||||
)
|
||||
try:
|
||||
self._create_indexes_after_collection()
|
||||
except Exception as index_error:
|
||||
logger.warning(
|
||||
f"[{self.workspace}] Failed to create indexes for new collection: {index_error}"
|
||||
)
|
||||
# Continue with migration even if index creation fails
|
||||
|
||||
# Load the new collection
|
||||
self._client.load_collection(temp_collection_name)
|
||||
|
||||
# Step 2: copy old data to new collection
|
||||
logger.info(
|
||||
f"[{self.workspace}] Step 2: Copying old data to new collection: {original_collection_name}"
|
||||
)
|
||||
read_client = self._create_migration_client()
|
||||
read_client.load_collection(original_collection_name)
|
||||
|
||||
page_size = 1000
|
||||
offset = 0
|
||||
total_migrated = 0
|
||||
while True:
|
||||
# Read data from old collection using read_client
|
||||
page_data = read_client.query(
|
||||
collection_name=original_collection_name,
|
||||
filter="", # Empty filter to get all data
|
||||
output_fields=["*"], # Get all fields
|
||||
limit=page_size,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
if not page_data:
|
||||
# No more data to retrieve
|
||||
break
|
||||
|
||||
# Write data to new collection using self._client
|
||||
try:
|
||||
self._client.insert(
|
||||
collection_name=temp_collection_name, data=page_data
|
||||
)
|
||||
total_migrated += len(page_data)
|
||||
|
||||
logger.debug(
|
||||
f"[{self.workspace}] Migrated batch {offset//page_size + 1}, "
|
||||
f"processed {len(page_data)} records, total migrated: {total_migrated}"
|
||||
)
|
||||
except Exception as batch_error:
|
||||
logger.error(
|
||||
f"[{self.workspace}] Failed to migrate batch {offset//page_size + 1}: {batch_error}"
|
||||
)
|
||||
raise
|
||||
|
||||
offset += page_size
|
||||
|
||||
# If we got less than page_size records, we've reached the end
|
||||
if len(page_data) < page_size:
|
||||
break
|
||||
|
||||
if total_migrated > 0:
|
||||
logger.info(
|
||||
f"[{self.workspace}] Successfully migrated {total_migrated} records"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"[{self.workspace}] No data found in original collection, migration completed"
|
||||
)
|
||||
|
||||
# Step 3: drop old collection
|
||||
logger.info(f"[{self.workspace}] Step 3: Dropping old collection")
|
||||
read_client.drop_collection(original_collection_name)
|
||||
read_client.close()
|
||||
read_client = None
|
||||
|
||||
# Step 4: Rename temporary collection to original name
|
||||
logger.info(
|
||||
f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}"
|
||||
)
|
||||
# create new client instance for self._client
|
||||
self._client = self._create_migration_client()
|
||||
|
||||
try:
|
||||
self._client.rename_collection(
|
||||
temp_collection_name, original_collection_name
|
||||
)
|
||||
logger.info(f"[{self.workspace}] Rename operation completed")
|
||||
except Exception as rename_error:
|
||||
logger.error(
|
||||
f"[{self.workspace}] Rename operation failed: {rename_error}"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to rename collection: {rename_error}"
|
||||
) from rename_error
|
||||
|
||||
# restore final_namespace
|
||||
self.final_namespace = original_collection_name
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.workspace}] Data migration failed for {self.namespace}: {e}"
|
||||
)
|
||||
|
||||
# Attempt cleanup of temporary collection if it exists
|
||||
try:
|
||||
if self._client and self._client.has_collection(temp_collection_name):
|
||||
logger.info(
|
||||
f"[{self.workspace}] Cleaning up failed migration temporary collection"
|
||||
)
|
||||
self._client.drop_collection(temp_collection_name)
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(
|
||||
f"[{self.workspace}] Failed to cleanup temporary collection: {cleanup_error}"
|
||||
)
|
||||
|
||||
# Re-raise the original error
|
||||
raise RuntimeError(
|
||||
f"Optimized dual-client migration failed for collection {self.namespace}: {e}"
|
||||
) from e
|
||||
|
||||
finally:
|
||||
# Ensure read_client is properly closed
|
||||
if read_client:
|
||||
try:
|
||||
read_client.close()
|
||||
logger.debug(f"[{self.workspace}] Read client closed successfully")
|
||||
except Exception as close_error:
|
||||
logger.warning(
|
||||
f"[{self.workspace}] Failed to close read client: {close_error}"
|
||||
)
|
||||
|
||||
def _validate_collection_compatibility(self):
|
||||
"""Validate existing collection's dimension and schema compatibility"""
|
||||
try:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue