feat: Centralized storage location for OpenRAG

This commit is contained in:
Eric Hare 2025-12-09 09:34:54 -08:00
parent ccd8f48984
commit e02cfd5fe9
No known key found for this signature in database
GPG key ID: A73DF73724270AB7
8 changed files with 624 additions and 47 deletions

View file

@ -1,11 +1,13 @@
"""Configuration management for OpenRAG."""
import os
import shutil
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
from utils.logging_config import get_logger
from utils.paths import get_config_file, get_legacy_paths
logger = get_logger(__name__)
@ -130,10 +132,40 @@ class ConfigManager:
"""Initialize configuration manager.
Args:
config_file: Path to configuration file. Defaults to 'config.yaml' in project root.
config_file: Path to configuration file. Defaults to centralized location.
"""
self.config_file = Path(config_file) if config_file else Path("config/config.yaml")
if config_file:
self.config_file = Path(config_file)
else:
# Use centralized location
self.config_file = get_config_file()
# Check for legacy location and migrate if needed
legacy_config = get_legacy_paths()["config"]
if not self.config_file.exists() and legacy_config.exists():
self._migrate_from_legacy(legacy_config)
self._config: Optional[OpenRAGConfig] = None
def _migrate_from_legacy(self, legacy_config_path: Path) -> None:
"""Migrate configuration from legacy location to centralized location.
Args:
legacy_config_path: Path to legacy config file
"""
try:
logger.info(
f"Migrating configuration from {legacy_config_path} to {self.config_file}"
)
# Ensure parent directory exists
self.config_file.parent.mkdir(parents=True, exist_ok=True)
# Copy the config file
shutil.copy2(legacy_config_path, self.config_file)
logger.info("Configuration migration completed successfully")
except Exception as e:
logger.warning(
f"Failed to migrate configuration from {legacy_config_path}: {e}"
)
def load_config(self) -> OpenRAGConfig:
"""Load configuration from environment variables and config file.

View file

@ -246,25 +246,40 @@ async def init_index():
def generate_jwt_keys():
"""Generate RSA keys for JWT signing if they don't exist"""
keys_dir = "keys"
private_key_path = os.path.join(keys_dir, "private_key.pem")
public_key_path = os.path.join(keys_dir, "public_key.pem")
# Create keys directory if it doesn't exist
os.makedirs(keys_dir, exist_ok=True)
from utils.paths import get_keys_dir, get_private_key_path, get_public_key_path, get_legacy_paths
# Use centralized keys directory
keys_dir = get_keys_dir()
private_key_path = get_private_key_path()
public_key_path = get_public_key_path()
# Check for legacy keys and migrate if needed
legacy_paths = get_legacy_paths()
if not private_key_path.exists() and legacy_paths["private_key"].exists():
logger.info(f"Migrating JWT keys from {legacy_paths['keys_dir']} to {keys_dir}")
try:
shutil.copy2(legacy_paths["private_key"], private_key_path)
if legacy_paths["public_key"].exists():
shutil.copy2(legacy_paths["public_key"], public_key_path)
logger.info("JWT keys migration completed successfully")
except Exception as e:
logger.warning(f"Failed to migrate JWT keys: {e}")
# Ensure keys directory exists (already done by get_keys_dir, but keeping for clarity)
keys_dir.mkdir(parents=True, exist_ok=True)
# Generate keys if they don't exist
if not os.path.exists(private_key_path):
if not private_key_path.exists():
try:
# Generate private key
subprocess.run(
["openssl", "genrsa", "-out", private_key_path, "2048"],
["openssl", "genrsa", "-out", str(private_key_path), "2048"],
check=True,
capture_output=True,
)
# Set restrictive permissions on private key (readable by owner only)
os.chmod(private_key_path, 0o600)
os.chmod(str(private_key_path), 0o600)
# Generate public key
subprocess.run(
@ -272,17 +287,17 @@ def generate_jwt_keys():
"openssl",
"rsa",
"-in",
private_key_path,
str(private_key_path),
"-pubout",
"-out",
public_key_path,
str(public_key_path),
],
check=True,
capture_output=True,
)
# Set permissions on public key (readable by all)
os.chmod(public_key_path, 0o644)
os.chmod(str(public_key_path), 0o644)
logger.info("Generated RSA keys for JWT signing")
except subprocess.CalledProcessError as e:
@ -292,8 +307,8 @@ def generate_jwt_keys():
else:
# Ensure correct permissions on existing keys
try:
os.chmod(private_key_path, 0o600)
os.chmod(public_key_path, 0o644)
os.chmod(str(private_key_path), 0o600)
os.chmod(str(public_key_path), 0o644)
logger.info("RSA keys already exist, ensured correct permissions")
except OSError as e:
logger.warning("Failed to set permissions on existing keys", error=str(e))
@ -314,17 +329,18 @@ async def init_index_when_ready():
def _get_documents_dir():
"""Get the documents directory path, handling both Docker and local environments."""
# In Docker, the volume is mounted at /app/openrag-documents
# Locally, we use openrag-documents
from utils.paths import get_documents_dir
# Use centralized path utility which handles both container and local environments
path = get_documents_dir()
container_env = detect_container_environment()
if container_env:
path = os.path.abspath("/app/openrag-documents")
logger.debug(f"Running in {container_env}, using container path: {path}")
return path
else:
path = os.path.abspath(os.path.join(os.getcwd(), "openrag-documents"))
logger.debug(f"Running locally, using local path: {path}")
return path
logger.debug(f"Running locally, using centralized path: {path}")
return str(path)
async def ingest_default_documents_when_ready(services):
@ -560,6 +576,16 @@ async def startup_tasks(services):
async def initialize_services():
"""Initialize all services and their dependencies"""
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_START)
# Perform migration if needed (move files from old locations to ~/.openrag)
from utils.migration import perform_migration
try:
migration_results = perform_migration()
if migration_results:
logger.info("File migration completed", results=migration_results)
except Exception as e:
logger.warning(f"Migration failed, continuing with startup: {e}")
# Generate JWT keys if they don't exist
generate_jwt_keys()

