Merge branch 'main' of https://github.com/langflow-ai/openrag into delete-knowledge

This commit is contained in:
Deon Sanchez 2025-09-18 08:42:03 -06:00
commit 8890fa1946
7 changed files with 505 additions and 169 deletions

View file

@ -1,6 +1,6 @@
[project] [project]
name = "openrag" name = "openrag"
version = "0.1.3" version = "0.1.4"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"

View file

@ -144,14 +144,15 @@ class ContainerManager:
) )
# Simple approach: read line by line and yield each one # Simple approach: read line by line and yield each one
while True: if process.stdout:
line = await process.stdout.readline() while True:
if not line: line = await process.stdout.readline()
break if not line:
break
line_text = line.decode().rstrip() line_text = line.decode(errors="ignore").rstrip()
if line_text: if line_text:
yield line_text yield line_text
# Wait for process to complete # Wait for process to complete
await process.wait() await process.wait()
@ -159,6 +160,59 @@ class ContainerManager:
except Exception as e: except Exception as e:
yield f"Command execution failed: {e}" yield f"Command execution failed: {e}"
async def _stream_compose_command(
self,
args: List[str],
success_flag: Dict[str, bool],
cpu_mode: Optional[bool] = None,
) -> AsyncIterator[str]:
"""Run compose command with live output and record success/failure."""
if not self.is_available():
success_flag["value"] = False
yield "No container runtime available"
return
if cpu_mode is None:
cpu_mode = self.use_cpu_compose
compose_file = self.cpu_compose_file if cpu_mode else self.compose_file
cmd = self.runtime_info.compose_command + ["-f", str(compose_file)] + args
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=Path.cwd(),
)
except Exception as e:
success_flag["value"] = False
yield f"Command execution failed: {e}"
return
success_flag["value"] = True
if process.stdout:
while True:
line = await process.stdout.readline()
if not line:
break
line_text = line.decode(errors="ignore")
# Compose often uses carriage returns for progress bars; normalise them
for chunk in line_text.replace("\r", "\n").split("\n"):
chunk = chunk.strip()
if not chunk:
continue
yield chunk
lowered = chunk.lower()
if "error" in lowered or "failed" in lowered:
success_flag["value"] = False
returncode = await process.wait()
if returncode != 0:
success_flag["value"] = False
yield f"Command exited with status {returncode}"
async def _run_runtime_command(self, args: List[str]) -> tuple[bool, str, str]: async def _run_runtime_command(self, args: List[str]) -> tuple[bool, str, str]:
"""Run a runtime command (docker/podman) and return (success, stdout, stderr).""" """Run a runtime command (docker/podman) and return (success, stdout, stderr)."""
if not self.is_available(): if not self.is_available():
@ -408,19 +462,42 @@ class ContainerManager:
return results return results
async def start_services( async def start_services(
self, cpu_mode: bool = False self, cpu_mode: Optional[bool] = None
) -> AsyncIterator[tuple[bool, str]]: ) -> AsyncIterator[tuple[bool, str]]:
"""Start all services and yield progress updates.""" """Start all services and yield progress updates."""
if not self.is_available():
yield False, "No container runtime available"
return
yield False, "Starting OpenRAG services..." yield False, "Starting OpenRAG services..."
success, stdout, stderr = await self._run_compose_command( missing_images: List[str] = []
["up", "-d"], cpu_mode try:
) images_info = await self.get_project_images_info()
missing_images = [image for image, digest in images_info if digest == "-"]
except Exception:
missing_images = []
if success: if missing_images:
images_list = ", ".join(missing_images)
yield False, f"Pulling container images ({images_list})..."
pull_success = {"value": True}
async for line in self._stream_compose_command(
["pull"], pull_success, cpu_mode
):
yield False, line
if not pull_success["value"]:
yield False, "Some images failed to pull; attempting to start services anyway..."
yield False, "Creating and starting containers..."
up_success = {"value": True}
async for line in self._stream_compose_command(["up", "-d"], up_success, cpu_mode):
yield False, line
if up_success["value"]:
yield True, "Services started successfully" yield True, "Services started successfully"
else: else:
yield False, f"Failed to start services: {stderr}" yield False, "Failed to start services. See output above for details."
async def stop_services(self) -> AsyncIterator[tuple[bool, str]]: async def stop_services(self) -> AsyncIterator[tuple[bool, str]]:
"""Stop all services and yield progress updates.""" """Stop all services and yield progress updates."""

