This commit is contained in:
phact 2025-09-03 15:31:14 -04:00
parent d070229566
commit 25a45af581
16 changed files with 2599 additions and 0 deletions

View file

@ -22,6 +22,10 @@ dependencies = [
"uvicorn>=0.35.0",
"boto3>=1.35.0",
"psutil>=7.0.0",
"rich>=13.0.0",
"textual>=0.45.0",
"python-dotenv>=1.0.0",
"textual-fspicker>=0.6.0",
]
[tool.uv.sources]

View file

@ -1,3 +1,11 @@
import sys
# Check for TUI flag FIRST, before any heavy imports
if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui":
from tui.main import run_tui
run_tui()
sys.exit(0)
import asyncio
import atexit
import multiprocessing
@ -699,6 +707,7 @@ async def cleanup_subscriptions_proper(services):
if __name__ == "__main__":
import uvicorn
# TUI check already handled at top of file
# Register cleanup function
atexit.register(cleanup)

1
src/tui/__init__.py Normal file
View file

@ -0,0 +1 @@
"""OpenRAG Terminal User Interface package."""

229
src/tui/main.py Normal file
View file

@ -0,0 +1,229 @@
"""Main TUI application for OpenRAG."""
import sys
from pathlib import Path
from textual.app import App, ComposeResult
from .screens.welcome import WelcomeScreen
from .screens.config import ConfigScreen
from .screens.monitor import MonitorScreen
from .screens.logs import LogsScreen
from .managers.env_manager import EnvManager
from .managers.container_manager import ContainerManager
from .utils.platform import PlatformDetector
class OpenRAGTUI(App):
"""OpenRAG Terminal User Interface application."""
TITLE = "OpenRAG TUI"
SUB_TITLE = "Container Management & Configuration"
CSS = """
Screen {
background: $background;
}
#main-container {
height: 100%;
padding: 1;
}
#welcome-container {
align: center middle;
width: 100%;
height: 100%;
}
#welcome-text {
text-align: center;
margin-bottom: 2;
}
.button-row {
align: center middle;
height: auto;
margin: 1 0;
}
.button-row Button {
margin: 0 1;
min-width: 20;
}
#config-header {
text-align: center;
margin-bottom: 2;
}
#config-scroll {
height: 1fr;
overflow-y: auto;
}
#config-form {
width: 80%;
max-width: 100;
margin: 0;
padding: 1;
height: auto;
}
#config-form Input {
margin-bottom: 1;
width: 100%;
}
/* Actions under Documents Paths input */
#docs-path-actions {
width: 100%;
padding-left: 0;
margin-top: -1;
height: auto;
}
#docs-path-actions Button {
width: auto;
min-width: 12;
}
#config-form Label {
margin-bottom: 0;
padding-left: 1;
}
.helper-text {
margin: 0 0 1 1;
}
/* Docs path actions row */
#services-content {
height: 100%;
}
#runtime-status {
background: $panel;
border: solid $primary;
padding: 1;
margin-bottom: 1;
}
#services-table {
height: 1fr;
margin-bottom: 1;
}
#images-table {
height: auto;
max-height: 8;
margin-bottom: 1;
}
#logs-scroll {
height: 1fr;
border: solid $primary;
background: $surface;
}
.controls-row {
align: left middle;
height: auto;
margin: 1 0;
}
.controls-row > * {
margin-right: 1;
}
.label {
width: auto;
margin-right: 1;
text-style: bold;
}
#system-info {
background: $panel;
border: solid $primary;
padding: 2;
height: 1fr;
}
TabbedContent {
height: 1fr;
}
TabPane {
padding: 1;
height: 1fr;
}
.tab-header {
text-style: bold;
color: $accent;
margin-bottom: 1;
}
TabPane ScrollableContainer {
height: 100%;
padding: 1;
}
"""
def __init__(self):
super().__init__()
self.platform_detector = PlatformDetector()
self.container_manager = ContainerManager()
self.env_manager = EnvManager()
def on_mount(self) -> None:
"""Initialize the application."""
# Check for runtime availability and show appropriate screen
if not self.container_manager.is_available():
self.notify(
"No container runtime found. Please install Docker or Podman.",
severity="warning",
timeout=10
)
# Load existing config if available
config_exists = self.env_manager.load_existing_env()
# Start with welcome screen
self.push_screen(WelcomeScreen())
def action_quit(self) -> None:
"""Quit the application."""
self.exit()
def check_runtime_requirements(self) -> tuple[bool, str]:
"""Check if runtime requirements are met."""
if not self.container_manager.is_available():
return False, self.platform_detector.get_installation_instructions()
# Check Podman macOS memory if applicable
runtime_info = self.container_manager.get_runtime_info()
if runtime_info.runtime_type.value == "podman":
is_sufficient, _, message = self.platform_detector.check_podman_macos_memory()
if not is_sufficient:
return False, f"Podman VM memory insufficient:\n{message}"
return True, "Runtime requirements satisfied"
def run_tui():
"""Run the OpenRAG TUI application."""
try:
app = OpenRAGTUI()
app.run()
except KeyboardInterrupt:
print("\nOpenRAG TUI interrupted by user")
sys.exit(0)
except Exception as e:
print(f"Error running OpenRAG TUI: {e}")
sys.exit(1)
if __name__ == "__main__":
run_tui()

View file

@ -0,0 +1 @@
"""TUI managers package."""

View file

