Add Qdrant legacy collection migration with workspace support
- Add QdrantMigrationError exception
- Implement automatic data migration
- Support workspace-based partitioning
- Add migration verification logic
- Update collection naming scheme
(cherry picked from commit 5f4a280458)
This commit is contained in:
parent
46c13e23f0
commit
537db072e0
2 changed files with 200 additions and 5 deletions
|
|
@ -104,3 +104,11 @@ class PipelineCancelledException(Exception):
|
||||||
def __init__(self, message: str = "User cancelled"):
|
def __init__(self, message: str = "User cancelled"):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
self.message = message
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
|
class QdrantMigrationError(Exception):
|
||||||
|
"""Raised when Qdrant data migration from legacy collections fails."""
|
||||||
|
|
||||||
|
def __init__(self, message: str):
|
||||||
|
super().__init__(message)
|
||||||
|
self.message = message
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import numpy as np
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
from ..base import BaseVectorStorage
|
from ..base import BaseVectorStorage
|
||||||
|
from ..exceptions import QdrantMigrationError
|
||||||
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||||
from ..utils import compute_mdhash_id, logger
|
from ..utils import compute_mdhash_id, logger
|
||||||
|
|
||||||
|
|
@ -81,19 +82,192 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||||
self.__post_init__()
|
self.__post_init__()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def setup_collection(client: QdrantClient, collection_name: str, **kwargs):
|
def setup_collection(
|
||||||
exists = client.collection_exists(collection_name)
|
client: QdrantClient,
|
||||||
|
collection_name: str,
|
||||||
|
legacy_namespace: str = None,
|
||||||
|
workspace: str = None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Setup Qdrant collection with migration support from legacy collections.
|
||||||
|
|
||||||
if not exists:
|
Args:
|
||||||
|
client: QdrantClient instance
|
||||||
|
collection_name: Name of the new collection
|
||||||
|
legacy_namespace: Name of the legacy collection (if exists)
|
||||||
|
workspace: Workspace identifier for data isolation
|
||||||
|
**kwargs: Additional arguments for collection creation (vectors_config, hnsw_config, etc.)
|
||||||
|
"""
|
||||||
|
new_collection_exists = client.collection_exists(collection_name)
|
||||||
|
legacy_exists = legacy_namespace and client.collection_exists(legacy_namespace)
|
||||||
|
|
||||||
|
# Case 1: Both new and legacy collections exist - Warning only (no migration)
|
||||||
|
if new_collection_exists and legacy_exists:
|
||||||
|
logger.warning(
|
||||||
|
f"Qdrant: Legacy collection '{legacy_namespace}' still exist. Remove it if migration is complete."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Case 2: Only new collection exists - Ensure index exists
|
||||||
|
if new_collection_exists:
|
||||||
|
# Check if workspace index exists, create if missing
|
||||||
|
try:
|
||||||
|
collection_info = client.get_collection(collection_name)
|
||||||
|
if WORKSPACE_ID_FIELD not in collection_info.payload_schema:
|
||||||
|
logger.info(
|
||||||
|
f"Qdrant: Creating missing workspace index for '{collection_name}'"
|
||||||
|
)
|
||||||
|
client.create_payload_index(
|
||||||
|
collection_name=collection_name,
|
||||||
|
field_name=WORKSPACE_ID_FIELD,
|
||||||
|
field_schema=models.KeywordIndexParams(
|
||||||
|
type=models.KeywordIndexType.KEYWORD,
|
||||||
|
is_tenant=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Qdrant: Could not verify/create workspace index for '{collection_name}': {e}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Case 3: Neither exists - Create new collection
|
||||||
|
if not legacy_exists:
|
||||||
|
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
|
||||||
client.create_collection(collection_name, **kwargs)
|
client.create_collection(collection_name, **kwargs)
|
||||||
client.create_payload_index(
|
client.create_payload_index(
|
||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
field_name=WORKSPACE_ID_FIELD,
|
field_name=WORKSPACE_ID_FIELD,
|
||||||
field_schema=models.KeywordIndexParams(
|
field_schema=models.KeywordIndexParams(
|
||||||
type=models.KeywordIndexType.KEYWORD,
|
type=models.KeywordIndexType.KEYWORD,
|
||||||
is_tenant=True, # Optimize storage structure for tenant co-location
|
is_tenant=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Case 4: Only legacy exists - Migrate data
|
||||||
|
logger.info(
|
||||||
|
f"Qdrant: Migrating data from legacy collection '{legacy_namespace}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get legacy collection count
|
||||||
|
legacy_count = client.count(
|
||||||
|
collection_name=legacy_namespace, exact=True
|
||||||
|
).count
|
||||||
|
logger.info(f"Qdrant: Found {legacy_count} records in legacy collection")
|
||||||
|
|
||||||
|
if legacy_count == 0:
|
||||||
|
logger.info("Qdrant: Legacy collection is empty, skipping migration")
|
||||||
|
# Create new empty collection
|
||||||
|
client.create_collection(collection_name, **kwargs)
|
||||||
|
client.create_payload_index(
|
||||||
|
collection_name=collection_name,
|
||||||
|
field_name=WORKSPACE_ID_FIELD,
|
||||||
|
field_schema=models.KeywordIndexParams(
|
||||||
|
type=models.KeywordIndexType.KEYWORD,
|
||||||
|
is_tenant=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create new collection first
|
||||||
|
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
|
||||||
|
client.create_collection(collection_name, **kwargs)
|
||||||
|
|
||||||
|
# Batch migration (500 records per batch)
|
||||||
|
migrated_count = 0
|
||||||
|
offset = None
|
||||||
|
batch_size = 500
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Scroll through legacy data
|
||||||
|
result = client.scroll(
|
||||||
|
collection_name=legacy_namespace,
|
||||||
|
limit=batch_size,
|
||||||
|
offset=offset,
|
||||||
|
with_vectors=True,
|
||||||
|
with_payload=True,
|
||||||
|
)
|
||||||
|
points, next_offset = result
|
||||||
|
|
||||||
|
if not points:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Transform points for new collection
|
||||||
|
new_points = []
|
||||||
|
for point in points:
|
||||||
|
# Add workspace_id to payload
|
||||||
|
new_payload = dict(point.payload or {})
|
||||||
|
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
|
||||||
|
|
||||||
|
# Create new point with workspace-prefixed ID
|
||||||
|
original_id = new_payload.get(ID_FIELD)
|
||||||
|
if original_id:
|
||||||
|
new_point_id = compute_mdhash_id_for_qdrant(
|
||||||
|
original_id, prefix=workspace or DEFAULT_WORKSPACE
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Fallback: use original point ID
|
||||||
|
new_point_id = str(point.id)
|
||||||
|
|
||||||
|
new_points.append(
|
||||||
|
models.PointStruct(
|
||||||
|
id=new_point_id,
|
||||||
|
vector=point.vector,
|
||||||
|
payload=new_payload,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Upsert to new collection
|
||||||
|
client.upsert(
|
||||||
|
collection_name=collection_name, points=new_points, wait=True
|
||||||
|
)
|
||||||
|
|
||||||
|
migrated_count += len(points)
|
||||||
|
logger.info(f"Qdrant: {migrated_count}/{legacy_count} records migrated")
|
||||||
|
|
||||||
|
# Check if we've reached the end
|
||||||
|
if next_offset is None:
|
||||||
|
break
|
||||||
|
offset = next_offset
|
||||||
|
|
||||||
|
# Verify migration by comparing counts
|
||||||
|
logger.info("Verifying migration...")
|
||||||
|
new_count = client.count(collection_name=collection_name, exact=True).count
|
||||||
|
|
||||||
|
if new_count != legacy_count:
|
||||||
|
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
|
||||||
|
logger.error(error_msg)
|
||||||
|
raise QdrantMigrationError(error_msg)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Qdrant: Migration completed successfully: {migrated_count} records migrated"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create payload index after successful migration
|
||||||
|
logger.info("Qdrant: Creating workspace payload index...")
|
||||||
|
client.create_payload_index(
|
||||||
|
collection_name=collection_name,
|
||||||
|
field_name=WORKSPACE_ID_FIELD,
|
||||||
|
field_schema=models.KeywordIndexParams(
|
||||||
|
type=models.KeywordIndexType.KEYWORD,
|
||||||
|
is_tenant=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Qdrant: Migration from '{legacy_namespace}' to '{collection_name}' completed successfully"
|
||||||
|
)
|
||||||
|
|
||||||
|
except QdrantMigrationError:
|
||||||
|
# Re-raise migration errors without wrapping
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Qdrant: Migration failed with error: {e}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
raise QdrantMigrationError(error_msg) from e
|
||||||
|
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
|
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
|
||||||
|
|
@ -113,11 +287,17 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||||
f"Using passed workspace parameter: '{effective_workspace}'"
|
f"Using passed workspace parameter: '{effective_workspace}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Get legacy namespace for data migration from old version
|
||||||
|
if effective_workspace:
|
||||||
|
self.legacy_namespace = f"{effective_workspace}_{self.namespace}"
|
||||||
|
else:
|
||||||
|
self.legacy_namespace = self.namespace
|
||||||
|
|
||||||
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
|
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
|
||||||
|
|
||||||
# Use a shared collection with payload-based partitioning (Qdrant's recommended approach)
|
# Use a shared collection with payload-based partitioning (Qdrant's recommended approach)
|
||||||
# Ref: https://qdrant.tech/documentation/guides/multiple-partitions/
|
# Ref: https://qdrant.tech/documentation/guides/multiple-partitions/
|
||||||
self.final_namespace = self.namespace
|
self.final_namespace = f"lightrag_vdb_{self.namespace}"
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning"
|
f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning"
|
||||||
)
|
)
|
||||||
|
|
@ -158,13 +338,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Setup collection (create if not exists and configure indexes)
|
# Setup collection (create if not exists and configure indexes)
|
||||||
|
# Pass legacy_namespace and workspace for migration support
|
||||||
QdrantVectorDBStorage.setup_collection(
|
QdrantVectorDBStorage.setup_collection(
|
||||||
self._client,
|
self._client,
|
||||||
self.final_namespace,
|
self.final_namespace,
|
||||||
|
legacy_namespace=self.legacy_namespace,
|
||||||
|
workspace=self.effective_workspace,
|
||||||
vectors_config=models.VectorParams(
|
vectors_config=models.VectorParams(
|
||||||
size=self.embedding_func.embedding_dim,
|
size=self.embedding_func.embedding_dim,
|
||||||
distance=models.Distance.COSINE,
|
distance=models.Distance.COSINE,
|
||||||
),
|
),
|
||||||
|
hnsw_config=models.HnswConfigDiff(
|
||||||
|
payload_m=16,
|
||||||
|
m=0,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue