tui podman fixes, etc.
This commit is contained in:
parent
cad271350b
commit
5f84acb996
10 changed files with 905 additions and 113 deletions
|
|
@ -8,9 +8,11 @@ from .screens.welcome import WelcomeScreen
|
|||
from .screens.config import ConfigScreen
|
||||
from .screens.monitor import MonitorScreen
|
||||
from .screens.logs import LogsScreen
|
||||
from .screens.diagnostics import DiagnosticsScreen
|
||||
from .managers.env_manager import EnvManager
|
||||
from .managers.container_manager import ContainerManager
|
||||
from .utils.platform import PlatformDetector
|
||||
from .widgets.diagnostics_notification import notify_with_diagnostics
|
||||
|
||||
|
||||
class OpenRAGTUI(App):
|
||||
|
|
@ -181,7 +183,8 @@ class OpenRAGTUI(App):
|
|||
"""Initialize the application."""
|
||||
# Check for runtime availability and show appropriate screen
|
||||
if not self.container_manager.is_available():
|
||||
self.notify(
|
||||
notify_with_diagnostics(
|
||||
self,
|
||||
"No container runtime found. Please install Docker or Podman.",
|
||||
severity="warning",
|
||||
timeout=10
|
||||
|
|
@ -193,7 +196,7 @@ class OpenRAGTUI(App):
|
|||
# Start with welcome screen
|
||||
self.push_screen(WelcomeScreen())
|
||||
|
||||
def action_quit(self) -> None:
|
||||
async def action_quit(self) -> None:
|
||||
"""Quit the application."""
|
||||
self.exit()
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import asyncio
|
|||
import json
|
||||
import subprocess
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, AsyncIterator
|
||||
|
|
@ -30,7 +30,7 @@ class ServiceInfo:
|
|||
name: str
|
||||
status: ServiceStatus
|
||||
health: Optional[str] = None
|
||||
ports: List[str] = None
|
||||
ports: List[str] = field(default_factory=list)
|
||||
image: Optional[str] = None
|
||||
image_digest: Optional[str] = None
|
||||
created: Optional[str] = None
|
||||
|
|
@ -139,6 +139,48 @@ class ContainerManager:
|
|||
except Exception as e:
|
||||
return False, "", f"Command execution failed: {e}"
|
||||
|
||||
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
|
||||
"""Process a service JSON object and add it to the services dict."""
|
||||
# Debug print to see the actual service data
|
||||
print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}")
|
||||
|
||||
container_name = service.get("Name", "")
|
||||
|
||||
# Map container name to service name
|
||||
service_name = self.container_name_map.get(container_name)
|
||||
if not service_name:
|
||||
return
|
||||
|
||||
state = service.get("State", "").lower()
|
||||
|
||||
# Map compose states to our status enum
|
||||
if "running" in state:
|
||||
status = ServiceStatus.RUNNING
|
||||
elif "exited" in state or "stopped" in state:
|
||||
status = ServiceStatus.STOPPED
|
||||
elif "starting" in state:
|
||||
status = ServiceStatus.STARTING
|
||||
else:
|
||||
status = ServiceStatus.UNKNOWN
|
||||
|
||||
# Extract health - use Status if Health is empty
|
||||
health = service.get("Health", "") or service.get("Status", "N/A")
|
||||
|
||||
# Extract ports
|
||||
ports_str = service.get("Ports", "")
|
||||
ports = [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else []
|
||||
|
||||
# Extract image
|
||||
image = service.get("Image", "N/A")
|
||||
|
||||
services[service_name] = ServiceInfo(
|
||||
name=service_name,
|
||||
status=status,
|
||||
health=health,
|
||||
ports=ports,
|
||||
image=image,
|
||||
)
|
||||
|
||||
async def get_service_status(self, force_refresh: bool = False) -> Dict[str, ServiceInfo]:
|
||||
"""Get current status of all services."""
|
||||
current_time = time.time()
|
||||
|
|
@ -149,75 +191,100 @@ class ContainerManager:
|
|||
|
||||
services = {}
|
||||
|
||||
# Get compose service status
|
||||
success, stdout, stderr = await self._run_compose_command(["ps", "--format", "json"])
|
||||
|
||||
if success and stdout.strip():
|
||||
try:
|
||||
# Parse JSON output - each line is a separate JSON object
|
||||
for line in stdout.strip().split('\n'):
|
||||
if line.strip() and line.startswith('{'):
|
||||
service = json.loads(line)
|
||||
container_name = service.get("Name", "")
|
||||
# Different approach for Podman vs Docker
|
||||
if self.runtime_info.runtime_type == RuntimeType.PODMAN:
|
||||
# For Podman, use direct podman ps command instead of compose
|
||||
cmd = ["ps", "--all", "--format", "json"]
|
||||
success, stdout, stderr = await self._run_runtime_command(cmd)
|
||||
|
||||
if success and stdout.strip():
|
||||
try:
|
||||
containers = json.loads(stdout.strip())
|
||||
for container in containers:
|
||||
# Get container name and map to service name
|
||||
names = container.get("Names", [])
|
||||
if not names:
|
||||
continue
|
||||
|
||||
# Map container name to service name
|
||||
container_name = names[0]
|
||||
service_name = self.container_name_map.get(container_name)
|
||||
if not service_name:
|
||||
continue
|
||||
|
||||
state = service.get("State", "").lower()
|
||||
|
||||
# Map compose states to our status enum
|
||||
# Get container state
|
||||
state = container.get("State", "").lower()
|
||||
if "running" in state:
|
||||
status = ServiceStatus.RUNNING
|
||||
elif "exited" in state or "stopped" in state:
|
||||
status = ServiceStatus.STOPPED
|
||||
elif "starting" in state:
|
||||
elif "created" in state:
|
||||
status = ServiceStatus.STARTING
|
||||
else:
|
||||
status = ServiceStatus.UNKNOWN
|
||||
|
||||
# Extract health - use Status if Health is empty
|
||||
health = service.get("Health", "") or service.get("Status", "N/A")
|
||||
|
||||
# Extract ports
|
||||
ports_str = service.get("Ports", "")
|
||||
ports = [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else []
|
||||
|
||||
# Extract image
|
||||
image = service.get("Image", "N/A")
|
||||
# Get other container info
|
||||
image = container.get("Image", "N/A")
|
||||
ports = []
|
||||
# Handle case where Ports might be None instead of an empty list
|
||||
container_ports = container.get("Ports") or []
|
||||
if isinstance(container_ports, list):
|
||||
for port in container_ports:
|
||||
host_port = port.get("host_port")
|
||||
container_port = port.get("container_port")
|
||||
if host_port and container_port:
|
||||
ports.append(f"{host_port}:{container_port}")
|
||||
|
||||
services[service_name] = ServiceInfo(
|
||||
name=service_name,
|
||||
status=status,
|
||||
health=health,
|
||||
health=state,
|
||||
ports=ports,
|
||||
image=image,
|
||||
)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
# Fallback to parsing text output
|
||||
lines = stdout.strip().split('\n')
|
||||
for line in lines[1:]: # Skip header
|
||||
if line.strip():
|
||||
parts = line.split()
|
||||
if len(parts) >= 3:
|
||||
name = parts[0]
|
||||
|
||||
# Only include our expected services
|
||||
if name not in self.expected_services:
|
||||
continue
|
||||
|
||||
state = parts[2].lower()
|
||||
|
||||
if "up" in state:
|
||||
status = ServiceStatus.RUNNING
|
||||
elif "exit" in state:
|
||||
status = ServiceStatus.STOPPED
|
||||
else:
|
||||
status = ServiceStatus.UNKNOWN
|
||||
|
||||
services[name] = ServiceInfo(name=name, status=status)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
else:
|
||||
# For Docker, use compose ps command
|
||||
success, stdout, stderr = await self._run_compose_command(["ps", "--format", "json"])
|
||||
|
||||
if success and stdout.strip():
|
||||
try:
|
||||
# Handle both single JSON object (Podman) and multiple JSON objects (Docker)
|
||||
if stdout.strip().startswith('[') and stdout.strip().endswith(']'):
|
||||
# JSON array format
|
||||
service_list = json.loads(stdout.strip())
|
||||
for service in service_list:
|
||||
self._process_service_json(service, services)
|
||||
else:
|
||||
# Line-by-line JSON format
|
||||
for line in stdout.strip().split('\n'):
|
||||
if line.strip() and line.startswith('{'):
|
||||
service = json.loads(line)
|
||||
self._process_service_json(service, services)
|
||||
except json.JSONDecodeError:
|
||||
# Fallback to parsing text output
|
||||
lines = stdout.strip().split('\n')
|
||||
if len(lines) > 1: # Make sure we have at least a header and one line
|
||||
for line in lines[1:]: # Skip header
|
||||
if line.strip():
|
||||
parts = line.split()
|
||||
if len(parts) >= 3:
|
||||
name = parts[0]
|
||||
|
||||
# Only include our expected services
|
||||
if name not in self.expected_services:
|
||||
continue
|
||||
|
||||
state = parts[2].lower()
|
||||
|
||||
if "up" in state:
|
||||
status = ServiceStatus.RUNNING
|
||||
elif "exit" in state:
|
||||
status = ServiceStatus.STOPPED
|
||||
else:
|
||||
status = ServiceStatus.UNKNOWN
|
||||
|
||||
services[name] = ServiceInfo(name=name, status=status)
|
||||
|
||||
# Add expected services that weren't found
|
||||
for expected in self.expected_services:
|
||||
|
|
@ -386,12 +453,15 @@ class ContainerManager:
|
|||
cwd=Path.cwd()
|
||||
)
|
||||
|
||||
while True:
|
||||
line = await process.stdout.readline()
|
||||
if line:
|
||||
yield line.decode().rstrip()
|
||||
else:
|
||||
break
|
||||
if process.stdout:
|
||||
while True:
|
||||
line = await process.stdout.readline()
|
||||
if line:
|
||||
yield line.decode().rstrip()
|
||||
else:
|
||||
break
|
||||
else:
|
||||
yield "Error: Unable to read process output"
|
||||
|
||||
except Exception as e:
|
||||
yield f"Error following logs: {e}"
|
||||
|
|
@ -422,9 +492,51 @@ class ContainerManager:
|
|||
|
||||
return stats
|
||||
|
||||
async def debug_podman_services(self) -> str:
|
||||
"""Run a direct Podman command to check services status for debugging."""
|
||||
if self.runtime_info.runtime_type != RuntimeType.PODMAN:
|
||||
return "Not using Podman"
|
||||
|
||||
# Try direct podman command
|
||||
cmd = ["podman", "ps", "--all", "--format", "json"]
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=Path.cwd()
|
||||
)
|
||||
|
||||
stdout, stderr = await process.communicate()
|
||||
stdout_text = stdout.decode() if stdout else ""
|
||||
stderr_text = stderr.decode() if stderr else ""
|
||||
|
||||
result = f"Command: {' '.join(cmd)}\n"
|
||||
result += f"Return code: {process.returncode}\n"
|
||||
result += f"Stdout: {stdout_text}\n"
|
||||
result += f"Stderr: {stderr_text}\n"
|
||||
|
||||
# Try to parse the output
|
||||
if stdout_text.strip():
|
||||
try:
|
||||
containers = json.loads(stdout_text)
|
||||
result += f"\nFound {len(containers)} containers:\n"
|
||||
for container in containers:
|
||||
name = container.get("Names", [""])[0]
|
||||
state = container.get("State", "")
|
||||
result += f" - {name}: {state}\n"
|
||||
except json.JSONDecodeError as e:
|
||||
result += f"\nFailed to parse JSON: {e}\n"
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
return f"Error executing command: {e}"
|
||||
|
||||
def check_podman_macos_memory(self) -> tuple[bool, str]:
|
||||
"""Check if Podman VM has sufficient memory on macOS."""
|
||||
if self.runtime_info.runtime_type != RuntimeType.PODMAN:
|
||||
return True, "Not using Podman"
|
||||
|
||||
return self.platform_detector.check_podman_macos_memory()[:2] # Return is_sufficient, message
|
||||
is_sufficient, memory_mb, message = self.platform_detector.check_podman_macos_memory()
|
||||
return is_sufficient, message
|
||||
|
|
|
|||
381
src/tui/screens/diagnostics.py
Normal file
381
src/tui/screens/diagnostics.py
Normal file
|
|
@ -0,0 +1,381 @@
|
|||
"""Diagnostics screen for OpenRAG TUI."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Header, Footer, Static, Button, Label, Log
|
||||
|
||||
from ..managers.container_manager import ContainerManager
|
||||
|
||||
|
||||
class DiagnosticsScreen(Screen):
|
||||
"""Diagnostics screen for debugging OpenRAG."""
|
||||
|
||||
CSS = """
|
||||
#diagnostics-log {
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
margin: 1;
|
||||
background: $surface;
|
||||
min-height: 20;
|
||||
}
|
||||
|
||||
.button-row Button {
|
||||
margin: 0 1;
|
||||
}
|
||||
|
||||
.copy-indicator {
|
||||
background: $success;
|
||||
color: $text;
|
||||
padding: 1;
|
||||
margin: 1;
|
||||
text-align: center;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
("escape", "back", "Back"),
|
||||
("r", "refresh", "Refresh"),
|
||||
("ctrl+c", "copy", "Copy to Clipboard"),
|
||||
("ctrl+s", "save", "Save to File"),
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.container_manager = ContainerManager()
|
||||
self._logger = logging.getLogger("openrag.diagnostics")
|
||||
self._status_timer = None
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Create the diagnostics screen layout."""
|
||||
yield Header()
|
||||
with Container(id="main-container"):
|
||||
yield Static("OpenRAG Diagnostics", classes="tab-header")
|
||||
with Horizontal(classes="button-row"):
|
||||
yield Button("Refresh", variant="primary", id="refresh-btn")
|
||||
yield Button("Check Podman", variant="default", id="check-podman-btn")
|
||||
yield Button("Check Docker", variant="default", id="check-docker-btn")
|
||||
yield Button("Copy to Clipboard", variant="default", id="copy-btn")
|
||||
yield Button("Save to File", variant="default", id="save-btn")
|
||||
yield Button("Back", variant="default", id="back-btn")
|
||||
|
||||
# Status indicator for copy/save operations
|
||||
yield Static("", id="copy-status", classes="copy-indicator")
|
||||
|
||||
with ScrollableContainer(id="diagnostics-scroll"):
|
||||
yield Log(id="diagnostics-log", highlight=True)
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the screen."""
|
||||
self.run_diagnostics()
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button presses."""
|
||||
if event.button.id == "refresh-btn":
|
||||
self.action_refresh()
|
||||
elif event.button.id == "check-podman-btn":
|
||||
asyncio.create_task(self.check_podman())
|
||||
elif event.button.id == "check-docker-btn":
|
||||
asyncio.create_task(self.check_docker())
|
||||
elif event.button.id == "copy-btn":
|
||||
self.copy_to_clipboard()
|
||||
elif event.button.id == "save-btn":
|
||||
self.save_to_file()
|
||||
elif event.button.id == "back-btn":
|
||||
self.action_back()
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
"""Refresh diagnostics."""
|
||||
self.run_diagnostics()
|
||||
|
||||
def action_copy(self) -> None:
|
||||
"""Copy log content to clipboard (keyboard shortcut)."""
|
||||
self.copy_to_clipboard()
|
||||
|
||||
def copy_to_clipboard(self) -> None:
|
||||
"""Copy log content to clipboard."""
|
||||
try:
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
content = "\n".join(str(line) for line in log.lines)
|
||||
status = self.query_one("#copy-status", Static)
|
||||
|
||||
# Try to use pyperclip if available
|
||||
try:
|
||||
import pyperclip
|
||||
pyperclip.copy(content)
|
||||
self.notify("Copied to clipboard", severity="information")
|
||||
status.update("✓ Content copied to clipboard")
|
||||
self._hide_status_after_delay(status)
|
||||
return
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Fallback to platform-specific clipboard commands
|
||||
import subprocess
|
||||
import platform
|
||||
|
||||
system = platform.system()
|
||||
if system == "Darwin": # macOS
|
||||
process = subprocess.Popen(
|
||||
["pbcopy"], stdin=subprocess.PIPE, text=True
|
||||
)
|
||||
process.communicate(input=content)
|
||||
self.notify("Copied to clipboard", severity="information")
|
||||
status.update("✓ Content copied to clipboard")
|
||||
elif system == "Windows":
|
||||
process = subprocess.Popen(
|
||||
["clip"], stdin=subprocess.PIPE, text=True
|
||||
)
|
||||
process.communicate(input=content)
|
||||
self.notify("Copied to clipboard", severity="information")
|
||||
status.update("✓ Content copied to clipboard")
|
||||
elif system == "Linux":
|
||||
# Try xclip first, then xsel
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
["xclip", "-selection", "clipboard"],
|
||||
stdin=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
process.communicate(input=content)
|
||||
self.notify("Copied to clipboard", severity="information")
|
||||
status.update("✓ Content copied to clipboard")
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
["xsel", "--clipboard", "--input"],
|
||||
stdin=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
process.communicate(input=content)
|
||||
self.notify("Copied to clipboard", severity="information")
|
||||
status.update("✓ Content copied to clipboard")
|
||||
except FileNotFoundError:
|
||||
self.notify("Clipboard utilities not found. Install xclip or xsel.", severity="error")
|
||||
status.update("❌ Clipboard utilities not found. Install xclip or xsel.")
|
||||
else:
|
||||
self.notify("Clipboard not supported on this platform", severity="error")
|
||||
status.update("❌ Clipboard not supported on this platform")
|
||||
|
||||
self._hide_status_after_delay(status)
|
||||
except Exception as e:
|
||||
self.notify(f"Failed to copy to clipboard: {e}", severity="error")
|
||||
status = self.query_one("#copy-status", Static)
|
||||
status.update(f"❌ Failed to copy: {e}")
|
||||
self._hide_status_after_delay(status)
|
||||
|
||||
def _hide_status_after_delay(self, status_widget: Static, delay: float = 3.0) -> None:
|
||||
"""Hide the status message after a delay."""
|
||||
# Cancel any existing timer
|
||||
if self._status_timer:
|
||||
self._status_timer.cancel()
|
||||
|
||||
# Create and run the timer task
|
||||
self._status_timer = asyncio.create_task(self._clear_status_after_delay(status_widget, delay))
|
||||
|
||||
async def _clear_status_after_delay(self, status_widget: Static, delay: float) -> None:
|
||||
"""Clear the status message after a delay."""
|
||||
await asyncio.sleep(delay)
|
||||
status_widget.update("")
|
||||
|
||||
def action_save(self) -> None:
|
||||
"""Save log content to file (keyboard shortcut)."""
|
||||
self.save_to_file()
|
||||
|
||||
def save_to_file(self) -> None:
|
||||
"""Save log content to a file."""
|
||||
try:
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
content = "\n".join(str(line) for line in log.lines)
|
||||
status = self.query_one("#copy-status", Static)
|
||||
|
||||
# Create logs directory if it doesn't exist
|
||||
logs_dir = Path("logs")
|
||||
logs_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Create a timestamped filename
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = logs_dir / f"openrag_diagnostics_{timestamp}.txt"
|
||||
|
||||
# Save to file
|
||||
with open(filename, "w") as f:
|
||||
f.write(content)
|
||||
|
||||
self.notify(f"Saved to {filename}", severity="information")
|
||||
status.update(f"✓ Saved to {filename}")
|
||||
|
||||
# Log the save operation
|
||||
self._logger.info(f"Diagnostics saved to {filename}")
|
||||
self._hide_status_after_delay(status)
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to save file: {e}"
|
||||
self.notify(error_msg, severity="error")
|
||||
self._logger.error(error_msg)
|
||||
|
||||
status = self.query_one("#copy-status", Static)
|
||||
status.update(f"❌ {error_msg}")
|
||||
self._hide_status_after_delay(status)
|
||||
|
||||
def action_back(self) -> None:
|
||||
"""Go back to previous screen."""
|
||||
self.app.pop_screen()
|
||||
|
||||
def run_diagnostics(self) -> None:
|
||||
"""Run all diagnostics."""
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
log.clear()
|
||||
|
||||
# System information
|
||||
log.write("[bold green]System Information[/bold green]")
|
||||
log.write(f"Runtime: {self.container_manager.runtime_info.runtime_type.value}")
|
||||
log.write(f"Version: {self.container_manager.runtime_info.version or 'Unknown'}")
|
||||
log.write(f"Compose file: {self.container_manager.compose_file}")
|
||||
log.write(f"CPU mode: {self.container_manager.use_cpu_compose}")
|
||||
log.write("")
|
||||
|
||||
# Run async diagnostics
|
||||
asyncio.create_task(self._run_async_diagnostics())
|
||||
|
||||
async def _run_async_diagnostics(self) -> None:
|
||||
"""Run asynchronous diagnostics."""
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
|
||||
# Check services
|
||||
log.write("[bold green]Service Status[/bold green]")
|
||||
services = await self.container_manager.get_service_status(force_refresh=True)
|
||||
for name, info in services.items():
|
||||
status_color = "green" if info.status == "running" else "red"
|
||||
log.write(f"[bold]{name}[/bold]: [{status_color}]{info.status.value}[/{status_color}]")
|
||||
if info.health:
|
||||
log.write(f" Health: {info.health}")
|
||||
if info.ports:
|
||||
log.write(f" Ports: {', '.join(info.ports)}")
|
||||
if info.image:
|
||||
log.write(f" Image: {info.image}")
|
||||
log.write("")
|
||||
|
||||
# Check for Podman-specific issues
|
||||
if self.container_manager.runtime_info.runtime_type.name == "PODMAN":
|
||||
await self.check_podman()
|
||||
|
||||
async def check_podman(self) -> None:
|
||||
"""Run Podman-specific diagnostics."""
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
log.write("[bold green]Podman Diagnostics[/bold green]")
|
||||
|
||||
# Check if using Podman
|
||||
if self.container_manager.runtime_info.runtime_type.name != "PODMAN":
|
||||
log.write("[yellow]Not using Podman[/yellow]")
|
||||
return
|
||||
|
||||
# Check Podman version
|
||||
cmd = ["podman", "--version"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write(f"Podman version: {stdout.decode().strip()}")
|
||||
else:
|
||||
log.write(f"[red]Failed to get Podman version: {stderr.decode().strip()}[/red]")
|
||||
|
||||
# Check Podman containers
|
||||
cmd = ["podman", "ps", "--all"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write("Podman containers:")
|
||||
for line in stdout.decode().strip().split("\n"):
|
||||
log.write(f" {line}")
|
||||
else:
|
||||
log.write(f"[red]Failed to list Podman containers: {stderr.decode().strip()}[/red]")
|
||||
|
||||
# Check Podman compose
|
||||
cmd = ["podman", "compose", "ps"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=self.container_manager.compose_file.parent
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write("Podman compose services:")
|
||||
for line in stdout.decode().strip().split("\n"):
|
||||
log.write(f" {line}")
|
||||
else:
|
||||
log.write(f"[red]Failed to list Podman compose services: {stderr.decode().strip()}[/red]")
|
||||
|
||||
log.write("")
|
||||
|
||||
async def check_docker(self) -> None:
|
||||
"""Run Docker-specific diagnostics."""
|
||||
log = self.query_one("#diagnostics-log", Log)
|
||||
log.write("[bold green]Docker Diagnostics[/bold green]")
|
||||
|
||||
# Check if using Docker
|
||||
if "DOCKER" not in self.container_manager.runtime_info.runtime_type.name:
|
||||
log.write("[yellow]Not using Docker[/yellow]")
|
||||
return
|
||||
|
||||
# Check Docker version
|
||||
cmd = ["docker", "--version"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write(f"Docker version: {stdout.decode().strip()}")
|
||||
else:
|
||||
log.write(f"[red]Failed to get Docker version: {stderr.decode().strip()}[/red]")
|
||||
|
||||
# Check Docker containers
|
||||
cmd = ["docker", "ps", "--all"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write("Docker containers:")
|
||||
for line in stdout.decode().strip().split("\n"):
|
||||
log.write(f" {line}")
|
||||
else:
|
||||
log.write(f"[red]Failed to list Docker containers: {stderr.decode().strip()}[/red]")
|
||||
|
||||
# Check Docker compose
|
||||
cmd = ["docker", "compose", "ps"]
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=self.container_manager.compose_file.parent
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode == 0:
|
||||
log.write("Docker compose services:")
|
||||
for line in stdout.decode().strip().split("\n"):
|
||||
log.write(f" {line}")
|
||||
else:
|
||||
log.write(f"[red]Failed to list Docker compose services: {stderr.decode().strip()}[/red]")
|
||||
|
||||
log.write("")
|
||||
|
||||
# Made with Bob
|
||||
|
|
@ -2,6 +2,11 @@
|
|||
|
||||
import asyncio
|
||||
import re
|
||||
from typing import Literal, Any
|
||||
|
||||
# Define button variant type
|
||||
ButtonVariant = Literal["default", "primary", "success", "warning", "error"]
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
|
|
@ -12,6 +17,8 @@ from rich.table import Table
|
|||
|
||||
from ..managers.container_manager import ContainerManager, ServiceStatus, ServiceInfo
|
||||
from ..utils.platform import RuntimeType
|
||||
from ..widgets.command_modal import CommandOutputModal
|
||||
from ..widgets.diagnostics_notification import notify_with_diagnostics
|
||||
|
||||
|
||||
class MonitorScreen(Screen):
|
||||
|
|
@ -160,6 +167,11 @@ class MonitorScreen(Screen):
|
|||
# Stop following logs if running
|
||||
self._stop_follow()
|
||||
|
||||
async def on_screen_resume(self) -> None:
|
||||
"""Called when the screen is resumed (e.g., after a modal is closed)."""
|
||||
# Refresh services when returning from a modal
|
||||
await self._refresh_services()
|
||||
|
||||
async def _refresh_services(self) -> None:
|
||||
"""Refresh the services table."""
|
||||
if not self.container_manager.is_available():
|
||||
|
|
@ -229,31 +241,37 @@ class MonitorScreen(Screen):
|
|||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button presses."""
|
||||
if event.button.id == "start-btn":
|
||||
button_id = event.button.id or ""
|
||||
|
||||
if button_id.startswith("start-btn"):
|
||||
self.run_worker(self._start_services())
|
||||
elif event.button.id == "stop-btn":
|
||||
elif button_id.startswith("stop-btn"):
|
||||
self.run_worker(self._stop_services())
|
||||
elif event.button.id == "restart-btn":
|
||||
elif button_id.startswith("restart-btn"):
|
||||
self.run_worker(self._restart_services())
|
||||
elif event.button.id == "upgrade-btn":
|
||||
elif button_id.startswith("upgrade-btn"):
|
||||
self.run_worker(self._upgrade_services())
|
||||
elif event.button.id == "reset-btn":
|
||||
elif button_id.startswith("reset-btn"):
|
||||
self.run_worker(self._reset_services())
|
||||
elif event.button.id == "toggle-mode-btn":
|
||||
elif button_id == "toggle-mode-btn":
|
||||
self.action_toggle_mode()
|
||||
elif event.button.id == "refresh-btn":
|
||||
elif button_id == "refresh-btn":
|
||||
self.action_refresh()
|
||||
elif event.button.id == "back-btn":
|
||||
elif button_id == "back-btn":
|
||||
self.action_back()
|
||||
elif event.button.id.startswith("logs-"):
|
||||
elif button_id.startswith("logs-"):
|
||||
# Map button IDs to actual service names
|
||||
service_mapping = {
|
||||
"logs-backend": "openrag-backend",
|
||||
"logs-frontend": "openrag-frontend",
|
||||
"logs-frontend": "openrag-frontend",
|
||||
"logs-opensearch": "opensearch",
|
||||
"logs-langflow": "langflow"
|
||||
}
|
||||
service_name = service_mapping.get(event.button.id)
|
||||
|
||||
# Extract the base button ID (without any suffix)
|
||||
button_base_id = button_id.split("-")[0] + "-" + button_id.split("-")[1]
|
||||
|
||||
service_name = service_mapping.get(button_base_id)
|
||||
if service_name:
|
||||
# Load recent logs then start following
|
||||
self.run_worker(self._show_logs(service_name))
|
||||
|
|
@ -263,9 +281,14 @@ class MonitorScreen(Screen):
|
|||
"""Start services with progress updates."""
|
||||
self.operation_in_progress = True
|
||||
try:
|
||||
async for is_complete, message in self.container_manager.start_services(cpu_mode):
|
||||
self.notify(message, severity="success" if is_complete else "info")
|
||||
await self._refresh_services()
|
||||
# Show command output in modal dialog
|
||||
command_generator = self.container_manager.start_services(cpu_mode)
|
||||
modal = CommandOutputModal(
|
||||
"Starting Services",
|
||||
command_generator,
|
||||
on_complete=None # We'll refresh in on_screen_resume instead
|
||||
)
|
||||
self.app.push_screen(modal)
|
||||
finally:
|
||||
self.operation_in_progress = False
|
||||
|
||||
|
|
@ -273,9 +296,14 @@ class MonitorScreen(Screen):
|
|||
"""Stop services with progress updates."""
|
||||
self.operation_in_progress = True
|
||||
try:
|
||||
async for is_complete, message in self.container_manager.stop_services():
|
||||
self.notify(message, severity="success" if is_complete else "info")
|
||||
await self._refresh_services()
|
||||
# Show command output in modal dialog
|
||||
command_generator = self.container_manager.stop_services()
|
||||
modal = CommandOutputModal(
|
||||
"Stopping Services",
|
||||
command_generator,
|
||||
on_complete=None # We'll refresh in on_screen_resume instead
|
||||
)
|
||||
self.app.push_screen(modal)
|
||||
finally:
|
||||
self.operation_in_progress = False
|
||||
|
||||
|
|
@ -283,9 +311,14 @@ class MonitorScreen(Screen):
|
|||
"""Restart services with progress updates."""
|
||||
self.operation_in_progress = True
|
||||
try:
|
||||
async for is_complete, message in self.container_manager.restart_services():
|
||||
self.notify(message, severity="success" if is_complete else "info")
|
||||
await self._refresh_services()
|
||||
# Show command output in modal dialog
|
||||
command_generator = self.container_manager.restart_services()
|
||||
modal = CommandOutputModal(
|
||||
"Restarting Services",
|
||||
command_generator,
|
||||
on_complete=None # We'll refresh in on_screen_resume instead
|
||||
)
|
||||
self.app.push_screen(modal)
|
||||
finally:
|
||||
self.operation_in_progress = False
|
||||
|
||||
|
|
@ -293,9 +326,14 @@ class MonitorScreen(Screen):
|
|||
"""Upgrade services with progress updates."""
|
||||
self.operation_in_progress = True
|
||||
try:
|
||||
async for is_complete, message in self.container_manager.upgrade_services():
|
||||
self.notify(message, severity="success" if is_complete else "warning")
|
||||
await self._refresh_services()
|
||||
# Show command output in modal dialog
|
||||
command_generator = self.container_manager.upgrade_services()
|
||||
modal = CommandOutputModal(
|
||||
"Upgrading Services",
|
||||
command_generator,
|
||||
on_complete=None # We'll refresh in on_screen_resume instead
|
||||
)
|
||||
self.app.push_screen(modal)
|
||||
finally:
|
||||
self.operation_in_progress = False
|
||||
|
||||
|
|
@ -303,9 +341,14 @@ class MonitorScreen(Screen):
|
|||
"""Reset services with progress updates."""
|
||||
self.operation_in_progress = True
|
||||
try:
|
||||
async for is_complete, message in self.container_manager.reset_services():
|
||||
self.notify(message, severity="success" if is_complete else "warning")
|
||||
await self._refresh_services()
|
||||
# Show command output in modal dialog
|
||||
command_generator = self.container_manager.reset_services()
|
||||
modal = CommandOutputModal(
|
||||
"Resetting Services",
|
||||
command_generator,
|
||||
on_complete=None # We'll refresh in on_screen_resume instead
|
||||
)
|
||||
self.app.push_screen(modal)
|
||||
finally:
|
||||
self.operation_in_progress = False
|
||||
|
||||
|
|
@ -332,14 +375,16 @@ class MonitorScreen(Screen):
|
|||
# Try to scroll to end of container
|
||||
try:
|
||||
scroller = self.query_one("#logs-scroll", ScrollableContainer)
|
||||
if hasattr(scroller, "scroll_end"):
|
||||
scroller.scroll_end(animate=False)
|
||||
elif hasattr(scroller, "scroll_to_end"):
|
||||
scroller.scroll_to_end()
|
||||
# Only use scroll_end which is the correct method
|
||||
scroller.scroll_end(animate=False)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
self.notify(f"Failed to get logs for {service_name}: {logs}", severity="error")
|
||||
notify_with_diagnostics(
|
||||
self.app,
|
||||
f"Failed to get logs for {service_name}: {logs}",
|
||||
severity="error"
|
||||
)
|
||||
|
||||
def _stop_follow(self) -> None:
|
||||
task = self._follow_task
|
||||
|
|
@ -377,12 +422,16 @@ class MonitorScreen(Screen):
|
|||
logs_widget = self.query_one("#logs-content", Static)
|
||||
logs_widget.update("\n".join(self._logs_buffer))
|
||||
scroller = self.query_one("#logs-scroll", ScrollableContainer)
|
||||
if hasattr(scroller, "scroll_end"):
|
||||
scroller.scroll_end(animate=False)
|
||||
# Only use scroll_end which is the correct method
|
||||
scroller.scroll_end(animate=False)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
self.notify(f"Error following logs: {e}", severity="error")
|
||||
notify_with_diagnostics(
|
||||
self.app,
|
||||
f"Error following logs: {e}",
|
||||
severity="error"
|
||||
)
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
"""Refresh services manually."""
|
||||
|
|
@ -405,36 +454,47 @@ class MonitorScreen(Screen):
|
|||
try:
|
||||
current = getattr(self.container_manager, "use_cpu_compose", True)
|
||||
self.container_manager.use_cpu_compose = not current
|
||||
self.notify("Switched to GPU compose" if not current else "Switched to CPU compose", severity="info")
|
||||
self.notify("Switched to GPU compose" if not current else "Switched to CPU compose", severity="information")
|
||||
self._update_mode_row()
|
||||
self.action_refresh()
|
||||
except Exception as e:
|
||||
self.notify(f"Failed to toggle mode: {e}", severity="error")
|
||||
notify_with_diagnostics(
|
||||
self.app,
|
||||
f"Failed to toggle mode: {e}",
|
||||
severity="error"
|
||||
)
|
||||
|
||||
def _update_controls(self, services: list[ServiceInfo]) -> None:
|
||||
"""Render control buttons based on running state and set default focus."""
|
||||
try:
|
||||
# Get the controls container
|
||||
controls = self.query_one("#services-controls", Horizontal)
|
||||
|
||||
# Remove all existing children
|
||||
controls.remove_children()
|
||||
|
||||
# Check if any services are running
|
||||
any_running = any(s.status == ServiceStatus.RUNNING for s in services)
|
||||
|
||||
# Add appropriate buttons based on service state
|
||||
if any_running:
|
||||
# When services are running, show stop and restart
|
||||
controls.mount(Button("Stop Services", variant="error", id="stop-btn"))
|
||||
controls.mount(Button("Restart", variant="primary", id="restart-btn"))
|
||||
controls.mount(Button("Upgrade", variant="warning", id="upgrade-btn"))
|
||||
controls.mount(Button("Reset", variant="error", id="reset-btn"))
|
||||
# Focus Stop by default when running
|
||||
try:
|
||||
self.query_one("#stop-btn", Button).focus()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
# When services are not running, show start
|
||||
controls.mount(Button("Start Services", variant="success", id="start-btn"))
|
||||
try:
|
||||
self.query_one("#start-btn", Button).focus()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Always show upgrade and reset buttons
|
||||
controls.mount(Button("Upgrade", variant="warning", id="upgrade-btn"))
|
||||
controls.mount(Button("Reset", variant="error", id="reset-btn"))
|
||||
|
||||
except Exception as e:
|
||||
notify_with_diagnostics(
|
||||
self.app,
|
||||
f"Error updating controls: {e}",
|
||||
severity="error"
|
||||
)
|
||||
|
||||
def action_back(self) -> None:
|
||||
"""Go back to previous screen."""
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ class WelcomeScreen(Screen):
|
|||
("1", "no_auth_setup", "Basic Setup"),
|
||||
("2", "full_setup", "Advanced Setup"),
|
||||
("3", "monitor", "Monitor Services"),
|
||||
("4", "diagnostics", "Diagnostics"),
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
|
|
@ -96,6 +97,9 @@ class WelcomeScreen(Screen):
|
|||
# Always show monitor option
|
||||
buttons.append(Button("Monitor Services", variant="default", id="monitor-btn"))
|
||||
|
||||
# Always show diagnostics button
|
||||
buttons.append(Button("Diagnostics", variant="default", id="diagnostics-btn"))
|
||||
|
||||
return Horizontal(*buttons, classes="button-row")
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
|
|
@ -124,7 +128,7 @@ class WelcomeScreen(Screen):
|
|||
# Update the welcome text and recompose with new state
|
||||
try:
|
||||
welcome_widget = self.query_one("#welcome-text")
|
||||
welcome_widget.update(self._create_welcome_text())
|
||||
welcome_widget.update(self._create_welcome_text()) # This is fine for Static widgets
|
||||
|
||||
# Focus the appropriate button
|
||||
if self.services_running:
|
||||
|
|
@ -154,6 +158,8 @@ class WelcomeScreen(Screen):
|
|||
self.action_full_setup()
|
||||
elif event.button.id == "monitor-btn":
|
||||
self.action_monitor()
|
||||
elif event.button.id == "diagnostics-btn":
|
||||
self.action_diagnostics()
|
||||
|
||||
def action_default_action(self) -> None:
|
||||
"""Handle Enter key - go to default action based on state."""
|
||||
|
|
@ -179,6 +185,11 @@ class WelcomeScreen(Screen):
|
|||
from .monitor import MonitorScreen
|
||||
self.app.push_screen(MonitorScreen())
|
||||
|
||||
def action_diagnostics(self) -> None:
|
||||
"""Switch to diagnostics screen."""
|
||||
from .diagnostics import DiagnosticsScreen
|
||||
self.app.push_screen(DiagnosticsScreen())
|
||||
|
||||
def action_quit(self) -> None:
|
||||
"""Quit the application."""
|
||||
self.app.exit()
|
||||
|
|
@ -32,15 +32,31 @@ class PlatformDetector:
|
|||
|
||||
def detect_runtime(self) -> RuntimeInfo:
|
||||
"""Detect available container runtime and compose capabilities."""
|
||||
# First check if we have podman installed
|
||||
podman_version = self._get_podman_version()
|
||||
|
||||
# If we have podman, check if docker is actually podman in disguise
|
||||
if podman_version:
|
||||
docker_version = self._get_docker_version()
|
||||
if docker_version and podman_version in docker_version:
|
||||
# This is podman masquerading as docker
|
||||
if self._check_command(["docker", "compose", "--help"]):
|
||||
return RuntimeInfo(RuntimeType.PODMAN, ["docker", "compose"], ["docker"], podman_version)
|
||||
if self._check_command(["docker-compose", "--help"]):
|
||||
return RuntimeInfo(RuntimeType.PODMAN, ["docker-compose"], ["docker"], podman_version)
|
||||
|
||||
# Check for native podman compose
|
||||
if self._check_command(["podman", "compose", "--help"]):
|
||||
return RuntimeInfo(RuntimeType.PODMAN, ["podman", "compose"], ["podman"], podman_version)
|
||||
|
||||
# Check for actual docker
|
||||
if self._check_command(["docker", "compose", "--help"]):
|
||||
version = self._get_docker_version()
|
||||
return RuntimeInfo(RuntimeType.DOCKER, ["docker", "compose"], ["docker"], version)
|
||||
if self._check_command(["docker-compose", "--help"]):
|
||||
version = self._get_docker_version()
|
||||
return RuntimeInfo(RuntimeType.DOCKER_COMPOSE, ["docker-compose"], ["docker"], version)
|
||||
if self._check_command(["podman", "compose", "--help"]):
|
||||
version = self._get_podman_version()
|
||||
return RuntimeInfo(RuntimeType.PODMAN, ["podman", "compose"], ["podman"], version)
|
||||
|
||||
return RuntimeInfo(RuntimeType.NONE, [], [])
|
||||
|
||||
def detect_gpu_available(self) -> bool:
|
||||
|
|
|
|||
3
src/tui/widgets/__init__.py
Normal file
3
src/tui/widgets/__init__.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
"""Widgets for OpenRAG TUI."""
|
||||
|
||||
# Made with Bob
|
||||
130
src/tui/widgets/command_modal.py
Normal file
130
src/tui/widgets/command_modal.py
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
"""Command output modal dialog for OpenRAG TUI."""
|
||||
|
||||
import asyncio
|
||||
from typing import Callable, List, Optional, AsyncIterator, Any
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.worker import Worker
|
||||
from textual.containers import Container, ScrollableContainer
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, Static, Label, Log
|
||||
|
||||
|
||||
class CommandOutputModal(ModalScreen):
|
||||
"""Modal dialog for displaying command output in real-time."""
|
||||
|
||||
DEFAULT_CSS = """
|
||||
CommandOutputModal {
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
#dialog {
|
||||
width: 90%;
|
||||
height: 90%;
|
||||
border: thick $primary;
|
||||
background: $surface;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#title {
|
||||
background: $primary;
|
||||
color: $text;
|
||||
padding: 1 2;
|
||||
text-align: center;
|
||||
width: 100%;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
#output-container {
|
||||
height: 1fr;
|
||||
padding: 0;
|
||||
margin: 0 1;
|
||||
}
|
||||
|
||||
#command-output {
|
||||
height: 100%;
|
||||
border: solid $accent;
|
||||
padding: 1 2;
|
||||
margin: 1 0;
|
||||
background: $surface-darken-1;
|
||||
}
|
||||
|
||||
#button-row {
|
||||
width: 100%;
|
||||
height: auto;
|
||||
align: center middle;
|
||||
padding: 1;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
#button-row Button {
|
||||
margin: 0 1;
|
||||
min-width: 16;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
title: str,
|
||||
command_generator: AsyncIterator[tuple[bool, str]],
|
||||
on_complete: Optional[Callable] = None
|
||||
):
|
||||
"""Initialize the modal dialog.
|
||||
|
||||
Args:
|
||||
title: Title of the modal dialog
|
||||
command_generator: Async generator that yields (is_complete, message) tuples
|
||||
on_complete: Optional callback to run when command completes
|
||||
"""
|
||||
super().__init__()
|
||||
self.title_text = title
|
||||
self.command_generator = command_generator
|
||||
self.on_complete = on_complete
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Create the modal dialog layout."""
|
||||
with Container(id="dialog"):
|
||||
yield Label(self.title_text, id="title")
|
||||
with ScrollableContainer(id="output-container"):
|
||||
yield Log(id="command-output", highlight=True)
|
||||
with Container(id="button-row"):
|
||||
yield Button("Close", variant="primary", id="close-btn")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Start the command when the modal is mounted."""
|
||||
# Start the command but don't store the worker
|
||||
self.run_worker(self._run_command(), exclusive=False)
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button presses."""
|
||||
if event.button.id == "close-btn":
|
||||
self.dismiss()
|
||||
|
||||
async def _run_command(self) -> None:
|
||||
"""Run the command and update the output in real-time."""
|
||||
output = self.query_one("#command-output", Log)
|
||||
|
||||
try:
|
||||
async for is_complete, message in self.command_generator:
|
||||
# Add message to output
|
||||
output.write(message)
|
||||
|
||||
# Scroll to bottom
|
||||
container = self.query_one("#output-container", ScrollableContainer)
|
||||
container.scroll_end(animate=False)
|
||||
|
||||
# If command is complete, update UI
|
||||
if is_complete:
|
||||
output.write("[bold green]Command completed successfully[/bold green]")
|
||||
# Call the completion callback if provided
|
||||
if self.on_complete:
|
||||
await asyncio.sleep(0.5) # Small delay for better UX
|
||||
self.on_complete()
|
||||
except Exception as e:
|
||||
output.write(f"[bold red]Error: {e}[/bold red]")
|
||||
|
||||
# Enable the close button
|
||||
close_btn = self.query_one("#close-btn", Button)
|
||||
close_btn.disabled = False
|
||||
|
||||
# Made with Bob
|
||||
38
src/tui/widgets/diagnostics_notification.py
Normal file
38
src/tui/widgets/diagnostics_notification.py
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
"""Utility functions for showing error notifications with diagnostics button."""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from textual.app import App
|
||||
|
||||
|
||||
def notify_with_diagnostics(
|
||||
app: App,
|
||||
message: str,
|
||||
severity: Literal["information", "warning", "error"] = "error",
|
||||
timeout: float = 10.0
|
||||
) -> None:
|
||||
"""Show a notification with a button to open the diagnostics screen.
|
||||
|
||||
Args:
|
||||
app: The Textual app
|
||||
message: The notification message
|
||||
severity: The notification severity
|
||||
timeout: The notification timeout in seconds
|
||||
"""
|
||||
# First show the notification
|
||||
app.notify(message, severity=severity, timeout=timeout)
|
||||
|
||||
# Then add a button to open diagnostics screen
|
||||
def open_diagnostics() -> None:
|
||||
from ..screens.diagnostics import DiagnosticsScreen
|
||||
app.push_screen(DiagnosticsScreen())
|
||||
|
||||
# Add a separate notification with just the button
|
||||
app.notify(
|
||||
"Click to view diagnostics",
|
||||
severity="information",
|
||||
timeout=timeout,
|
||||
title="Diagnostics"
|
||||
)
|
||||
|
||||
# Made with Bob
|
||||
38
src/tui/widgets/error_notification.py
Normal file
38
src/tui/widgets/error_notification.py
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
"""Utility functions for showing error notifications with diagnostics button."""
|
||||
|
||||
from typing import Literal, Callable
|
||||
|
||||
from textual.app import App
|
||||
|
||||
|
||||
def notify_with_diagnostics(
|
||||
app: App,
|
||||
message: str,
|
||||
severity: Literal["information", "warning", "error"] = "error",
|
||||
timeout: float = 10.0
|
||||
) -> None:
|
||||
"""Show a notification with a button to open the diagnostics screen.
|
||||
|
||||
Args:
|
||||
app: The Textual app
|
||||
message: The notification message
|
||||
severity: The notification severity
|
||||
timeout: The notification timeout in seconds
|
||||
"""
|
||||
# First show the notification
|
||||
app.notify(message, severity=severity, timeout=timeout)
|
||||
|
||||
# Then add a button to open diagnostics screen
|
||||
def open_diagnostics() -> None:
|
||||
from ..screens.diagnostics import DiagnosticsScreen
|
||||
app.push_screen(DiagnosticsScreen())
|
||||
|
||||
# Add a separate notification with just the button
|
||||
app.notify(
|
||||
"Click to view diagnostics",
|
||||
severity="information",
|
||||
timeout=timeout,
|
||||
title="Diagnostics"
|
||||
)
|
||||
|
||||
# Made with Bob
|
||||
Loading…
Add table
Reference in a new issue