@ -0,0 +1,430 @@
"""Container lifecycle manager for OpenRAG TUI."""
import asyncio
import json
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, AsyncIterator
from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType
from utils.gpu_detection import detect_gpu_devices
class ServiceStatus(Enum):
"""Container service status."""
UNKNOWN = "unknown"
RUNNING = "running"
STOPPED = "stopped"
STARTING = "starting"
STOPPING = "stopping"
ERROR = "error"
MISSING = "missing"
@dataclass
class ServiceInfo:
"""Container service information."""
name: str
status: ServiceStatus
health: Optional[str] = None
ports: List[str] = None
image: Optional[str] = None
image_digest: Optional[str] = None
created: Optional[str] = None
def __post_init__(self):
if self.ports is None:
self.ports = []
class ContainerManager:
"""Manages Docker/Podman container lifecycle for OpenRAG."""
def __init__(self, compose_file: Optional[Path] = None):
self.platform_detector = PlatformDetector()
self.runtime_info = self.platform_detector.detect_runtime()
self.compose_file = compose_file or Path("docker-compose.yml")
self.cpu_compose_file = Path("docker-compose-cpu.yml")
self.services_cache: Dict[str, ServiceInfo] = {}
self.last_status_update = 0
# Auto-select CPU compose if no GPU available
try:
has_gpu, _ = detect_gpu_devices()
self.use_cpu_compose = not has_gpu
except Exception:
self.use_cpu_compose = True
# Expected services based on compose files
self.expected_services = [
"openrag-backend",
"openrag-frontend",
"opensearch",
"dashboards",
"langflow"
]
# Map container names to service names
self.container_name_map = {
"openrag-backend": "openrag-backend",
"openrag-frontend": "openrag-frontend",
"os": "opensearch",
"osdash": "dashboards",
"langflow": "langflow"
}
def is_available(self) -> bool:
"""Check if container runtime is available."""
return self.runtime_info.runtime_type != RuntimeType.NONE
def get_runtime_info(self) -> RuntimeInfo:
"""Get container runtime information."""
return self.runtime_info
def get_installation_help(self) -> str:
"""Get installation instructions if runtime is not available."""
return self.platform_detector.get_installation_instructions()
async def _run_compose_command(self, args: List[str], cpu_mode: Optional[bool] = None) -> tuple[bool, str, str]:
"""Run a compose command and return (success, stdout, stderr)."""
if not self.is_available():
return False, "", "No container runtime available"
if cpu_mode is None:
cpu_mode = self.use_cpu_compose
compose_file = self.cpu_compose_file if cpu_mode else self.compose_file
cmd = self.runtime_info.compose_command + ["-f", str(compose_file)] + args
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=Path.cwd()
)
stdout, stderr = await process.communicate()
stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""
success = process.returncode == 0
return success, stdout_text, stderr_text
except Exception as e:
return False, "", f"Command execution failed: {e}"
async def _run_runtime_command(self, args: List[str]) -> tuple[bool, str, str]:
"""Run a runtime command (docker/podman) and return (success, stdout, stderr)."""
if not self.is_available():
return False, "", "No container runtime available"
cmd = self.runtime_info.runtime_command + args
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""
success = process.returncode == 0
return success, stdout_text, stderr_text
except Exception as e:
return False, "", f"Command execution failed: {e}"
async def get_service_status(self, force_refresh: bool = False) -> Dict[str, ServiceInfo]:
"""Get current status of all services."""
current_time = time.time()
# Use cache if recent and not forcing refresh
if not force_refresh and current_time - self.last_status_update < 5:
return self.services_cache
services = {}
# Get compose service status
success, stdout, stderr = await self._run_compose_command(["ps", "--format", "json"])
if success and stdout.strip():
try:
# Parse JSON output - each line is a separate JSON object
for line in stdout.strip().split('\n'):
if line.strip() and line.startswith('{'):
service = json.loads(line)
container_name = service.get("Name", "")
# Map container name to service name
service_name = self.container_name_map.get(container_name)
if not service_name:
continue
state = service.get("State", "").lower()
# Map compose states to our status enum
if "running" in state:
status = ServiceStatus.RUNNING
elif "exited" in state or "stopped" in state:
status = ServiceStatus.STOPPED
elif "starting" in state:
status = ServiceStatus.STARTING
else:
status = ServiceStatus.UNKNOWN
# Extract health - use Status if Health is empty
health = service.get("Health", "") or service.get("Status", "N/A")
# Extract ports
ports_str = service.get("Ports", "")
ports = [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else []
# Extract image
image = service.get("Image", "N/A")
services[service_name] = ServiceInfo(
name=service_name,
status=status,
health=health,
ports=ports,
image=image,
)
except json.JSONDecodeError:
# Fallback to parsing text output
lines = stdout.strip().split('\n')
for line in lines[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
name = parts[0]
# Only include our expected services
if name not in self.expected_services:
continue
state = parts[2].lower()
if "up" in state:
status = ServiceStatus.RUNNING
elif "exit" in state:
status = ServiceStatus.STOPPED
else:
status = ServiceStatus.UNKNOWN
services[name] = ServiceInfo(name=name, status=status)
# Add expected services that weren't found
for expected in self.expected_services:
if expected not in services:
services[expected] = ServiceInfo(name=expected, status=ServiceStatus.MISSING)
self.services_cache = services
self.last_status_update = current_time
return services
async def get_images_digests(self, images: List[str]) -> Dict[str, str]:
"""Return a map of image -> digest/ID (sha256:...)."""
digests: Dict[str, str] = {}
for image in images:
if not image or image in digests:
continue
success, stdout, _ = await self._run_runtime_command([
"image", "inspect", image, "--format", "{{.Id}}"
])
if success and stdout.strip():
digests[image] = stdout.strip().splitlines()[0]
return digests
def _parse_compose_images(self) -> list[str]:
"""Best-effort parse of image names from compose files without YAML dependency."""
images: set[str] = set()
for compose in [self.compose_file, self.cpu_compose_file]:
try:
if not compose.exists():
continue
for line in compose.read_text().splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('image:'):
# image: repo/name:tag
val = line.split(':', 1)[1].strip()
# Remove quotes if present
if (val.startswith('"') and val.endswith('"')) or (val.startswith("'") and val.endswith("'")):
val = val[1:-1]
images.add(val)
except Exception:
continue
return list(images)
async def get_project_images_info(self) -> list[tuple[str, str]]:
"""
Return list of (image, digest_or_id) for images referenced by compose files.
If an image isn't present locally, returns '-' for its digest.
"""
expected = self._parse_compose_images()
results: list[tuple[str, str]] = []
for image in expected:
digest = '-'
success, stdout, _ = await self._run_runtime_command([
'image', 'inspect', image, '--format', '{{.Id}}'
])
if success and stdout.strip():
digest = stdout.strip().splitlines()[0]
results.append((image, digest))
results.sort(key=lambda x: x[0])
return results
async def start_services(self, cpu_mode: bool = False) -> AsyncIterator[tuple[bool, str]]:
"""Start all services and yield progress updates."""
yield False, "Starting OpenRAG services..."
success, stdout, stderr = await self._run_compose_command(["up", "-d"], cpu_mode)
if success:
yield True, "Services started successfully"
else:
yield False, f"Failed to start services: {stderr}"
async def stop_services(self) -> AsyncIterator[tuple[bool, str]]:
"""Stop all services and yield progress updates."""
yield False, "Stopping OpenRAG services..."
success, stdout, stderr = await self._run_compose_command(["down"])
if success:
yield True, "Services stopped successfully"
else:
yield False, f"Failed to stop services: {stderr}"
async def restart_services(self, cpu_mode: bool = False) -> AsyncIterator[tuple[bool, str]]:
"""Restart all services and yield progress updates."""
yield False, "Restarting OpenRAG services..."
success, stdout, stderr = await self._run_compose_command(["restart"], cpu_mode)
if success:
yield True, "Services restarted successfully"
else:
yield False, f"Failed to restart services: {stderr}"
async def upgrade_services(self, cpu_mode: bool = False) -> AsyncIterator[tuple[bool, str]]:
"""Upgrade services (pull latest images and restart) and yield progress updates."""
yield False, "Pulling latest images..."
# Pull latest images
success, stdout, stderr = await self._run_compose_command(["pull"], cpu_mode)
if not success:
yield False, f"Failed to pull images: {stderr}"
return
yield False, "Images updated, restarting services..."
# Restart with new images
success, stdout, stderr = await self._run_compose_command(["up", "-d", "--force-recreate"], cpu_mode)
if success:
yield True, "Services upgraded and restarted successfully"
else:
yield False, f"Failed to restart services after upgrade: {stderr}"
async def reset_services(self) -> AsyncIterator[tuple[bool, str]]:
"""Reset all services (stop, remove containers/volumes, clear data) and yield progress updates."""
yield False, "Stopping all services..."
# Stop and remove everything
success, stdout, stderr = await self._run_compose_command([
"down",
"--volumes",
"--remove-orphans",
"--rmi", "local"
])
if not success:
yield False, f"Failed to stop services: {stderr}"
return
yield False, "Cleaning up container data..."
# Additional cleanup - remove any remaining containers/volumes
# This is more thorough than just compose down
await self._run_runtime_command(["system", "prune", "-f"])
yield True, "System reset completed - all containers, volumes, and local images removed"
async def get_service_logs(self, service_name: str, lines: int = 100) -> tuple[bool, str]:
"""Get logs for a specific service."""
success, stdout, stderr = await self._run_compose_command(["logs", "--tail", str(lines), service_name])
if success:
return True, stdout
else:
return False, f"Failed to get logs: {stderr}"
async def follow_service_logs(self, service_name: str) -> AsyncIterator[str]:
"""Follow logs for a specific service."""
if not self.is_available():
yield "No container runtime available"
return
compose_file = self.cpu_compose_file if self.use_cpu_compose else self.compose_file
cmd = self.runtime_info.compose_command + ["-f", str(compose_file), "logs", "-f", service_name]
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=Path.cwd()
)
while True:
line = await process.stdout.readline()
if line:
yield line.decode().rstrip()
else:
break
except Exception as e:
yield f"Error following logs: {e}"
async def get_system_stats(self) -> Dict[str, Dict[str, str]]:
"""Get system resource usage statistics."""
stats = {}
# Get container stats
success, stdout, stderr = await self._run_runtime_command(["stats", "--no-stream", "--format", "json"])
if success and stdout.strip():
try:
for line in stdout.strip().split('\n'):
if line.strip():
data = json.loads(line)
name = data.get("Name", data.get("Container", ""))
if name:
stats[name] = {
"cpu": data.get("CPUPerc", "0%"),
"memory": data.get("MemUsage", "0B / 0B"),
"memory_percent": data.get("MemPerc", "0%"),
"network": data.get("NetIO", "0B / 0B"),
"disk": data.get("BlockIO", "0B / 0B"),
}
except json.JSONDecodeError:
pass
return stats
def check_podman_macos_memory(self) -> tuple[bool, str]:
"""Check if Podman VM has sufficient memory on macOS."""
if self.runtime_info.runtime_type != RuntimeType.PODMAN:
return True, "Not using Podman"
return self.platform_detector.check_podman_macos_memory()[:2] # Return is_sufficient, message

View file

@ -0,0 +1,279 @@
"""Environment configuration manager for OpenRAG TUI."""
import os
import secrets
import string
from pathlib import Path
from typing import Dict, Optional, List
from dataclasses import dataclass, field
from ..utils.validation import (
validate_openai_api_key,
validate_google_oauth_client_id,
validate_non_empty,
validate_url,
validate_documents_paths,
sanitize_env_value
)
@dataclass
class EnvConfig:
"""Environment configuration data."""
# Core settings
openai_api_key: str = ""
opensearch_password: str = ""
langflow_secret_key: str = ""
langflow_superuser: str = "admin"
langflow_superuser_password: str = ""
flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
# OAuth settings
google_oauth_client_id: str = ""
google_oauth_client_secret: str = ""
microsoft_graph_oauth_client_id: str = ""
microsoft_graph_oauth_client_secret: str = ""
# Optional settings
webhook_base_url: str = ""
aws_access_key_id: str = ""
aws_secret_access_key: str = ""
langflow_public_url: str = ""
# Document paths (comma-separated)
openrag_documents_paths: str = "./documents"
# Validation errors
validation_errors: Dict[str, str] = field(default_factory=dict)
class EnvManager:
"""Manages environment configuration for OpenRAG."""
def __init__(self, env_file: Optional[Path] = None):
self.env_file = env_file or Path(".env")
self.config = EnvConfig()
def generate_secure_password(self) -> str:
"""Generate a secure password for OpenSearch."""
# Generate a 16-character password with letters, digits, and symbols
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(16))
def generate_langflow_secret_key(self) -> str:
"""Generate a secure secret key for Langflow."""
return secrets.token_urlsafe(32)
def load_existing_env(self) -> bool:
"""Load existing .env file if it exists."""
if not self.env_file.exists():
return False
try:
with open(self.env_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = sanitize_env_value(value)
# Map env vars to config attributes
attr_map = {
'OPENAI_API_KEY': 'openai_api_key',
'OPENSEARCH_PASSWORD': 'opensearch_password',
'LANGFLOW_SECRET_KEY': 'langflow_secret_key',
'LANGFLOW_SUPERUSER': 'langflow_superuser',
'LANGFLOW_SUPERUSER_PASSWORD': 'langflow_superuser_password',
'FLOW_ID': 'flow_id',
'GOOGLE_OAUTH_CLIENT_ID': 'google_oauth_client_id',
'GOOGLE_OAUTH_CLIENT_SECRET': 'google_oauth_client_secret',
'MICROSOFT_GRAPH_OAUTH_CLIENT_ID': 'microsoft_graph_oauth_client_id',
'MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET': 'microsoft_graph_oauth_client_secret',
'WEBHOOK_BASE_URL': 'webhook_base_url',
'AWS_ACCESS_KEY_ID': 'aws_access_key_id',
'AWS_SECRET_ACCESS_KEY': 'aws_secret_access_key',
'LANGFLOW_PUBLIC_URL': 'langflow_public_url',
'OPENRAG_DOCUMENTS_PATHS': 'openrag_documents_paths',
}
if key in attr_map:
setattr(self.config, attr_map[key], value)
return True
except Exception as e:
print(f"Error loading .env file: {e}")
return False
def setup_secure_defaults(self) -> None:
"""Set up secure default values for passwords and keys."""
if not self.config.opensearch_password:
self.config.opensearch_password = self.generate_secure_password()
if not self.config.langflow_secret_key:
self.config.langflow_secret_key = self.generate_langflow_secret_key()
if not self.config.langflow_superuser_password:
self.config.langflow_superuser_password = self.generate_secure_password()
def validate_config(self, mode: str = "full") -> bool:
"""
Validate the current configuration.
Args:
mode: "no_auth" for minimal validation, "full" for complete validation
"""
self.config.validation_errors.clear()
# Always validate OpenAI API key
if not validate_openai_api_key(self.config.openai_api_key):
self.config.validation_errors['openai_api_key'] = "Invalid OpenAI API key format (should start with sk-)"
# Validate documents paths only if provided (optional)
if self.config.openrag_documents_paths:
is_valid, error_msg, _ = validate_documents_paths(self.config.openrag_documents_paths)
if not is_valid:
self.config.validation_errors['openrag_documents_paths'] = error_msg
# Validate required fields
if not validate_non_empty(self.config.opensearch_password):
self.config.validation_errors['opensearch_password'] = "OpenSearch password is required"
# Langflow secret key is auto-generated; no user input required
if not validate_non_empty(self.config.langflow_superuser_password):
self.config.validation_errors['langflow_superuser_password'] = "Langflow superuser password is required"
if mode == "full":
# Validate OAuth settings if provided
if self.config.google_oauth_client_id and not validate_google_oauth_client_id(self.config.google_oauth_client_id):
self.config.validation_errors['google_oauth_client_id'] = "Invalid Google OAuth client ID format"
if self.config.google_oauth_client_id and not validate_non_empty(self.config.google_oauth_client_secret):
self.config.validation_errors['google_oauth_client_secret'] = "Google OAuth client secret required when client ID is provided"
if self.config.microsoft_graph_oauth_client_id and not validate_non_empty(self.config.microsoft_graph_oauth_client_secret):
self.config.validation_errors['microsoft_graph_oauth_client_secret'] = "Microsoft Graph client secret required when client ID is provided"
# Validate optional URLs if provided
if self.config.webhook_base_url and not validate_url(self.config.webhook_base_url):
self.config.validation_errors['webhook_base_url'] = "Invalid webhook URL format"
if self.config.langflow_public_url and not validate_url(self.config.langflow_public_url):
self.config.validation_errors['langflow_public_url'] = "Invalid Langflow public URL format"
return len(self.config.validation_errors) == 0
def save_env_file(self) -> bool:
"""Save current configuration to .env file."""
try:
# Ensure secure defaults (including Langflow secret key) are set before saving
self.setup_secure_defaults()
# Create backup if file exists
if self.env_file.exists():
backup_file = self.env_file.with_suffix('.env.backup')
self.env_file.rename(backup_file)
with open(self.env_file, 'w') as f:
f.write("# OpenRAG Environment Configuration\n")
f.write("# Generated by OpenRAG TUI\n\n")
# Core settings
f.write("# Core settings\n")
f.write(f"LANGFLOW_SECRET_KEY={self.config.langflow_secret_key}\n")
f.write(f"LANGFLOW_SUPERUSER={self.config.langflow_superuser}\n")
f.write(f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n")
f.write(f"FLOW_ID={self.config.flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")
f.write(f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n")
f.write("\n")
# OAuth settings
if self.config.google_oauth_client_id or self.config.google_oauth_client_secret:
f.write("# Google OAuth settings\n")
f.write(f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n")
f.write(f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n")
f.write("\n")
if self.config.microsoft_graph_oauth_client_id or self.config.microsoft_graph_oauth_client_secret:
f.write("# Microsoft Graph OAuth settings\n")
f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n")
f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n")
f.write("\n")
# Optional settings
optional_vars = [
("WEBHOOK_BASE_URL", self.config.webhook_base_url),
("AWS_ACCESS_KEY_ID", self.config.aws_access_key_id),
("AWS_SECRET_ACCESS_KEY", self.config.aws_secret_access_key),
("LANGFLOW_PUBLIC_URL", self.config.langflow_public_url),
]
optional_written = False
for var_name, var_value in optional_vars:
if var_value:
if not optional_written:
f.write("# Optional settings\n")
optional_written = True
f.write(f"{var_name}={var_value}\n")
if optional_written:
f.write("\n")
return True
except Exception as e:
print(f"Error saving .env file: {e}")
return False
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
"""Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate)."""
return [
("openai_api_key", "OpenAI API Key", "sk-...", False),
("opensearch_password", "OpenSearch Password", "Will be auto-generated if empty", True),
("langflow_superuser_password", "Langflow Superuser Password", "Will be auto-generated if empty", True),
("openrag_documents_paths", "Documents Paths", "./documents,/path/to/more/docs", False),
]
def get_full_setup_fields(self) -> List[tuple[str, str, str, bool]]:
"""Get all fields for full setup mode."""
base_fields = self.get_no_auth_setup_fields()
oauth_fields = [
("google_oauth_client_id", "Google OAuth Client ID", "xxx.apps.googleusercontent.com", False),
("google_oauth_client_secret", "Google OAuth Client Secret", "", False),
("microsoft_graph_oauth_client_id", "Microsoft Graph Client ID", "", False),
("microsoft_graph_oauth_client_secret", "Microsoft Graph Client Secret", "", False),
]
optional_fields = [
("webhook_base_url", "Webhook Base URL (optional)", "https://your-domain.com", False),
("aws_access_key_id", "AWS Access Key ID (optional)", "", False),
("aws_secret_access_key", "AWS Secret Access Key (optional)", "", False),
("langflow_public_url", "Langflow Public URL (optional)", "http://localhost:7860", False),
]
return base_fields + oauth_fields + optional_fields
def generate_compose_volume_mounts(self) -> List[str]:
"""Generate Docker Compose volume mount strings from documents paths."""
is_valid, _, validated_paths = validate_documents_paths(self.config.openrag_documents_paths)
if not is_valid:
return ["./documents:/app/documents:Z"] # fallback
volume_mounts = []
for i, path in enumerate(validated_paths):
if i == 0:
# First path maps to the default /app/documents
volume_mounts.append(f"{path}:/app/documents:Z")
else:
# Additional paths map to numbered directories
volume_mounts.append(f"{path}:/app/documents{i+1}:Z")
return volume_mounts

View file

@ -0,0 +1 @@
"""TUI screens package."""

456
src/tui/screens/config.py Normal file
View file

@ -0,0 +1,456 @@
"""Configuration screen for OpenRAG TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Input, Label, TabbedContent, TabPane
from textual.validation import ValidationResult, Validator
from rich.text import Text
from pathlib import Path
from ..managers.env_manager import EnvManager
from ..utils.validation import validate_openai_api_key, validate_documents_paths
from pathlib import Path
class OpenAIKeyValidator(Validator):
"""Validator for OpenAI API keys."""
def validate(self, value: str) -> ValidationResult:
if not value:
return self.success()
if validate_openai_api_key(value):
return self.success()
else:
return self.failure("Invalid OpenAI API key format (should start with sk-)")
class DocumentsPathValidator(Validator):
"""Validator for documents paths."""
def validate(self, value: str) -> ValidationResult:
# Optional: allow empty value
if not value:
return self.success()
is_valid, error_msg, _ = validate_documents_paths(value)
if is_valid:
return self.success()
else:
return self.failure(error_msg)
class ConfigScreen(Screen):
"""Configuration screen for environment setup."""
BINDINGS = [
("escape", "back", "Back"),
("ctrl+s", "save", "Save"),
("ctrl+g", "generate", "Generate Passwords"),
]
def __init__(self, mode: str = "full"):
super().__init__()
self.mode = mode # "no_auth" or "full"
self.env_manager = EnvManager()
self.inputs = {}
# Load existing config if available
self.env_manager.load_existing_env()
def compose(self) -> ComposeResult:
"""Create the configuration screen layout."""
# Removed top header bar and header text
with Container(id="main-container"):
with ScrollableContainer(id="config-scroll"):
with Vertical(id="config-form"):
yield from self._create_all_fields()
yield Horizontal(
Button("Generate Passwords", variant="default", id="generate-btn"),
Button("Save Configuration", variant="success", id="save-btn"),
Button("Back", variant="default", id="back-btn"),
classes="button-row"
)
yield Footer()
def _create_header_text(self) -> Text:
"""Create the configuration header text."""
header_text = Text()
if self.mode == "no_auth":
header_text.append("Quick Setup - No Authentication\n", style="bold green")
header_text.append("Configure OpenRAG for local document processing only.\n\n", style="dim")
else:
header_text.append("Full Setup - OAuth Integration\n", style="bold cyan")
header_text.append("Configure OpenRAG with cloud service integrations.\n\n", style="dim")
header_text.append("Required fields are marked with *\n", style="yellow")
header_text.append("Use Ctrl+G to generate admin passwords\n", style="dim")
return header_text
def _create_all_fields(self) -> ComposeResult:
"""Create all configuration fields in a single scrollable layout."""
# Admin Credentials Section
yield Static("Admin Credentials", classes="tab-header")
yield Static(" ")
# OpenSearch Admin Password
yield Label("OpenSearch Admin Password *")
current_value = getattr(self.env_manager.config, "opensearch_password", "")
input_widget = Input(
placeholder="Auto-generated secure password",
value=current_value,
password=True,
id="input-opensearch_password"
)
yield input_widget
self.inputs["opensearch_password"] = input_widget
yield Static(" ")
# Langflow Admin Username
yield Label("Langflow Admin Username *")
current_value = getattr(self.env_manager.config, "langflow_superuser", "")
input_widget = Input(
placeholder="admin",
value=current_value,
id="input-langflow_superuser"
)
yield input_widget
self.inputs["langflow_superuser"] = input_widget
yield Static(" ")
# Langflow Admin Password
yield Label("Langflow Admin Password *")
current_value = getattr(self.env_manager.config, "langflow_superuser_password", "")
input_widget = Input(
placeholder="Auto-generated secure password",
value=current_value,
password=True,
id="input-langflow_superuser_password"
)
yield input_widget
self.inputs["langflow_superuser_password"] = input_widget
yield Static(" ")
yield Static(" ")
# API Keys Section
yield Static("API Keys", classes="tab-header")
yield Static(" ")
# OpenAI API Key
yield Label("OpenAI API Key *")
# Where to create OpenAI keys (helper above the box)
yield Static(Text("Get a key: https://platform.openai.com/api-keys", style="dim"), classes="helper-text")
current_value = getattr(self.env_manager.config, "openai_api_key", "")
input_widget = Input(
placeholder="sk-...",
value=current_value,
password=True,
validators=[OpenAIKeyValidator()],
id="input-openai_api_key"
)
yield input_widget
self.inputs["openai_api_key"] = input_widget
yield Static(" ")
# Add OAuth fields only in full mode
if self.mode == "full":
# Google OAuth Client ID
yield Label("Google OAuth Client ID")
# Where to create Google OAuth credentials (helper above the box)
yield Static(Text("Create credentials: https://console.cloud.google.com/apis/credentials", style="dim"), classes="helper-text")
current_value = getattr(self.env_manager.config, "google_oauth_client_id", "")
input_widget = Input(
placeholder="xxx.apps.googleusercontent.com",
value=current_value,
id="input-google_oauth_client_id"
)
yield input_widget
self.inputs["google_oauth_client_id"] = input_widget
yield Static(" ")
# Google OAuth Client Secret
yield Label("Google OAuth Client Secret")
current_value = getattr(self.env_manager.config, "google_oauth_client_secret", "")
input_widget = Input(
placeholder="",
value=current_value,
password=True,
id="input-google_oauth_client_secret"
)
yield input_widget
self.inputs["google_oauth_client_secret"] = input_widget
yield Static(" ")
# Microsoft Graph Client ID
yield Label("Microsoft Graph Client ID")
# Where to create Microsoft app registrations (helper above the box)
yield Static(Text("Create app: https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/ApplicationsListBlade", style="dim"), classes="helper-text")
current_value = getattr(self.env_manager.config, "microsoft_graph_oauth_client_id", "")
input_widget = Input(
placeholder="",
value=current_value,
id="input-microsoft_graph_oauth_client_id"
)
yield input_widget
self.inputs["microsoft_graph_oauth_client_id"] = input_widget
yield Static(" ")
# Microsoft Graph Client Secret
yield Label("Microsoft Graph Client Secret")
current_value = getattr(self.env_manager.config, "microsoft_graph_oauth_client_secret", "")
input_widget = Input(
placeholder="",
value=current_value,
password=True,
id="input-microsoft_graph_oauth_client_secret"
)
yield input_widget
self.inputs["microsoft_graph_oauth_client_secret"] = input_widget
yield Static(" ")
# AWS Access Key ID
yield Label("AWS Access Key ID")
# Where to create AWS keys (helper above the box)
yield Static(Text("Create keys: https://console.aws.amazon.com/iam/home#/security_credentials", style="dim"), classes="helper-text")
current_value = getattr(self.env_manager.config, "aws_access_key_id", "")
input_widget = Input(
placeholder="",
value=current_value,
id="input-aws_access_key_id"
)
yield input_widget
self.inputs["aws_access_key_id"] = input_widget
yield Static(" ")
# AWS Secret Access Key
yield Label("AWS Secret Access Key")
current_value = getattr(self.env_manager.config, "aws_secret_access_key", "")
input_widget = Input(
placeholder="",
value=current_value,
password=True,
id="input-aws_secret_access_key"
)
yield input_widget
self.inputs["aws_secret_access_key"] = input_widget
yield Static(" ")
yield Static(" ")
# Other Settings Section
yield Static("Others", classes="tab-header")
yield Static(" ")
# Documents Paths (optional) + picker action button on next line
yield Label("Documents Paths")
current_value = getattr(self.env_manager.config, "openrag_documents_paths", "")
input_widget = Input(
placeholder="./documents,/path/to/more/docs",
value=current_value,
validators=[DocumentsPathValidator()],
id="input-openrag_documents_paths"
)
yield input_widget
# Actions row with pick button
yield Horizontal(Button("Pick…", id="pick-docs-btn"), id="docs-path-actions", classes="controls-row")
self.inputs["openrag_documents_paths"] = input_widget
yield Static(" ")
# Langflow Secret Key removed from UI; generated automatically on save
# Add optional fields only in full mode
if self.mode == "full":
# Webhook Base URL
yield Label("Webhook Base URL")
current_value = getattr(self.env_manager.config, "webhook_base_url", "")
input_widget = Input(
placeholder="https://your-domain.com",
value=current_value,
id="input-webhook_base_url"
)
yield input_widget
self.inputs["webhook_base_url"] = input_widget
yield Static(" ")
# Langflow Public URL
yield Label("Langflow Public URL")
current_value = getattr(self.env_manager.config, "langflow_public_url", "")
input_widget = Input(
placeholder="http://localhost:7860",
value=current_value,
id="input-langflow_public_url"
)
yield input_widget
self.inputs["langflow_public_url"] = input_widget
yield Static(" ")
def _create_field(self, field_name: str, display_name: str, placeholder: str, can_generate: bool, required: bool = False) -> ComposeResult:
"""Create a single form field."""
# Create label
label_text = f"{display_name}"
if required:
label_text += " *"
yield Label(label_text)
# Get current value
current_value = getattr(self.env_manager.config, field_name, "")
# Create input with appropriate validator
if field_name == "openai_api_key":
input_widget = Input(
placeholder=placeholder,
value=current_value,
password=True,
validators=[OpenAIKeyValidator()],
id=f"input-{field_name}"
)
elif field_name == "openrag_documents_paths":
input_widget = Input(
placeholder=placeholder,
value=current_value,
validators=[DocumentsPathValidator()],
id=f"input-{field_name}"
)
elif "password" in field_name or "secret" in field_name:
input_widget = Input(
placeholder=placeholder,
value=current_value,
password=True,
id=f"input-{field_name}"
)
else:
input_widget = Input(
placeholder=placeholder,
value=current_value,
id=f"input-{field_name}"
)
yield input_widget
self.inputs[field_name] = input_widget
# Add spacing
yield Static(" ")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "generate-btn":
self.action_generate()
elif event.button.id == "save-btn":
self.action_save()
elif event.button.id == "back-btn":
self.action_back()
elif event.button.id == "pick-docs-btn":
self.action_pick_documents_path()
def action_generate(self) -> None:
"""Generate secure passwords for admin accounts."""
self.env_manager.setup_secure_defaults()
# Update input fields with generated values
for field_name, input_widget in self.inputs.items():
if field_name in ["opensearch_password", "langflow_superuser_password"]:
new_value = getattr(self.env_manager.config, field_name)
input_widget.value = new_value
self.notify("Generated secure passwords", severity="info")
def action_save(self) -> None:
"""Save the configuration."""
# Update config from input fields
for field_name, input_widget in self.inputs.items():
setattr(self.env_manager.config, field_name, input_widget.value)
# Validate the configuration
if not self.env_manager.validate_config(self.mode):
error_messages = []
for field, error in self.env_manager.config.validation_errors.items():
error_messages.append(f"{field}: {error}")
self.notify(f"Validation failed:\n" + "\n".join(error_messages[:3]), severity="error")
return
# Save to file
if self.env_manager.save_env_file():
self.notify("Configuration saved successfully!", severity="success")
# Switch to monitor screen
from .monitor import MonitorScreen
self.app.push_screen(MonitorScreen())
else:
self.notify("Failed to save configuration", severity="error")
def action_back(self) -> None:
"""Go back to welcome screen."""
self.app.pop_screen()
def action_pick_documents_path(self) -> None:
"""Open textual-fspicker to select a path and append it to the input."""
try:
import importlib
fsp = importlib.import_module("textual_fspicker")
except Exception:
self.notify("textual-fspicker not available", severity="warning")
return
# Determine starting path from current input if possible
input_widget = self.inputs.get("openrag_documents_paths")
start = Path.home()
if input_widget and input_widget.value:
first = input_widget.value.split(",")[0].strip()
if first:
start = Path(first).expanduser()
# Prefer SelectDirectory for directories; fallback to FileOpen
PickerClass = getattr(fsp, "SelectDirectory", None) or getattr(fsp, "FileOpen", None)
if PickerClass is None:
self.notify("No compatible picker found in textual-fspicker", severity="warning")
return
try:
picker = PickerClass(location=start)
except Exception:
try:
picker = PickerClass(start)
except Exception:
self.notify("Could not initialize textual-fspicker", severity="warning")
return
def _append_path(result) -> None:
if not result:
return
path_str = str(result)
if input_widget is None:
return
current = input_widget.value or ""
paths = [p.strip() for p in current.split(",") if p.strip()]
if path_str not in paths:
paths.append(path_str)
input_widget.value = ",".join(paths)
# Push with callback when supported; otherwise, use on_screen_dismissed fallback
try:
self.app.push_screen(picker, _append_path) # type: ignore[arg-type]
except TypeError:
self._docs_pick_callback = _append_path # type: ignore[attr-defined]
self.app.push_screen(picker)
def on_screen_dismissed(self, event) -> None: # type: ignore[override]
try:
# textual-fspicker screens should dismiss with a result; hand to callback if present
cb = getattr(self, "_docs_pick_callback", None)
if cb is not None:
cb(getattr(event, "result", None))
try:
delattr(self, "_docs_pick_callback")
except Exception:
pass
except Exception:
pass
def on_input_changed(self, event: Input.Changed) -> None:
"""Handle input changes for real-time validation feedback."""
# This will trigger validation display in real-time
pass

182
src/tui/screens/logs.py Normal file
View file

@ -0,0 +1,182 @@
"""Logs viewing screen for OpenRAG TUI."""
import asyncio
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Select, TextArea
from textual.timer import Timer
from rich.text import Text
from ..managers.container_manager import ContainerManager
class LogsScreen(Screen):
"""Logs viewing and monitoring screen."""
BINDINGS = [
("escape", "back", "Back"),
("f", "follow", "Follow Logs"),
("c", "clear", "Clear"),
("r", "refresh", "Refresh"),
]
def __init__(self, initial_service: str = "openrag-backend"):
super().__init__()
self.container_manager = ContainerManager()
self.current_service = initial_service
self.logs_area = None
self.following = False
self.follow_task = None
def compose(self) -> ComposeResult:
"""Create the logs screen layout."""
yield Header()
yield Container(
Vertical(
Static("Service Logs", id="logs-title"),
Horizontal(
Static("Service:", classes="label"),
Select([
("openrag-backend", "Backend"),
("openrag-frontend", "Frontend"),
("opensearch", "OpenSearch"),
("langflow", "Langflow"),
("dashboards", "Dashboards")
], value=self.current_service, id="service-select"),
Button("Refresh", variant="default", id="refresh-btn"),
Button("Follow", variant="primary", id="follow-btn"),
Button("Clear", variant="default", id="clear-btn"),
classes="controls-row"
),
self._create_logs_area(),
Horizontal(
Button("Back", variant="default", id="back-btn"),
classes="button-row"
),
id="logs-content"
),
id="main-container"
)
yield Footer()
def _create_logs_area(self) -> TextArea:
"""Create the logs text area."""
self.logs_area = TextArea(
text="Loading logs...",
read_only=True,
show_line_numbers=False,
id="logs-area"
)
return self.logs_area
async def on_mount(self) -> None:
"""Initialize the screen when mounted."""
await self._load_logs()
def on_unmount(self) -> None:
"""Clean up when unmounting."""
self._stop_following()
def on_select_changed(self, event: Select.Changed) -> None:
"""Handle service selection change."""
if event.select.id == "service-select":
self.current_service = event.value
self._stop_following()
self.run_worker(self._load_logs())
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "refresh-btn":
self.action_refresh()
elif event.button.id == "follow-btn":
self.action_follow()
elif event.button.id == "clear-btn":
self.action_clear()
elif event.button.id == "back-btn":
self.action_back()
async def _load_logs(self, lines: int = 200) -> None:
"""Load recent logs for the current service."""
if not self.container_manager.is_available():
self.logs_area.text = "No container runtime available"
return
success, logs = await self.container_manager.get_service_logs(self.current_service, lines)
if success:
self.logs_area.text = logs
# Scroll to bottom
self.logs_area.cursor_position = len(logs)
else:
self.logs_area.text = f"Failed to load logs: {logs}"
def _stop_following(self) -> None:
"""Stop following logs."""
self.following = False
if self.follow_task and not self.follow_task.done():
self.follow_task.cancel()
# Update button text
follow_btn = self.query_one("#follow-btn")
follow_btn.label = "Follow"
follow_btn.variant = "primary"
async def _follow_logs(self) -> None:
"""Follow logs in real-time."""
if not self.container_manager.is_available():
return
try:
async for log_line in self.container_manager.follow_service_logs(self.current_service):
if not self.following:
break
# Append new line to logs area
current_text = self.logs_area.text
new_text = current_text + "\n" + log_line
# Keep only last 1000 lines to prevent memory issues
lines = new_text.split('\n')
if len(lines) > 1000:
lines = lines[-1000:]
new_text = '\n'.join(lines)
self.logs_area.text = new_text
# Scroll to bottom
self.logs_area.cursor_position = len(new_text)
except asyncio.CancelledError:
pass
except Exception as e:
if self.following: # Only show error if we're still supposed to be following
self.notify(f"Error following logs: {e}", severity="error")
finally:
self.following = False
def action_refresh(self) -> None:
"""Refresh logs."""
self._stop_following()
self.run_worker(self._load_logs())
def action_follow(self) -> None:
"""Toggle log following."""
if self.following:
self._stop_following()
else:
self.following = True
follow_btn = self.query_one("#follow-btn")
follow_btn.label = "Stop Following"
follow_btn.variant = "error"
# Start following
self.follow_task = self.run_worker(self._follow_logs(), exclusive=False)
def action_clear(self) -> None:
"""Clear the logs area."""
self.logs_area.text = ""
def action_back(self) -> None:
"""Go back to previous screen."""
self._stop_following()
self.app.pop_screen()

449
src/tui/screens/monitor.py Normal file
View file

@ -0,0 +1,449 @@
"""Service monitoring screen for OpenRAG TUI."""
import asyncio
import re
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, TabbedContent, TabPane
from textual.timer import Timer
from rich.text import Text
from rich.table import Table
from ..managers.container_manager import ContainerManager, ServiceStatus, ServiceInfo
from ..utils.platform import RuntimeType
class MonitorScreen(Screen):
"""Service monitoring and control screen."""
BINDINGS = [
("escape", "back", "Back"),
("r", "refresh", "Refresh"),
("s", "start", "Start Services"),
("t", "stop", "Stop Services"),
("u", "upgrade", "Upgrade"),
("x", "reset", "Reset"),
]
def __init__(self):
super().__init__()
self.container_manager = ContainerManager()
self.services_table = None
self.images_table = None
self.status_text = None
self.refresh_timer = None
self.operation_in_progress = False
self._follow_task = None
self._follow_service = None
self._logs_buffer = []
def compose(self) -> ComposeResult:
"""Create the monitoring screen layout."""
yield Header()
with TabbedContent(id="monitor-tabs"):
with TabPane("Services", id="services-tab"):
yield from self._create_services_tab()
with TabPane("Logs", id="logs-tab"):
yield from self._create_logs_tab()
with TabPane("System", id="system-tab"):
yield from self._create_system_tab()
yield Footer()
def _create_services_tab(self) -> ComposeResult:
"""Create the services monitoring tab."""
# Current mode indicator + toggle
yield Horizontal(
Static("", id="mode-indicator"),
Button("Toggle Mode", id="toggle-mode-btn"),
classes="button-row",
id="mode-row",
)
# Images summary table (above services)
yield Static("Container Images", classes="tab-header")
self.images_table = DataTable(id="images-table")
self.images_table.add_columns("Image", "Digest")
yield self.images_table
yield Static(" ")
# Dynamic controls container; populated based on running state
yield Horizontal(id="services-controls", classes="button-row")
# Create services table with image + digest info
self.services_table = DataTable(id="services-table")
self.services_table.add_columns("Service", "Status", "Health", "Ports", "Image", "Digest")
yield self.services_table
yield Horizontal(
Button("Refresh", variant="default", id="refresh-btn"),
Button("Back", variant="default", id="back-btn"),
classes="button-row"
)
def _create_logs_tab(self) -> ComposeResult:
"""Create the logs viewing tab."""
logs_content = Static("Select a service to view logs", id="logs-content", markup=False)
yield Static("Service Logs", id="logs-header")
yield Horizontal(
Button("Backend", variant="default", id="logs-backend"),
Button("Frontend", variant="default", id="logs-frontend"),
Button("OpenSearch", variant="default", id="logs-opensearch"),
Button("Langflow", variant="default", id="logs-langflow"),
classes="button-row"
)
yield ScrollableContainer(logs_content, id="logs-scroll")
def _create_system_tab(self) -> ComposeResult:
"""Create the system information tab."""
system_info = Static(self._get_system_info(), id="system-info")
yield Static("System Information", id="system-header")
yield system_info
def _get_runtime_status(self) -> Text:
"""Get container runtime status text."""
status_text = Text()
if not self.container_manager.is_available():
status_text.append("WARNING: No container runtime available\n", style="bold red")
status_text.append("Please install Docker or Podman to continue.\n", style="dim")
return status_text
runtime_info = self.container_manager.get_runtime_info()
if runtime_info.runtime_type == RuntimeType.DOCKER:
status_text.append("Docker Runtime\n", style="bold blue")
elif runtime_info.runtime_type == RuntimeType.PODMAN:
status_text.append("Podman Runtime\n", style="bold purple")
else:
status_text.append("Container Runtime\n", style="bold green")
if runtime_info.version:
status_text.append(f"Version: {runtime_info.version}\n", style="dim")
# Check Podman macOS memory if applicable
if runtime_info.runtime_type == RuntimeType.PODMAN:
is_sufficient, message = self.container_manager.check_podman_macos_memory()
if not is_sufficient:
status_text.append(f"WARNING: {message}\n", style="bold yellow")
return status_text
def _get_system_info(self) -> Text:
"""Get system information text."""
info_text = Text()
runtime_info = self.container_manager.get_runtime_info()
info_text.append("Container Runtime Information\n", style="bold")
info_text.append("=" * 30 + "\n")
info_text.append(f"Type: {runtime_info.runtime_type.value}\n")
info_text.append(f"Compose Command: {' '.join(runtime_info.compose_command)}\n")
info_text.append(f"Runtime Command: {' '.join(runtime_info.runtime_command)}\n")
if runtime_info.version:
info_text.append(f"Version: {runtime_info.version}\n")
# Removed compose files section for cleaner display
return info_text
async def on_mount(self) -> None:
"""Initialize the screen when mounted."""
await self._refresh_services()
# Set up auto-refresh every 5 seconds
self.refresh_timer = self.set_interval(5.0, self._auto_refresh)
def on_unmount(self) -> None:
"""Clean up when unmounting."""
if self.refresh_timer:
self.refresh_timer.stop()
# Stop following logs if running
self._stop_follow()
async def _refresh_services(self) -> None:
"""Refresh the services table."""
if not self.container_manager.is_available():
return
services = await self.container_manager.get_service_status(force_refresh=True)
# Fetch image info independent of service state
project_images = await self.container_manager.get_project_images_info()
digest_map = {img: dig for img, dig in project_images}
images = [img for img, _ in project_images]
# Clear existing rows
self.services_table.clear()
if self.images_table:
self.images_table.clear()
# Add service rows
for service_name, service_info in services.items():
status_style = self._get_status_style(service_info.status)
self.services_table.add_row(
service_info.name,
Text(service_info.status.value, style=status_style),
service_info.health or "N/A",
", ".join(service_info.ports) if service_info.ports else "N/A",
service_info.image or "N/A",
digest_map.get(service_info.image or "", "-")
)
# Populate images table (unique images)
if self.images_table:
seen=set()
for image in images:
if not image or image in seen:
continue
seen.add(image)
self.images_table.add_row(image, digest_map.get(image, "-"))
# Update controls based on overall state
self._update_controls(list(services.values()))
# Update mode indicator
self._update_mode_row()
def _get_status_style(self, status: ServiceStatus) -> str:
"""Get the Rich style for a service status."""
status_styles = {
ServiceStatus.RUNNING: "bold green",
ServiceStatus.STOPPED: "bold red",
ServiceStatus.STARTING: "bold yellow",
ServiceStatus.STOPPING: "bold yellow",
ServiceStatus.ERROR: "bold red",
ServiceStatus.MISSING: "dim",
ServiceStatus.UNKNOWN: "dim"
}
return status_styles.get(status, "white")
async def _auto_refresh(self) -> None:
"""Auto-refresh services if not in operation."""
if not self.operation_in_progress:
await self._refresh_services()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "start-btn":
self.run_worker(self._start_services())
elif event.button.id == "stop-btn":
self.run_worker(self._stop_services())
elif event.button.id == "restart-btn":
self.run_worker(self._restart_services())
elif event.button.id == "upgrade-btn":
self.run_worker(self._upgrade_services())
elif event.button.id == "reset-btn":
self.run_worker(self._reset_services())
elif event.button.id == "toggle-mode-btn":
self.action_toggle_mode()
elif event.button.id == "refresh-btn":
self.action_refresh()
elif event.button.id == "back-btn":
self.action_back()
elif event.button.id.startswith("logs-"):
# Map button IDs to actual service names
service_mapping = {
"logs-backend": "openrag-backend",
"logs-frontend": "openrag-frontend",
"logs-opensearch": "opensearch",
"logs-langflow": "langflow"
}
service_name = service_mapping.get(event.button.id)
if service_name:
# Load recent logs then start following
self.run_worker(self._show_logs(service_name))
self._start_follow(service_name)
async def _start_services(self, cpu_mode: bool = False) -> None:
"""Start services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.start_services(cpu_mode):
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
finally:
self.operation_in_progress = False
async def _stop_services(self) -> None:
"""Stop services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.stop_services():
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
finally:
self.operation_in_progress = False
async def _restart_services(self) -> None:
"""Restart services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.restart_services():
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
finally:
self.operation_in_progress = False
async def _upgrade_services(self) -> None:
"""Upgrade services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.upgrade_services():
self.notify(message, severity="success" if is_complete else "warning")
await self._refresh_services()
finally:
self.operation_in_progress = False
async def _reset_services(self) -> None:
"""Reset services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.reset_services():
self.notify(message, severity="success" if is_complete else "warning")
await self._refresh_services()
finally:
self.operation_in_progress = False
def _strip_ansi_codes(self, text: str) -> str:
"""Strip ANSI escape sequences from text."""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
async def _show_logs(self, service_name: str) -> None:
"""Show logs for a service."""
success, logs = await self.container_manager.get_service_logs(service_name)
if success:
# Strip ANSI codes and limit length to prevent UI issues
cleaned_logs = self._strip_ansi_codes(logs)
# Limit to last 5000 characters to prevent performance issues
if len(cleaned_logs) > 5000:
cleaned_logs = "...\n" + cleaned_logs[-5000:]
logs_widget = self.query_one("#logs-content", Static)
logs_widget.update(cleaned_logs)
# Reset buffer to the current content split by lines (cap buffer)
self._logs_buffer = cleaned_logs.splitlines()[-1000:]
# Try to scroll to end of container
try:
scroller = self.query_one("#logs-scroll", ScrollableContainer)
if hasattr(scroller, "scroll_end"):
scroller.scroll_end(animate=False)
elif hasattr(scroller, "scroll_to_end"):
scroller.scroll_to_end()
except Exception:
pass
else:
self.notify(f"Failed to get logs for {service_name}: {logs}", severity="error")
def _stop_follow(self) -> None:
task = self._follow_task
if task and hasattr(task, "cancel"):
try:
task.cancel()
except Exception:
pass
self._follow_task = None
self._follow_service = None
def _start_follow(self, service_name: str) -> None:
# Stop any existing follower and start a new one
self._stop_follow()
self._follow_service = service_name
self._follow_task = self.run_worker(self._follow_logs(), exclusive=False)
async def _follow_logs(self) -> None:
"""Follow logs for the currently selected service and append to the view."""
service_name = self._follow_service
if not service_name:
return
if not self.container_manager.is_available():
return
try:
async for line in self.container_manager.follow_service_logs(service_name):
cleaned = self._strip_ansi_codes(line.rstrip("\n"))
if not cleaned:
continue
self._logs_buffer.append(cleaned)
# Keep only the last 1000 lines to avoid growth
if len(self._logs_buffer) > 1000:
self._logs_buffer = self._logs_buffer[-1000:]
try:
logs_widget = self.query_one("#logs-content", Static)
logs_widget.update("\n".join(self._logs_buffer))
scroller = self.query_one("#logs-scroll", ScrollableContainer)
if hasattr(scroller, "scroll_end"):
scroller.scroll_end(animate=False)
except Exception:
pass
except Exception as e:
self.notify(f"Error following logs: {e}", severity="error")
def action_refresh(self) -> None:
"""Refresh services manually."""
self.run_worker(self._refresh_services())
def _update_mode_row(self) -> None:
"""Update the mode indicator and toggle button label."""
try:
use_cpu = getattr(self.container_manager, "use_cpu_compose", True)
indicator = self.query_one("#mode-indicator", Static)
mode_text = "Mode: CPU (no GPU detected)" if use_cpu else "Mode: GPU"
indicator.update(mode_text)
toggle_btn = self.query_one("#toggle-mode-btn", Button)
toggle_btn.label = "Switch to GPU Mode" if use_cpu else "Switch to CPU Mode"
except Exception:
pass
def action_toggle_mode(self) -> None:
"""Toggle between CPU/GPU compose files and refresh view."""
try:
current = getattr(self.container_manager, "use_cpu_compose", True)
self.container_manager.use_cpu_compose = not current
self.notify("Switched to GPU compose" if not current else "Switched to CPU compose", severity="info")
self._update_mode_row()
self.action_refresh()
except Exception as e:
self.notify(f"Failed to toggle mode: {e}", severity="error")
def _update_controls(self, services: list[ServiceInfo]) -> None:
"""Render control buttons based on running state and set default focus."""
try:
controls = self.query_one("#services-controls", Horizontal)
controls.remove_children()
any_running = any(s.status == ServiceStatus.RUNNING for s in services)
if any_running:
controls.mount(Button("Stop Services", variant="error", id="stop-btn"))
controls.mount(Button("Restart", variant="primary", id="restart-btn"))
controls.mount(Button("Upgrade", variant="warning", id="upgrade-btn"))
controls.mount(Button("Reset", variant="error", id="reset-btn"))
# Focus Stop by default when running
try:
self.query_one("#stop-btn", Button).focus()
except Exception:
pass
else:
controls.mount(Button("Start Services", variant="success", id="start-btn"))
try:
self.query_one("#start-btn", Button).focus()
except Exception:
pass
except Exception:
pass
def action_back(self) -> None:
"""Go back to previous screen."""
self.app.pop_screen()
def action_start(self) -> None:
"""Start services."""
self.run_worker(self._start_services())
def action_stop(self) -> None:
"""Stop services."""
self.run_worker(self._stop_services())
def action_upgrade(self) -> None:
"""Upgrade services."""
self.run_worker(self._upgrade_services())
def action_reset(self) -> None:
"""Reset services."""
self.run_worker(self._reset_services())

184
src/tui/screens/welcome.py Normal file
View file

@ -0,0 +1,184 @@
"""Welcome screen for OpenRAG TUI."""
import os
from pathlib import Path
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button
from rich.text import Text
from rich.align import Align
from dotenv import load_dotenv
from ..managers.container_manager import ContainerManager, ServiceStatus
from ..managers.env_manager import EnvManager
class WelcomeScreen(Screen):
"""Initial welcome screen with setup options."""
BINDINGS = [
("q", "quit", "Quit"),
("enter", "default_action", "Continue"),
("1", "no_auth_setup", "Basic Setup"),
("2", "full_setup", "Advanced Setup"),
("3", "monitor", "Monitor Services"),
]
def __init__(self):
super().__init__()
self.container_manager = ContainerManager()
self.env_manager = EnvManager()
self.services_running = False
self.has_oauth_config = False
self.default_button_id = "basic-setup-btn"
self._state_checked = False
# Load .env file if it exists
load_dotenv()
def compose(self) -> ComposeResult:
"""Create the welcome screen layout."""
yield Container(
Vertical(
Static(self._create_welcome_text(), id="welcome-text"),
self._create_dynamic_buttons(),
id="welcome-container"
),
id="main-container"
)
yield Footer()
def _create_welcome_text(self) -> Text:
"""Create a minimal welcome message."""
welcome_text = Text()
ascii_art = """
"""
welcome_text.append(ascii_art, style="bold blue")
welcome_text.append("Terminal User Interface for OpenRAG\n\n", style="dim")
if self.services_running:
welcome_text.append("✓ Services are currently running\n\n", style="bold green")
elif self.has_oauth_config:
welcome_text.append("OAuth credentials detected — Advanced Setup recommended\n\n", style="bold green")
else:
welcome_text.append("Select a setup below to continue\n\n", style="white")
return welcome_text
def _create_dynamic_buttons(self) -> Horizontal:
"""Create buttons based on current state."""
# Check OAuth config early to determine which buttons to show
has_oauth = (
bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or
bool(os.getenv("MICROSOFT_GRAPH_OAUTH_CLIENT_ID"))
)
buttons = []
if self.services_running:
# Services running - only show monitor
buttons.append(Button("Monitor Services", variant="success", id="monitor-btn"))
else:
# Services not running - show setup options
if has_oauth:
# Only show advanced setup if OAuth is configured
buttons.append(Button("Advanced Setup", variant="success", id="advanced-setup-btn"))
else:
# Only show basic setup if no OAuth
buttons.append(Button("Basic Setup", variant="success", id="basic-setup-btn"))
# Always show monitor option
buttons.append(Button("Monitor Services", variant="default", id="monitor-btn"))
return Horizontal(*buttons, classes="button-row")
async def on_mount(self) -> None:
"""Initialize screen state when mounted."""
# Check if services are running
if self.container_manager.is_available():
services = await self.container_manager.get_service_status()
running_services = [s.name for s in services.values() if s.status == ServiceStatus.RUNNING]
self.services_running = len(running_services) > 0
# Check for OAuth configuration
self.has_oauth_config = (
bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or
bool(os.getenv("MICROSOFT_GRAPH_OAUTH_CLIENT_ID"))
)
# Set default button focus
if self.services_running:
self.default_button_id = "monitor-btn"
elif self.has_oauth_config:
self.default_button_id = "advanced-setup-btn"
else:
self.default_button_id = "basic-setup-btn"
# Update the welcome text and recompose with new state
try:
welcome_widget = self.query_one("#welcome-text")
welcome_widget.update(self._create_welcome_text())
# Focus the appropriate button
if self.services_running:
try:
self.query_one("#monitor-btn").focus()
except:
pass
elif self.has_oauth_config:
try:
self.query_one("#advanced-setup-btn").focus()
except:
pass
else:
try:
self.query_one("#basic-setup-btn").focus()
except:
pass
except:
pass # Widgets might not be mounted yet
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "basic-setup-btn":
self.action_no_auth_setup()
elif event.button.id == "advanced-setup-btn":
self.action_full_setup()
elif event.button.id == "monitor-btn":
self.action_monitor()
def action_default_action(self) -> None:
"""Handle Enter key - go to default action based on state."""
if self.services_running:
self.action_monitor()
elif self.has_oauth_config:
self.action_full_setup()
else:
self.action_no_auth_setup()
def action_no_auth_setup(self) -> None:
"""Switch to basic configuration screen."""
from .config import ConfigScreen
self.app.push_screen(ConfigScreen(mode="no_auth"))
def action_full_setup(self) -> None:
"""Switch to advanced configuration screen."""
from .config import ConfigScreen
self.app.push_screen(ConfigScreen(mode="full"))
def action_monitor(self) -> None:
"""Switch to monitoring screen."""
from .monitor import MonitorScreen
self.app.push_screen(MonitorScreen())
def action_quit(self) -> None:
"""Quit the application."""
self.app.exit()

View file

@ -0,0 +1 @@
"""TUI utilities package."""

154
src/tui/utils/platform.py Normal file
View file

@ -0,0 +1,154 @@
"""Platform detection and container runtime discovery utilities."""
import json
import platform
import subprocess
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class RuntimeType(Enum):
DOCKER_COMPOSE = "docker-compose"
DOCKER = "docker"
PODMAN = "podman"
NONE = "none"
@dataclass
class RuntimeInfo:
runtime_type: RuntimeType
compose_command: list[str]
runtime_command: list[str]
version: Optional[str] = None
class PlatformDetector:
"""Detect platform and container runtime capabilities."""
def __init__(self):
self.platform_system = platform.system()
self.platform_machine = platform.machine()
def detect_runtime(self) -> RuntimeInfo:
"""Detect available container runtime and compose capabilities."""
if self._check_command(["docker", "compose", "--help"]):
version = self._get_docker_version()
return RuntimeInfo(RuntimeType.DOCKER, ["docker", "compose"], ["docker"], version)
if self._check_command(["docker-compose", "--help"]):
version = self._get_docker_version()
return RuntimeInfo(RuntimeType.DOCKER_COMPOSE, ["docker-compose"], ["docker"], version)
if self._check_command(["podman", "compose", "--help"]):
version = self._get_podman_version()
return RuntimeInfo(RuntimeType.PODMAN, ["podman", "compose"], ["podman"], version)
return RuntimeInfo(RuntimeType.NONE, [], [])
def detect_gpu_available(self) -> bool:
"""Best-effort detection of NVIDIA GPU availability for containers."""
try:
res = subprocess.run(["nvidia-smi", "-L"], capture_output=True, text=True, timeout=5)
if res.returncode == 0 and any("GPU" in ln for ln in res.stdout.splitlines()):
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
for cmd in (["docker", "info", "--format", "{{json .Runtimes}}"], ["podman", "info", "--format", "json"]):
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if res.returncode == 0 and "nvidia" in res.stdout.lower():
return True
except (subprocess.TimeoutExpired, FileNotFoundError):
continue
return False
def _check_command(self, cmd: list[str]) -> bool:
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def _get_docker_version(self) -> Optional[str]:
try:
res = subprocess.run(["docker", "--version"], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
return res.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return None
def _get_podman_version(self) -> Optional[str]:
try:
res = subprocess.run(["podman", "--version"], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
return res.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return None
def check_podman_macos_memory(self) -> tuple[bool, int, str]:
"""
Check Podman VM memory on macOS.
Returns (is_sufficient, current_memory_mb, status_message)
"""
if self.platform_system != "Darwin":
return True, 0, "Not running on macOS"
try:
result = subprocess.run(["podman", "machine", "inspect"], capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return False, 0, "Could not inspect Podman machine"
machines = json.loads(result.stdout)
if not machines:
return False, 0, "No Podman machines found"
machine = machines[0]
memory_mb = machine.get("Resources", {}).get("Memory", 0)
min_memory_mb = 8192
is_sufficient = memory_mb >= min_memory_mb
status = f"Current: {memory_mb}MB, Recommended: ≥{min_memory_mb}MB"
if not is_sufficient:
status += "\nTo increase: podman machine stop && podman machine rm && podman machine init --memory 8192 && podman machine start"
return is_sufficient, memory_mb, status
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError) as e:
return False, 0, f"Error checking Podman VM memory: {e}"
def get_installation_instructions(self) -> str:
if self.platform_system == "Darwin":
return """
No container runtime found. Please install one:
Docker Desktop for Mac:
https://docs.docker.com/desktop/install/mac-install/
Or Podman:
brew install podman
podman machine init --memory 8192
podman machine start
"""
elif self.platform_system == "Linux":
return """
No container runtime found. Please install one:
Docker:
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
Or Podman:
# Ubuntu/Debian: sudo apt install podman
# RHEL/Fedora: sudo dnf install podman
"""
elif self.platform_system == "Windows":
return """
No container runtime found. Please install one:
Docker Desktop for Windows:
https://docs.docker.com/desktop/install/windows-install/
Or Podman Desktop:
https://podman-desktop.io/downloads
"""
else:
return """
No container runtime found. Please install Docker or Podman for your platform:
- Docker: https://docs.docker.com/get-docker/
- Podman: https://podman.io/getting-started/installation
"""

133
src/tui/utils/validation.py Normal file
View file

@ -0,0 +1,133 @@
"""Input validation utilities for TUI."""
import os
import re
from pathlib import Path
from typing import Optional
class ValidationError(Exception):
"""Validation error exception."""
pass
def validate_env_var_name(name: str) -> bool:
"""Validate environment variable name format."""
return bool(re.match(r'^[A-Z][A-Z0-9_]*$', name))
def validate_path(path: str, must_exist: bool = False, must_be_dir: bool = False) -> bool:
"""Validate file/directory path."""
if not path:
return False
try:
path_obj = Path(path).expanduser().resolve()
if must_exist and not path_obj.exists():
return False
if must_be_dir and path_obj.exists() and not path_obj.is_dir():
return False
return True
except (OSError, ValueError):
return False
def validate_url(url: str) -> bool:
"""Validate URL format."""
if not url:
return False
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain
r'localhost|' # localhost
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # IP
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(url_pattern.match(url))
def validate_openai_api_key(key: str) -> bool:
"""Validate OpenAI API key format."""
if not key:
return False
return key.startswith('sk-') and len(key) > 20
def validate_google_oauth_client_id(client_id: str) -> bool:
"""Validate Google OAuth client ID format."""
if not client_id:
return False
return client_id.endswith('.apps.googleusercontent.com')
def validate_non_empty(value: str) -> bool:
"""Validate that value is not empty."""
return bool(value and value.strip())
def sanitize_env_value(value: str) -> str:
"""Sanitize environment variable value."""
# Remove leading/trailing whitespace
value = value.strip()
# Remove quotes if they wrap the entire value
if len(value) >= 2:
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
return value
def validate_documents_paths(paths_str: str) -> tuple[bool, str, list[str]]:
"""
Validate comma-separated documents paths for volume mounting.
Returns:
(is_valid, error_message, validated_paths)
"""
if not paths_str:
return False, "Documents paths cannot be empty", []
paths = [path.strip() for path in paths_str.split(',') if path.strip()]
if not paths:
return False, "No valid paths provided", []
validated_paths = []
for path in paths:
try:
path_obj = Path(path).expanduser().resolve()
# Check if path exists
if not path_obj.exists():
# Try to create it
try:
path_obj.mkdir(parents=True, exist_ok=True)
except (OSError, PermissionError) as e:
return False, f"Cannot create directory '{path}': {e}", []
# Check if it's a directory
if not path_obj.is_dir():
return False, f"Path '{path}' must be a directory", []
# Check if we can write to it
try:
test_file = path_obj / ".openrag_test"
test_file.touch()
test_file.unlink()
except (OSError, PermissionError):
return False, f"Directory '{path}' is not writable", []
validated_paths.append(str(path_obj))
except (OSError, ValueError) as e:
return False, f"Invalid path '{path}': {e}", []
return True, "All paths valid", validated_paths

86
uv.lock generated
View file

@ -864,6 +864,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/83/60/d497a310bde3f01cb805196ac61b7ad6dc5dcf8dce66634dc34364b20b4f/lazy_loader-0.4-py3-none-any.whl", hash = "sha256:342aa8e14d543a154047afb4ba8ef17f5563baad3fc610d7b15b213b0f119efc", size = 12097, upload-time = "2024-04-05T13:03:10.514Z" },
]
[[package]]
name = "linkify-it-py"
version = "2.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
]
[[package]]
name = "litellm"
version = "1.74.1"
@ -923,6 +935,14 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528, upload-time = "2023-06-03T06:41:11.019Z" },
]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
plugins = [
{ name = "mdit-py-plugins" },
]
[[package]]
name = "marko"
version = "2.1.4"
@ -994,6 +1014,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/64/56/f98938bded6b2ac779c55e36bf5277d1fe4154da2246aa0621c1358efa2b/mcp_subscribe-0.1.1-py3-none-any.whl", hash = "sha256:617b8dc30253a992bddcb6023de6cce7eb95d3b976dc9a828892242c7a2c6eaa", size = 5092, upload-time = "2025-04-28T05:46:42.024Z" },
]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
@ -1389,8 +1421,12 @@ dependencies = [
{ name = "opensearch-py", extra = ["async"] },
{ name = "psutil" },
{ name = "pyjwt" },
{ name = "python-dotenv" },
{ name = "python-multipart" },
{ name = "rich" },
{ name = "starlette" },
{ name = "textual" },
{ name = "textual-fspicker" },
{ name = "torch", version = "2.7.1+cu128", source = { registry = "https://download.pytorch.org/whl/cu128" }, marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "torch", version = "2.8.0", source = { registry = "https://pypi.org/simple" }, marker = "platform_machine != 'x86_64' or sys_platform != 'linux'" },
{ name = "uvicorn" },
@ -1411,8 +1447,12 @@ requires-dist = [
{ name = "opensearch-py", extras = ["async"], specifier = ">=3.0.0" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyjwt", specifier = ">=2.8.0" },
{ name = "python-dotenv", specifier = ">=1.0.0" },
{ name = "python-multipart", specifier = ">=0.0.20" },
{ name = "rich", specifier = ">=13.0.0" },
{ name = "starlette", specifier = ">=0.47.1" },
{ name = "textual", specifier = ">=0.45.0" },
{ name = "textual-fspicker", specifier = ">=0.6.0" },
{ name = "torch", marker = "platform_machine != 'x86_64' or sys_platform != 'linux'", specifier = ">=2.7.1" },
{ name = "torch", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'", specifier = ">=2.7.1", index = "https://download.pytorch.org/whl/cu128" },
{ name = "uvicorn", specifier = ">=0.35.0" },
@ -1530,6 +1570,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/89/c7/5572fa4a3f45740eaab6ae86fcdf7195b55beac1371ac8c619d880cfe948/pillow-11.3.0-cp314-cp314t-win_arm64.whl", hash = "sha256:79ea0d14d3ebad43ec77ad5272e6ff9bba5b679ef73375ea760261207fa8e0aa", size = 2512835, upload-time = "2025-07-01T09:15:50.399Z" },
]
[[package]]
name = "platformdirs"
version = "4.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/23/e8/21db9c9987b0e728855bd57bff6984f67952bea55d6f75e055c46b5383e8/platformdirs-4.4.0.tar.gz", hash = "sha256:ca753cf4d81dc309bc67b0ea38fd15dc97bc30ce419a7f58d13eb3bf14c4febf", size = 21634, upload-time = "2025-08-26T14:32:04.268Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/40/4b/2028861e724d3bd36227adfa20d3fd24c3fc6d52032f4a93c133be5d17ce/platformdirs-4.4.0-py3-none-any.whl", hash = "sha256:abd01743f24e5287cd7a5db3752faf1a2d65353f38ec26d98e25a6db65958c85", size = 18654, upload-time = "2025-08-26T14:32:02.735Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
@ -2288,6 +2337,34 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252, upload-time = "2022-10-06T17:21:44.262Z" },
]
[[package]]
name = "textual"
version = "6.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify", "plugins"] },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/da/44/4b524b2f06e0fa6c4ede56a4e9af5edd5f3f83cf2eea5cb4fd0ce5bbe063/textual-6.1.0.tar.gz", hash = "sha256:cc89826ca2146c645563259320ca4ddc75d183c77afb7d58acdd46849df9144d", size = 1564786, upload-time = "2025-09-02T11:42:34.655Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/43/f91e041f239b54399310a99041faf33beae9a6e628671471d0fcd6276af4/textual-6.1.0-py3-none-any.whl", hash = "sha256:a3f5e6710404fcdc6385385db894699282dccf2ad50103cebc677403c1baadd5", size = 707840, upload-time = "2025-09-02T11:42:32.746Z" },
]
[[package]]
name = "textual-fspicker"
version = "0.6.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "textual" },
]
sdist = { url = "https://files.pythonhosted.org/packages/15/2e/8c1ae6f0c26af2fe0c49d61d42c91d0077cbfd984df049d7e3d82a40d93d/textual_fspicker-0.6.0.tar.gz", hash = "sha256:0da0e3f35025f72c5b90557d12777c9f67c674470b3263cbe2c2de38f5b70c3c", size = 16157, upload-time = "2025-08-26T15:38:19.805Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/06/62/463994497050869e517dde4da7f628599f6aea05dcd6bbb14d2a945c2499/textual_fspicker-0.6.0-py3-none-any.whl", hash = "sha256:4d0ddbebdc5d7c93ad0d1f48627003a60690bc6d382267ee033cfeb2e6b4949c", size = 24715, upload-time = "2025-08-26T15:38:14.344Z" },
]
[[package]]
name = "tifffile"
version = "2025.6.11"
@ -2552,6 +2629,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" },
]
[[package]]
name = "uc-micro-py"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
]
[[package]]
name = "uritemplate"
version = "4.2.0"