Merge branch 'qdrant-multi-tenancy'

This commit is contained in:
yangdx 2025-10-30 19:32:25 +08:00
commit 042cbad047
5 changed files with 1171 additions and 867 deletions

View file

@ -941,7 +941,8 @@ maxclients 500
The `workspace` parameter ensures data isolation between different LightRAG instances. Once initialized, the `workspace` is immutable and cannot be changed.Here is how workspaces are implemented for different types of storage: The `workspace` parameter ensures data isolation between different LightRAG instances. Once initialized, the `workspace` is immutable and cannot be changed.Here is how workspaces are implemented for different types of storage:
- **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`.
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. - **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability.
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage` - **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`

View file

@ -165,7 +165,8 @@ Configuring an independent working directory and a dedicated `.env` configuratio
The command-line `workspace` argument and the `WORKSPACE` environment variable in the `.env` file can both be used to specify the workspace name for the current instance, with the command-line argument having higher priority. Here is how workspaces are implemented for different types of storage: The command-line `workspace` argument and the `WORKSPACE` environment variable in the `.env` file can both be used to specify the workspace name for the current instance, with the command-line argument having higher priority. Here is how workspaces are implemented for different types of storage:
- **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`.
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. - **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability.
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
- **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage` - **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage`

View file

@ -104,3 +104,11 @@ class PipelineCancelledException(Exception):
def __init__(self, message: str = "User cancelled"): def __init__(self, message: str = "User cancelled"):
super().__init__(message) super().__init__(message)
self.message = message self.message = message
class QdrantMigrationError(Exception):
"""Raised when Qdrant data migration from legacy collections fails."""
def __init__(self, message: str):
super().__init__(message)
self.message = message

View file