View file

@ -10,10 +10,11 @@ from typing import List, Optional
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, ScrollableContainer from textual.containers import Container, Vertical, Horizontal, ScrollableContainer
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Header, Footer, Static, Button, Label, Log from textual.widgets import Header, Footer, Static, Button, Log
from rich.text import Text from rich.text import Text
from ..managers.container_manager import ContainerManager from ..managers.container_manager import ContainerManager
from ..utils.clipboard import copy_text_to_clipboard
class DiagnosticsScreen(Screen): class DiagnosticsScreen(Screen):
@ -117,67 +118,13 @@ class DiagnosticsScreen(Screen):
content = "\n".join(str(line) for line in log.lines) content = "\n".join(str(line) for line in log.lines)
status = self.query_one("#copy-status", Static) status = self.query_one("#copy-status", Static)
# Try to use pyperclip if available success, message = copy_text_to_clipboard(content)
try: if success:
import pyperclip self.notify(message, severity="information")
status.update(f"{message}")
pyperclip.copy(content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
self._hide_status_after_delay(status)
return
except ImportError:
pass
# Fallback to platform-specific clipboard commands
import subprocess
import platform
system = platform.system()
if system == "Darwin": # macOS
process = subprocess.Popen(["pbcopy"], stdin=subprocess.PIPE, text=True)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
elif system == "Windows":
process = subprocess.Popen(["clip"], stdin=subprocess.PIPE, text=True)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
elif system == "Linux":
# Try xclip first, then xsel
try:
process = subprocess.Popen(
["xclip", "-selection", "clipboard"],
stdin=subprocess.PIPE,
text=True,
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
except FileNotFoundError:
try:
process = subprocess.Popen(
["xsel", "--clipboard", "--input"],
stdin=subprocess.PIPE,
text=True,
)
process.communicate(input=content)
self.notify("Copied to clipboard", severity="information")
status.update("✓ Content copied to clipboard")
except FileNotFoundError:
self.notify(
"Clipboard utilities not found. Install xclip or xsel.",
severity="error",
)
status.update(
"❌ Clipboard utilities not found. Install xclip or xsel."
)
else: else:
self.notify( self.notify(message, severity="error")
"Clipboard not supported on this platform", severity="error" status.update(f"{message}")
)
status.update("❌ Clipboard not supported on this platform")
self._hide_status_after_delay(status) self._hide_status_after_delay(status)
except Exception as e: except Exception as e:

View file

@ -10,11 +10,32 @@ from rich.text import Text
from ..managers.container_manager import ContainerManager from ..managers.container_manager import ContainerManager
from ..managers.docling_manager import DoclingManager from ..managers.docling_manager import DoclingManager
from ..utils.clipboard import copy_text_to_clipboard
class LogsScreen(Screen): class LogsScreen(Screen):
"""Logs viewing and monitoring screen.""" """Logs viewing and monitoring screen."""
CSS = """
#main-container {
height: 1fr;
}
#logs-content {
height: 1fr;
padding: 1 1 0 1;
}
#logs-area {
height: 1fr;
min-height: 30;
}
#logs-button-row {
padding: 1 0 0 0;
}
"""
BINDINGS = [ BINDINGS = [
("escape", "back", "Back"), ("escape", "back", "Back"),
("f", "follow", "Follow Logs"), ("f", "follow", "Follow Logs"),
@ -27,6 +48,7 @@ class LogsScreen(Screen):
("k", "scroll_up", "Scroll Up"), ("k", "scroll_up", "Scroll Up"),
("ctrl+u", "scroll_page_up", "Page Up"), ("ctrl+u", "scroll_page_up", "Page Up"),
("ctrl+f", "scroll_page_down", "Page Down"), ("ctrl+f", "scroll_page_down", "Page Down"),
("ctrl+c", "copy_logs", "Copy Logs"),
] ]
def __init__(self, initial_service: str = "openrag-backend"): def __init__(self, initial_service: str = "openrag-backend"):
@ -51,17 +73,17 @@ class LogsScreen(Screen):
self.following = False self.following = False
self.follow_task = None self.follow_task = None
self.auto_scroll = True self.auto_scroll = True
self._status_task = None
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""Create the logs screen layout.""" """Create the logs screen layout."""
yield Container( with Container(id="main-container"):
Vertical( with Vertical(id="logs-content"):
Static(f"Service Logs: {self.current_service}", id="logs-title"), yield Static(f"Service Logs: {self.current_service}", id="logs-title")
self._create_logs_area(), yield self._create_logs_area()
id="logs-content", with Horizontal(id="logs-button-row"):
), yield Button("Copy to Clipboard", variant="default", id="copy-btn")
id="main-container", yield Static("", id="copy-status", classes="copy-indicator")
)
yield Footer() yield Footer()
def _create_logs_area(self) -> TextArea: def _create_logs_area(self) -> TextArea:
@ -108,6 +130,9 @@ class LogsScreen(Screen):
def on_unmount(self) -> None: def on_unmount(self) -> None:
"""Clean up when unmounting.""" """Clean up when unmounting."""
self._stop_following() self._stop_following()
if self._status_task:
self._status_task.cancel()
self._status_task = None
async def _load_logs(self, lines: int = 200) -> None: async def _load_logs(self, lines: int = 200) -> None:
"""Load recent logs for the current service.""" """Load recent logs for the current service."""
@ -235,6 +260,10 @@ class LogsScreen(Screen):
"""Clear the logs area.""" """Clear the logs area."""
self.logs_area.text = "" self.logs_area.text = ""
def action_copy_logs(self) -> None:
"""Copy log content to the clipboard."""
self._copy_logs_to_clipboard()
def action_toggle_auto_scroll(self) -> None: def action_toggle_auto_scroll(self) -> None:
"""Toggle auto scroll on/off.""" """Toggle auto scroll on/off."""
self.auto_scroll = not self.auto_scroll self.auto_scroll = not self.auto_scroll
@ -284,3 +313,44 @@ class LogsScreen(Screen):
"""Go back to previous screen.""" """Go back to previous screen."""
self._stop_following() self._stop_following()
self.app.pop_screen() self.app.pop_screen()
def _copy_logs_to_clipboard(self) -> None:
"""Copy the current log buffer to the clipboard."""
if not self.logs_area:
return
content = self.logs_area.text or ""
status_widget = self.query_one("#copy-status", Static)
if not content.strip():
message = "No logs to copy"
self.notify(message, severity="warning")
status_widget.update(Text("⚠ No logs to copy", style="bold yellow"))
self._schedule_status_clear(status_widget)
return
success, message = copy_text_to_clipboard(content)
self.notify(message, severity="information" if success else "error")
prefix = "" if success else ""
style = "bold green" if success else "bold red"
status_widget.update(Text(f"{prefix} {message}", style=style))
self._schedule_status_clear(status_widget)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "copy-btn":
self._copy_logs_to_clipboard()
def _schedule_status_clear(self, widget: Static, delay: float = 3.0) -> None:
"""Clear the status message after a short delay."""
if self._status_task:
self._status_task.cancel()
async def _clear() -> None:
try:
await asyncio.sleep(delay)
widget.update("")
except asyncio.CancelledError:
pass
self._status_task = asyncio.create_task(_clear())

View file

@ -145,11 +145,7 @@ class MonitorScreen(Screen):
# Set up auto-refresh every 5 seconds # Set up auto-refresh every 5 seconds
self.refresh_timer = self.set_interval(5.0, self._auto_refresh) self.refresh_timer = self.set_interval(5.0, self._auto_refresh)
# Focus the services table self._focus_services_table()
try:
self.services_table.focus()
except Exception:
pass
def on_unmount(self) -> None: def on_unmount(self) -> None:
"""Clean up when unmounting.""" """Clean up when unmounting."""
@ -224,6 +220,9 @@ class MonitorScreen(Screen):
docling_pid, docling_pid,
"Start/Stop/Logs" "Start/Stop/Logs"
) )
# Restore docling selection when it was the last active table
if self._last_selected_table == "docling":
self._focus_docling_table(focus=False, set_last=False)
# Populate images table (unique images as reported by runtime) # Populate images table (unique images as reported by runtime)
if self.images_table: if self.images_table:
for image in sorted(images): for image in sorted(images):
@ -509,16 +508,52 @@ class MonitorScreen(Screen):
self.run_worker(self._refresh_services()) self.run_worker(self._refresh_services())
def action_cursor_down(self) -> None: def action_cursor_down(self) -> None:
"""Move cursor down in services table.""" """Move selection down, handling both tables."""
active_table = self._get_active_table_name()
try: try:
self.services_table.action_cursor_down() if active_table == "docling":
return # Nothing to move within docling table
if not self.services_table:
return
row_count = self._table_row_count(self.services_table)
current = self._get_cursor_row(self.services_table)
if current is None:
current = 0
if current < row_count - 1:
self.services_table.action_cursor_down()
self._last_selected_table = "services"
elif self._table_row_count(self.docling_table):
self._focus_docling_table()
except Exception: except Exception:
pass pass
def action_cursor_up(self) -> None: def action_cursor_up(self) -> None:
"""Move cursor up in services table.""" """Move selection up, handling both tables."""
active_table = self._get_active_table_name()
try: try:
self.services_table.action_cursor_up() if active_table == "docling":
self._focus_services_table(row="last")
return
if not self.services_table:
return
current = self._get_cursor_row(self.services_table)
if current is None:
current = 0
if current > 0:
self.services_table.action_cursor_up()
else:
# Already at the top; nothing else to do
self._set_cursor_row(self.services_table, 0)
self._last_selected_table = "services"
except Exception: except Exception:
pass pass
@ -664,59 +699,37 @@ class MonitorScreen(Screen):
self.notify(f"Error opening logs: {e}", severity="error") self.notify(f"Error opening logs: {e}", severity="error")
def _get_selected_service(self) -> str | None: def _get_selected_service(self) -> str | None:
"""Get the currently selected service from either table.""" """Resolve the currently selected service based on active table."""
try: try:
# Check both tables regardless of last_selected_table to handle cursor navigation active_table = self._get_active_table_name()
services_table = self.query_one("#services-table", DataTable)
services_cursor = services_table.cursor_row
docling_cursor = None if active_table == "docling" and self.docling_table:
if self.docling_table: cursor = self._get_cursor_row(self.docling_table)
docling_cursor = self.docling_table.cursor_row if cursor is not None and cursor >= 0:
# If we have a last selected table preference, use it if that table has a valid selection
if self._last_selected_table == "docling" and self.docling_table:
if docling_cursor is not None and docling_cursor >= 0:
row_data = self.docling_table.get_row_at(docling_cursor)
if row_data:
return "docling-serve"
elif self._last_selected_table == "services":
if services_cursor is not None and services_cursor >= 0:
row_data = services_table.get_row_at(services_cursor)
if row_data:
service_name = str(row_data[0])
service_mapping = {
"openrag-backend": "openrag-backend",
"openrag-frontend": "openrag-frontend",
"opensearch": "opensearch",
"langflow": "langflow",
"dashboards": "dashboards",
}
selected_service = service_mapping.get(service_name, service_name)
return selected_service
# Fallback: check both tables if no last_selected_table or it doesn't have a selection
if self.docling_table and docling_cursor is not None and docling_cursor >= 0:
row_data = self.docling_table.get_row_at(docling_cursor)
if row_data:
return "docling-serve" return "docling-serve"
if services_cursor is not None and services_cursor >= 0: services_table = self.query_one("#services-table", DataTable)
row_data = services_table.get_row_at(services_cursor) row_count = self._table_row_count(services_table)
if row_data: if row_count == 0:
service_name = str(row_data[0]) return None
service_mapping = {
"openrag-backend": "openrag-backend",
"openrag-frontend": "openrag-frontend",
"opensearch": "opensearch",
"langflow": "langflow",
"dashboards": "dashboards",
}
selected_service = service_mapping.get(service_name, service_name)
return selected_service
return None cursor = self._get_cursor_row(services_table)
if cursor is None or cursor < 0 or cursor >= row_count:
cursor = 0
row_data = services_table.get_row_at(cursor)
if not row_data:
return None
service_name = str(row_data[0])
service_mapping = {
"openrag-backend": "openrag-backend",
"openrag-frontend": "openrag-frontend",
"opensearch": "opensearch",
"langflow": "langflow",
"dashboards": "dashboards",
}
return service_mapping.get(service_name, service_name)
except Exception as e: except Exception as e:
self.notify(f"Error getting selected service: {e}", severity="error") self.notify(f"Error getting selected service: {e}", severity="error")
return None return None
@ -728,15 +741,118 @@ class MonitorScreen(Screen):
try: try:
# Track which table was selected # Track which table was selected
if selected_table.id == "services-table": if selected_table.id == "services-table":
self._last_selected_table = "services" self._focus_services_table(row="current")
# Clear docling table selection
if self.docling_table:
self.docling_table.cursor_row = -1
elif selected_table.id == "docling-table": elif selected_table.id == "docling-table":
self._last_selected_table = "docling" self._focus_docling_table()
# Clear services table selection
services_table = self.query_one("#services-table", DataTable)
services_table.cursor_row = -1
except Exception: except Exception:
# Ignore errors during table manipulation # Ignore errors during table manipulation
pass pass
def _get_active_table_name(self) -> str:
"""Determine which table is currently active."""
if self.docling_table and self.docling_table.has_focus:
return "docling"
if self.services_table and self.services_table.has_focus:
return "services"
return self._last_selected_table or "services"
def _table_row_count(self, table: DataTable | None) -> int:
"""Safely compute the number of rows in a DataTable."""
if not table:
return 0
count_attr = getattr(table, "row_count", None)
if callable(count_attr):
try:
return int(count_attr())
except Exception:
pass
if isinstance(count_attr, int):
return count_attr
try:
rows = getattr(table, "rows", None)
if rows is not None:
return len(rows)
except Exception:
pass
return 0
def _get_cursor_row(self, table: DataTable | None) -> int | None:
"""Return the current cursor row for the given table."""
if not table:
return None
coord = getattr(table, "cursor_coordinate", None)
if coord is None:
return None
row = getattr(coord, "row", None)
if row is not None:
return row
if isinstance(coord, tuple) and coord:
return coord[0]
return None
def _set_cursor_row(self, table: DataTable | None, row: int) -> None:
"""Set the cursor row for the given table, if possible."""
if not table:
return
try:
table.cursor_coordinate = (row, 0)
except Exception:
move_cursor = getattr(table, "move_cursor", None)
if callable(move_cursor):
try:
move_cursor(row, 0, expand=False)
except Exception:
pass
def _focus_services_table(self, row: str | None = None, set_last: bool = True) -> None:
"""Focus the services table and update selection."""
if not self.services_table:
return
try:
self.services_table.focus()
row_count = self._table_row_count(self.services_table)
if row_count:
if row == "last":
self._set_cursor_row(self.services_table, row_count - 1)
elif row == "current":
# Keep existing cursor position if valid
cursor = self._get_cursor_row(self.services_table)
if cursor is None or cursor < 0 or cursor >= row_count:
self._set_cursor_row(self.services_table, 0)
else:
cursor = self._get_cursor_row(self.services_table)
if cursor is None or cursor < 0:
self._set_cursor_row(self.services_table, 0)
if set_last:
self._last_selected_table = "services"
except Exception:
pass
def _focus_docling_table(self, focus: bool = True, set_last: bool = True) -> None:
"""Focus the docling table and select its row."""
if not self.docling_table:
return
try:
if focus:
self.docling_table.focus()
if self._table_row_count(self.docling_table):
self._set_cursor_row(self.docling_table, 0)
if set_last:
self._last_selected_table = "docling"
except Exception:
pass

View file

@ -0,0 +1,50 @@
"""Clipboard helper utilities for the TUI."""
from __future__ import annotations
import platform
import subprocess
from typing import Tuple
def copy_text_to_clipboard(text: str) -> Tuple[bool, str]:
"""Copy ``text`` to the system clipboard.
Returns a tuple of (success, message) so callers can surface feedback to users.
"""
# Try optional dependency first for cross-platform consistency
try:
import pyperclip # type: ignore
pyperclip.copy(text)
return True, "Copied to clipboard"
except ImportError:
# Fall back to platform-specific commands
pass
except Exception as exc: # pragma: no cover - defensive catch for pyperclip edge cases
return False, f"Clipboard error: {exc}"
system = platform.system()
try:
if system == "Darwin":
process = subprocess.Popen(["pbcopy"], stdin=subprocess.PIPE, text=True)
process.communicate(input=text)
return True, "Copied to clipboard"
if system == "Windows":
process = subprocess.Popen(["clip"], stdin=subprocess.PIPE, text=True)
process.communicate(input=text)
return True, "Copied to clipboard"
if system == "Linux":
for command in (["xclip", "-selection", "clipboard"], ["xsel", "--clipboard", "--input"]):
try:
process = subprocess.Popen(command, stdin=subprocess.PIPE, text=True)
process.communicate(input=text)
return True, "Copied to clipboard"
except FileNotFoundError:
continue
return False, "Clipboard utilities not found. Install xclip or xsel."
return False, "Clipboard not supported on this platform"
except Exception as exc: # pragma: no cover - subprocess errors
return False, f"Clipboard error: {exc}"

View file

@ -2,14 +2,15 @@
import asyncio import asyncio
import inspect import inspect
from typing import Callable, List, Optional, AsyncIterator, Any from typing import Callable, Optional, AsyncIterator
from rich.text import Text
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.worker import Worker
from textual.containers import Container, ScrollableContainer from textual.containers import Container, ScrollableContainer
from textual.screen import ModalScreen from textual.screen import ModalScreen
from textual.widgets import Button, Static, Label, RichLog from textual.widgets import Button, Static, Label, TextArea
from rich.console import Console
from ..utils.clipboard import copy_text_to_clipboard
class CommandOutputModal(ModalScreen): class CommandOutputModal(ModalScreen):
@ -46,11 +47,14 @@ class CommandOutputModal(ModalScreen):
#command-output { #command-output {
height: 100%; height: 100%;
border: solid $accent; border: solid $accent;
padding: 1 2;
margin: 1 0; margin: 1 0;
background: $surface-darken-1; background: $surface-darken-1;
} }
#command-output > .text-area--content {
padding: 1 2;
}
#button-row { #button-row {
width: 100%; width: 100%;
height: auto; height: auto;
@ -63,6 +67,11 @@ class CommandOutputModal(ModalScreen):
margin: 0 1; margin: 0 1;
min-width: 16; min-width: 16;
} }
#copy-status {
text-align: center;
margin-bottom: 1;
}
""" """
def __init__( def __init__(
@ -82,44 +91,66 @@ class CommandOutputModal(ModalScreen):
self.title_text = title self.title_text = title
self.command_generator = command_generator self.command_generator = command_generator
self.on_complete = on_complete self.on_complete = on_complete
self._output_text: str = ""
self._status_task: Optional[asyncio.Task] = None
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""Create the modal dialog layout.""" """Create the modal dialog layout."""
with Container(id="dialog"): with Container(id="dialog"):
yield Label(self.title_text, id="title") yield Label(self.title_text, id="title")
with ScrollableContainer(id="output-container"): with ScrollableContainer(id="output-container"):
yield RichLog(id="command-output", highlight=True, markup=True) yield TextArea(
text="",
read_only=True,
show_line_numbers=False,
id="command-output",
)
with Container(id="button-row"): with Container(id="button-row"):
yield Button("Close", variant="primary", id="close-btn") yield Button("Copy Output", variant="default", id="copy-btn")
yield Button(
"Close", variant="primary", id="close-btn", disabled=True
)
yield Static("", id="copy-status")
def on_mount(self) -> None: def on_mount(self) -> None:
"""Start the command when the modal is mounted.""" """Start the command when the modal is mounted."""
# Start the command but don't store the worker # Start the command but don't store the worker
self.run_worker(self._run_command(), exclusive=False) self.run_worker(self._run_command(), exclusive=False)
# Focus the output so users can select text immediately
try:
self.query_one("#command-output", TextArea).focus()
except Exception:
pass
def on_unmount(self) -> None:
"""Cancel any pending timers when modal closes."""
if self._status_task:
self._status_task.cancel()
self._status_task = None
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses.""" """Handle button presses."""
if event.button.id == "close-btn": if event.button.id == "close-btn":
self.dismiss() self.dismiss()
elif event.button.id == "copy-btn":
self.copy_to_clipboard()
async def _run_command(self) -> None: async def _run_command(self) -> None:
"""Run the command and update the output in real-time.""" """Run the command and update the output in real-time."""
output = self.query_one("#command-output", RichLog) output = self.query_one("#command-output", TextArea)
container = self.query_one("#output-container", ScrollableContainer)
try: try:
async for is_complete, message in self.command_generator: async for is_complete, message in self.command_generator:
# Simple approach: just append each line as it comes self._append_output(message)
output.write(message + "\n") output.text = self._output_text
# Scroll to bottom
container = self.query_one("#output-container", ScrollableContainer)
container.scroll_end(animate=False) container.scroll_end(animate=False)
# If command is complete, update UI # If command is complete, update UI
if is_complete: if is_complete:
output.write( self._append_output("Command completed successfully")
"[bold green]Command completed successfully[/bold green]\n" output.text = self._output_text
) container.scroll_end(animate=False)
# Call the completion callback if provided # Call the completion callback if provided
if self.on_complete: if self.on_complete:
await asyncio.sleep(0.5) # Small delay for better UX await asyncio.sleep(0.5) # Small delay for better UX
@ -131,12 +162,57 @@ class CommandOutputModal(ModalScreen):
self.call_after_refresh(_invoke_callback) self.call_after_refresh(_invoke_callback)
except Exception as e: except Exception as e:
output.write(f"[bold red]Error: {e}[/bold red]\n") self._append_output(f"Error: {e}")
output.text = self._output_text
container.scroll_end(animate=False)
finally:
# Enable the close button and focus it
close_btn = self.query_one("#close-btn", Button)
close_btn.disabled = False
close_btn.focus()
# Enable the close button and focus it def _append_output(self, message: str) -> None:
close_btn = self.query_one("#close-btn", Button) """Append a message to the output buffer."""
close_btn.disabled = False if message is None:
close_btn.focus() return
message = message.rstrip("\n")
if not message:
return
if self._output_text:
self._output_text += "\n" + message
else:
self._output_text = message
def copy_to_clipboard(self) -> None:
"""Copy the modal output to the clipboard."""
if not self._output_text:
message = "No output to copy yet"
self.notify(message, severity="warning")
status = self.query_one("#copy-status", Static)
status.update(Text(message, style="bold yellow"))
self._schedule_status_clear(status)
return
success, message = copy_text_to_clipboard(self._output_text)
self.notify(message, severity="information" if success else "error")
status = self.query_one("#copy-status", Static)
style = "bold green" if success else "bold red"
status.update(Text(message, style=style))
self._schedule_status_clear(status)
def _schedule_status_clear(self, widget: Static, delay: float = 3.0) -> None:
"""Clear the status message after a delay."""
if self._status_task:
self._status_task.cancel()
async def _clear() -> None:
try:
await asyncio.sleep(delay)
widget.update("")
except asyncio.CancelledError:
pass
self._status_task = asyncio.create_task(_clear())
# Made with Bob # Made with Bob