Merge pull request #516 from langflow-ai/fix-port-conflict

fix: More graceful handling of port conflicts
This commit is contained in:
Sebastián Estévez 2025-11-26 04:36:52 -05:00 committed by GitHub
commit eeebc674e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 273 additions and 6 deletions

View file

@ -31,15 +31,23 @@ jobs:
steps: steps:
- run: df -h - run: df -h
#- name: "node-cleanup"
#run: | - name: Cleanup Docker cache
# sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL run: |
# sudo docker image prune --all --force docker system prune -af || true
# sudo docker builder prune -a docker builder prune -af || true
- run: df -h - run: df -h
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Verify workspace
run: |
echo "Current directory: $(pwd)"
echo "Workspace: ${GITHUB_WORKSPACE}"
ls -la
- name: Set up UV - name: Set up UV
uses: astral-sh/setup-uv@v3 uses: astral-sh/setup-uv@v3
with: with:

View file

@ -43,6 +43,7 @@ class ServiceInfo:
image: Optional[str] = None image: Optional[str] = None
image_digest: Optional[str] = None image_digest: Optional[str] = None
created: Optional[str] = None created: Optional[str] = None
error_message: Optional[str] = None
def __post_init__(self): def __post_init__(self):
if self.ports is None: if self.ports is None:
@ -135,6 +136,96 @@ class ContainerManager:
return self.platform_detector.get_compose_installation_instructions() return self.platform_detector.get_compose_installation_instructions()
return self.platform_detector.get_installation_instructions() return self.platform_detector.get_installation_instructions()
def _extract_ports_from_compose(self) -> Dict[str, List[int]]:
"""Extract port mappings from compose files.
Returns:
Dict mapping service name to list of host ports
"""
service_ports: Dict[str, List[int]] = {}
compose_files = [self.compose_file]
if hasattr(self, 'cpu_compose_file') and self.cpu_compose_file and self.cpu_compose_file.exists():
compose_files.append(self.cpu_compose_file)
for compose_file in compose_files:
if not compose_file.exists():
continue
try:
import re
content = compose_file.read_text()
current_service = None
in_ports_section = False
for line in content.splitlines():
# Detect service names
service_match = re.match(r'^ (\w[\w-]*):$', line)
if service_match:
current_service = service_match.group(1)
in_ports_section = False
if current_service not in service_ports:
service_ports[current_service] = []
continue
# Detect ports section
if current_service and re.match(r'^ ports:$', line):
in_ports_section = True
continue
# Exit ports section on new top-level key
if in_ports_section and re.match(r'^ \w+:', line):
in_ports_section = False
# Extract port mappings
if in_ports_section and current_service:
# Match patterns like: - "3000:3000", - "9200:9200", - 7860:7860
port_match = re.search(r'["\']?(\d+):\d+["\']?', line)
if port_match:
host_port = int(port_match.group(1))
if host_port not in service_ports[current_service]:
service_ports[current_service].append(host_port)
except Exception as e:
logger.debug(f"Error parsing {compose_file} for ports: {e}")
continue
return service_ports
async def check_ports_available(self) -> tuple[bool, List[tuple[str, int, str]]]:
"""Check if required ports are available.
Returns:
Tuple of (all_available, conflicts) where conflicts is a list of
(service_name, port, error_message) tuples
"""
import socket
service_ports = self._extract_ports_from_compose()
conflicts: List[tuple[str, int, str]] = []
for service_name, ports in service_ports.items():
for port in ports:
try:
# Try to bind to the port to check if it's available
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
if result == 0:
# Port is in use
conflicts.append((
service_name,
port,
f"Port {port} is already in use"
))
except Exception as e:
logger.debug(f"Error checking port {port}: {e}")
continue
return (len(conflicts) == 0, conflicts)
async def _run_compose_command( async def _run_compose_command(
self, args: List[str], cpu_mode: Optional[bool] = None self, args: List[str], cpu_mode: Optional[bool] = None
) -> tuple[bool, str, str]: ) -> tuple[bool, str, str]:
@ -655,6 +746,17 @@ class ContainerManager:
yield False, f"ERROR: Compose file not found at {compose_file.absolute()}", False yield False, f"ERROR: Compose file not found at {compose_file.absolute()}", False
return return
# Check for port conflicts before starting
yield False, "Checking port availability...", False
ports_available, conflicts = await self.check_ports_available()
if not ports_available:
yield False, "ERROR: Port conflicts detected:", False
for service_name, port, error_msg in conflicts:
yield False, f" - {service_name}: {error_msg}", False
yield False, "Please stop the conflicting services and try again.", False
yield False, "Services not started due to port conflicts.", False
return
yield False, "Starting OpenRAG services...", False yield False, "Starting OpenRAG services...", False
missing_images: List[str] = [] missing_images: List[str] = []
@ -677,13 +779,37 @@ class ContainerManager:
yield False, "Creating and starting containers...", False yield False, "Creating and starting containers...", False
up_success = {"value": True} up_success = {"value": True}
error_messages = []
async for message, replace_last in self._stream_compose_command(["up", "-d"], up_success, cpu_mode): async for message, replace_last in self._stream_compose_command(["up", "-d"], up_success, cpu_mode):
# Detect error patterns in the output
import re
lower_msg = message.lower()
# Check for common error patterns
if any(pattern in lower_msg for pattern in [
"port.*already.*allocated",
"address already in use",
"bind.*address already in use",
"port is already allocated"
]):
error_messages.append("Port conflict detected")
up_success["value"] = False
elif "error" in lower_msg or "failed" in lower_msg:
# Generic error detection
if message not in error_messages:
error_messages.append(message)
yield False, message, replace_last yield False, message, replace_last
if up_success["value"]: if up_success["value"]:
yield True, "Services started successfully", False yield True, "Services started successfully", False
else: else:
yield False, "Failed to start services. See output above for details.", False yield False, "Failed to start services. See output above for details.", False
if error_messages:
yield False, "\nDetected errors:", False
for err in error_messages[:5]: # Limit to first 5 errors
yield False, f" - {err}", False
async def stop_services(self) -> AsyncIterator[tuple[bool, str]]: async def stop_services(self) -> AsyncIterator[tuple[bool, str]]:
"""Stop all services and yield progress updates.""" """Stop all services and yield progress updates."""

View file

@ -143,6 +143,29 @@ class DoclingManager:
self._external_process = False self._external_process = False
return False return False
def check_port_available(self) -> tuple[bool, Optional[str]]:
"""Check if the native service port is available.
Returns:
Tuple of (available, error_message)
"""
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex(('127.0.0.1', self._port))
sock.close()
if result == 0:
# Port is in use
return False, f"Port {self._port} is already in use"
return True, None
except Exception as e:
logger.debug(f"Error checking port {self._port}: {e}")
# If we can't check, assume it's available
return True, None
def get_status(self) -> Dict[str, Any]: def get_status(self) -> Dict[str, Any]:
"""Get current status of docling serve.""" """Get current status of docling serve."""
# Check for starting state first # Check for starting state first

View file

@ -311,17 +311,46 @@ class MonitorScreen(Screen):
"""Start services with progress updates.""" """Start services with progress updates."""
self.operation_in_progress = True self.operation_in_progress = True
try: try:
# Check for port conflicts before attempting to start
ports_available, conflicts = await self.container_manager.check_ports_available()
if not ports_available:
# Show error notification instead of modal
conflict_msgs = []
for service_name, port, error_msg in conflicts[:3]: # Show first 3
conflict_msgs.append(f"{service_name} (port {port})")
conflict_str = ", ".join(conflict_msgs)
if len(conflicts) > 3:
conflict_str += f" and {len(conflicts) - 3} more"
self.notify(
f"Cannot start services: Port conflicts detected for {conflict_str}. "
f"Please stop the conflicting services first.",
severity="error",
timeout=10
)
# Refresh to show current state
await self._refresh_services()
return
# Show command output in modal dialog # Show command output in modal dialog
command_generator = self.container_manager.start_services(cpu_mode) command_generator = self.container_manager.start_services(cpu_mode)
modal = CommandOutputModal( modal = CommandOutputModal(
"Starting Services", "Starting Services",
command_generator, command_generator,
on_complete=None, # We'll refresh in on_screen_resume instead on_complete=self._on_start_complete, # Refresh after completion
) )
self.app.push_screen(modal) self.app.push_screen(modal)
except Exception as e:
self.notify(f"Error starting services: {str(e)}", severity="error")
await self._refresh_services()
finally: finally:
self.operation_in_progress = False self.operation_in_progress = False
async def _on_start_complete(self) -> None:
"""Callback after service start completes."""
await self._refresh_services()
async def _stop_services(self) -> None: async def _stop_services(self) -> None:
"""Stop services with progress updates.""" """Stop services with progress updates."""
self.operation_in_progress = True self.operation_in_progress = True
@ -386,6 +415,19 @@ class MonitorScreen(Screen):
"""Start docling serve.""" """Start docling serve."""
self.operation_in_progress = True self.operation_in_progress = True
try: try:
# Check for port conflicts before attempting to start
port_available, error_msg = self.docling_manager.check_port_available()
if not port_available:
self.notify(
f"Cannot start docling serve: {error_msg}. "
f"Please stop the conflicting service first.",
severity="error",
timeout=10
)
# Refresh to show current state
await self._refresh_services()
return
# Start the service (this sets _starting = True internally at the start) # Start the service (this sets _starting = True internally at the start)
# Create task and let it begin executing (which sets the flag) # Create task and let it begin executing (which sets the flag)
start_task = asyncio.create_task(self.docling_manager.start()) start_task = asyncio.create_task(self.docling_manager.start())

View file

@ -385,6 +385,34 @@ class WelcomeScreen(Screen):
async def _start_all_services(self) -> None: async def _start_all_services(self) -> None:
"""Start all services: containers first, then native services.""" """Start all services: containers first, then native services."""
# Check for port conflicts before attempting to start anything
conflicts = []
# Check container ports
if self.container_manager.is_available():
ports_available, port_conflicts = await self.container_manager.check_ports_available()
if not ports_available:
for service_name, port, error_msg in port_conflicts[:3]: # Show first 3
conflicts.append(f"{service_name} (port {port})")
if len(port_conflicts) > 3:
conflicts.append(f"and {len(port_conflicts) - 3} more")
# Check native service port
port_available, error_msg = self.docling_manager.check_port_available()
if not port_available:
conflicts.append(f"docling (port {self.docling_manager._port})")
# If there are any conflicts, show error and return
if conflicts:
conflict_str = ", ".join(conflicts)
self.notify(
f"Cannot start services: Port conflicts detected for {conflict_str}. "
f"Please stop the conflicting services first.",
severity="error",
timeout=10
)
return
# Step 1: Start container services first (to create the network) # Step 1: Start container services first (to create the network)
if self.container_manager.is_available(): if self.container_manager.is_available():
command_generator = self.container_manager.start_services() command_generator = self.container_manager.start_services()
@ -410,6 +438,20 @@ class WelcomeScreen(Screen):
async def _start_native_services_after_containers(self) -> None: async def _start_native_services_after_containers(self) -> None:
"""Start native services after containers have been started.""" """Start native services after containers have been started."""
if not self.docling_manager.is_running(): if not self.docling_manager.is_running():
# Check for port conflicts before attempting to start
port_available, error_msg = self.docling_manager.check_port_available()
if not port_available:
self.notify(
f"Cannot start native services: {error_msg}. "
f"Please stop the conflicting service first.",
severity="error",
timeout=10
)
# Update state and return
self.docling_running = False
await self._refresh_welcome_content()
return
self.notify("Starting native services...", severity="information") self.notify("Starting native services...", severity="information")
success, message = await self.docling_manager.start() success, message = await self.docling_manager.start()
if success: if success:

View file

@ -23,6 +23,7 @@ class CommandOutputModal(ModalScreen):
("p", "pause_waves", "Pause"), ("p", "pause_waves", "Pause"),
("f", "speed_up", "Faster"), ("f", "speed_up", "Faster"),
("s", "speed_down", "Slower"), ("s", "speed_down", "Slower"),
("escape", "close_modal", "Close"),
] ]
DEFAULT_CSS = """ DEFAULT_CSS = """
@ -188,6 +189,8 @@ class CommandOutputModal(ModalScreen):
self._output_lines: list[str] = [] self._output_lines: list[str] = []
self._layer_line_map: dict[str, int] = {} # Maps layer ID to line index self._layer_line_map: dict[str, int] = {} # Maps layer ID to line index
self._status_task: Optional[asyncio.Task] = None self._status_task: Optional[asyncio.Task] = None
self._error_detected = False
self._command_complete = False
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""Create the modal dialog layout.""" """Create the modal dialog layout."""
@ -254,6 +257,12 @@ class CommandOutputModal(ModalScreen):
for w in waves.wavelets: for w in waves.wavelets:
w.speed = max(0.1, w.speed * 0.8) w.speed = max(0.1, w.speed * 0.8)
def action_close_modal(self) -> None:
"""Close the modal (only if error detected or command complete)."""
close_btn = self.query_one("#close-btn", Button)
if not close_btn.disabled:
self.dismiss()
async def _run_command(self) -> None: async def _run_command(self) -> None:
"""Run the command and update the output in real-time.""" """Run the command and update the output in real-time."""
output = self.query_one("#command-output", TextArea) output = self.query_one("#command-output", TextArea)
@ -273,8 +282,25 @@ class CommandOutputModal(ModalScreen):
# Move cursor to end to trigger scroll # Move cursor to end to trigger scroll
output.move_cursor((len(self._output_lines), 0)) output.move_cursor((len(self._output_lines), 0))
# Detect error patterns in messages
import re
lower_msg = message.lower() if message else ""
if not self._error_detected and any(pattern in lower_msg for pattern in [
"error:",
"failed",
"port.*already.*allocated",
"address already in use",
"not found",
"permission denied"
]):
self._error_detected = True
# Enable close button when error detected
close_btn = self.query_one("#close-btn", Button)
close_btn.disabled = False
# If command is complete, update UI # If command is complete, update UI
if is_complete: if is_complete:
self._command_complete = True
self._update_output("Command completed successfully", False) self._update_output("Command completed successfully", False)
output.text = "\n".join(self._output_lines) output.text = "\n".join(self._output_lines)
output.move_cursor((len(self._output_lines), 0)) output.move_cursor((len(self._output_lines), 0))