@ -1,21 +1,30 @@
import asyncio import asyncio
import os
from typing import Any, final, List
from dataclasses import dataclass
import numpy as np
import hashlib
import uuid
from ..utils import logger
from ..base import BaseVectorStorage
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
import configparser import configparser
import hashlib
import os
import uuid
from dataclasses import dataclass
from typing import Any, List, final
import numpy as np
import pipmaster as pm import pipmaster as pm
from ..base import BaseVectorStorage
from ..exceptions import QdrantMigrationError
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
from ..utils import compute_mdhash_id, logger
if not pm.is_installed("qdrant-client"): if not pm.is_installed("qdrant-client"):
pm.install("qdrant-client") pm.install("qdrant-client")
from qdrant_client import QdrantClient, models # type: ignore from qdrant_client import QdrantClient, models # type: ignore
DEFAULT_WORKSPACE = "_"
WORKSPACE_ID_FIELD = "workspace_id"
ENTITY_PREFIX = "ent-"
CREATED_AT_FIELD = "created_at"
ID_FIELD = "id"
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
@ -48,6 +57,15 @@ def compute_mdhash_id_for_qdrant(
raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.") raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
def workspace_filter_condition(workspace: str) -> models.FieldCondition:
"""
Create a workspace filter condition for Qdrant queries.
"""
return models.FieldCondition(
key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
)
@final @final
@dataclass @dataclass
class QdrantVectorDBStorage(BaseVectorStorage): class QdrantVectorDBStorage(BaseVectorStorage):
@ -64,24 +82,192 @@ class QdrantVectorDBStorage(BaseVectorStorage):
self.__post_init__() self.__post_init__()
@staticmethod @staticmethod
def create_collection_if_not_exist( def setup_collection(
client: QdrantClient, collection_name: str, **kwargs client: QdrantClient,
collection_name: str,
legacy_namespace: str = None,
workspace: str = None,
**kwargs,
): ):
exists = False """
if hasattr(client, "collection_exists"): Setup Qdrant collection with migration support from legacy collections.
try:
exists = client.collection_exists(collection_name)
except Exception:
exists = False
else:
try:
client.get_collection(collection_name)
exists = True
except Exception:
exists = False
if not exists: Args:
client: QdrantClient instance
collection_name: Name of the new collection
legacy_namespace: Name of the legacy collection (if exists)
workspace: Workspace identifier for data isolation
**kwargs: Additional arguments for collection creation (vectors_config, hnsw_config, etc.)
"""
new_collection_exists = client.collection_exists(collection_name)
legacy_exists = legacy_namespace and client.collection_exists(legacy_namespace)
# Case 1: Both new and legacy collections exist - Warning only (no migration)
if new_collection_exists and legacy_exists:
logger.warning(
f"Qdrant: Legacy collection '{legacy_namespace}' still exist. Remove it if migration is complete."
)
return
# Case 2: Only new collection exists - Ensure index exists
if new_collection_exists:
# Check if workspace index exists, create if missing
try:
collection_info = client.get_collection(collection_name)
if WORKSPACE_ID_FIELD not in collection_info.payload_schema:
logger.info(
f"Qdrant: Creating missing workspace index for '{collection_name}'"
)
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
except Exception as e:
logger.warning(
f"Qdrant: Could not verify/create workspace index for '{collection_name}': {e}"
)
return
# Case 3: Neither exists - Create new collection
if not legacy_exists:
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
client.create_collection(collection_name, **kwargs) client.create_collection(collection_name, **kwargs)
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
return
# Case 4: Only legacy exists - Migrate data
logger.info(
f"Qdrant: Migrating data from legacy collection '{legacy_namespace}'"
)
try:
# Get legacy collection count
legacy_count = client.count(
collection_name=legacy_namespace, exact=True
).count
logger.info(f"Qdrant: Found {legacy_count} records in legacy collection")
if legacy_count == 0:
logger.info("Qdrant: Legacy collection is empty, skipping migration")
# Create new empty collection
client.create_collection(collection_name, **kwargs)
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
return
# Create new collection first
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
client.create_collection(collection_name, **kwargs)
# Batch migration (500 records per batch)
migrated_count = 0
offset = None
batch_size = 500
while True:
# Scroll through legacy data
result = client.scroll(
collection_name=legacy_namespace,
limit=batch_size,
offset=offset,
with_vectors=True,
with_payload=True,
)
points, next_offset = result
if not points:
break
# Transform points for new collection
new_points = []
for point in points:
# Add workspace_id to payload
new_payload = dict(point.payload or {})
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
# Create new point with workspace-prefixed ID
original_id = new_payload.get(ID_FIELD)
if original_id:
new_point_id = compute_mdhash_id_for_qdrant(
original_id, prefix=workspace or DEFAULT_WORKSPACE
)
else:
# Fallback: use original point ID
new_point_id = str(point.id)
new_points.append(
models.PointStruct(
id=new_point_id,
vector=point.vector,
payload=new_payload,
)
)
# Upsert to new collection
client.upsert(
collection_name=collection_name, points=new_points, wait=True
)
migrated_count += len(points)
logger.info(f"Qdrant: {migrated_count}/{legacy_count} records migrated")
# Check if we've reached the end
if next_offset is None:
break
offset = next_offset
# Verify migration by comparing counts
logger.info("Verifying migration...")
new_count = client.count(collection_name=collection_name, exact=True).count
if new_count != legacy_count:
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
logger.error(error_msg)
raise QdrantMigrationError(error_msg)
logger.info(
f"Qdrant: Migration completed successfully: {migrated_count} records migrated"
)
# Create payload index after successful migration
logger.info("Qdrant: Creating workspace payload index...")
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
logger.info(
f"Qdrant: Migration from '{legacy_namespace}' to '{collection_name}' completed successfully"
)
except QdrantMigrationError:
# Re-raise migration errors without wrapping
raise
except Exception as e:
error_msg = f"Qdrant: Migration failed with error: {e}"
logger.error(error_msg)
raise QdrantMigrationError(error_msg) from e
def __post_init__(self): def __post_init__(self):
# Check for QDRANT_WORKSPACE environment variable first (higher priority) # Check for QDRANT_WORKSPACE environment variable first (higher priority)
@ -101,18 +287,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
f"Using passed workspace parameter: '{effective_workspace}'" f"Using passed workspace parameter: '{effective_workspace}'"
) )
# Build final_namespace with workspace prefix for data isolation # Get legacy namespace for data migration from old version
# Keep original namespace unchanged for type detection logic
if effective_workspace: if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}" self.legacy_namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
)
else: else:
# When workspace is empty, final_namespace equals original namespace self.legacy_namespace = self.namespace
self.final_namespace = self.namespace
self.workspace = "_" self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'")
# Use a shared collection with payload-based partitioning (Qdrant's recommended approach)
# Ref: https://qdrant.tech/documentation/guides/multiple-partitions/
self.final_namespace = f"lightrag_vdb_{self.namespace}"
logger.debug(
f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning"
)
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {}) kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold") cosine_threshold = kwargs.get("cosine_better_than_threshold")
@ -149,15 +337,23 @@ class QdrantVectorDBStorage(BaseVectorStorage):
f"[{self.workspace}] QdrantClient created successfully" f"[{self.workspace}] QdrantClient created successfully"
) )
# Create collection if not exists # Setup collection (create if not exists and configure indexes)
QdrantVectorDBStorage.create_collection_if_not_exist( # Pass legacy_namespace and workspace for migration support
QdrantVectorDBStorage.setup_collection(
self._client, self._client,
self.final_namespace, self.final_namespace,
legacy_namespace=self.legacy_namespace,
workspace=self.effective_workspace,
vectors_config=models.VectorParams( vectors_config=models.VectorParams(
size=self.embedding_func.embedding_dim, size=self.embedding_func.embedding_dim,
distance=models.Distance.COSINE, distance=models.Distance.COSINE,
), ),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
) )
self._initialized = True self._initialized = True
logger.info( logger.info(
f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully" f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
@ -179,8 +375,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
list_data = [ list_data = [
{ {
"id": k, ID_FIELD: k,
"created_at": current_time, WORKSPACE_ID_FIELD: self.effective_workspace,
CREATED_AT_FIELD: current_time,
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields}, **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
} }
for k, v in data.items() for k, v in data.items()
@ -200,7 +397,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
for i, d in enumerate(list_data): for i, d in enumerate(list_data):
list_points.append( list_points.append(
models.PointStruct( models.PointStruct(
id=compute_mdhash_id_for_qdrant(d["id"]), id=compute_mdhash_id_for_qdrant(
d[ID_FIELD], prefix=self.effective_workspace
),
vector=embeddings[i], vector=embeddings[i],
payload=d, payload=d,
) )
@ -222,21 +421,22 @@ class QdrantVectorDBStorage(BaseVectorStorage):
) # higher priority for query ) # higher priority for query
embedding = embedding_result[0] embedding = embedding_result[0]
results = self._client.search( results = self._client.query_points(
collection_name=self.final_namespace, collection_name=self.final_namespace,
query_vector=embedding, query=embedding,
limit=top_k, limit=top_k,
with_payload=True, with_payload=True,
score_threshold=self.cosine_better_than_threshold, score_threshold=self.cosine_better_than_threshold,
) query_filter=models.Filter(
must=[workspace_filter_condition(self.effective_workspace)]
# logger.debug(f"[{self.workspace}] query result: {results}") ),
).points
return [ return [
{ {
**dp.payload, **dp.payload,
"distance": dp.score, "distance": dp.score,
"created_at": dp.payload.get("created_at"), CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
} }
for dp in results for dp in results
] ]
@ -252,14 +452,18 @@ class QdrantVectorDBStorage(BaseVectorStorage):
ids: List of vector IDs to be deleted ids: List of vector IDs to be deleted
""" """
try: try:
if not ids:
return
# Convert regular ids to Qdrant compatible ids # Convert regular ids to Qdrant compatible ids
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] qdrant_ids = [
# Delete points from the collection compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Delete points from the collection with workspace filtering
self._client.delete( self._client.delete(
collection_name=self.final_namespace, collection_name=self.final_namespace,
points_selector=models.PointIdsList( points_selector=models.PointIdsList(points=qdrant_ids),
points=qdrant_ids,
),
wait=True, wait=True,
) )
logger.debug( logger.debug(
@ -277,18 +481,16 @@ class QdrantVectorDBStorage(BaseVectorStorage):
entity_name: Name of the entity to delete entity_name: Name of the entity to delete
""" """
try: try:
# Generate the entity ID # Generate the entity ID using the same function as used for storage
entity_id = compute_mdhash_id_for_qdrant(entity_name, prefix="ent-") entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
# logger.debug( qdrant_entity_id = compute_mdhash_id_for_qdrant(
# f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}" entity_id, prefix=self.effective_workspace
# ) )
# Delete the entity point from the collection # Delete the entity point by its Qdrant ID directly
self._client.delete( self._client.delete(
collection_name=self.final_namespace, collection_name=self.final_namespace,
points_selector=models.PointIdsList( points_selector=models.PointIdsList(points=[qdrant_entity_id]),
points=[entity_id],
),
wait=True, wait=True,
) )
logger.debug( logger.debug(
@ -304,10 +506,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
entity_name: Name of the entity whose relations should be deleted entity_name: Name of the entity whose relations should be deleted
""" """
try: try:
# Find relations where the entity is either source or target # Find relations where the entity is either source or target, with workspace filtering
results = self._client.scroll( results = self._client.scroll(
collection_name=self.final_namespace, collection_name=self.final_namespace,
scroll_filter=models.Filter( scroll_filter=models.Filter(
must=[workspace_filter_condition(self.effective_workspace)],
should=[ should=[
models.FieldCondition( models.FieldCondition(
key="src_id", match=models.MatchValue(value=entity_name) key="src_id", match=models.MatchValue(value=entity_name)
@ -315,7 +518,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
models.FieldCondition( models.FieldCondition(
key="tgt_id", match=models.MatchValue(value=entity_name) key="tgt_id", match=models.MatchValue(value=entity_name)
), ),
] ],
), ),
with_payload=True, with_payload=True,
limit=1000, # Adjust as needed for your use case limit=1000, # Adjust as needed for your use case
@ -326,12 +529,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
ids_to_delete = [point.id for point in relation_points] ids_to_delete = [point.id for point in relation_points]
if ids_to_delete: if ids_to_delete:
# Delete the relations # Delete the relations with workspace filtering
assert isinstance(self._client, QdrantClient)
self._client.delete( self._client.delete(
collection_name=self.final_namespace, collection_name=self.final_namespace,
points_selector=models.PointIdsList( points_selector=models.PointIdsList(points=ids_to_delete),
points=ids_to_delete,
),
wait=True, wait=True,
) )
logger.debug( logger.debug(
@ -357,9 +559,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
""" """
try: try:
# Convert to Qdrant compatible ID # Convert to Qdrant compatible ID
qdrant_id = compute_mdhash_id_for_qdrant(id) qdrant_id = compute_mdhash_id_for_qdrant(
id, prefix=self.effective_workspace
)
# Retrieve the point by ID # Retrieve the point by ID with workspace filtering
result = self._client.retrieve( result = self._client.retrieve(
collection_name=self.final_namespace, collection_name=self.final_namespace,
ids=[qdrant_id], ids=[qdrant_id],
@ -369,10 +573,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
if not result: if not result:
return None return None
# Ensure the result contains created_at field
payload = result[0].payload payload = result[0].payload
if "created_at" not in payload: if CREATED_AT_FIELD not in payload:
payload["created_at"] = None payload[CREATED_AT_FIELD] = None
return payload return payload
except Exception as e: except Exception as e:
@ -395,7 +598,10 @@ class QdrantVectorDBStorage(BaseVectorStorage):
try: try:
# Convert to Qdrant compatible IDs # Convert to Qdrant compatible IDs
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] qdrant_ids = [
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Retrieve the points by IDs # Retrieve the points by IDs
results = self._client.retrieve( results = self._client.retrieve(
@ -410,14 +616,14 @@ class QdrantVectorDBStorage(BaseVectorStorage):
for point in results: for point in results:
payload = dict(point.payload or {}) payload = dict(point.payload or {})
if "created_at" not in payload: if CREATED_AT_FIELD not in payload:
payload["created_at"] = None payload[CREATED_AT_FIELD] = None
qdrant_point_id = str(point.id) if point.id is not None else "" qdrant_point_id = str(point.id) if point.id is not None else ""
if qdrant_point_id: if qdrant_point_id:
payload_by_qdrant_id[qdrant_point_id] = payload payload_by_qdrant_id[qdrant_point_id] = payload
original_id = payload.get("id") original_id = payload.get(ID_FIELD)
if original_id is not None: if original_id is not None:
payload_by_original_id[str(original_id)] = payload payload_by_original_id[str(original_id)] = payload
@ -450,7 +656,10 @@ class QdrantVectorDBStorage(BaseVectorStorage):
try: try:
# Convert to Qdrant compatible IDs # Convert to Qdrant compatible IDs
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] qdrant_ids = [
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
for id in ids
]
# Retrieve the points by IDs with vectors # Retrieve the points by IDs with vectors
results = self._client.retrieve( results = self._client.retrieve(
@ -464,7 +673,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
for point in results: for point in results:
if point and point.vector is not None and point.payload: if point and point.vector is not None and point.payload:
# Get original ID from payload # Get original ID from payload
original_id = point.payload.get("id") original_id = point.payload.get(ID_FIELD)
if original_id: if original_id:
# Convert numpy array to list if needed # Convert numpy array to list if needed
vector_data = point.vector vector_data = point.vector
@ -482,7 +691,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will delete all data from the Qdrant collection. This method will delete all data for the current workspace from the Qdrant collection.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
@ -491,39 +700,23 @@ class QdrantVectorDBStorage(BaseVectorStorage):
""" """
async with get_storage_lock(): async with get_storage_lock():
try: try:
# Delete the collection and recreate it # Delete all points for the current workspace
exists = False self._client.delete(
if hasattr(self._client, "collection_exists"): collection_name=self.final_namespace,
try: points_selector=models.FilterSelector(
exists = self._client.collection_exists(self.final_namespace) filter=models.Filter(
except Exception: must=[workspace_filter_condition(self.effective_workspace)]
exists = False )
else:
try:
self._client.get_collection(self.final_namespace)
exists = True
except Exception:
exists = False
if exists:
self._client.delete_collection(self.final_namespace)
# Recreate the collection
QdrantVectorDBStorage.create_collection_if_not_exist(
self._client,
self.final_namespace,
vectors_config=models.VectorParams(
size=self.embedding_func.embedding_dim,
distance=models.Distance.COSINE,
), ),
wait=True,
) )
logger.info( logger.info(
f"[{self.workspace}] Process {os.getpid()} drop Qdrant collection {self.namespace}" f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
) )
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping Qdrant collection {self.namespace}: {e}" f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

1625
uv.lock generated

File diff suppressed because it is too large Load diff