View file

@ -40,17 +40,13 @@ class FlowsService:
def _get_flows_directory(self):
"""Get the flows directory path"""
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/
src_dir = os.path.dirname(current_file_dir) # src/
project_root = os.path.dirname(src_dir) # project root
return os.path.join(project_root, "flows")
from utils.paths import get_flows_dir
return str(get_flows_dir())
def _get_backup_directory(self):
"""Get the backup directory path"""
flows_dir = self._get_flows_directory()
backup_dir = os.path.join(flows_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
from utils.paths import get_flows_backup_dir
return str(get_flows_backup_dir())
def _get_latest_backup_path(self, flow_id: str, flow_type: str):
"""

View file

@ -51,18 +51,29 @@ class SessionManager:
def __init__(
self,
secret_key: str = None,
private_key_path: str = "keys/private_key.pem",
public_key_path: str = "keys/public_key.pem",
private_key_path: str = None,
public_key_path: str = None,
):
from utils.paths import get_private_key_path, get_public_key_path
self.secret_key = secret_key # Keep for backward compatibility
self.users: Dict[str, User] = {} # user_id -> User
self.user_opensearch_clients: Dict[
str, Any
] = {} # user_id -> OpenSearch client
# Use centralized key paths if not explicitly provided
if private_key_path is None:
self.private_key_path = str(get_private_key_path())
else:
self.private_key_path = private_key_path
if public_key_path is None:
self.public_key_path = str(get_public_key_path())
else:
self.public_key_path = public_key_path
# Load RSA keys
self.private_key_path = private_key_path
self.public_key_path = public_key_path
self._load_rsa_keys()
def _load_rsa_keys(self):

View file

@ -454,8 +454,9 @@ def _copy_assets(resource_tree, destination: Path, allowed_suffixes: Optional[It
def copy_sample_documents(*, force: bool = False) -> None:
"""Copy sample documents from package to current directory if they don't exist."""
documents_dir = Path("openrag-documents")
"""Copy sample documents from package to centralized directory if they don't exist."""
from utils.paths import get_documents_dir
documents_dir = get_documents_dir()
try:
assets_files = files("tui._assets.openrag-documents")
@ -466,8 +467,9 @@ def copy_sample_documents(*, force: bool = False) -> None:
def copy_sample_flows(*, force: bool = False) -> None:
"""Copy sample flows from package to current directory if they don't exist."""
flows_dir = Path("flows")
"""Copy sample flows from package to centralized directory if they don't exist."""
from utils.paths import get_flows_dir
flows_dir = get_flows_dir()
try:
assets_files = files("tui._assets.flows")
@ -478,7 +480,9 @@ def copy_sample_flows(*, force: bool = False) -> None:
def copy_compose_files(*, force: bool = False) -> None:
"""Copy docker-compose templates into the workspace if they are missing."""
"""Copy docker-compose templates into the TUI workspace if they are missing."""
from utils.paths import get_tui_compose_file
try:
assets_root = files("tui._assets")
except Exception as e:
@ -486,7 +490,9 @@ def copy_compose_files(*, force: bool = False) -> None:
return
for filename in ("docker-compose.yml", "docker-compose.gpu.yml"):
destination = Path(filename)
is_gpu = "gpu" in filename
destination = get_tui_compose_file(gpu=is_gpu)
if destination.exists() and not force:
continue
@ -505,7 +511,7 @@ def copy_compose_files(*, force: bool = False) -> None:
logger.debug(f"Failed to read existing compose file {destination}: {read_error}")
destination.write_bytes(resource_bytes)
logger.info(f"Copied docker-compose template: {filename}")
logger.info(f"Copied docker-compose template to {destination}")
except Exception as error:
logger.debug(f"Could not copy compose file {filename}: {error}")

View file

@ -63,11 +63,11 @@ class EnvConfig:
disable_ingest_with_langflow: str = "False"
nudges_flow_id: str = "ebc01d31-1976-46ce-a385-b0240327226c"
# Document paths (comma-separated)
openrag_documents_paths: str = "./openrag-documents"
# Document paths (comma-separated) - use centralized location by default
openrag_documents_paths: str = "~/.openrag/documents/openrag-documents"
# OpenSearch data path
opensearch_data_path: str = "./opensearch-data"
# OpenSearch data path - use centralized location by default
opensearch_data_path: str = "~/.openrag/data/opensearch-data"
# Container version (linked to TUI version)
openrag_version: str = ""
@ -80,7 +80,28 @@ class EnvManager:
"""Manages environment configuration for OpenRAG."""
def __init__(self, env_file: Optional[Path] = None):
self.env_file = env_file or Path(".env")
if env_file:
self.env_file = env_file
else:
# Use centralized location for TUI .env file
from utils.paths import get_tui_env_file, get_legacy_paths
self.env_file = get_tui_env_file()
# Check for legacy .env in current directory and migrate if needed
legacy_env = get_legacy_paths()["tui_env"]
if not self.env_file.exists() and legacy_env.exists():
try:
import shutil
self.env_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(legacy_env, self.env_file)
from utils.logging_config import get_logger
logger = get_logger(__name__)
logger.info(f"Migrated .env from {legacy_env} to {self.env_file}")
except Exception as e:
from utils.logging_config import get_logger
logger = get_logger(__name__)
logger.warning(f"Failed to migrate .env file: {e}")
self.config = EnvConfig()
def generate_secure_password(self) -> str:

285
src/utils/migration.py Normal file
View file

@ -0,0 +1,285 @@
"""Migration utilities for moving OpenRAG files to centralized location.
This module handles migration of files from legacy locations (current working directory)
to the new centralized location (~/.openrag/).
"""
import shutil
from pathlib import Path
from typing import Optional, List, Dict
from utils.logging_config import get_logger
from utils.container_utils import detect_container_environment
logger = get_logger(__name__)
def get_migration_marker_file() -> Path:
"""Get the path to the migration marker file.
This file is created after a successful migration to prevent repeated migrations.
Returns:
Path to migration marker file
"""
from utils.paths import get_openrag_home
return get_openrag_home() / ".migrated"
def is_migration_needed() -> bool:
"""Check if migration is needed.
Migration is not needed if:
- We're in a container environment
- Migration has already been completed (marker file exists)
Returns:
True if migration should be performed, False otherwise
"""
# Don't migrate in container environments
if detect_container_environment():
return False
# Check if migration has already been completed
marker_file = get_migration_marker_file()
if marker_file.exists():
return False
# Check if any legacy files exist
from utils.paths import get_legacy_paths
legacy_paths = get_legacy_paths()
for name, path in legacy_paths.items():
if path.exists():
logger.info(f"Found legacy file/directory: {path}")
return True
return False
def migrate_directory(src: Path, dst: Path, description: str) -> bool:
"""Migrate a directory from source to destination.
Args:
src: Source directory path
dst: Destination directory path
description: Human-readable description for logging
Returns:
True if migration was successful or not needed, False otherwise
"""
if not src.exists():
logger.debug(f"Source directory does not exist, skipping: {src}")
return True
if not src.is_dir():
logger.warning(f"Source is not a directory: {src}")
return False
try:
# Ensure parent directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
# If destination already exists, merge contents
if dst.exists():
logger.info(f"Destination already exists, merging: {dst}")
# Copy contents recursively
for item in src.iterdir():
src_item = src / item.name
dst_item = dst / item.name
if src_item.is_dir():
if not dst_item.exists():
shutil.copytree(src_item, dst_item)
logger.debug(f"Copied directory: {src_item} -> {dst_item}")
else:
if not dst_item.exists():
shutil.copy2(src_item, dst_item)
logger.debug(f"Copied file: {src_item} -> {dst_item}")
else:
# Move entire directory
shutil.move(str(src), str(dst))
logger.info(f"Migrated {description}: {src} -> {dst}")
return True
except Exception as e:
logger.error(f"Failed to migrate {description} from {src} to {dst}: {e}")
return False
def migrate_file(src: Path, dst: Path, description: str) -> bool:
"""Migrate a file from source to destination.
Args:
src: Source file path
dst: Destination file path
description: Human-readable description for logging
Returns:
True if migration was successful or not needed, False otherwise
"""
if not src.exists():
logger.debug(f"Source file does not exist, skipping: {src}")
return True
if not src.is_file():
logger.warning(f"Source is not a file: {src}")
return False
try:
# Ensure parent directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
# Only copy if destination doesn't exist
if dst.exists():
logger.debug(f"Destination already exists, skipping: {dst}")
return True
# Copy the file
shutil.copy2(src, dst)
logger.info(f"Migrated {description}: {src} -> {dst}")
return True
except Exception as e:
logger.error(f"Failed to migrate {description} from {src} to {dst}: {e}")
return False
def perform_migration() -> Dict[str, bool]:
"""Perform migration of all OpenRAG files to centralized location.
Returns:
Dictionary mapping resource names to migration success status
"""
if not is_migration_needed():
logger.debug("Migration not needed or already completed")
return {}
logger.info("Starting migration of OpenRAG files to centralized location")
from utils.paths import (
get_config_file,
get_keys_dir,
get_documents_dir,
get_flows_dir,
get_tui_env_file,
get_tui_compose_file,
get_opensearch_data_dir,
get_legacy_paths,
)
legacy_paths = get_legacy_paths()
results = {}
# Migrate configuration file
if legacy_paths["config"].exists():
results["config"] = migrate_file(
legacy_paths["config"],
get_config_file(),
"configuration file"
)
# Migrate JWT keys directory
if legacy_paths["keys_dir"].exists():
results["keys"] = migrate_directory(
legacy_paths["keys_dir"],
get_keys_dir(),
"JWT keys directory"
)
# Migrate documents directory
if legacy_paths["documents"].exists():
results["documents"] = migrate_directory(
legacy_paths["documents"],
get_documents_dir(),
"documents directory"
)
# Migrate flows directory
if legacy_paths["flows"].exists():
results["flows"] = migrate_directory(
legacy_paths["flows"],
get_flows_dir(),
"flows directory"
)
# Migrate TUI .env file
if legacy_paths["tui_env"].exists():
results["tui_env"] = migrate_file(
legacy_paths["tui_env"],
get_tui_env_file(),
"TUI .env file"
)
# Migrate docker-compose files
if legacy_paths["tui_compose"].exists():
results["tui_compose"] = migrate_file(
legacy_paths["tui_compose"],
get_tui_compose_file(gpu=False),
"docker-compose.yml"
)
if legacy_paths["tui_compose_gpu"].exists():
results["tui_compose_gpu"] = migrate_file(
legacy_paths["tui_compose_gpu"],
get_tui_compose_file(gpu=True),
"docker-compose.gpu.yml"
)
# Note: We don't migrate opensearch-data as it's typically large and managed by Docker
# Users can manually move it if needed, or specify a custom path via env var
# Create migration marker file
marker_file = get_migration_marker_file()
try:
marker_file.parent.mkdir(parents=True, exist_ok=True)
marker_file.write_text(f"Migration completed successfully\n")
logger.info("Migration marker file created")
except Exception as e:
logger.warning(f"Failed to create migration marker file: {e}")
# Log summary
successful = sum(1 for success in results.values() if success)
total = len(results)
logger.info(f"Migration completed: {successful}/{total} items migrated successfully")
if successful < total:
logger.warning("Some migrations failed. Check logs for details.")
return results
def cleanup_legacy_files(dry_run: bool = True) -> List[str]:
"""Clean up legacy files after successful migration.
This function removes the old files from the current working directory after
confirming they have been successfully migrated.
Args:
dry_run: If True, only list files that would be removed without actually removing them
Returns:
List of file paths that were (or would be) removed
"""
from utils.paths import get_legacy_paths
legacy_paths = get_legacy_paths()
removed_files = []
for name, path in legacy_paths.items():
if not path.exists():
continue
if dry_run:
logger.info(f"Would remove: {path}")
removed_files.append(str(path))
else:
try:
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
logger.info(f"Removed legacy file/directory: {path}")
removed_files.append(str(path))
except Exception as e:
logger.error(f"Failed to remove {path}: {e}")
return removed_files

200
src/utils/paths.py Normal file
View file

@ -0,0 +1,200 @@
"""Centralized path management for OpenRAG.
This module provides functions to get standardized paths for OpenRAG files and directories.
All paths are centralized under ~/.openrag/ to avoid cluttering the user's current working directory.
"""
import os
from pathlib import Path
from utils.logging_config import get_logger
from utils.container_utils import detect_container_environment
logger = get_logger(__name__)
def get_openrag_home() -> Path:
"""Get the OpenRAG home directory.
In containers: Uses current working directory (for backward compatibility)
In local environments: Uses ~/.openrag/
Returns:
Path to OpenRAG home directory
"""
# In container environments, use the container's working directory
# This maintains backward compatibility with existing Docker setups
container_env = detect_container_environment()
if container_env:
# In containers, paths are managed by Docker volumes
return Path.cwd()
# In local environments, use centralized location
home_dir = Path.home() / ".openrag"
home_dir.mkdir(parents=True, exist_ok=True)
return home_dir
def get_config_dir() -> Path:
"""Get the configuration directory.
Returns:
Path to config directory (~/.openrag/config/ or ./config/ in containers)
"""
config_dir = get_openrag_home() / "config"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir
def get_config_file() -> Path:
"""Get the configuration file path.
Returns:
Path to config.yaml file
"""
return get_config_dir() / "config.yaml"
def get_keys_dir() -> Path:
"""Get the JWT keys directory.
Returns:
Path to keys directory (~/.openrag/keys/ or ./keys/ in containers)
"""
keys_dir = get_openrag_home() / "keys"
keys_dir.mkdir(parents=True, exist_ok=True)
return keys_dir
def get_private_key_path() -> Path:
"""Get the JWT private key path.
Returns:
Path to private_key.pem
"""
return get_keys_dir() / "private_key.pem"
def get_public_key_path() -> Path:
"""Get the JWT public key path.
Returns:
Path to public_key.pem
"""
return get_keys_dir() / "public_key.pem"
def get_documents_dir() -> Path:
"""Get the documents directory for default document ingestion.
In containers: Uses /app/openrag-documents (Docker volume mount)
In local environments: Uses ~/.openrag/documents/openrag-documents
Returns:
Path to documents directory
"""
container_env = detect_container_environment()
if container_env:
# In containers, use the Docker volume mount path
return Path("/app/openrag-documents")
# In local environments, use centralized location
documents_dir = get_openrag_home() / "documents" / "openrag-documents"
documents_dir.mkdir(parents=True, exist_ok=True)
return documents_dir
def get_flows_dir() -> Path:
"""Get the flows directory.
Returns:
Path to flows directory (~/.openrag/flows/ or ./flows/ in containers)
"""
flows_dir = get_openrag_home() / "flows"
flows_dir.mkdir(parents=True, exist_ok=True)
return flows_dir
def get_flows_backup_dir() -> Path:
"""Get the flows backup directory.
Returns:
Path to flows/backup directory
"""
backup_dir = get_flows_dir() / "backup"
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def get_data_dir() -> Path:
"""Get the data directory.
Returns:
Path to data directory (~/.openrag/data/ or ./data/ in containers)
"""
data_dir = get_openrag_home() / "data"
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir
def get_opensearch_data_dir() -> Path:
"""Get the OpenSearch data directory.
Returns:
Path to OpenSearch data directory
"""
return get_data_dir() / "opensearch-data"
def get_tui_dir() -> Path:
"""Get the TUI directory for TUI-specific files.
Returns:
Path to tui directory (~/.openrag/tui/ or ./tui/ in containers)
"""
tui_dir = get_openrag_home() / "tui"
tui_dir.mkdir(parents=True, exist_ok=True)
return tui_dir
def get_tui_env_file() -> Path:
"""Get the TUI .env file path.
Returns:
Path to .env file
"""
return get_tui_dir() / ".env"
def get_tui_compose_file(gpu: bool = False) -> Path:
"""Get the TUI docker-compose file path.
Args:
gpu: If True, returns path to docker-compose.gpu.yml
Returns:
Path to docker-compose file
"""
filename = "docker-compose.gpu.yml" if gpu else "docker-compose.yml"
return get_tui_dir() / filename
# Backward compatibility functions for migration
def get_legacy_paths() -> dict:
"""Get legacy (old) paths for migration purposes.
Returns:
Dictionary mapping resource names to their old paths
"""
cwd = Path.cwd()
return {
"config": cwd / "config" / "config.yaml",
"keys_dir": cwd / "keys",
"private_key": cwd / "keys" / "private_key.pem",
"public_key": cwd / "keys" / "public_key.pem",
"documents": cwd / "openrag-documents",
"flows": cwd / "flows",
"tui_env": cwd / ".env",
"tui_compose": cwd / "docker-compose.yml",
"tui_compose_gpu": cwd / "docker-compose.gpu.yml",
"opensearch_data": cwd / "opensearch-data",
}