Improve storage config validation and add config.ini fallback support

• Add MongoDB env requirements
• Support config.ini fallback
• Warn on missing env vars
• Check available storage count
• Show config source info
This commit is contained in:
yangdx 2025-11-08 22:48:49 +08:00
parent 57ee7d5ac8
commit 1a91bcdb5f
2 changed files with 189 additions and 21 deletions

View file

@ -45,13 +45,19 @@ STORAGE_IMPLEMENTATIONS = {
STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = {
# KV Storage Implementations # KV Storage Implementations
"JsonKVStorage": [], "JsonKVStorage": [],
"MongoKVStorage": [], "MongoKVStorage": [
"MONGO_URI",
"MONGO_DATABASE",
],
"RedisKVStorage": ["REDIS_URI"], "RedisKVStorage": ["REDIS_URI"],
"PGKVStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], "PGKVStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"],
# Graph Storage Implementations # Graph Storage Implementations
"NetworkXStorage": [], "NetworkXStorage": [],
"Neo4JStorage": ["NEO4J_URI", "NEO4J_USERNAME", "NEO4J_PASSWORD"], "Neo4JStorage": ["NEO4J_URI", "NEO4J_USERNAME", "NEO4J_PASSWORD"],
"MongoGraphStorage": [], "MongoGraphStorage": [
"MONGO_URI",
"MONGO_DATABASE",
],
"MemgraphStorage": ["MEMGRAPH_URI"], "MemgraphStorage": ["MEMGRAPH_URI"],
"AGEStorage": [ "AGEStorage": [
"AGE_POSTGRES_DB", "AGE_POSTGRES_DB",
@ -65,17 +71,26 @@ STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = {
], ],
# Vector Storage Implementations # Vector Storage Implementations
"NanoVectorDBStorage": [], "NanoVectorDBStorage": [],
"MilvusVectorDBStorage": [], "MilvusVectorDBStorage": [
"ChromaVectorDBStorage": [], "MILVUS_URI",
"MILVUS_DB_NAME",
],
# "ChromaVectorDBStorage": [],
"PGVectorStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], "PGVectorStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"],
"FaissVectorDBStorage": [], "FaissVectorDBStorage": [],
"QdrantVectorDBStorage": ["QDRANT_URL"], # QDRANT_API_KEY has default value None "QdrantVectorDBStorage": ["QDRANT_URL"], # QDRANT_API_KEY has default value None
"MongoVectorDBStorage": [], "MongoVectorDBStorage": [
"MONGO_URI",
"MONGO_DATABASE",
],
# Document Status Storage Implementations # Document Status Storage Implementations
"JsonDocStatusStorage": [], "JsonDocStatusStorage": [],
"RedisDocStatusStorage": ["REDIS_URI"], "RedisDocStatusStorage": ["REDIS_URI"],
"PGDocStatusStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], "PGDocStatusStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"],
"MongoDocStatusStorage": [], "MongoDocStatusStorage": [
"MONGO_URI",
"MONGO_DATABASE",
],
} }
# Storage implementation module mapping # Storage implementation module mapping

View file

@ -128,27 +128,100 @@ class MigrationTool:
workspace = os.getenv("WORKSPACE", "") workspace = os.getenv("WORKSPACE", "")
return workspace return workspace
def check_env_vars(self, storage_name: str) -> bool: def check_config_ini_for_storage(self, storage_name: str) -> bool:
"""Check if all required environment variables exist """Check if config.ini has configuration for the storage type
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
Returns: Returns:
True if all required env vars exist, False otherwise True if config.ini has the necessary configuration
"""
try:
import configparser
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
if storage_name == "RedisKVStorage":
return config.has_option("redis", "uri")
elif storage_name == "PGKVStorage":
return (
config.has_option("postgres", "user")
and config.has_option("postgres", "password")
and config.has_option("postgres", "database")
)
elif storage_name == "MongoKVStorage":
return config.has_option("mongodb", "uri") and config.has_option(
"mongodb", "database"
)
return False
except Exception:
return False
def check_env_vars(self, storage_name: str) -> bool:
"""Check environment variables, show warnings if missing but don't fail
Args:
storage_name: Storage implementation name
Returns:
Always returns True (warnings only, no hard failure)
""" """
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
print("✓ No environment variables required")
return True
missing_vars = [var for var in required_vars if var not in os.environ] missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars: if missing_vars:
print( print(
f"✗ Missing required environment variables: {', '.join(missing_vars)}" f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}"
) )
return False
# Check if config.ini has configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
print(" ✓ Found configuration in config.ini")
else:
print(f" Will attempt to use defaults for {storage_name}")
return True
print("✓ All required environment variables are set") print("✓ All required environment variables are set")
return True return True
def count_available_storage_types(self) -> int:
"""Count available storage types (with env vars, config.ini, or defaults)
Returns:
Number of available storage types
"""
available_count = 0
for storage_name in STORAGE_TYPES.values():
# Check if storage requires configuration
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
# JsonKVStorage, MongoKVStorage etc. - no config needed
available_count += 1
else:
# Check if has environment variables
has_env = all(var in os.environ for var in required_vars)
if has_env:
available_count += 1
else:
# Check if has config.ini configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
available_count += 1
return available_count
def get_storage_class(self, storage_name: str): def get_storage_class(self, storage_name: str):
"""Dynamically import and return storage class """Dynamically import and return storage class
@ -178,7 +251,7 @@ class MigrationTool:
raise ValueError(f"Unsupported storage type: {storage_name}") raise ValueError(f"Unsupported storage type: {storage_name}")
async def initialize_storage(self, storage_name: str, workspace: str): async def initialize_storage(self, storage_name: str, workspace: str):
"""Initialize storage instance """Initialize storage instance with fallback to config.ini and defaults
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
@ -186,6 +259,9 @@ class MigrationTool:
Returns: Returns:
Initialized storage instance Initialized storage instance
Raises:
Exception: If initialization fails
""" """
storage_class = self.get_storage_class(storage_name) storage_class = self.get_storage_class(storage_name)
@ -203,7 +279,7 @@ class MigrationTool:
embedding_func=None, embedding_func=None,
) )
# Initialize the storage # Initialize the storage (may raise exception if connection fails)
await storage.initialize() await storage.initialize()
return storage return storage
@ -851,7 +927,7 @@ class MigrationTool:
use_streaming: bool = False, use_streaming: bool = False,
exclude_storage_name: str = None, exclude_storage_name: str = None,
) -> tuple: ) -> tuple:
"""Setup and initialize storage """Setup and initialize storage with config.ini fallback support
Args: Args:
storage_type: Type label (source/target) storage_type: Type label (source/target)
@ -917,23 +993,76 @@ class MigrationTool:
storage_name = available_types[choice] storage_name = available_types[choice]
# Check environment variables # Check configuration (warnings only, doesn't block)
print("\nChecking environment variables...") print("\nChecking configuration...")
if not self.check_env_vars(storage_name): self.check_env_vars(storage_name)
return None, None, None, 0
# Get workspace # Get workspace
workspace = self.get_workspace_for_storage(storage_name) workspace = self.get_workspace_for_storage(storage_name)
# Initialize storage # Initialize storage (real validation point)
print(f"\nInitializing {storage_type} storage...") print(f"\nInitializing {storage_type} storage...")
try: try:
storage = await self.initialize_storage(storage_name, workspace) storage = await self.initialize_storage(storage_name, workspace)
print(f"- Storage Type: {storage_name}") print(f"- Storage Type: {storage_name}")
print(f"- Workspace: {workspace if workspace else '(default)'}") print(f"- Workspace: {workspace if workspace else '(default)'}")
print("- Connection Status: ✓ Success") print("- Connection Status: ✓ Success")
# Show configuration source for transparency
if storage_name == "RedisKVStorage":
config_source = (
"environment variable"
if "REDIS_URI" in os.environ
else "config.ini or default"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "PGKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "MongoKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
except Exception as e: except Exception as e:
print(f"✗ Initialization failed: {e}") print(f"✗ Initialization failed: {e}")
print(f"\nFor {storage_name}, you can configure using:")
print(" 1. Environment variables (highest priority)")
# Show specific environment variable requirements
if storage_name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[storage_name]:
print(f" - {var}")
print(" 2. config.ini file (medium priority)")
if storage_name == "RedisKVStorage":
print(" [redis]")
print(" uri = redis://localhost:6379")
elif storage_name == "PGKVStorage":
print(" [postgres]")
print(" host = localhost")
print(" port = 5432")
print(" user = postgres")
print(" password = yourpassword")
print(" database = lightrag")
elif storage_name == "MongoKVStorage":
print(" [mongodb]")
print(" uri = mongodb://root:root@localhost:27017/")
print(" database = LightRAG")
return None, None, None, 0 return None, None, None, 0
# Count cache records efficiently # Count cache records efficiently
@ -1191,7 +1320,7 @@ class MigrationTool:
print("=" * 60) print("=" * 60)
async def run(self): async def run(self):
"""Run the migration tool with streaming approach""" """Run the migration tool with streaming approach and early validation"""
try: try:
# Initialize shared storage (REQUIRED for storage classes to work) # Initialize shared storage (REQUIRED for storage classes to work)
from lightrag.kg.shared_storage import initialize_share_data from lightrag.kg.shared_storage import initialize_share_data
@ -1213,8 +1342,32 @@ class MigrationTool:
if self.source_storage is None: if self.source_storage is None:
return return
# Check if there are at least 2 storage types available
available_count = self.count_available_storage_types()
if available_count <= 1:
print("\n" + "=" * 60)
print("⚠️ Warning: Migration Not Possible")
print("=" * 60)
print(f"Only {available_count} storage type(s) available.")
print("Migration requires at least 2 different storage types.")
print("\nTo enable migration, configure additional storage:")
print(" 1. Set environment variables, OR")
print(" 2. Update config.ini file")
print("\nSupported storage types:")
for name in STORAGE_TYPES.values():
if name != source_storage_name:
print(f" - {name}")
if name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[name]:
print(f" Required: {var}")
print("=" * 60)
# Cleanup
await self.source_storage.finalize()
return
if source_count == 0: if source_count == 0:
print("\n⚠ Source storage has no cache records to migrate") print("\n Source storage has no cache records to migrate")
# Cleanup # Cleanup
await self.source_storage.finalize() await self.source_storage.finalize()
return return