Merge pull request #14 from langflow-ai/tui

Tui improvements
This commit is contained in:
Sebastián Estévez 2025-09-04 13:40:54 -04:00 committed by GitHub
commit d5dda4ce2e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1152 additions and 230 deletions

View file

@ -8,9 +8,11 @@ from .screens.welcome import WelcomeScreen
from .screens.config import ConfigScreen
from .screens.monitor import MonitorScreen
from .screens.logs import LogsScreen
from .screens.diagnostics import DiagnosticsScreen
from .managers.env_manager import EnvManager
from .managers.container_manager import ContainerManager
from .utils.platform import PlatformDetector
from .widgets.diagnostics_notification import notify_with_diagnostics
class OpenRAGTUI(App):
@ -181,7 +183,8 @@ class OpenRAGTUI(App):
"""Initialize the application."""
# Check for runtime availability and show appropriate screen
if not self.container_manager.is_available():
self.notify(
notify_with_diagnostics(
self,
"No container runtime found. Please install Docker or Podman.",
severity="warning",
timeout=10
@ -193,7 +196,7 @@ class OpenRAGTUI(App):
# Start with welcome screen
self.push_screen(WelcomeScreen())
def action_quit(self) -> None:
async def action_quit(self) -> None:
"""Quit the application."""
self.exit()

View file

@ -4,7 +4,7 @@ import asyncio
import json
import subprocess
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, AsyncIterator
@ -30,7 +30,7 @@ class ServiceInfo:
name: str
status: ServiceStatus
health: Optional[str] = None
ports: List[str] = None
ports: List[str] = field(default_factory=list)
image: Optional[str] = None
image_digest: Optional[str] = None
created: Optional[str] = None
@ -115,6 +115,41 @@ class ContainerManager:
except Exception as e:
return False, "", f"Command execution failed: {e}"
async def _run_compose_command_streaming(self, args: List[str], cpu_mode: Optional[bool] = None) -> AsyncIterator[str]:
"""Run a compose command and yield output lines in real-time."""
if not self.is_available():
yield "No container runtime available"
return
if cpu_mode is None:
cpu_mode = self.use_cpu_compose
compose_file = self.cpu_compose_file if cpu_mode else self.compose_file
cmd = self.runtime_info.compose_command + ["-f", str(compose_file)] + args
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT, # Combine stderr with stdout for unified output
cwd=Path.cwd()
)
# Simple approach: read line by line and yield each one
while True:
line = await process.stdout.readline()
if not line:
break
line_text = line.decode().rstrip()
if line_text:
yield line_text
# Wait for process to complete
await process.wait()
except Exception as e:
yield f"Command execution failed: {e}"
async def _run_runtime_command(self, args: List[str]) -> tuple[bool, str, str]:
"""Run a runtime command (docker/podman) and return (success, stdout, stderr)."""
if not self.is_available():
@ -139,6 +174,48 @@ class ContainerManager:
except Exception as e:
return False, "", f"Command execution failed: {e}"
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
"""Process a service JSON object and add it to the services dict."""
# Debug print to see the actual service data
print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}")
container_name = service.get("Name", "")
# Map container name to service name
service_name = self.container_name_map.get(container_name)
if not service_name:
return
state = service.get("State", "").lower()
# Map compose states to our status enum
if "running" in state:
status = ServiceStatus.RUNNING
elif "exited" in state or "stopped" in state:
status = ServiceStatus.STOPPED
elif "starting" in state:
status = ServiceStatus.STARTING
else:
status = ServiceStatus.UNKNOWN
# Extract health - use Status if Health is empty
health = service.get("Health", "") or service.get("Status", "N/A")
# Extract ports
ports_str = service.get("Ports", "")
ports = [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else []
# Extract image
image = service.get("Image", "N/A")
services[service_name] = ServiceInfo(
name=service_name,
status=status,
health=health,
ports=ports,
image=image,
)
async def get_service_status(self, force_refresh: bool = False) -> Dict[str, ServiceInfo]:
"""Get current status of all services."""
current_time = time.time()
@ -149,75 +226,100 @@ class ContainerManager:
services = {}
# Get compose service status
success, stdout, stderr = await self._run_compose_command(["ps", "--format", "json"])
if success and stdout.strip():
try:
# Parse JSON output - each line is a separate JSON object
for line in stdout.strip().split('\n'):
if line.strip() and line.startswith('{'):
service = json.loads(line)
container_name = service.get("Name", "")
# Different approach for Podman vs Docker
if self.runtime_info.runtime_type == RuntimeType.PODMAN:
# For Podman, use direct podman ps command instead of compose
cmd = ["ps", "--all", "--format", "json"]
success, stdout, stderr = await self._run_runtime_command(cmd)
if success and stdout.strip():
try:
containers = json.loads(stdout.strip())
for container in containers:
# Get container name and map to service name
names = container.get("Names", [])
if not names:
continue
# Map container name to service name
container_name = names[0]
service_name = self.container_name_map.get(container_name)
if not service_name:
continue
state = service.get("State", "").lower()
# Map compose states to our status enum
# Get container state
state = container.get("State", "").lower()
if "running" in state:
status = ServiceStatus.RUNNING
elif "exited" in state or "stopped" in state:
status = ServiceStatus.STOPPED
elif "starting" in state:
elif "created" in state:
status = ServiceStatus.STARTING
else:
status = ServiceStatus.UNKNOWN
# Extract health - use Status if Health is empty
health = service.get("Health", "") or service.get("Status", "N/A")
# Extract ports
ports_str = service.get("Ports", "")
ports = [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else []
# Extract image
image = service.get("Image", "N/A")
# Get other container info
image = container.get("Image", "N/A")
ports = []
# Handle case where Ports might be None instead of an empty list
container_ports = container.get("Ports") or []
if isinstance(container_ports, list):
for port in container_ports:
host_port = port.get("host_port")
container_port = port.get("container_port")
if host_port and container_port:
ports.append(f"{host_port}:{container_port}")
services[service_name] = ServiceInfo(
name=service_name,
status=status,
health=health,
health=state,
ports=ports,
image=image,
)
except json.JSONDecodeError:
# Fallback to parsing text output
lines = stdout.strip().split('\n')
for line in lines[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
name = parts[0]
# Only include our expected services
if name not in self.expected_services:
continue
state = parts[2].lower()
if "up" in state:
status = ServiceStatus.RUNNING
elif "exit" in state:
status = ServiceStatus.STOPPED
else:
status = ServiceStatus.UNKNOWN
services[name] = ServiceInfo(name=name, status=status)
except json.JSONDecodeError:
pass
else:
# For Docker, use compose ps command
success, stdout, stderr = await self._run_compose_command(["ps", "--format", "json"])
if success and stdout.strip():
try:
# Handle both single JSON object (Podman) and multiple JSON objects (Docker)
if stdout.strip().startswith('[') and stdout.strip().endswith(']'):
# JSON array format
service_list = json.loads(stdout.strip())
for service in service_list:
self._process_service_json(service, services)
else:
# Line-by-line JSON format
for line in stdout.strip().split('\n'):
if line.strip() and line.startswith('{'):
service = json.loads(line)
self._process_service_json(service, services)
except json.JSONDecodeError:
# Fallback to parsing text output
lines = stdout.strip().split('\n')
if len(lines) > 1: # Make sure we have at least a header and one line
for line in lines[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 3:
name = parts[0]
# Only include our expected services
if name not in self.expected_services:
continue
state = parts[2].lower()
if "up" in state:
status = ServiceStatus.RUNNING
elif "exit" in state:
status = ServiceStatus.STOPPED
else:
status = ServiceStatus.UNKNOWN
services[name] = ServiceInfo(name=name, status=status)
# Add expected services that weren't found
for expected in self.expected_services:
@ -319,22 +421,31 @@ class ContainerManager:
"""Upgrade services (pull latest images and restart) and yield progress updates."""
yield False, "Pulling latest images..."
# Pull latest images
success, stdout, stderr = await self._run_compose_command(["pull"], cpu_mode)
# Pull latest images with streaming output
pull_success = True
async for line in self._run_compose_command_streaming(["pull"], cpu_mode):
yield False, line
# Check for error patterns in the output
if "error" in line.lower() or "failed" in line.lower():
pull_success = False
if not success:
yield False, f"Failed to pull images: {stderr}"
return
if not pull_success:
yield False, "Failed to pull some images, but continuing with restart..."
yield False, "Images updated, restarting services..."
# Restart with new images
success, stdout, stderr = await self._run_compose_command(["up", "-d", "--force-recreate"], cpu_mode)
# Restart with new images using streaming output
restart_success = True
async for line in self._run_compose_command_streaming(["up", "-d", "--force-recreate"], cpu_mode):
yield False, line
# Check for error patterns in the output
if "error" in line.lower() or "failed" in line.lower():
restart_success = False
if success:
if restart_success:
yield True, "Services upgraded and restarted successfully"
else:
yield False, f"Failed to restart services after upgrade: {stderr}"
yield False, "Some errors occurred during service restart"
async def reset_services(self) -> AsyncIterator[tuple[bool, str]]:
"""Reset all services (stop, remove containers/volumes, clear data) and yield progress updates."""
@ -386,12 +497,15 @@ class ContainerManager:
cwd=Path.cwd()
)
while True:
line = await process.stdout.readline()
if line:
yield line.decode().rstrip()
else:
break
if process.stdout:
while True:
line = await process.stdout.readline()
if line:
yield line.decode().rstrip()
else:
break
else:
yield "Error: Unable to read process output"
except Exception as e:
yield f"Error following logs: {e}"
@ -422,9 +536,51 @@ class ContainerManager:
return stats
async def debug_podman_services(self) -> str:
"""Run a direct Podman command to check services status for debugging."""
if self.runtime_info.runtime_type != RuntimeType.PODMAN:
return "Not using Podman"
# Try direct podman command
cmd = ["podman", "ps", "--all", "--format", "json"]
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=Path.cwd()
)
stdout, stderr = await process.communicate()
stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""
result = f"Command: {' '.join(cmd)}\n"
result += f"Return code: {process.returncode}\n"
result += f"Stdout: {stdout_text}\n"
result += f"Stderr: {stderr_text}\n"
# Try to parse the output
if stdout_text.strip():
try:
containers = json.loads(stdout_text)
result += f"\nFound {len(containers)} containers:\n"
for container in containers:
name = container.get("Names", [""])[0]
state = container.get("State", "")
result += f" - {name}: {state}\n"
except json.JSONDecodeError as e:
result += f"\nFailed to parse JSON: {e}\n"
return result
except Exception as e:
return f"Error executing command: {e}"
def check_podman_macos_memory(self) -> tuple[bool, str]:
"""Check if Podman VM has sufficient memory on macOS."""
if self.runtime_info.runtime_type != RuntimeType.PODMAN:
return True, "Not using Podman"
return self.platform_detector.check_podman_macos_memory()[:2] # Return is_sufficient, message
is_sufficient, memory_mb, message = self.platform_detector.check_podman_macos_memory()
return is_sufficient, message

View file

@ -3,6 +3,7 @@
import os
import secrets
import string
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, List
from dataclasses import dataclass, field
@ -181,9 +182,10 @@ class EnvManager:
try:
# Ensure secure defaults (including Langflow secret key) are set before saving
self.setup_secure_defaults()
# Create backup if file exists
# Create timestamped backup if file exists
if self.env_file.exists():
backup_file = self.env_file.with_suffix('.env.backup')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = self.env_file.with_suffix(f'.env.backup.{timestamp}')
self.env_file.rename(backup_file)
with open(self.env_file, 'w') as f:

View file

@ -399,6 +399,17 @@ class ConfigScreen(Screen):
# Add spacing
yield Static(" ")
def on_mount(self) -> None:
"""Initialize the screen when mounted."""
# Focus the first input field
try:
# Find the first input field and focus it
inputs = self.query(Input)
if inputs:
inputs[0].focus()
except Exception:
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "generate-btn":

View file

@ -0,0 +1,402 @@
"""Diagnostics screen for OpenRAG TUI."""
import asyncio
import logging
import os
import datetime
from pathlib import Path
from typing import List, Optional
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Label, Log
from rich.text import Text
from ..managers.container_manager import ContainerManager
class DiagnosticsScreen(Screen):
"""Diagnostics screen for debugging OpenRAG."""
CSS = """
#diagnostics-log {
border: solid $accent;
padding: 1;
margin: 1;
background: $surface;
min-height: 20;
}
.button-row Button {
margin: 0 1;
}
.copy-indicator {
background: $success;
color: $text;
padding: 1;
margin: 1;
text-align: center;
}
"""
BINDINGS = [
("escape", "back", "Back"),
("r", "refresh", "Refresh"),
("ctrl+c", "copy", "Copy to Clipboard"),
("ctrl+s", "save", "Save to File"),
]
def __init__(self):
super().__init__()
self.container_manager = ContainerManager()
self._logger = logging.getLogger("openrag.diagnostics")
self._status_timer = None
def compose(self) -> ComposeResult:
"""Create the diagnostics screen layout."""
yield Header()
with Container(id="main-container"):
yield Static("OpenRAG Diagnostics", classes="tab-header")
with Horizontal(classes="button-row"):
yield Button("Refresh", variant="primary", id="refresh-btn")
yield Button("Check Podman", variant="default", id="check-podman-btn")
yield Button("Check Docker", variant="default", id="check-docker-btn")
yield Button("Copy to Clipboard", variant="default", id="copy-btn")
yield Button("Save to File", variant="default", id="save-btn")
yield Button("Back", variant="default", id="back-btn")
# Status indicator for copy/save operations
yield Static("", id="copy-status", classes="copy-indicator")
with ScrollableContainer(id="diagnostics-scroll"):
yield Log(id="diagnostics-log", highlight=True)
yield Footer()
def on_mount(self) -> None:
"""Initialize the screen."""
self.run_diagnostics()
# Focus the first button (refresh-btn)
try:
self.query_one("#refresh-btn").focus()
except Exception:
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "refresh-btn":
self.action_refresh()
elif event.button.id == "check-podman-btn":
asyncio.create_task(self.check_podman())
elif event.button.id == "check-docker-btn":
asyncio.create_task(self.check_docker())
elif event.button.id == "copy-btn":
self.copy_to_clipboard()
elif event.button.id == "save-btn":
self.save_to_file()
elif event.button.id == "back-btn":
self.action_back()
def action_refresh(self) -> None:
"""Refresh diagnostics."""
self.run_diagnostics()
def action_copy(self) -> None:
"""Copy log content to clipboard (keyboard shortcut)."""
self.copy_to_clipboard()
def copy_to_clipboard(self) -> None:
"""Copy log content to clipboard."""
try:
log = self.query_one("#diagnostics-log", Log)
content = "\n".join(str(line) for line in log.lines)
status = self.query_one("#copy-status", Static)
# Try to use pyperclip if available
try:
import pyperclip
pyperclip.copy(content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
self._hide_status_after_delay(status)
return
except ImportError:
pass
# Fallback to platform-specific clipboard commands
import subprocess
import platform
system = platform.system()
if system == "Darwin": # macOS
process = subprocess.Popen(
["pbcopy"], stdin=subprocess.PIPE, text=True
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
elif system == "Windows":
process = subprocess.Popen(
["clip"], stdin=subprocess.PIPE, text=True
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
elif system == "Linux":
# Try xclip first, then xsel
try:
process = subprocess.Popen(
["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
text=True
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
except FileNotFoundError:
try:
process = subprocess.Popen(
["xsel", "--clipboard", "--input"],
stdin=subprocess.PIPE,
text=True
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
except FileNotFoundError:
self.notify("Clipboard utilities not found. Install xclip or xsel.", severity="error")
status.update("❌ Clipboard utilities not found. Install xclip or xsel.")
else:
self.notify("Clipboard not supported on this platform", severity="error")
status.update("❌ Clipboard not supported on this platform")
self._hide_status_after_delay(status)
except Exception as e:
self.notify(f"Failed to copy to clipboard: {e}", severity="error")
status = self.query_one("#copy-status", Static)
status.update(f"❌ Failed to copy: {e}")
self._hide_status_after_delay(status)
def _hide_status_after_delay(self, status_widget: Static, delay: float = 3.0) -> None:
"""Hide the status message after a delay."""
# Cancel any existing timer
if self._status_timer:
self._status_timer.cancel()
# Create and run the timer task
self._status_timer = asyncio.create_task(self._clear_status_after_delay(status_widget, delay))
async def _clear_status_after_delay(self, status_widget: Static, delay: float) -> None:
"""Clear the status message after a delay."""
await asyncio.sleep(delay)
status_widget.update("")
def action_save(self) -> None:
"""Save log content to file (keyboard shortcut)."""
self.save_to_file()
def save_to_file(self) -> None:
"""Save log content to a file."""
try:
log = self.query_one("#diagnostics-log", Log)
content = "\n".join(str(line) for line in log.lines)
status = self.query_one("#copy-status", Static)
# Create logs directory if it doesn't exist
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# Create a timestamped filename
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = logs_dir / f"openrag_diagnostics_{timestamp}.txt"
# Save to file
with open(filename, "w") as f:
f.write(content)
self.notify(f"Saved to {filename}", severity="information")
status.update(f"✓ Saved to {filename}")
# Log the save operation
self._logger.info(f"Diagnostics saved to {filename}")
self._hide_status_after_delay(status)
except Exception as e:
error_msg = f"Failed to save file: {e}"
self.notify(error_msg, severity="error")
self._logger.error(error_msg)
status = self.query_one("#copy-status", Static)
status.update(f"{error_msg}")
self._hide_status_after_delay(status)
def action_back(self) -> None:
"""Go back to previous screen."""
self.app.pop_screen()
def _get_system_info(self) -> Text:
"""Get system information text."""
info_text = Text()
runtime_info = self.container_manager.get_runtime_info()
info_text.append("Container Runtime Information\n", style="bold")
info_text.append("=" * 30 + "\n")
info_text.append(f"Type: {runtime_info.runtime_type.value}\n")
info_text.append(f"Compose Command: {' '.join(runtime_info.compose_command)}\n")
info_text.append(f"Runtime Command: {' '.join(runtime_info.runtime_command)}\n")
if runtime_info.version:
info_text.append(f"Version: {runtime_info.version}\n")
return info_text
def run_diagnostics(self) -> None:
"""Run all diagnostics."""
log = self.query_one("#diagnostics-log", Log)
log.clear()
# System information
system_info = self._get_system_info()
log.write(str(system_info))
log.write("")
# Run async diagnostics
asyncio.create_task(self._run_async_diagnostics())
async def _run_async_diagnostics(self) -> None:
"""Run asynchronous diagnostics."""
log = self.query_one("#diagnostics-log", Log)
# Check services
log.write("[bold green]Service Status[/bold green]")
services = await self.container_manager.get_service_status(force_refresh=True)
for name, info in services.items():
status_color = "green" if info.status == "running" else "red"
log.write(f"[bold]{name}[/bold]: [{status_color}]{info.status.value}[/{status_color}]")
if info.health:
log.write(f" Health: {info.health}")
if info.ports:
log.write(f" Ports: {', '.join(info.ports)}")
if info.image:
log.write(f" Image: {info.image}")
log.write("")
# Check for Podman-specific issues
if self.container_manager.runtime_info.runtime_type.name == "PODMAN":
await self.check_podman()
async def check_podman(self) -> None:
"""Run Podman-specific diagnostics."""
log = self.query_one("#diagnostics-log", Log)
log.write("[bold green]Podman Diagnostics[/bold green]")
# Check if using Podman
if self.container_manager.runtime_info.runtime_type.name != "PODMAN":
log.write("[yellow]Not using Podman[/yellow]")
return
# Check Podman version
cmd = ["podman", "--version"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write(f"Podman version: {stdout.decode().strip()}")
else:
log.write(f"[red]Failed to get Podman version: {stderr.decode().strip()}[/red]")
# Check Podman containers
cmd = ["podman", "ps", "--all"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write("Podman containers:")
for line in stdout.decode().strip().split("\n"):
log.write(f" {line}")
else:
log.write(f"[red]Failed to list Podman containers: {stderr.decode().strip()}[/red]")
# Check Podman compose
cmd = ["podman", "compose", "ps"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.container_manager.compose_file.parent
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write("Podman compose services:")
for line in stdout.decode().strip().split("\n"):
log.write(f" {line}")
else:
log.write(f"[red]Failed to list Podman compose services: {stderr.decode().strip()}[/red]")
log.write("")
async def check_docker(self) -> None:
"""Run Docker-specific diagnostics."""
log = self.query_one("#diagnostics-log", Log)
log.write("[bold green]Docker Diagnostics[/bold green]")
# Check if using Docker
if "DOCKER" not in self.container_manager.runtime_info.runtime_type.name:
log.write("[yellow]Not using Docker[/yellow]")
return
# Check Docker version
cmd = ["docker", "--version"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write(f"Docker version: {stdout.decode().strip()}")
else:
log.write(f"[red]Failed to get Docker version: {stderr.decode().strip()}[/red]")
# Check Docker containers
cmd = ["docker", "ps", "--all"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write("Docker containers:")
for line in stdout.decode().strip().split("\n"):
log.write(f" {line}")
else:
log.write(f"[red]Failed to list Docker containers: {stderr.decode().strip()}[/red]")
# Check Docker compose
cmd = ["docker", "compose", "ps"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.container_manager.compose_file.parent
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
log.write("Docker compose services:")
for line in stdout.decode().strip().split("\n"):
log.write(f" {line}")
else:
log.write(f"[red]Failed to list Docker compose services: {stderr.decode().strip()}[/red]")
log.write("")
# Made with Bob

View file

@ -19,41 +19,36 @@ class LogsScreen(Screen):
("f", "follow", "Follow Logs"),
("c", "clear", "Clear"),
("r", "refresh", "Refresh"),
("a", "toggle_auto_scroll", "Toggle Auto Scroll"),
("g", "scroll_top", "Go to Top"),
("G", "scroll_bottom", "Go to Bottom"),
("j", "scroll_down", "Scroll Down"),
("k", "scroll_up", "Scroll Up"),
("ctrl+u", "scroll_page_up", "Page Up"),
("ctrl+f", "scroll_page_down", "Page Down"),
]
def __init__(self, initial_service: str = "openrag-backend"):
super().__init__()
self.container_manager = ContainerManager()
# Validate the initial service against available options
valid_services = ["openrag-backend", "openrag-frontend", "opensearch", "langflow", "dashboards"]
if initial_service not in valid_services:
initial_service = "openrag-backend" # fallback
self.current_service = initial_service
self.logs_area = None
self.following = False
self.follow_task = None
self.auto_scroll = True
def compose(self) -> ComposeResult:
"""Create the logs screen layout."""
yield Header()
yield Container(
Vertical(
Static("Service Logs", id="logs-title"),
Horizontal(
Static("Service:", classes="label"),
Select([
("openrag-backend", "Backend"),
("openrag-frontend", "Frontend"),
("opensearch", "OpenSearch"),
("langflow", "Langflow"),
("dashboards", "Dashboards")
], value=self.current_service, id="service-select"),
Button("Refresh", variant="default", id="refresh-btn"),
Button("Follow", variant="primary", id="follow-btn"),
Button("Clear", variant="default", id="clear-btn"),
classes="controls-row"
),
Static(f"Service Logs: {self.current_service}", id="logs-title"),
self._create_logs_area(),
Horizontal(
Button("Back", variant="default", id="back-btn"),
classes="button-row"
),
id="logs-content"
),
id="main-container"
@ -72,29 +67,30 @@ class LogsScreen(Screen):
async def on_mount(self) -> None:
"""Initialize the screen when mounted."""
# Set the correct service in the select widget after a brief delay
try:
select = self.query_one("#service-select")
# Set a default first, then set the desired value
select.value = "openrag-backend"
if self.current_service in ["openrag-backend", "openrag-frontend", "opensearch", "langflow", "dashboards"]:
select.value = self.current_service
except Exception as e:
# If setting the service fails, just use the default
pass
await self._load_logs()
# Focus the logs area since there are no buttons
try:
self.logs_area.focus()
except Exception:
pass
def on_unmount(self) -> None:
"""Clean up when unmounting."""
self._stop_following()
def on_select_changed(self, event: Select.Changed) -> None:
"""Handle service selection change."""
if event.select.id == "service-select":
self.current_service = event.value
self._stop_following()
self.run_worker(self._load_logs())
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "refresh-btn":
self.action_refresh()
elif event.button.id == "follow-btn":
self.action_follow()
elif event.button.id == "clear-btn":
self.action_clear()
elif event.button.id == "back-btn":
self.action_back()
async def _load_logs(self, lines: int = 200) -> None:
"""Load recent logs for the current service."""
@ -106,21 +102,19 @@ class LogsScreen(Screen):
if success:
self.logs_area.text = logs
# Scroll to bottom
self.logs_area.cursor_position = len(logs)
# Scroll to bottom if auto scroll is enabled
if self.auto_scroll:
self.logs_area.scroll_end()
else:
self.logs_area.text = f"Failed to load logs: {logs}"
def _stop_following(self) -> None:
"""Stop following logs."""
self.following = False
if self.follow_task and not self.follow_task.done():
if self.follow_task and not self.follow_task.is_finished:
self.follow_task.cancel()
# Update button text
follow_btn = self.query_one("#follow-btn")
follow_btn.label = "Follow"
follow_btn.variant = "primary"
# No button to update since we removed it
async def _follow_logs(self) -> None:
"""Follow logs in real-time."""
@ -143,8 +137,9 @@ class LogsScreen(Screen):
new_text = '\n'.join(lines)
self.logs_area.text = new_text
# Scroll to bottom
self.logs_area.cursor_position = len(new_text)
# Scroll to bottom if auto scroll is enabled
if self.auto_scroll:
self.logs_area.scroll_end()
except asyncio.CancelledError:
pass
@ -165,9 +160,6 @@ class LogsScreen(Screen):
self._stop_following()
else:
self.following = True
follow_btn = self.query_one("#follow-btn")
follow_btn.label = "Stop Following"
follow_btn.variant = "error"
# Start following
self.follow_task = self.run_worker(self._follow_logs(), exclusive=False)
@ -176,6 +168,51 @@ class LogsScreen(Screen):
"""Clear the logs area."""
self.logs_area.text = ""
def action_toggle_auto_scroll(self) -> None:
"""Toggle auto scroll on/off."""
self.auto_scroll = not self.auto_scroll
status = "enabled" if self.auto_scroll else "disabled"
self.notify(f"Auto scroll {status}", severity="information")
def action_scroll_top(self) -> None:
"""Scroll to the top of logs."""
self.logs_area.scroll_home()
def action_scroll_bottom(self) -> None:
"""Scroll to the bottom of logs."""
self.logs_area.scroll_end()
def action_scroll_down(self) -> None:
"""Scroll down one line."""
self.logs_area.scroll_down()
def action_scroll_up(self) -> None:
"""Scroll up one line."""
self.logs_area.scroll_up()
def action_scroll_page_up(self) -> None:
"""Scroll up one page."""
self.logs_area.scroll_page_up()
def action_scroll_page_down(self) -> None:
"""Scroll down one page."""
self.logs_area.scroll_page_down()
def on_key(self, event) -> None:
"""Handle key presses that might be intercepted by TextArea."""
key = event.key
# Handle keys that TextArea might intercept
if key == "ctrl+u":
self.action_scroll_page_up()
event.prevent_default()
elif key == "ctrl+f":
self.action_scroll_page_down()
event.prevent_default()
elif key.upper() == "G":
self.action_scroll_bottom()
event.prevent_default()
def action_back(self) -> None:
"""Go back to previous screen."""
self._stop_following()

View file

@ -2,16 +2,23 @@
import asyncio
import re
from typing import Literal, Any
# Define button variant type
ButtonVariant = Literal["default", "primary", "success", "warning", "error"]
from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, DataTable, TabbedContent, TabPane
from textual.widgets import Header, Footer, Static, Button, DataTable
from textual.timer import Timer
from rich.text import Text
from rich.table import Table
from ..managers.container_manager import ContainerManager, ServiceStatus, ServiceInfo
from ..utils.platform import RuntimeType
from ..widgets.command_modal import CommandOutputModal
from ..widgets.diagnostics_notification import notify_with_diagnostics
class MonitorScreen(Screen):
@ -24,6 +31,9 @@ class MonitorScreen(Screen):
("t", "stop", "Stop Services"),
("u", "upgrade", "Upgrade"),
("x", "reset", "Reset"),
("l", "logs", "View Logs"),
("j", "cursor_down", "Move Down"),
("k", "cursor_up", "Move Up"),
]
def __init__(self):
@ -40,15 +50,8 @@ class MonitorScreen(Screen):
def compose(self) -> ComposeResult:
"""Create the monitoring screen layout."""
yield Header()
with TabbedContent(id="monitor-tabs"):
with TabPane("Services", id="services-tab"):
yield from self._create_services_tab()
with TabPane("Logs", id="logs-tab"):
yield from self._create_logs_tab()
with TabPane("System", id="system-tab"):
yield from self._create_system_tab()
# Just show the services content directly (no header, no tabs)
yield from self._create_services_tab()
yield Footer()
@ -63,7 +66,8 @@ class MonitorScreen(Screen):
)
# Images summary table (above services)
yield Static("Container Images", classes="tab-header")
self.images_table = DataTable(id="images-table")
self.images_table = DataTable(id="images-table", show_cursor=False)
self.images_table.can_focus = False
self.images_table.add_columns("Image", "Digest")
yield self.images_table
yield Static(" ")
@ -73,32 +77,7 @@ class MonitorScreen(Screen):
self.services_table = DataTable(id="services-table")
self.services_table.add_columns("Service", "Status", "Health", "Ports", "Image", "Digest")
yield self.services_table
yield Horizontal(
Button("Refresh", variant="default", id="refresh-btn"),
Button("Back", variant="default", id="back-btn"),
classes="button-row"
)
def _create_logs_tab(self) -> ComposeResult:
"""Create the logs viewing tab."""
logs_content = Static("Select a service to view logs", id="logs-content", markup=False)
yield Static("Service Logs", id="logs-header")
yield Horizontal(
Button("Backend", variant="default", id="logs-backend"),
Button("Frontend", variant="default", id="logs-frontend"),
Button("OpenSearch", variant="default", id="logs-opensearch"),
Button("Langflow", variant="default", id="logs-langflow"),
classes="button-row"
)
yield ScrollableContainer(logs_content, id="logs-scroll")
def _create_system_tab(self) -> ComposeResult:
"""Create the system information tab."""
system_info = Static(self._get_system_info(), id="system-info")
yield Static("System Information", id="system-header")
yield system_info
def _get_runtime_status(self) -> Text:
"""Get container runtime status text."""
@ -129,29 +108,18 @@ class MonitorScreen(Screen):
return status_text
def _get_system_info(self) -> Text:
"""Get system information text."""
info_text = Text()
runtime_info = self.container_manager.get_runtime_info()
info_text.append("Container Runtime Information\n", style="bold")
info_text.append("=" * 30 + "\n")
info_text.append(f"Type: {runtime_info.runtime_type.value}\n")
info_text.append(f"Compose Command: {' '.join(runtime_info.compose_command)}\n")
info_text.append(f"Runtime Command: {' '.join(runtime_info.runtime_command)}\n")
if runtime_info.version:
info_text.append(f"Version: {runtime_info.version}\n")
# Removed compose files section for cleaner display
return info_text
async def on_mount(self) -> None:
"""Initialize the screen when mounted."""
await self._refresh_services()
# Set up auto-refresh every 5 seconds
self.refresh_timer = self.set_interval(5.0, self._auto_refresh)
# Focus the services table
try:
self.services_table.focus()
except Exception:
pass
def on_unmount(self) -> None:
"""Clean up when unmounting."""
@ -160,6 +128,11 @@ class MonitorScreen(Screen):
# Stop following logs if running
self._stop_follow()
async def on_screen_resume(self) -> None:
"""Called when the screen is resumed (e.g., after a modal is closed)."""
# Refresh services when returning from a modal
await self._refresh_services()
async def _refresh_services(self) -> None:
"""Refresh the services table."""
if not self.container_manager.is_available():
@ -229,31 +202,39 @@ class MonitorScreen(Screen):
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "start-btn":
button_id = event.button.id or ""
button_label = event.button.label or ""
# Use button ID prefixes to determine action, ignoring any random suffix
if button_id.startswith("start-btn"):
self.run_worker(self._start_services())
elif event.button.id == "stop-btn":
elif button_id.startswith("stop-btn"):
self.run_worker(self._stop_services())
elif event.button.id == "restart-btn":
elif button_id.startswith("restart-btn"):
self.run_worker(self._restart_services())
elif event.button.id == "upgrade-btn":
elif button_id.startswith("upgrade-btn"):
self.run_worker(self._upgrade_services())
elif event.button.id == "reset-btn":
elif button_id.startswith("reset-btn"):
self.run_worker(self._reset_services())
elif event.button.id == "toggle-mode-btn":
elif button_id == "toggle-mode-btn":
self.action_toggle_mode()
elif event.button.id == "refresh-btn":
elif button_id.startswith("refresh-btn"):
self.action_refresh()
elif event.button.id == "back-btn":
elif button_id.startswith("back-btn"):
self.action_back()
elif event.button.id.startswith("logs-"):
elif button_id.startswith("logs-"):
# Map button IDs to actual service names
service_mapping = {
"logs-backend": "openrag-backend",
"logs-frontend": "openrag-frontend",
"logs-frontend": "openrag-frontend",
"logs-opensearch": "opensearch",
"logs-langflow": "langflow"
}
service_name = service_mapping.get(event.button.id)
# Extract the base button ID (without any suffix)
button_base_id = button_id.split("-")[0] + "-" + button_id.split("-")[1]
service_name = service_mapping.get(button_base_id)
if service_name:
# Load recent logs then start following
self.run_worker(self._show_logs(service_name))
@ -263,9 +244,14 @@ class MonitorScreen(Screen):
"""Start services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.start_services(cpu_mode):
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
# Show command output in modal dialog
command_generator = self.container_manager.start_services(cpu_mode)
modal = CommandOutputModal(
"Starting Services",
command_generator,
on_complete=None # We'll refresh in on_screen_resume instead
)
self.app.push_screen(modal)
finally:
self.operation_in_progress = False
@ -273,9 +259,14 @@ class MonitorScreen(Screen):
"""Stop services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.stop_services():
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
# Show command output in modal dialog
command_generator = self.container_manager.stop_services()
modal = CommandOutputModal(
"Stopping Services",
command_generator,
on_complete=None # We'll refresh in on_screen_resume instead
)
self.app.push_screen(modal)
finally:
self.operation_in_progress = False
@ -283,9 +274,14 @@ class MonitorScreen(Screen):
"""Restart services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.restart_services():
self.notify(message, severity="success" if is_complete else "info")
await self._refresh_services()
# Show command output in modal dialog
command_generator = self.container_manager.restart_services()
modal = CommandOutputModal(
"Restarting Services",
command_generator,
on_complete=None # We'll refresh in on_screen_resume instead
)
self.app.push_screen(modal)
finally:
self.operation_in_progress = False
@ -293,9 +289,14 @@ class MonitorScreen(Screen):
"""Upgrade services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.upgrade_services():
self.notify(message, severity="success" if is_complete else "warning")
await self._refresh_services()
# Show command output in modal dialog
command_generator = self.container_manager.upgrade_services()
modal = CommandOutputModal(
"Upgrading Services",
command_generator,
on_complete=None # We'll refresh in on_screen_resume instead
)
self.app.push_screen(modal)
finally:
self.operation_in_progress = False
@ -303,9 +304,14 @@ class MonitorScreen(Screen):
"""Reset services with progress updates."""
self.operation_in_progress = True
try:
async for is_complete, message in self.container_manager.reset_services():
self.notify(message, severity="success" if is_complete else "warning")
await self._refresh_services()
# Show command output in modal dialog
command_generator = self.container_manager.reset_services()
modal = CommandOutputModal(
"Resetting Services",
command_generator,
on_complete=None # We'll refresh in on_screen_resume instead
)
self.app.push_screen(modal)
finally:
self.operation_in_progress = False
@ -332,14 +338,16 @@ class MonitorScreen(Screen):
# Try to scroll to end of container
try:
scroller = self.query_one("#logs-scroll", ScrollableContainer)
if hasattr(scroller, "scroll_end"):
scroller.scroll_end(animate=False)
elif hasattr(scroller, "scroll_to_end"):
scroller.scroll_to_end()
# Only use scroll_end which is the correct method
scroller.scroll_end(animate=False)
except Exception:
pass
else:
self.notify(f"Failed to get logs for {service_name}: {logs}", severity="error")
notify_with_diagnostics(
self.app,
f"Failed to get logs for {service_name}: {logs}",
severity="error"
)
def _stop_follow(self) -> None:
task = self._follow_task
@ -377,17 +385,35 @@ class MonitorScreen(Screen):
logs_widget = self.query_one("#logs-content", Static)
logs_widget.update("\n".join(self._logs_buffer))
scroller = self.query_one("#logs-scroll", ScrollableContainer)
if hasattr(scroller, "scroll_end"):
scroller.scroll_end(animate=False)
# Only use scroll_end which is the correct method
scroller.scroll_end(animate=False)
except Exception:
pass
except Exception as e:
self.notify(f"Error following logs: {e}", severity="error")
notify_with_diagnostics(
self.app,
f"Error following logs: {e}",
severity="error"
)
def action_refresh(self) -> None:
"""Refresh services manually."""
self.run_worker(self._refresh_services())
def action_cursor_down(self) -> None:
"""Move cursor down in services table."""
try:
self.services_table.action_cursor_down()
except Exception:
pass
def action_cursor_up(self) -> None:
"""Move cursor up in services table."""
try:
self.services_table.action_cursor_up()
except Exception:
pass
def _update_mode_row(self) -> None:
"""Update the mode indicator and toggle button label."""
try:
@ -405,36 +431,52 @@ class MonitorScreen(Screen):
try:
current = getattr(self.container_manager, "use_cpu_compose", True)
self.container_manager.use_cpu_compose = not current
self.notify("Switched to GPU compose" if not current else "Switched to CPU compose", severity="info")
self.notify("Switched to GPU compose" if not current else "Switched to CPU compose", severity="information")
self._update_mode_row()
self.action_refresh()
except Exception as e:
self.notify(f"Failed to toggle mode: {e}", severity="error")
notify_with_diagnostics(
self.app,
f"Failed to toggle mode: {e}",
severity="error"
)
def _update_controls(self, services: list[ServiceInfo]) -> None:
"""Render control buttons based on running state and set default focus."""
"""Update control buttons based on running state."""
try:
# Get the controls container
controls = self.query_one("#services-controls", Horizontal)
controls.remove_children()
# Check if any services are running
any_running = any(s.status == ServiceStatus.RUNNING for s in services)
# Clear existing buttons by removing all children
controls.remove_children()
# Use a single ID for each button type, but make them unique with a suffix
# This ensures we don't create duplicate IDs across refreshes
import random
suffix = f"-{random.randint(10000, 99999)}"
# Add appropriate buttons based on service state
if any_running:
controls.mount(Button("Stop Services", variant="error", id="stop-btn"))
controls.mount(Button("Restart", variant="primary", id="restart-btn"))
controls.mount(Button("Upgrade", variant="warning", id="upgrade-btn"))
controls.mount(Button("Reset", variant="error", id="reset-btn"))
# Focus Stop by default when running
try:
self.query_one("#stop-btn", Button).focus()
except Exception:
pass
# When services are running, show stop and restart
controls.mount(Button("Stop Services", variant="error", id=f"stop-btn{suffix}"))
controls.mount(Button("Restart", variant="primary", id=f"restart-btn{suffix}"))
else:
controls.mount(Button("Start Services", variant="success", id="start-btn"))
try:
self.query_one("#start-btn", Button).focus()
except Exception:
pass
except Exception:
pass
# When services are not running, show start
controls.mount(Button("Start Services", variant="success", id=f"start-btn{suffix}"))
# Always show upgrade and reset buttons
controls.mount(Button("Upgrade", variant="warning", id=f"upgrade-btn{suffix}"))
controls.mount(Button("Reset", variant="error", id=f"reset-btn{suffix}"))
except Exception as e:
notify_with_diagnostics(
self.app,
f"Error updating controls: {e}",
severity="error"
)
def action_back(self) -> None:
"""Go back to previous screen."""
@ -455,3 +497,37 @@ class MonitorScreen(Screen):
def action_reset(self) -> None:
"""Reset services."""
self.run_worker(self._reset_services())
def action_logs(self) -> None:
"""View logs for the selected service."""
try:
# Get the currently focused row in the services table
table = self.query_one("#services-table", DataTable)
if table.cursor_row is not None and table.cursor_row >= 0:
# Get the service name from the first column of the selected row
row_data = table.get_row_at(table.cursor_row)
if row_data:
service_name = str(row_data[0]) # First column is service name
# Map display names to actual service names
service_mapping = {
"openrag-backend": "openrag-backend",
"openrag-frontend": "openrag-frontend",
"opensearch": "opensearch",
"langflow": "langflow",
"dashboards": "dashboards"
}
actual_service_name = service_mapping.get(service_name, service_name)
# Push the logs screen with the selected service
from .logs import LogsScreen
logs_screen = LogsScreen(initial_service=actual_service_name)
self.app.push_screen(logs_screen)
else:
self.notify("No service selected", severity="warning")
else:
self.notify("No service selected", severity="warning")
except Exception as e:
self.notify(f"Error opening logs: {e}", severity="error")

View file

@ -23,6 +23,7 @@ class WelcomeScreen(Screen):
("1", "no_auth_setup", "Basic Setup"),
("2", "full_setup", "Advanced Setup"),
("3", "monitor", "Monitor Services"),
("4", "diagnostics", "Diagnostics"),
]
def __init__(self):
@ -124,7 +125,7 @@ class WelcomeScreen(Screen):
# Update the welcome text and recompose with new state
try:
welcome_widget = self.query_one("#welcome-text")
welcome_widget.update(self._create_welcome_text())
welcome_widget.update(self._create_welcome_text()) # This is fine for Static widgets
# Focus the appropriate button
if self.services_running:
@ -154,6 +155,8 @@ class WelcomeScreen(Screen):
self.action_full_setup()
elif event.button.id == "monitor-btn":
self.action_monitor()
elif event.button.id == "diagnostics-btn":
self.action_diagnostics()
def action_default_action(self) -> None:
"""Handle Enter key - go to default action based on state."""
@ -179,6 +182,11 @@ class WelcomeScreen(Screen):
from .monitor import MonitorScreen
self.app.push_screen(MonitorScreen())
def action_diagnostics(self) -> None:
"""Switch to diagnostics screen."""
from .diagnostics import DiagnosticsScreen
self.app.push_screen(DiagnosticsScreen())
def action_quit(self) -> None:
"""Quit the application."""
self.app.exit()

View file

@ -32,15 +32,31 @@ class PlatformDetector:
def detect_runtime(self) -> RuntimeInfo:
"""Detect available container runtime and compose capabilities."""
# First check if we have podman installed
podman_version = self._get_podman_version()
# If we have podman, check if docker is actually podman in disguise
if podman_version:
docker_version = self._get_docker_version()
if docker_version and podman_version in docker_version:
# This is podman masquerading as docker
if self._check_command(["docker", "compose", "--help"]):
return RuntimeInfo(RuntimeType.PODMAN, ["docker", "compose"], ["docker"], podman_version)
if self._check_command(["docker-compose", "--help"]):
return RuntimeInfo(RuntimeType.PODMAN, ["docker-compose"], ["docker"], podman_version)
# Check for native podman compose
if self._check_command(["podman", "compose", "--help"]):
return RuntimeInfo(RuntimeType.PODMAN, ["podman", "compose"], ["podman"], podman_version)
# Check for actual docker
if self._check_command(["docker", "compose", "--help"]):
version = self._get_docker_version()
return RuntimeInfo(RuntimeType.DOCKER, ["docker", "compose"], ["docker"], version)
if self._check_command(["docker-compose", "--help"]):
version = self._get_docker_version()
return RuntimeInfo(RuntimeType.DOCKER_COMPOSE, ["docker-compose"], ["docker"], version)
if self._check_command(["podman", "compose", "--help"]):
version = self._get_podman_version()
return RuntimeInfo(RuntimeType.PODMAN, ["podman", "compose"], ["podman"], version)
return RuntimeInfo(RuntimeType.NONE, [], [])
def detect_gpu_available(self) -> bool:

View file

@ -0,0 +1,3 @@
"""Widgets for OpenRAG TUI."""
# Made with Bob

View file

@ -0,0 +1,132 @@
"""Command output modal dialog for OpenRAG TUI."""
import asyncio
from typing import Callable, List, Optional, AsyncIterator, Any
from textual.app import ComposeResult
from textual.worker import Worker
from textual.containers import Container, ScrollableContainer
from textual.screen import ModalScreen
from textual.widgets import Button, Static, Label, RichLog
from rich.console import Console
class CommandOutputModal(ModalScreen):
"""Modal dialog for displaying command output in real-time."""
DEFAULT_CSS = """
CommandOutputModal {
align: center middle;
}
#dialog {
width: 90%;
height: 90%;
border: thick $primary;
background: $surface;
padding: 0;
}
#title {
background: $primary;
color: $text;
padding: 1 2;
text-align: center;
width: 100%;
text-style: bold;
}
#output-container {
height: 1fr;
padding: 0;
margin: 0 1;
}
#command-output {
height: 100%;
border: solid $accent;
padding: 1 2;
margin: 1 0;
background: $surface-darken-1;
}
#button-row {
width: 100%;
height: auto;
align: center middle;
padding: 1;
margin-top: 1;
}
#button-row Button {
margin: 0 1;
min-width: 16;
}
"""
def __init__(
self,
title: str,
command_generator: AsyncIterator[tuple[bool, str]],
on_complete: Optional[Callable] = None
):
"""Initialize the modal dialog.
Args:
title: Title of the modal dialog
command_generator: Async generator that yields (is_complete, message) tuples
on_complete: Optional callback to run when command completes
"""
super().__init__()
self.title_text = title
self.command_generator = command_generator
self.on_complete = on_complete
def compose(self) -> ComposeResult:
"""Create the modal dialog layout."""
with Container(id="dialog"):
yield Label(self.title_text, id="title")
with ScrollableContainer(id="output-container"):
yield RichLog(id="command-output", highlight=True, markup=True)
with Container(id="button-row"):
yield Button("Close", variant="primary", id="close-btn")
def on_mount(self) -> None:
"""Start the command when the modal is mounted."""
# Start the command but don't store the worker
self.run_worker(self._run_command(), exclusive=False)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "close-btn":
self.dismiss()
async def _run_command(self) -> None:
"""Run the command and update the output in real-time."""
output = self.query_one("#command-output", RichLog)
try:
async for is_complete, message in self.command_generator:
# Simple approach: just append each line as it comes
output.write(message + "\n")
# Scroll to bottom
container = self.query_one("#output-container", ScrollableContainer)
container.scroll_end(animate=False)
# If command is complete, update UI
if is_complete:
output.write("[bold green]Command completed successfully[/bold green]\n")
# Call the completion callback if provided
if self.on_complete:
await asyncio.sleep(0.5) # Small delay for better UX
self.on_complete()
except Exception as e:
output.write(f"[bold red]Error: {e}[/bold red]\n")
# Enable the close button and focus it
close_btn = self.query_one("#close-btn", Button)
close_btn.disabled = False
close_btn.focus()
# Made with Bob

View file

@ -0,0 +1,38 @@
"""Utility functions for showing error notifications with diagnostics button."""
from typing import Literal
from textual.app import App
def notify_with_diagnostics(
app: App,
message: str,
severity: Literal["information", "warning", "error"] = "error",
timeout: float = 10.0
) -> None:
"""Show a notification with a button to open the diagnostics screen.
Args:
app: The Textual app
message: The notification message
severity: The notification severity
timeout: The notification timeout in seconds
"""
# First show the notification
app.notify(message, severity=severity, timeout=timeout)
# Then add a button to open diagnostics screen
def open_diagnostics() -> None:
from ..screens.diagnostics import DiagnosticsScreen
app.push_screen(DiagnosticsScreen())
# Add a separate notification with just the button
app.notify(
"Click to view diagnostics",
severity="information",
timeout=timeout,
title="Diagnostics"
)
# Made with Bob

View file

@ -0,0 +1,38 @@
"""Utility functions for showing error notifications with diagnostics button."""
from typing import Literal, Callable
from textual.app import App
def notify_with_diagnostics(
app: App,
message: str,
severity: Literal["information", "warning", "error"] = "error",
timeout: float = 10.0
) -> None:
"""Show a notification with a button to open the diagnostics screen.
Args:
app: The Textual app
message: The notification message
severity: The notification severity
timeout: The notification timeout in seconds
"""
# First show the notification
app.notify(message, severity=severity, timeout=timeout)
# Then add a button to open diagnostics screen
def open_diagnostics() -> None:
from ..screens.diagnostics import DiagnosticsScreen
app.push_screen(DiagnosticsScreen())
# Add a separate notification with just the button
app.notify(
"Click to view diagnostics",
severity="information",
timeout=timeout,
title="Diagnostics"
)
# Made with Bob