diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index ae756f85..190d0c18 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -58,3 +58,41 @@ class RateLimitError(APIStatusError): class APITimeoutError(APIConnectionError): def __init__(self, request: httpx.Request) -> None: super().__init__(message="Request timed out.", request=request) + + +class StorageNotInitializedError(RuntimeError): + """Raised when storage operations are attempted before initialization.""" + + def __init__(self, storage_type: str = "Storage"): + super().__init__( + f"{storage_type} not initialized. Please ensure proper initialization:\n" + f"\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages() # Required\n" + f" \n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status() # Required for pipeline operations\n" + f"\n" + f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" + ) + + +class PipelineNotInitializedError(KeyError): + """Raised when pipeline status is accessed before initialization.""" + + def __init__(self, namespace: str = ""): + msg = ( + f"Pipeline namespace '{namespace}' not found. " + f"This usually means pipeline status was not initialized.\n" + f"\n" + f"Please call 'await initialize_pipeline_status()' after initializing storages:\n" + f"\n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status()\n" + f"\n" + f"Full initialization sequence:\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages()\n" + f" await initialize_pipeline_status()" + ) + super().__init__(msg) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 09af3ef1..5ba9d9d3 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -12,6 +12,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -65,11 +66,15 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, keys: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: return set(keys) - set(self._data.keys()) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for id in ids: data = self._data.get(id, None) @@ -80,6 +85,8 @@ class JsonDocStatusStorage(DocStatusStorage): async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" counts = {status.value: 0 for status in DocStatus} + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for doc in self._data.values(): counts[doc["status"]] += 1 @@ -160,6 +167,8 @@ class JsonDocStatusStorage(DocStatusStorage): if not data: return logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: # Ensure chunks_list field exists for new documents for doc_id, doc_data in data.items(): diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index d6d80079..9bb8b3f2 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -10,6 +10,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -152,6 +153,8 @@ class JsonKVStorage(BaseKVStorage): current_time = int(time.time()) # Get current Unix timestamp logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") + if self._storage_lock is None: + raise StorageNotInitializedError("JsonKVStorage") async with self._storage_lock: # Add timestamps to data based on whether key exists for k, v in data.items():