"""Container lifecycle manager for OpenRAG TUI.""" import asyncio import json import os import re import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Dict, List, Optional, AsyncIterator from utils.logging_config import get_logger try: from importlib.resources import files except ImportError: from importlib_resources import files logger = get_logger(__name__) from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType from utils.gpu_detection import detect_gpu_devices class ServiceStatus(Enum): """Container service status.""" UNKNOWN = "unknown" RUNNING = "running" STOPPED = "stopped" STARTING = "starting" STOPPING = "stopping" ERROR = "error" MISSING = "missing" @dataclass class ServiceInfo: """Container service information.""" name: str status: ServiceStatus health: Optional[str] = None ports: List[str] = field(default_factory=list) image: Optional[str] = None image_digest: Optional[str] = None created: Optional[str] = None error_message: Optional[str] = None def __post_init__(self): if self.ports is None: self.ports = [] class ContainerManager: """Manages Docker/Podman container lifecycle for OpenRAG.""" def __init__(self, compose_file: Optional[Path] = None): self.platform_detector = PlatformDetector() self.runtime_info = self.platform_detector.detect_runtime() self.compose_file = compose_file or self._find_compose_file("docker-compose.yml") self.gpu_compose_file = self._find_compose_file("docker-compose.gpu.yml") self.services_cache: Dict[str, ServiceInfo] = {} self.last_status_update = 0 # Auto-select GPU override if GPU is available try: has_gpu, _ = detect_gpu_devices() self.use_gpu_compose = has_gpu except Exception: self.use_gpu_compose = False # Expected services based on compose files self.expected_services = [ "openrag-backend", "openrag-frontend", "opensearch", "dashboards", "langflow", ] # Map container names to service names self.container_name_map = { "openrag-backend": "openrag-backend", "openrag-frontend": "openrag-frontend", "os": "opensearch", "osdash": "dashboards", "langflow": "langflow", } def _find_compose_file(self, filename: str) -> Path: """Find compose file in current directory or package resources.""" # First check current working directory cwd_path = Path(filename) self._compose_search_log = f"Searching for {filename}:\n" self._compose_search_log += f" 1. Current directory: {cwd_path.absolute()}" if cwd_path.exists(): self._compose_search_log += " ✓ FOUND" return cwd_path else: self._compose_search_log += " ✗ NOT FOUND" # Then check package resources self._compose_search_log += f"\n 2. Package resources: " try: pkg_files = files("tui._assets") self._compose_search_log += f"{pkg_files}" compose_resource = pkg_files / filename if compose_resource.is_file(): self._compose_search_log += f" ✓ FOUND, copying to current directory" # Copy to cwd for compose command to work content = compose_resource.read_text() cwd_path.write_text(content) return cwd_path else: self._compose_search_log += f" ✗ NOT FOUND" except Exception as e: self._compose_search_log += f" ✗ SKIPPED ({e})" # Don't log this as an error since it's expected when running from source # Fall back to original path (will fail later if not found) self._compose_search_log += f"\n 3. Falling back to: {cwd_path.absolute()}" return Path(filename) def _get_env_from_file(self) -> Dict[str, str]: """Read environment variables from .env file, prioritizing file values over os.environ. Uses python-dotenv's load_dotenv() for standard .env file parsing, which handles: - Quoted values (single and double quotes) - Variable expansion (${VAR}) - Multiline values - Escaped characters - Comments This ensures Docker Compose commands use the latest values from .env file, even if os.environ has stale values. """ from dotenv import load_dotenv env = dict(os.environ) # Start with current environment env_file = Path(".env") if env_file.exists(): try: # Load .env file with override=True to ensure file values take precedence # This loads into os.environ, then we copy to our dict load_dotenv(dotenv_path=env_file, override=True) # Update our dict with all environment variables (including those from .env) env.update(os.environ) except Exception as e: logger.debug(f"Error reading .env file for Docker Compose: {e}") return env def is_available(self) -> bool: """Check if container runtime with compose is available.""" return (self.runtime_info.runtime_type != RuntimeType.NONE and len(self.runtime_info.compose_command) > 0) def get_runtime_info(self) -> RuntimeInfo: """Get container runtime information.""" return self.runtime_info def get_installation_help(self) -> str: """Get installation instructions based on what's missing.""" if self.runtime_info.has_runtime_without_compose: return self.platform_detector.get_compose_installation_instructions() return self.platform_detector.get_installation_instructions() def _extract_ports_from_compose(self) -> Dict[str, List[int]]: """Extract port mappings from compose files. Returns: Dict mapping service name to list of host ports """ service_ports: Dict[str, List[int]] = {} compose_files = [self.compose_file] if hasattr(self, 'cpu_compose_file') and self.cpu_compose_file and self.cpu_compose_file.exists(): compose_files.append(self.cpu_compose_file) for compose_file in compose_files: if not compose_file.exists(): continue try: content = compose_file.read_text() current_service = None in_ports_section = False for line in content.splitlines(): # Detect service names service_match = re.match(r'^ (\w[\w-]*):$', line) if service_match: current_service = service_match.group(1) in_ports_section = False if current_service not in service_ports: service_ports[current_service] = [] continue # Detect ports section if current_service and re.match(r'^ ports:$', line): in_ports_section = True continue # Exit ports section on new top-level key if in_ports_section and re.match(r'^ \w+:', line): in_ports_section = False # Extract port mappings if in_ports_section and current_service: # Match patterns like: - "3000:3000", - "9200:9200", - 7860:7860 port_match = re.search(r'["\']?(\d+):\d+["\']?', line) if port_match: host_port = int(port_match.group(1)) if host_port not in service_ports[current_service]: service_ports[current_service].append(host_port) except Exception as e: logger.debug(f"Error parsing {compose_file} for ports: {e}") continue return service_ports async def check_ports_available(self) -> tuple[bool, List[tuple[str, int, str]]]: """Check if required ports are available. Returns: Tuple of (all_available, conflicts) where conflicts is a list of (service_name, port, error_message) tuples """ import socket service_ports = self._extract_ports_from_compose() conflicts: List[tuple[str, int, str]] = [] for service_name, ports in service_ports.items(): for port in ports: try: # Try to bind to the port to check if it's available sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) result = sock.connect_ex(('127.0.0.1', port)) sock.close() if result == 0: # Port is in use conflicts.append(( service_name, port, f"Port {port} is already in use" )) except Exception as e: logger.debug(f"Error checking port {port}: {e}") continue return (len(conflicts) == 0, conflicts) async def _run_compose_command( self, args: List[str], cpu_mode: Optional[bool] = None ) -> tuple[bool, str, str]: """Run a compose command and return (success, stdout, stderr).""" if not self.is_available(): return False, "", "No container runtime available" if cpu_mode is None: use_gpu = self.use_gpu_compose else: use_gpu = not cpu_mode # Build compose command with override pattern cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if use_gpu and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.extend(args) try: # Get environment variables from .env file to ensure latest values env = self._get_env_from_file() process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=Path.cwd(), env=env, ) stdout, stderr = await process.communicate() stdout_text = stdout.decode() if stdout else "" stderr_text = stderr.decode() if stderr else "" success = process.returncode == 0 return success, stdout_text, stderr_text except Exception as e: return False, "", f"Command execution failed: {e}" async def _run_compose_command_streaming( self, args: List[str], cpu_mode: Optional[bool] = None ) -> AsyncIterator[tuple[str, bool]]: """Run a compose command and yield output with progress bar support. Yields: Tuples of (message, replace_last) where replace_last indicates if the message should replace the previous line (for progress updates) """ if not self.is_available(): yield ("No container runtime available", False) return if cpu_mode is None: use_gpu = self.use_gpu_compose else: use_gpu = not cpu_mode # Build compose command with override pattern cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if use_gpu and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.extend(args) try: # Get environment variables from .env file to ensure latest values env = self._get_env_from_file() process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), env=env, ) if process.stdout: buffer = "" while True: chunk = await process.stdout.read(1024) if not chunk: if buffer.strip(): yield (buffer.strip(), False) break buffer += chunk.decode(errors="ignore") while "\n" in buffer or "\r" in buffer: cr_pos = buffer.find("\r") nl_pos = buffer.find("\n") if cr_pos != -1 and (nl_pos == -1 or cr_pos < nl_pos): line = buffer[:cr_pos] buffer = buffer[cr_pos + 1:] if line.strip(): yield (line.strip(), True) elif nl_pos != -1: line = buffer[:nl_pos] buffer = buffer[nl_pos + 1:] if line.strip(): yield (line.strip(), False) else: break await process.wait() except Exception as e: yield (f"Command execution failed: {e}", False) async def _stream_compose_command( self, args: List[str], success_flag: Dict[str, bool], cpu_mode: Optional[bool] = None, ) -> AsyncIterator[tuple[str, bool]]: """Run compose command with live output and record success/failure. Yields: Tuples of (message, replace_last) where replace_last indicates if the message should replace the previous line (for progress updates) """ if not self.is_available(): success_flag["value"] = False yield ("No container runtime available", False) return if cpu_mode is None: use_gpu = self.use_gpu_compose else: use_gpu = not cpu_mode # Build compose command with override pattern cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if use_gpu and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.extend(args) try: # Get environment variables from .env file to ensure latest values env = self._get_env_from_file() process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), env=env, ) except Exception as e: success_flag["value"] = False yield (f"Command execution failed: {e}", False) return success_flag["value"] = True if process.stdout: # Buffer to accumulate data for progress bar handling buffer = "" while True: chunk = await process.stdout.read(1024) if not chunk: # Process any remaining buffer content if buffer.strip(): yield (buffer.strip(), False) break buffer += chunk.decode(errors="ignore") # Process complete lines or carriage return updates while "\n" in buffer or "\r" in buffer: # Check if we have a carriage return (progress update) before newline cr_pos = buffer.find("\r") nl_pos = buffer.find("\n") if cr_pos != -1 and (nl_pos == -1 or cr_pos < nl_pos): # Carriage return found - extract and yield as replaceable line line = buffer[:cr_pos] buffer = buffer[cr_pos + 1:] if line.strip(): yield (line.strip(), True) # replace_last=True for progress updates elif nl_pos != -1: # Newline found - extract and yield as new line line = buffer[:nl_pos] buffer = buffer[nl_pos + 1:] if line.strip(): lowered = line.lower() yield (line.strip(), False) # replace_last=False for new lines if "error" in lowered or "failed" in lowered: success_flag["value"] = False else: break returncode = await process.wait() if returncode != 0: success_flag["value"] = False yield (f"Command exited with status {returncode}", False) async def _run_runtime_command(self, args: List[str]) -> tuple[bool, str, str]: """Run a runtime command (docker/podman) and return (success, stdout, stderr).""" if not self.is_available(): return False, "", "No container runtime available" cmd = self.runtime_info.runtime_command + args try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() stdout_text = stdout.decode() if stdout else "" stderr_text = stderr.decode() if stderr else "" success = process.returncode == 0 return success, stdout_text, stderr_text except Exception as e: return False, "", f"Command execution failed: {e}" def _process_service_json( self, service: Dict, services: Dict[str, ServiceInfo] ) -> None: """Process a service JSON object and add it to the services dict.""" # Debug print to see the actual service data logger.debug("Processing service data", service_data=service) container_name = service.get("Name", "") # Map container name to service name service_name = self.container_name_map.get(container_name) if not service_name: return state = service.get("State", "").lower() # Map compose states to our status enum if "running" in state: status = ServiceStatus.RUNNING elif "exited" in state or "stopped" in state: status = ServiceStatus.STOPPED elif "starting" in state: status = ServiceStatus.STARTING else: status = ServiceStatus.UNKNOWN # Extract health - use Status if Health is empty health = service.get("Health", "") or service.get("Status", "N/A") # Extract ports ports_str = service.get("Ports", "") ports = ( [p.strip() for p in ports_str.split(",") if p.strip()] if ports_str else [] ) # Extract image image = service.get("Image", "N/A") services[service_name] = ServiceInfo( name=service_name, status=status, health=health, ports=ports, image=image, ) async def get_container_version(self) -> Optional[str]: """ Get the version tag from existing containers. Checks the backend container image tag to determine version. Returns: Version string if found, None if no containers exist or version can't be determined """ try: # Check for backend container first (most reliable) success, stdout, _ = await self._run_runtime_command( ["ps", "--all", "--filter", "name=openrag-backend", "--format", "{{.Image}}"] ) if success and stdout.strip(): image_tag = stdout.strip().splitlines()[0].strip() if not image_tag or image_tag == "N/A": return None # Extract version from image tag (e.g., langflowai/openrag-backend:0.1.47) if ":" in image_tag: version = image_tag.split(":")[-1] # If version is "latest", check .env file for OPENRAG_VERSION if version == "latest": # Try to get version from .env file try: from pathlib import Path env_file = Path(".env") if env_file.exists(): env_content = env_file.read_text() for line in env_content.splitlines(): line = line.strip() if line.startswith("OPENRAG_VERSION"): env_version = line.split("=", 1)[1].strip() # Remove quotes if present env_version = env_version.strip("'\"") if env_version and env_version != "latest": return env_version except Exception: pass # If still "latest", we can't determine version - return None return None # Return version if it looks like a version number (not "latest") if version and version != "latest": return version # Fallback: check all containers for version tags success, stdout, _ = await self._run_runtime_command( ["ps", "--all", "--format", "{{.Image}}"] ) if success and stdout.strip(): images = stdout.strip().splitlines() for image in images: image = image.strip() if "openrag" in image.lower() and ":" in image: version = image.split(":")[-1] if version and version != "latest": return version except Exception as e: logger.debug(f"Error getting container version: {e}") return None async def check_version_mismatch(self) -> tuple[bool, Optional[str], str]: """ Check if existing containers have a different version than the current TUI. Returns: Tuple of (has_mismatch, container_version, tui_version) """ try: from ..utils.version_check import get_current_version tui_version = get_current_version() if tui_version == "unknown": return False, None, tui_version container_version = await self.get_container_version() if container_version is None: # No containers exist, no mismatch return False, None, tui_version # Compare versions from ..utils.version_check import compare_versions comparison = compare_versions(container_version, tui_version) has_mismatch = comparison != 0 return has_mismatch, container_version, tui_version except Exception as e: logger.debug(f"Error checking version mismatch: {e}") return False, None, "unknown" async def get_service_status( self, force_refresh: bool = False ) -> Dict[str, ServiceInfo]: """Get current status of all services.""" current_time = time.time() # Use cache if recent and not forcing refresh if not force_refresh and current_time - self.last_status_update < 5: return self.services_cache services = {} # Different approach for Podman vs Docker if self.runtime_info.runtime_type == RuntimeType.PODMAN: # For Podman, use direct podman ps command instead of compose cmd = ["ps", "--all", "--format", "json"] success, stdout, stderr = await self._run_runtime_command(cmd) if success and stdout.strip(): try: containers = json.loads(stdout.strip()) for container in containers: # Get container name and map to service name names = container.get("Names", []) if not names: continue container_name = names[0] service_name = self.container_name_map.get(container_name) if not service_name: continue # Get container state state = container.get("State", "").lower() if "running" in state: status = ServiceStatus.RUNNING elif "exited" in state or "stopped" in state: status = ServiceStatus.STOPPED elif "created" in state: status = ServiceStatus.STARTING else: status = ServiceStatus.UNKNOWN # Get other container info image = container.get("Image", "N/A") ports = [] # Handle case where Ports might be None instead of an empty list container_ports = container.get("Ports") or [] if isinstance(container_ports, list): for port in container_ports: host_port = port.get("host_port") container_port = port.get("container_port") if host_port and container_port: ports.append(f"{host_port}:{container_port}") services[service_name] = ServiceInfo( name=service_name, status=status, health=state, ports=ports, image=image, ) except json.JSONDecodeError: pass else: # For Docker, use compose ps command success, stdout, stderr = await self._run_compose_command( ["ps", "--format", "json"] ) if success and stdout.strip(): try: # Handle both single JSON object (Podman) and multiple JSON objects (Docker) if stdout.strip().startswith("[") and stdout.strip().endswith("]"): # JSON array format service_list = json.loads(stdout.strip()) for service in service_list: self._process_service_json(service, services) else: # Line-by-line JSON format for line in stdout.strip().split("\n"): if line.strip() and line.startswith("{"): service = json.loads(line) self._process_service_json(service, services) except json.JSONDecodeError: # Fallback to parsing text output lines = stdout.strip().split("\n") if ( len(lines) > 1 ): # Make sure we have at least a header and one line for line in lines[1:]: # Skip header if line.strip(): parts = line.split() if len(parts) >= 3: name = parts[0] # Only include our expected services if name not in self.expected_services: continue state = parts[2].lower() if "up" in state: status = ServiceStatus.RUNNING elif "exit" in state: status = ServiceStatus.STOPPED else: status = ServiceStatus.UNKNOWN services[name] = ServiceInfo( name=name, status=status ) # Add expected services that weren't found for expected in self.expected_services: if expected not in services: services[expected] = ServiceInfo( name=expected, status=ServiceStatus.MISSING ) self.services_cache = services self.last_status_update = current_time return services async def get_images_digests(self, images: List[str]) -> Dict[str, str]: """Return a map of image -> digest/ID (sha256:...).""" digests: Dict[str, str] = {} for image in images: if not image or image in digests: continue success, stdout, _ = await self._run_runtime_command( ["image", "inspect", image, "--format", "{{.Id}}"] ) if success and stdout.strip(): digests[image] = stdout.strip().splitlines()[0] return digests def _extract_images_from_compose_config(self, text: str, tried_json: bool) -> set[str]: """ Try JSON first (if we asked for it or it looks like JSON), then YAML if available. Returns a set of image names. """ images: set[str] = set() # Try JSON parse if tried_json or (text.lstrip().startswith("{") and text.rstrip().endswith("}")): try: cfg = json.loads(text) services = cfg.get("services", {}) for _, svc in services.items(): image = svc.get("image") if image: images.add(str(image)) if images: return images except json.JSONDecodeError: pass # Try YAML (if available) - import here to avoid hard dependency try: import yaml cfg = yaml.safe_load(text) or {} services = cfg.get("services", {}) if isinstance(services, dict): for _, svc in services.items(): if isinstance(svc, dict): image = svc.get("image") if image: images.add(str(image)) if images: return images except Exception: pass return images async def _parse_compose_images(self) -> list[str]: """Get resolved image names from compose files using docker/podman compose, with robust fallbacks.""" images: set[str] = set() # Try both GPU and CPU modes to get all images for use_gpu in [True, False]: try: # Build compose command with override pattern cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if use_gpu and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.extend(["config", "--format", "json"]) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=Path.cwd(), ) stdout, stderr = await process.communicate() stdout_text = stdout.decode() if stdout else "" if process.returncode == 0 and stdout_text.strip(): from_cfg = self._extract_images_from_compose_config(stdout_text, tried_json=True) if from_cfg: images.update(from_cfg) continue # Fallback to YAML output (for older compose versions) cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if use_gpu and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.append("config") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=Path.cwd(), ) stdout, stderr = await process.communicate() stdout_text = stdout.decode() if stdout else "" if process.returncode == 0 and stdout_text.strip(): from_cfg = self._extract_images_from_compose_config(stdout_text, tried_json=False) if from_cfg: images.update(from_cfg) continue except Exception: # Keep behavior resilient—just continue to next mode continue # Fallback: manual parsing if compose config didn't work if not images: compose_files = [self.compose_file] if self.gpu_compose_file.exists(): compose_files.append(self.gpu_compose_file) for compose in compose_files: try: if not compose.exists(): continue for line in compose.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("image:"): # image: repo/name:tag val = line.split(":", 1)[1].strip() # Remove quotes if present if (val.startswith('"') and val.endswith('"')) or ( val.startswith("'") and val.endswith("'") ): val = val[1:-1] if val: images.add(val) except Exception: continue return sorted(images) async def get_project_images_info(self) -> list[tuple[str, str]]: """ Return list of (image, digest_or_id) for images referenced by compose files. If an image isn't present locally, returns '-' for its digest. """ expected = await self._parse_compose_images() results: list[tuple[str, str]] = [] for image in expected: digest = "-" success, stdout, _ = await self._run_runtime_command( ["image", "inspect", image, "--format", "{{.Id}}"] ) if success and stdout.strip(): digest = stdout.strip().splitlines()[0] results.append((image, digest)) results.sort(key=lambda x: x[0]) return results async def start_services( self, cpu_mode: Optional[bool] = None ) -> AsyncIterator[tuple[bool, str]]: """Start all services and yield progress updates.""" if not self.is_available(): yield False, "No container runtime available" return # Ensure OPENRAG_VERSION is set in .env file try: from ..managers.env_manager import EnvManager env_manager = EnvManager() env_manager.ensure_openrag_version() except Exception: pass # Continue even if version setting fails # Determine GPU mode if cpu_mode is None: use_gpu = self.use_gpu_compose else: use_gpu = not cpu_mode # Show the search process for debugging if hasattr(self, '_compose_search_log'): for line in self._compose_search_log.split('\n'): if line.strip(): yield False, line, False # Show runtime detection info runtime_cmd_str = " ".join(self.runtime_info.compose_command) yield False, f"Using compose command: {runtime_cmd_str}", False compose_files_str = str(self.compose_file.absolute()) if use_gpu and self.gpu_compose_file.exists(): compose_files_str += f" + {self.gpu_compose_file.absolute()}" yield False, f"Compose files: {compose_files_str}", False if not self.compose_file.exists(): yield False, f"ERROR: Base compose file not found at {self.compose_file.absolute()}", False return # Check for port conflicts before starting yield False, "Checking port availability...", False ports_available, conflicts = await self.check_ports_available() if not ports_available: yield False, "ERROR: Port conflicts detected:", False for service_name, port, error_msg in conflicts: yield False, f" - {service_name}: {error_msg}", False yield False, "Please stop the conflicting services and try again.", False yield False, "Services not started due to port conflicts.", False return yield False, "Starting OpenRAG services...", False missing_images: List[str] = [] try: images_info = await self.get_project_images_info() missing_images = [image for image, digest in images_info if digest == "-"] except Exception: missing_images = [] if missing_images: images_list = ", ".join(missing_images) yield False, f"Pulling container images ({images_list})...", False pull_success = {"value": True} async for message, replace_last in self._stream_compose_command( ["pull"], pull_success, cpu_mode ): yield False, message, replace_last if not pull_success["value"]: yield False, "Some images failed to pull; attempting to start services anyway...", False yield False, "Creating and starting containers...", False up_success = {"value": True} error_messages = [] async for message, replace_last in self._stream_compose_command(["up", "-d"], up_success, cpu_mode): # Detect error patterns in the output lower_msg = message.lower() # Check for common error patterns if any(pattern in lower_msg for pattern in [ "port.*already.*allocated", "address already in use", "bind.*address already in use", "port is already allocated" ]): error_messages.append("Port conflict detected") up_success["value"] = False elif "error" in lower_msg or "failed" in lower_msg: # Generic error detection if message not in error_messages: error_messages.append(message) yield False, message, replace_last if up_success["value"]: yield True, "Services started successfully", False else: yield False, "Failed to start services. See output above for details.", False if error_messages: yield False, "\nDetected errors:", False for err in error_messages[:5]: # Limit to first 5 errors yield False, f" - {err}", False async def stop_services(self) -> AsyncIterator[tuple[bool, str]]: """Stop all services and yield progress updates.""" yield False, "Stopping OpenRAG services..." success, stdout, stderr = await self._run_compose_command(["stop"]) if success: yield True, "Services stopped successfully" else: yield False, f"Failed to stop services: {stderr}" async def restart_services( self, cpu_mode: bool = False ) -> AsyncIterator[tuple[bool, str]]: """Restart all services and yield progress updates.""" yield False, "Restarting OpenRAG services..." success, stdout, stderr = await self._run_compose_command(["restart"], cpu_mode) if success: yield True, "Services restarted successfully" else: yield False, f"Failed to restart services: {stderr}" async def upgrade_services( self, cpu_mode: bool = False ) -> AsyncIterator[tuple[bool, str]]: """Upgrade services (pull latest images and restart) and yield progress updates.""" yield False, "Pulling latest images...", False # Pull latest images with streaming output pull_success = True async for message, replace_last in self._run_compose_command_streaming(["pull"], cpu_mode): yield False, message, replace_last # Check for error patterns in the output if "error" in message.lower() or "failed" in message.lower(): pull_success = False if not pull_success: yield False, "Failed to pull some images, but continuing with restart...", False yield False, "Images updated, restarting services...", False # Restart with new images using streaming output restart_success = True async for message, replace_last in self._run_compose_command_streaming( ["up", "-d", "--force-recreate"], cpu_mode ): yield False, message, replace_last # Check for error patterns in the output if "error" in message.lower() or "failed" in message.lower(): restart_success = False if restart_success: yield True, "Services upgraded and restarted successfully", False else: yield False, "Some errors occurred during service restart", False async def clear_opensearch_data_volume(self) -> AsyncIterator[tuple[bool, str]]: """Clear opensearch data using a temporary container with proper permissions.""" if not self.is_available(): yield False, "No container runtime available" return yield False, "Clearing OpenSearch data volume..." # Get the absolute path to opensearch-data directory opensearch_data_path = Path("opensearch-data").absolute() if not opensearch_data_path.exists(): yield True, "OpenSearch data directory does not exist, skipping" return # Use the opensearch container with proper volume mount flags # :Z flag ensures proper SELinux labeling and UID mapping for rootless containers cmd = [ "run", "--rm", "-v", f"{opensearch_data_path}:/usr/share/opensearch/data:Z", "langflowai/openrag-opensearch:latest", "bash", "-c", "rm -rf /usr/share/opensearch/data/* /usr/share/opensearch/data/.[!.]* && echo 'Cleared successfully'" ] success, stdout, stderr = await self._run_runtime_command(cmd) if success and "Cleared successfully" in stdout: yield True, "OpenSearch data cleared successfully" else: # If it fails, try with the base opensearch image yield False, "Retrying with base OpenSearch image..." cmd = [ "run", "--rm", "-v", f"{opensearch_data_path}:/usr/share/opensearch/data:Z", "opensearchproject/opensearch:3.0.0", "bash", "-c", "rm -rf /usr/share/opensearch/data/* /usr/share/opensearch/data/.[!.]* && echo 'Cleared successfully'" ] success, stdout, stderr = await self._run_runtime_command(cmd) if success and "Cleared successfully" in stdout: yield True, "OpenSearch data cleared successfully" else: yield False, f"Failed to clear OpenSearch data: {stderr if stderr else 'Unknown error'}" async def reset_services(self) -> AsyncIterator[tuple[bool, str]]: """Reset all services (stop, remove containers/volumes, clear data) and yield progress updates.""" yield False, "Stopping all services..." # Stop and remove everything success, stdout, stderr = await self._run_compose_command( ["down", "--volumes", "--remove-orphans", "--rmi", "local"] ) if not success: yield False, f"Failed to stop services: {stderr}" return yield False, "Cleaning up container data..." # Additional cleanup - remove any remaining containers/volumes # This is more thorough than just compose down await self._run_runtime_command(["system", "prune", "-f"]) yield ( True, "System reset completed - all containers, volumes, and local images removed", ) async def get_service_logs( self, service_name: str, lines: int = 100 ) -> tuple[bool, str]: """Get logs for a specific service.""" success, stdout, stderr = await self._run_compose_command( ["logs", "--tail", str(lines), service_name] ) if success: return True, stdout else: return False, f"Failed to get logs: {stderr}" async def follow_service_logs(self, service_name: str) -> AsyncIterator[str]: """Follow logs for a specific service.""" if not self.is_available(): yield "No container runtime available" return # Build compose command with override pattern cmd = self.runtime_info.compose_command + ["-f", str(self.compose_file)] if self.use_gpu_compose and self.gpu_compose_file.exists(): cmd.extend(["-f", str(self.gpu_compose_file)]) cmd.extend(["logs", "-f", service_name]) try: # Get environment variables from .env file to ensure latest values env = self._get_env_from_file() process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), env=env, ) if process.stdout: while True: line = await process.stdout.readline() if line: yield line.decode().rstrip() else: break else: yield "Error: Unable to read process output" except Exception as e: yield f"Error following logs: {e}" async def get_system_stats(self) -> Dict[str, Dict[str, str]]: """Get system resource usage statistics.""" stats = {} # Get container stats success, stdout, stderr = await self._run_runtime_command( ["stats", "--no-stream", "--format", "json"] ) if success and stdout.strip(): try: for line in stdout.strip().split("\n"): if line.strip(): data = json.loads(line) name = data.get("Name", data.get("Container", "")) if name: stats[name] = { "cpu": data.get("CPUPerc", "0%"), "memory": data.get("MemUsage", "0B / 0B"), "memory_percent": data.get("MemPerc", "0%"), "network": data.get("NetIO", "0B / 0B"), "disk": data.get("BlockIO", "0B / 0B"), } except json.JSONDecodeError: pass return stats async def debug_podman_services(self) -> str: """Run a direct Podman command to check services status for debugging.""" if self.runtime_info.runtime_type != RuntimeType.PODMAN: return "Not using Podman" # Try direct podman command cmd = ["podman", "ps", "--all", "--format", "json"] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=Path.cwd(), ) stdout, stderr = await process.communicate() stdout_text = stdout.decode() if stdout else "" stderr_text = stderr.decode() if stderr else "" result = f"Command: {' '.join(cmd)}\n" result += f"Return code: {process.returncode}\n" result += f"Stdout: {stdout_text}\n" result += f"Stderr: {stderr_text}\n" # Try to parse the output if stdout_text.strip(): try: containers = json.loads(stdout_text) result += f"\nFound {len(containers)} containers:\n" for container in containers: name = container.get("Names", [""])[0] state = container.get("State", "") result += f" - {name}: {state}\n" except json.JSONDecodeError as e: result += f"\nFailed to parse JSON: {e}\n" return result except Exception as e: return f"Error executing command: {e}" def check_podman_macos_memory(self) -> tuple[bool, str]: """Check if Podman VM has sufficient memory on macOS.""" if self.runtime_info.runtime_type != RuntimeType.PODMAN: return True, "Not using Podman" is_sufficient, memory_mb, message = ( self.platform_detector.check_podman_macos_memory() ) return is_sufficient, message