Relocate client creation to the initialize method to prevent race conditions in multi-process mode.

This commit is contained in:
yangdx 2025-08-12 18:20:56 +08:00
parent 74783d7781
commit 5d1bc8b49d
2 changed files with 52 additions and 33 deletions

View file

@ -716,30 +716,8 @@ class MilvusVectorDBStorage(BaseVectorStorage):
if "created_at" not in self.meta_fields:
self.meta_fields.add("created_at")
self._client = MilvusClient(
uri=os.environ.get(
"MILVUS_URI",
config.get(
"milvus",
"uri",
fallback=os.path.join(
self.global_config["working_dir"], "milvus_lite.db"
),
),
),
user=os.environ.get(
"MILVUS_USER", config.get("milvus", "user", fallback=None)
),
password=os.environ.get(
"MILVUS_PASSWORD", config.get("milvus", "password", fallback=None)
),
token=os.environ.get(
"MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
),
db_name=os.environ.get(
"MILVUS_DB_NAME", config.get("milvus", "db_name", fallback=None)
),
)
# Initialize client as None - will be created in initialize() method
self._client = None
self._max_batch_size = self.global_config["embedding_batch_num"]
self._initialized = False
@ -750,6 +728,38 @@ class MilvusVectorDBStorage(BaseVectorStorage):
return
try:
# Create MilvusClient if not already created
if self._client is None:
self._client = MilvusClient(
uri=os.environ.get(
"MILVUS_URI",
config.get(
"milvus",
"uri",
fallback=os.path.join(
self.global_config["working_dir"], "milvus_lite.db"
),
),
),
user=os.environ.get(
"MILVUS_USER", config.get("milvus", "user", fallback=None)
),
password=os.environ.get(
"MILVUS_PASSWORD",
config.get("milvus", "password", fallback=None),
),
token=os.environ.get(
"MILVUS_TOKEN", config.get("milvus", "token", fallback=None)
),
db_name=os.environ.get(
"MILVUS_DB_NAME",
config.get("milvus", "db_name", fallback=None),
),
)
logger.debug(
f"[{self.workspace}] MilvusClient created successfully"
)
# Create collection and check compatibility
self._create_collection_if_not_exist()
self._initialized = True
@ -763,7 +773,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
raise
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
# logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
if not data:
return

View file

@ -110,14 +110,8 @@ class QdrantVectorDBStorage(BaseVectorStorage):
)
self.cosine_better_than_threshold = cosine_threshold
self._client = QdrantClient(
url=os.environ.get(
"QDRANT_URL", config.get("qdrant", "uri", fallback=None)
),
api_key=os.environ.get(
"QDRANT_API_KEY", config.get("qdrant", "apikey", fallback=None)
),
)
# Initialize client as None - will be created in initialize() method
self._client = None
self._max_batch_size = self.global_config["embedding_batch_num"]
self._initialized = False
@ -128,6 +122,21 @@ class QdrantVectorDBStorage(BaseVectorStorage):
return
try:
# Create QdrantClient if not already created
if self._client is None:
self._client = QdrantClient(
url=os.environ.get(
"QDRANT_URL", config.get("qdrant", "uri", fallback=None)
),
api_key=os.environ.get(
"QDRANT_API_KEY",
config.get("qdrant", "apikey", fallback=None),
),
)
logger.debug(
f"[{self.workspace}] QdrantClient created successfully"
)
# Create collection if not exists
QdrantVectorDBStorage.create_collection_if_not_exist(
self._client,