From 5f4a2804586d03ddf48d47cb372ffe166c0bf237 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 30 Oct 2025 19:16:33 +0800 Subject: [PATCH] Add Qdrant legacy collection migration with workspace support - Add QdrantMigrationError exception - Implement automatic data migration - Support workspace-based partitioning - Add migration verification logic - Update collection naming scheme --- lightrag/exceptions.py | 8 ++ lightrag/kg/qdrant_impl.py | 197 ++++++++++++++++++++++++++++++++++++- 2 files changed, 200 insertions(+), 5 deletions(-) diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index 09e1d0e7..303391c2 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -104,3 +104,11 @@ class PipelineCancelledException(Exception): def __init__(self, message: str = "User cancelled"): super().__init__(message) self.message = message + + +class QdrantMigrationError(Exception): + """Raised when Qdrant data migration from legacy collections fails.""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 60e8e66d..d51d8898 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -10,6 +10,7 @@ import numpy as np import pipmaster as pm from ..base import BaseVectorStorage +from ..exceptions import QdrantMigrationError from ..kg.shared_storage import get_data_init_lock, get_storage_lock from ..utils import compute_mdhash_id, logger @@ -81,19 +82,192 @@ class QdrantVectorDBStorage(BaseVectorStorage): self.__post_init__() @staticmethod - def setup_collection(client: QdrantClient, collection_name: str, **kwargs): - exists = client.collection_exists(collection_name) + def setup_collection( + client: QdrantClient, + collection_name: str, + legacy_namespace: str = None, + workspace: str = None, + **kwargs, + ): + """ + Setup Qdrant collection with migration support from legacy collections. - if not exists: + Args: + client: QdrantClient instance + collection_name: Name of the new collection + legacy_namespace: Name of the legacy collection (if exists) + workspace: Workspace identifier for data isolation + **kwargs: Additional arguments for collection creation (vectors_config, hnsw_config, etc.) + """ + new_collection_exists = client.collection_exists(collection_name) + legacy_exists = legacy_namespace and client.collection_exists(legacy_namespace) + + # Case 1: Both new and legacy collections exist - Warning only (no migration) + if new_collection_exists and legacy_exists: + logger.warning( + f"Qdrant: Legacy collection '{legacy_namespace}' still exist. Remove it if migration is complete." + ) + return + + # Case 2: Only new collection exists - Ensure index exists + if new_collection_exists: + # Check if workspace index exists, create if missing + try: + collection_info = client.get_collection(collection_name) + if WORKSPACE_ID_FIELD not in collection_info.payload_schema: + logger.info( + f"Qdrant: Creating missing workspace index for '{collection_name}'" + ) + client.create_payload_index( + collection_name=collection_name, + field_name=WORKSPACE_ID_FIELD, + field_schema=models.KeywordIndexParams( + type=models.KeywordIndexType.KEYWORD, + is_tenant=True, + ), + ) + except Exception as e: + logger.warning( + f"Qdrant: Could not verify/create workspace index for '{collection_name}': {e}" + ) + return + + # Case 3: Neither exists - Create new collection + if not legacy_exists: + logger.info(f"Qdrant: Creating new collection '{collection_name}'") client.create_collection(collection_name, **kwargs) client.create_payload_index( collection_name=collection_name, field_name=WORKSPACE_ID_FIELD, field_schema=models.KeywordIndexParams( type=models.KeywordIndexType.KEYWORD, - is_tenant=True, # Optimize storage structure for tenant co-location + is_tenant=True, ), ) + logger.info(f"Qdrant: Collection '{collection_name}' created successfully") + return + + # Case 4: Only legacy exists - Migrate data + logger.info( + f"Qdrant: Migrating data from legacy collection '{legacy_namespace}'" + ) + + try: + # Get legacy collection count + legacy_count = client.count( + collection_name=legacy_namespace, exact=True + ).count + logger.info(f"Qdrant: Found {legacy_count} records in legacy collection") + + if legacy_count == 0: + logger.info("Qdrant: Legacy collection is empty, skipping migration") + # Create new empty collection + client.create_collection(collection_name, **kwargs) + client.create_payload_index( + collection_name=collection_name, + field_name=WORKSPACE_ID_FIELD, + field_schema=models.KeywordIndexParams( + type=models.KeywordIndexType.KEYWORD, + is_tenant=True, + ), + ) + return + + # Create new collection first + logger.info(f"Qdrant: Creating new collection '{collection_name}'") + client.create_collection(collection_name, **kwargs) + + # Batch migration (500 records per batch) + migrated_count = 0 + offset = None + batch_size = 500 + + while True: + # Scroll through legacy data + result = client.scroll( + collection_name=legacy_namespace, + limit=batch_size, + offset=offset, + with_vectors=True, + with_payload=True, + ) + points, next_offset = result + + if not points: + break + + # Transform points for new collection + new_points = [] + for point in points: + # Add workspace_id to payload + new_payload = dict(point.payload or {}) + new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE + + # Create new point with workspace-prefixed ID + original_id = new_payload.get(ID_FIELD) + if original_id: + new_point_id = compute_mdhash_id_for_qdrant( + original_id, prefix=workspace or DEFAULT_WORKSPACE + ) + else: + # Fallback: use original point ID + new_point_id = str(point.id) + + new_points.append( + models.PointStruct( + id=new_point_id, + vector=point.vector, + payload=new_payload, + ) + ) + + # Upsert to new collection + client.upsert( + collection_name=collection_name, points=new_points, wait=True + ) + + migrated_count += len(points) + logger.info(f"Qdrant: {migrated_count}/{legacy_count} records migrated") + + # Check if we've reached the end + if next_offset is None: + break + offset = next_offset + + # Verify migration by comparing counts + logger.info("Verifying migration...") + new_count = client.count(collection_name=collection_name, exact=True).count + + if new_count != legacy_count: + error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection" + logger.error(error_msg) + raise QdrantMigrationError(error_msg) + + logger.info( + f"Qdrant: Migration completed successfully: {migrated_count} records migrated" + ) + + # Create payload index after successful migration + logger.info("Qdrant: Creating workspace payload index...") + client.create_payload_index( + collection_name=collection_name, + field_name=WORKSPACE_ID_FIELD, + field_schema=models.KeywordIndexParams( + type=models.KeywordIndexType.KEYWORD, + is_tenant=True, + ), + ) + logger.info( + f"Qdrant: Migration from '{legacy_namespace}' to '{collection_name}' completed successfully" + ) + + except QdrantMigrationError: + # Re-raise migration errors without wrapping + raise + except Exception as e: + error_msg = f"Qdrant: Migration failed with error: {e}" + logger.error(error_msg) + raise QdrantMigrationError(error_msg) from e def __post_init__(self): # Check for QDRANT_WORKSPACE environment variable first (higher priority) @@ -113,11 +287,17 @@ class QdrantVectorDBStorage(BaseVectorStorage): f"Using passed workspace parameter: '{effective_workspace}'" ) + # Get legacy namespace for data migration from old version + if effective_workspace: + self.legacy_namespace = f"{effective_workspace}_{self.namespace}" + else: + self.legacy_namespace = self.namespace + self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE # Use a shared collection with payload-based partitioning (Qdrant's recommended approach) # Ref: https://qdrant.tech/documentation/guides/multiple-partitions/ - self.final_namespace = self.namespace + self.final_namespace = f"lightrag_vdb_{self.namespace}" logger.debug( f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning" ) @@ -158,13 +338,20 @@ class QdrantVectorDBStorage(BaseVectorStorage): ) # Setup collection (create if not exists and configure indexes) + # Pass legacy_namespace and workspace for migration support QdrantVectorDBStorage.setup_collection( self._client, self.final_namespace, + legacy_namespace=self.legacy_namespace, + workspace=self.effective_workspace, vectors_config=models.VectorParams( size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE, ), + hnsw_config=models.HnswConfigDiff( + payload_m=16, + m=0, + ), ) self._initialized = True