feat: Add clear error messages for uninitialized storage
- Add StorageNotInitializedError and PipelineNotInitializedError exceptions - Update JsonDocStatusStorage to raise clear errors when not initialized - Update JsonKVStorage to raise clear errors when not initialized - Error messages now include complete initialization instructions - Helps users understand and fix initialization issues quickly Addresses feedback from issue #1933 about improving error clarity
This commit is contained in:
parent
f1c6a4ed94
commit
f35963c020
3 changed files with 50 additions and 0 deletions
|
|
@ -58,3 +58,41 @@ class RateLimitError(APIStatusError):
|
|||
class APITimeoutError(APIConnectionError):
|
||||
def __init__(self, request: httpx.Request) -> None:
|
||||
super().__init__(message="Request timed out.", request=request)
|
||||
|
||||
|
||||
class StorageNotInitializedError(RuntimeError):
|
||||
"""Raised when storage operations are attempted before initialization."""
|
||||
|
||||
def __init__(self, storage_type: str = "Storage"):
|
||||
super().__init__(
|
||||
f"{storage_type} not initialized. Please ensure proper initialization:\n"
|
||||
f"\n"
|
||||
f" rag = LightRAG(...)\n"
|
||||
f" await rag.initialize_storages() # Required\n"
|
||||
f" \n"
|
||||
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
|
||||
f" await initialize_pipeline_status() # Required for pipeline operations\n"
|
||||
f"\n"
|
||||
f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements"
|
||||
)
|
||||
|
||||
|
||||
class PipelineNotInitializedError(KeyError):
|
||||
"""Raised when pipeline status is accessed before initialization."""
|
||||
|
||||
def __init__(self, namespace: str = ""):
|
||||
msg = (
|
||||
f"Pipeline namespace '{namespace}' not found. "
|
||||
f"This usually means pipeline status was not initialized.\n"
|
||||
f"\n"
|
||||
f"Please call 'await initialize_pipeline_status()' after initializing storages:\n"
|
||||
f"\n"
|
||||
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
|
||||
f" await initialize_pipeline_status()\n"
|
||||
f"\n"
|
||||
f"Full initialization sequence:\n"
|
||||
f" rag = LightRAG(...)\n"
|
||||
f" await rag.initialize_storages()\n"
|
||||
f" await initialize_pipeline_status()"
|
||||
)
|
||||
super().__init__(msg)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from lightrag.utils import (
|
|||
logger,
|
||||
write_json,
|
||||
)
|
||||
from lightrag.exceptions import StorageNotInitializedError
|
||||
from .shared_storage import (
|
||||
get_namespace_data,
|
||||
get_storage_lock,
|
||||
|
|
@ -65,11 +66,15 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||
|
||||
async def filter_keys(self, keys: set[str]) -> set[str]:
|
||||
"""Return keys that should be processed (not in storage or not successfully processed)"""
|
||||
if self._storage_lock is None:
|
||||
raise StorageNotInitializedError("JsonDocStatusStorage")
|
||||
async with self._storage_lock:
|
||||
return set(keys) - set(self._data.keys())
|
||||
|
||||
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
|
||||
result: list[dict[str, Any]] = []
|
||||
if self._storage_lock is None:
|
||||
raise StorageNotInitializedError("JsonDocStatusStorage")
|
||||
async with self._storage_lock:
|
||||
for id in ids:
|
||||
data = self._data.get(id, None)
|
||||
|
|
@ -80,6 +85,8 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||
async def get_status_counts(self) -> dict[str, int]:
|
||||
"""Get counts of documents in each status"""
|
||||
counts = {status.value: 0 for status in DocStatus}
|
||||
if self._storage_lock is None:
|
||||
raise StorageNotInitializedError("JsonDocStatusStorage")
|
||||
async with self._storage_lock:
|
||||
for doc in self._data.values():
|
||||
counts[doc["status"]] += 1
|
||||
|
|
@ -160,6 +167,8 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||
if not data:
|
||||
return
|
||||
logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
|
||||
if self._storage_lock is None:
|
||||
raise StorageNotInitializedError("JsonDocStatusStorage")
|
||||
async with self._storage_lock:
|
||||
# Ensure chunks_list field exists for new documents
|
||||
for doc_id, doc_data in data.items():
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from lightrag.utils import (
|
|||
logger,
|
||||
write_json,
|
||||
)
|
||||
from lightrag.exceptions import StorageNotInitializedError
|
||||
from .shared_storage import (
|
||||
get_namespace_data,
|
||||
get_storage_lock,
|
||||
|
|
@ -152,6 +153,8 @@ class JsonKVStorage(BaseKVStorage):
|
|||
current_time = int(time.time()) # Get current Unix timestamp
|
||||
|
||||
logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
|
||||
if self._storage_lock is None:
|
||||
raise StorageNotInitializedError("JsonKVStorage")
|
||||
async with self._storage_lock:
|
||||
# Add timestamps to data based on whether key exists
|
||||
for k, v in data.items():
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue