Add Qdrant legacy collection migration with workspace support
- Add QdrantMigrationError exception - Implement automatic data migration - Support workspace-based partitioning - Add migration verification logic - Update collection naming scheme
This commit is contained in:
parent
0498e80a42
commit
5f4a280458
2 changed files with 200 additions and 5 deletions
|
|
@ -104,3 +104,11 @@ class PipelineCancelledException(Exception):
|
|||
def __init__(self, message: str = "User cancelled"):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
|
||||
|
||||
class QdrantMigrationError(Exception):
|
||||
"""Raised when Qdrant data migration from legacy collections fails."""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import numpy as np
|
|||
import pipmaster as pm
|
||||
|
||||
from ..base import BaseVectorStorage
|
||||
from ..exceptions import QdrantMigrationError
|
||||
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||
from ..utils import compute_mdhash_id, logger
|
||||
|
||||
|
|
@ -81,19 +82,192 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
|||
self.__post_init__()
|
||||
|
||||
@staticmethod
|
||||
def setup_collection(client: QdrantClient, collection_name: str, **kwargs):
|
||||
exists = client.collection_exists(collection_name)
|
||||
def setup_collection(
|
||||
client: QdrantClient,
|
||||
collection_name: str,
|
||||
legacy_namespace: str = None,
|
||||
workspace: str = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Setup Qdrant collection with migration support from legacy collections.
|
||||
|
||||
if not exists:
|
||||
Args:
|
||||
client: QdrantClient instance
|
||||
collection_name: Name of the new collection
|
||||
legacy_namespace: Name of the legacy collection (if exists)
|
||||
workspace: Workspace identifier for data isolation
|
||||
**kwargs: Additional arguments for collection creation (vectors_config, hnsw_config, etc.)
|
||||
"""
|
||||
new_collection_exists = client.collection_exists(collection_name)
|
||||
legacy_exists = legacy_namespace and client.collection_exists(legacy_namespace)
|
||||
|
||||
# Case 1: Both new and legacy collections exist - Warning only (no migration)
|
||||
if new_collection_exists and legacy_exists:
|
||||
logger.warning(
|
||||
f"Qdrant: Legacy collection '{legacy_namespace}' still exist. Remove it if migration is complete."
|
||||
)
|
||||
return
|
||||
|
||||
# Case 2: Only new collection exists - Ensure index exists
|
||||
if new_collection_exists:
|
||||
# Check if workspace index exists, create if missing
|
||||
try:
|
||||
collection_info = client.get_collection(collection_name)
|
||||
if WORKSPACE_ID_FIELD not in collection_info.payload_schema:
|
||||
logger.info(
|
||||
f"Qdrant: Creating missing workspace index for '{collection_name}'"
|
||||
)
|
||||
client.create_payload_index(
|
||||
collection_name=collection_name,
|
||||
field_name=WORKSPACE_ID_FIELD,
|
||||
field_schema=models.KeywordIndexParams(
|
||||
type=models.KeywordIndexType.KEYWORD,
|
||||
is_tenant=True,
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Qdrant: Could not verify/create workspace index for '{collection_name}': {e}"
|
||||
)
|
||||
return
|
||||
|
||||
# Case 3: Neither exists - Create new collection
|
||||
if not legacy_exists:
|
||||
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
|
||||
client.create_collection(collection_name, **kwargs)
|
||||
client.create_payload_index(
|
||||
collection_name=collection_name,
|
||||
field_name=WORKSPACE_ID_FIELD,
|
||||
field_schema=models.KeywordIndexParams(
|
||||
type=models.KeywordIndexType.KEYWORD,
|
||||
is_tenant=True, # Optimize storage structure for tenant co-location
|
||||
is_tenant=True,
|
||||
),
|
||||
)
|
||||
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
|
||||
return
|
||||
|
||||
# Case 4: Only legacy exists - Migrate data
|
||||
logger.info(
|
||||
f"Qdrant: Migrating data from legacy collection '{legacy_namespace}'"
|
||||
)
|
||||
|
||||
try:
|
||||
# Get legacy collection count
|
||||
legacy_count = client.count(
|
||||
collection_name=legacy_namespace, exact=True
|
||||
).count
|
||||
logger.info(f"Qdrant: Found {legacy_count} records in legacy collection")
|
||||
|
||||
if legacy_count == 0:
|
||||
logger.info("Qdrant: Legacy collection is empty, skipping migration")
|
||||
# Create new empty collection
|
||||
client.create_collection(collection_name, **kwargs)
|
||||
client.create_payload_index(
|
||||
collection_name=collection_name,
|
||||
field_name=WORKSPACE_ID_FIELD,
|
||||
field_schema=models.KeywordIndexParams(
|
||||
type=models.KeywordIndexType.KEYWORD,
|
||||
is_tenant=True,
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
# Create new collection first
|
||||
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
|
||||
client.create_collection(collection_name, **kwargs)
|
||||
|
||||
# Batch migration (500 records per batch)
|
||||
migrated_count = 0
|
||||
offset = None
|
||||
batch_size = 500
|
||||
|
||||
while True:
|
||||
# Scroll through legacy data
|
||||
result = client.scroll(
|
||||
collection_name=legacy_namespace,
|
||||
limit=batch_size,
|
||||
offset=offset,
|
||||
with_vectors=True,
|
||||
with_payload=True,
|
||||
)
|
||||
points, next_offset = result
|
||||
|
||||
if not points:
|
||||
break
|
||||
|
||||
# Transform points for new collection
|
||||
new_points = []
|
||||
for point in points:
|
||||
# Add workspace_id to payload
|
||||
new_payload = dict(point.payload or {})
|
||||
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
|
||||
|
||||
# Create new point with workspace-prefixed ID
|
||||
original_id = new_payload.get(ID_FIELD)
|
||||
if original_id:
|
||||
new_point_id = compute_mdhash_id_for_qdrant(
|
||||
original_id, prefix=workspace or DEFAULT_WORKSPACE
|
||||
)
|
||||
else:
|
||||
# Fallback: use original point ID
|
||||
new_point_id = str(point.id)
|
||||
|
||||
new_points.append(
|
||||
models.PointStruct(
|
||||
id=new_point_id,
|
||||
vector=point.vector,
|
||||
payload=new_payload,
|
||||
)
|
||||
)
|
||||
|
||||
# Upsert to new collection
|
||||
client.upsert(
|
||||
collection_name=collection_name, points=new_points, wait=True
|
||||
)
|
||||
|
||||
migrated_count += len(points)
|
||||
logger.info(f"Qdrant: {migrated_count}/{legacy_count} records migrated")
|
||||
|
||||
# Check if we've reached the end
|
||||
if next_offset is None:
|
||||
break
|
||||
offset = next_offset
|
||||
|
||||
# Verify migration by comparing counts
|
||||
logger.info("Verifying migration...")
|
||||
new_count = client.count(collection_name=collection_name, exact=True).count
|
||||
|
||||
if new_count != legacy_count:
|
||||
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
|
||||
logger.error(error_msg)
|
||||
raise QdrantMigrationError(error_msg)
|
||||
|
||||
logger.info(
|
||||
f"Qdrant: Migration completed successfully: {migrated_count} records migrated"
|
||||
)
|
||||
|
||||
# Create payload index after successful migration
|
||||
logger.info("Qdrant: Creating workspace payload index...")
|
||||
client.create_payload_index(
|
||||
collection_name=collection_name,
|
||||
field_name=WORKSPACE_ID_FIELD,
|
||||
field_schema=models.KeywordIndexParams(
|
||||
type=models.KeywordIndexType.KEYWORD,
|
||||
is_tenant=True,
|
||||
),
|
||||
)
|
||||
logger.info(
|
||||
f"Qdrant: Migration from '{legacy_namespace}' to '{collection_name}' completed successfully"
|
||||
)
|
||||
|
||||
except QdrantMigrationError:
|
||||
# Re-raise migration errors without wrapping
|
||||
raise
|
||||
except Exception as e:
|
||||
error_msg = f"Qdrant: Migration failed with error: {e}"
|
||||
logger.error(error_msg)
|
||||
raise QdrantMigrationError(error_msg) from e
|
||||
|
||||
def __post_init__(self):
|
||||
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
|
||||
|
|
@ -113,11 +287,17 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
|||
f"Using passed workspace parameter: '{effective_workspace}'"
|
||||
)
|
||||
|
||||
# Get legacy namespace for data migration from old version
|
||||
if effective_workspace:
|
||||
self.legacy_namespace = f"{effective_workspace}_{self.namespace}"
|
||||
else:
|
||||
self.legacy_namespace = self.namespace
|
||||
|
||||
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
|
||||
|
||||
# Use a shared collection with payload-based partitioning (Qdrant's recommended approach)
|
||||
# Ref: https://qdrant.tech/documentation/guides/multiple-partitions/
|
||||
self.final_namespace = self.namespace
|
||||
self.final_namespace = f"lightrag_vdb_{self.namespace}"
|
||||
logger.debug(
|
||||
f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning"
|
||||
)
|
||||
|
|
@ -158,13 +338,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
|||
)
|
||||
|
||||
# Setup collection (create if not exists and configure indexes)
|
||||
# Pass legacy_namespace and workspace for migration support
|
||||
QdrantVectorDBStorage.setup_collection(
|
||||
self._client,
|
||||
self.final_namespace,
|
||||
legacy_namespace=self.legacy_namespace,
|
||||
workspace=self.effective_workspace,
|
||||
vectors_config=models.VectorParams(
|
||||
size=self.embedding_func.embedding_dim,
|
||||
distance=models.Distance.COSINE,
|
||||
),
|
||||
hnsw_config=models.HnswConfigDiff(
|
||||
payload_m=16,
|
||||
m=0,
|
||||
),
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue