From 5d1bc8b49dd89882e7b3d8d5f243ef1096dbf0cc Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 12 Aug 2025 18:20:56 +0800 Subject: [PATCH] Relocate client creation to the initialize method to prevent race conditions in multi-process mode. --- lightrag/kg/milvus_impl.py | 60 ++++++++++++++++++++++---------------- lightrag/kg/qdrant_impl.py | 25 +++++++++++----- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index db5ab6a7..7061289a 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -716,30 +716,8 @@ class MilvusVectorDBStorage(BaseVectorStorage): if "created_at" not in self.meta_fields: self.meta_fields.add("created_at") - self._client = MilvusClient( - uri=os.environ.get( - "MILVUS_URI", - config.get( - "milvus", - "uri", - fallback=os.path.join( - self.global_config["working_dir"], "milvus_lite.db" - ), - ), - ), - user=os.environ.get( - "MILVUS_USER", config.get("milvus", "user", fallback=None) - ), - password=os.environ.get( - "MILVUS_PASSWORD", config.get("milvus", "password", fallback=None) - ), - token=os.environ.get( - "MILVUS_TOKEN", config.get("milvus", "token", fallback=None) - ), - db_name=os.environ.get( - "MILVUS_DB_NAME", config.get("milvus", "db_name", fallback=None) - ), - ) + # Initialize client as None - will be created in initialize() method + self._client = None self._max_batch_size = self.global_config["embedding_batch_num"] self._initialized = False @@ -750,6 +728,38 @@ class MilvusVectorDBStorage(BaseVectorStorage): return try: + # Create MilvusClient if not already created + if self._client is None: + self._client = MilvusClient( + uri=os.environ.get( + "MILVUS_URI", + config.get( + "milvus", + "uri", + fallback=os.path.join( + self.global_config["working_dir"], "milvus_lite.db" + ), + ), + ), + user=os.environ.get( + "MILVUS_USER", config.get("milvus", "user", fallback=None) + ), + password=os.environ.get( + "MILVUS_PASSWORD", + config.get("milvus", "password", fallback=None), + ), + token=os.environ.get( + "MILVUS_TOKEN", config.get("milvus", "token", fallback=None) + ), + db_name=os.environ.get( + "MILVUS_DB_NAME", + config.get("milvus", "db_name", fallback=None), + ), + ) + logger.debug( + f"[{self.workspace}] MilvusClient created successfully" + ) + # Create collection and check compatibility self._create_collection_if_not_exist() self._initialized = True @@ -763,7 +773,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): raise async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}") + # logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}") if not data: return diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 2bc39d8b..ab0ee02b 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -110,14 +110,8 @@ class QdrantVectorDBStorage(BaseVectorStorage): ) self.cosine_better_than_threshold = cosine_threshold - self._client = QdrantClient( - url=os.environ.get( - "QDRANT_URL", config.get("qdrant", "uri", fallback=None) - ), - api_key=os.environ.get( - "QDRANT_API_KEY", config.get("qdrant", "apikey", fallback=None) - ), - ) + # Initialize client as None - will be created in initialize() method + self._client = None self._max_batch_size = self.global_config["embedding_batch_num"] self._initialized = False @@ -128,6 +122,21 @@ class QdrantVectorDBStorage(BaseVectorStorage): return try: + # Create QdrantClient if not already created + if self._client is None: + self._client = QdrantClient( + url=os.environ.get( + "QDRANT_URL", config.get("qdrant", "uri", fallback=None) + ), + api_key=os.environ.get( + "QDRANT_API_KEY", + config.get("qdrant", "apikey", fallback=None), + ), + ) + logger.debug( + f"[{self.workspace}] QdrantClient created successfully" + ) + # Create collection if not exists QdrantVectorDBStorage.create_collection_if_not_exist( self._client,