diff --git a/src/auth_middleware.py b/src/auth_middleware.py index d9f19e8c..06f5b0d0 100644 --- a/src/auth_middleware.py +++ b/src/auth_middleware.py @@ -3,7 +3,9 @@ from starlette.responses import JSONResponse from typing import Optional from session_manager import User from config.settings import is_no_auth_mode +from utils.logging_config import get_logger +logger = get_logger(__name__) def get_current_user(request: Request, session_manager) -> Optional[User]: """Extract current user from request cookies""" @@ -25,7 +27,7 @@ def require_auth(session_manager): async def wrapper(request: Request): # In no-auth mode, bypass authentication entirely if is_no_auth_mode(): - print(f"[DEBUG] No-auth mode: Creating anonymous user") + logger.debug("No-auth mode: Creating anonymous user") # Create an anonymous user object so endpoints don't break from session_manager import User from datetime import datetime @@ -33,7 +35,7 @@ def require_auth(session_manager): from session_manager import AnonymousUser request.state.user = AnonymousUser() request.state.jwt_token = None # No JWT in no-auth mode - print(f"[DEBUG] Set user_id=anonymous, jwt_token=None") + logger.debug("Set user_id=anonymous, jwt_token=None") return await handler(request) user = get_current_user(request, session_manager) diff --git a/src/connectors/service.py b/src/connectors/service.py index c2225f5c..dd3e5151 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -8,7 +8,9 @@ from .google_drive import GoogleDriveConnector from .sharepoint import SharePointConnector from .onedrive import OneDriveConnector from .connection_manager import ConnectionManager +from utils.logging_config import get_logger +logger = get_logger(__name__) class ConnectorService: """Service to manage document connectors and process files""" @@ -63,7 +65,7 @@ class ConnectorService: doc_service = DocumentService(session_manager=self.session_manager) - print(f"[DEBUG] Processing connector document with ID: {document.id}") + logger.debug(f"Processing connector document with ID: {document.id}") # Process using the existing pipeline but with connector document metadata result = await doc_service.process_file_common( @@ -78,7 +80,7 @@ class ConnectorService: connector_type=connector_type, ) - print(f"[DEBUG] Document processing result: {result}") + logger.debug(f"Document processing result: {result}") # If successfully indexed or already exists, update the indexed documents with connector metadata if result["status"] in ["indexed", "unchanged"]: @@ -105,7 +107,7 @@ class ConnectorService: jwt_token: str = None, ): """Update indexed chunks with connector-specific metadata""" - print(f"[DEBUG] Looking for chunks with document_id: {document.id}") + logger.debug(f"Looking for chunks with document_id: {document.id}") # Find all chunks for this document query = {"query": {"term": {"document_id": document.id}}} @@ -118,25 +120,23 @@ class ConnectorService: try: response = await opensearch_client.search(index=self.index_name, body=query) except Exception as e: - print( + logger.error( f"[ERROR] OpenSearch search failed for connector metadata update: {e}" ) - print(f"[ERROR] Search query: {query}") + logger.error(f"Search query: {query}") raise - print(f"[DEBUG] Search query: {query}") - print( - f"[DEBUG] Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}" - ) + logger.debug(f"Search query: {query}") + logger.debug(f"Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}") # Update each chunk with connector metadata - print( + logger.debug( f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}" ) for hit in response["hits"]["hits"]: chunk_id = hit["_id"] current_connector_type = hit["_source"].get("connector_type", "unknown") - print( + logger.debug( f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}" ) @@ -165,10 +165,10 @@ class ConnectorService: await opensearch_client.update( index=self.index_name, id=chunk_id, body=update_body ) - print(f"[DEBUG] Updated chunk {chunk_id} with connector metadata") + logger.debug(f"Updated chunk {chunk_id} with connector metadata") except Exception as e: - print(f"[ERROR] OpenSearch update failed for chunk {chunk_id}: {e}") - print(f"[ERROR] Update body: {update_body}") + logger.error(f"OpenSearch update failed for chunk {chunk_id}: {e}") + logger.error(f"Update body: {update_body}") raise def _get_file_extension(self, mimetype: str) -> str: @@ -201,7 +201,7 @@ class ConnectorService: "TaskService not available - connector sync requires task service dependency" ) - print( + logger.debug( f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}" ) @@ -211,7 +211,7 @@ class ConnectorService: f"Connection '{connection_id}' not found or not authenticated" ) - print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}") + logger.debug(f"Got connector, authenticated: {connector.is_authenticated}") if not connector.is_authenticated: raise ValueError(f"Connection '{connection_id}' not authenticated") @@ -225,11 +225,11 @@ class ConnectorService: while True: # List files from connector with limit - print( + logger.debug( f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}" ) file_list = await connector.list_files(page_token, limit=page_size) - print(f"[DEBUG] Got {len(file_list.get('files', []))} files") + logger.debug(f"Got {len(file_list.get('files', []))} files") files = file_list["files"] if not files: diff --git a/src/models/processors.py b/src/models/processors.py index 254f0374..acc0f81b 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,7 +1,9 @@ from abc import ABC, abstractmethod from typing import Any, Dict from .tasks import UploadTask, FileTask +from utils.logging_config import get_logger +logger = get_logger(__name__) class TaskProcessor(ABC): """Abstract base class for task processors""" @@ -225,10 +227,10 @@ class S3FileProcessor(TaskProcessor): index=INDEX_NAME, id=chunk_id, body=chunk_doc ) except Exception as e: - print( + logger.error( f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}" ) - print(f"[ERROR] Chunk document: {chunk_doc}") + logger.error(f"Chunk document: {chunk_doc}") raise result = {"status": "indexed", "id": slim_doc["id"]} diff --git a/src/services/task_service.py b/src/services/task_service.py index bb09db39..a85e9b18 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -7,7 +7,9 @@ from typing import Dict from models.tasks import TaskStatus, UploadTask, FileTask from src.utils.gpu_detection import get_worker_count +from utils.logging_config import get_logger +logger = get_logger(__name__) class TaskService: def __init__(self, document_service=None, process_pool=None): @@ -104,7 +106,7 @@ class TaskService: await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: - print(f"[ERROR] Background upload processor failed for task {task_id}: {e}") + logger.error(f"Background upload processor failed for task {task_id}: {e}") import traceback traceback.print_exc() @@ -136,7 +138,7 @@ class TaskService: try: await processor.process_item(upload_task, item, file_task) except Exception as e: - print(f"[ERROR] Failed to process item {item}: {e}") + logger.error(f"Failed to process item {item}: {e}") import traceback traceback.print_exc() @@ -157,13 +159,13 @@ class TaskService: upload_task.updated_at = time.time() except asyncio.CancelledError: - print(f"[INFO] Background processor for task {task_id} was cancelled") + logger.info(f"Background processor for task {task_id} was cancelled") if user_id in self.task_store and task_id in self.task_store[user_id]: # Task status and pending files already handled by cancel_task() pass raise # Re-raise to properly handle cancellation except Exception as e: - print(f"[ERROR] Background custom processor failed for task {task_id}: {e}") + logger.error(f"Background custom processor failed for task {task_id}: {e}") import traceback traceback.print_exc() diff --git a/src/session_manager.py b/src/session_manager.py index 0d55acd7..29298006 100644 --- a/src/session_manager.py +++ b/src/session_manager.py @@ -6,7 +6,9 @@ from typing import Dict, Optional, Any from dataclasses import dataclass, asdict from cryptography.hazmat.primitives import serialization import os +from utils.logging_config import get_logger +logger = get_logger(__name__) @dataclass class User: @@ -93,13 +95,13 @@ class SessionManager: if response.status_code == 200: return response.json() else: - print( + logger.error( f"Failed to get user info: {response.status_code} {response.text}" ) return None except Exception as e: - print(f"Error getting user info: {e}") + logger.error(f"Error getting user info: {e}") return None async def create_user_session( @@ -186,19 +188,19 @@ class SessionManager: """Get or create OpenSearch client for user with their JWT""" from config.settings import is_no_auth_mode - print( - f"[DEBUG] get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}" + logger.debug( + f"get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}" ) # In no-auth mode, create anonymous JWT for OpenSearch DLS if is_no_auth_mode() and jwt_token is None: if not hasattr(self, "_anonymous_jwt"): # Create anonymous JWT token for OpenSearch OIDC - print(f"[DEBUG] Creating anonymous JWT...") + logger.debug("Creating anonymous JWT...") self._anonymous_jwt = self._create_anonymous_jwt() - print(f"[DEBUG] Anonymous JWT created: {self._anonymous_jwt[:50]}...") + logger.debug(f"Anonymous JWT created: {self._anonymous_jwt[:50]}...") jwt_token = self._anonymous_jwt - print(f"[DEBUG] Using anonymous JWT for OpenSearch") + logger.debug("Using anonymous JWT for OpenSearch") # Check if we have a cached client for this user if user_id not in self.user_opensearch_clients: diff --git a/src/tui/main.py b/src/tui/main.py index aa29756f..8687219f 100644 --- a/src/tui/main.py +++ b/src/tui/main.py @@ -13,7 +13,9 @@ from .managers.env_manager import EnvManager from .managers.container_manager import ContainerManager from .utils.platform import PlatformDetector from .widgets.diagnostics_notification import notify_with_diagnostics +from utils.logging_config import get_logger +logger = get_logger(__name__) class OpenRAGTUI(App): """OpenRAG Terminal User Interface application.""" @@ -221,10 +223,10 @@ def run_tui(): app = OpenRAGTUI() app.run() except KeyboardInterrupt: - print("\nOpenRAG TUI interrupted by user") + logger.info("\nOpenRAG TUI interrupted by user") sys.exit(0) except Exception as e: - print(f"Error running OpenRAG TUI: {e}") + logger.error(f"Error running OpenRAG TUI: {e}") sys.exit(1) diff --git a/src/tui/managers/container_manager.py b/src/tui/managers/container_manager.py index 4297d6b1..39fbb4d4 100644 --- a/src/tui/managers/container_manager.py +++ b/src/tui/managers/container_manager.py @@ -11,7 +11,9 @@ from typing import Dict, List, Optional, AsyncIterator from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType from utils.gpu_detection import detect_gpu_devices +from utils.logging_config import get_logger +logger = get_logger(__name__) class ServiceStatus(Enum): """Container service status.""" @@ -177,7 +179,7 @@ class ContainerManager: def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None: """Process a service JSON object and add it to the services dict.""" # Debug print to see the actual service data - print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}") + logger.debug(f"Processing service data: {json.dumps(service, indent=2)}") container_name = service.get("Name", "") diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index 1a0766ba..f4df3df5 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -16,7 +16,9 @@ from ..utils.validation import ( validate_documents_paths, sanitize_env_value ) +from utils.logging_config import get_logger +logger = get_logger(__name__) @dataclass class EnvConfig: @@ -115,7 +117,7 @@ class EnvManager: return True except Exception as e: - print(f"Error loading .env file: {e}") + logger.error(f"Error loading .env file: {e}") return False def setup_secure_defaults(self) -> None: @@ -245,7 +247,7 @@ class EnvManager: return True except Exception as e: - print(f"Error saving .env file: {e}") + logger.error(f"Error saving .env file: {e}") return False def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]: diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index dd537a26..e9b65717 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -2,6 +2,9 @@ import hashlib import os from collections import defaultdict from .gpu_detection import detect_gpu_devices +from utils.logging_config import get_logger + +logger = get_logger(__name__) # Global converter cache for worker processes _worker_converter = None @@ -37,11 +40,11 @@ def get_worker_converter(): "1" # Still disable progress bars ) - print( + logger.info( f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process" ) _worker_converter = DocumentConverter() - print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process") + logger.info(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process") return _worker_converter @@ -118,33 +121,33 @@ def process_document_sync(file_path: str): start_memory = process.memory_info().rss / 1024 / 1024 # MB try: - print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}") - print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB") + logger.info(f"[WORKER {os.getpid()}] Starting document processing: {file_path}") + logger.info(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB") # Check file size try: file_size = os.path.getsize(file_path) / 1024 / 1024 # MB - print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB") + logger.info(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB") except OSError as e: - print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}") + logger.warning(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}") file_size = 0 # Get the cached converter for this worker try: - print(f"[WORKER {os.getpid()}] Getting document converter...") + logger.info(f"[WORKER {os.getpid()}] Getting document converter...") converter = get_worker_converter() memory_after_converter = process.memory_info().rss / 1024 / 1024 - print( + logger.info( f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB" ) except Exception as e: - print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}") + logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}") traceback.print_exc() raise # Compute file hash try: - print(f"[WORKER {os.getpid()}] Computing file hash...") + logger.info(f"[WORKER {os.getpid()}] Computing file hash...") sha256 = hashlib.sha256() with open(file_path, "rb") as f: while True: @@ -153,39 +156,39 @@ def process_document_sync(file_path: str): break sha256.update(chunk) file_hash = sha256.hexdigest() - print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...") + logger.info(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...") except Exception as e: - print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}") + logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}") traceback.print_exc() raise # Convert with docling try: - print(f"[WORKER {os.getpid()}] Starting docling conversion...") + logger.info(f"[WORKER {os.getpid()}] Starting docling conversion...") memory_before_convert = process.memory_info().rss / 1024 / 1024 - print( + logger.info( f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB" ) result = converter.convert(file_path) memory_after_convert = process.memory_info().rss / 1024 / 1024 - print( + logger.info( f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB" ) - print(f"[WORKER {os.getpid()}] Docling conversion completed") + logger.info(f"[WORKER {os.getpid()}] Docling conversion completed") full_doc = result.document.export_to_dict() memory_after_export = process.memory_info().rss / 1024 / 1024 - print( + logger.info( f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB" ) except Exception as e: - print( + logger.error( f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}" ) - print( + logger.info( f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB" ) traceback.print_exc() @@ -193,10 +196,10 @@ def process_document_sync(file_path: str): # Extract relevant content (same logic as extract_relevant) try: - print(f"[WORKER {os.getpid()}] Extracting relevant content...") + logger.info(f"[WORKER {os.getpid()}] Extracting relevant content...") origin = full_doc.get("origin", {}) texts = full_doc.get("texts", []) - print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments") + logger.info(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments") page_texts = defaultdict(list) for txt in texts: @@ -210,12 +213,12 @@ def process_document_sync(file_path: str): joined = "\n".join(page_texts[page]) chunks.append({"page": page, "text": joined}) - print( + logger.info( f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages" ) except Exception as e: - print( + logger.error( f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}" ) traceback.print_exc() @@ -223,8 +226,8 @@ def process_document_sync(file_path: str): final_memory = process.memory_info().rss / 1024 / 1024 memory_delta = final_memory - start_memory - print(f"[WORKER {os.getpid()}] Document processing completed successfully") - print( + logger.info(f"[WORKER {os.getpid()}] Document processing completed successfully") + logger.info( f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)" ) @@ -239,24 +242,24 @@ def process_document_sync(file_path: str): except Exception as e: final_memory = process.memory_info().rss / 1024 / 1024 memory_delta = final_memory - start_memory - print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync") - print(f"[WORKER {os.getpid()}] File: {file_path}") - print(f"[WORKER {os.getpid()}] Python version: {sys.version}") - print( + logger.error(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync") + logger.info(f"[WORKER {os.getpid()}] File: {file_path}") + logger.info(f"[WORKER {os.getpid()}] Python version: {sys.version}") + logger.info( f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)" ) - print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}") - print(f"[WORKER {os.getpid()}] Full traceback:") + logger.error(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}") + logger.error(f"[WORKER {os.getpid()}] Full traceback:") traceback.print_exc() # Try to get more system info before crashing try: import platform - print( + logger.info( f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}" ) - print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}") + logger.info(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}") except: pass diff --git a/src/utils/gpu_detection.py b/src/utils/gpu_detection.py index 6c0d977b..b0d51ff1 100644 --- a/src/utils/gpu_detection.py +++ b/src/utils/gpu_detection.py @@ -1,6 +1,8 @@ import multiprocessing import os +from utils.logging_config import get_logger +logger = get_logger(__name__) def detect_gpu_devices(): """Detect if GPU devices are actually available""" @@ -30,12 +32,12 @@ def get_worker_count(): if has_gpu_devices: default_workers = min(4, multiprocessing.cpu_count() // 2) - print( + logger.info( f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)" ) else: default_workers = multiprocessing.cpu_count() - print( + logger.info( f"CPU-only mode enabled - using full concurrency ({default_workers} workers)" ) diff --git a/src/utils/process_pool.py b/src/utils/process_pool.py index 7a953863..a9e06dc8 100644 --- a/src/utils/process_pool.py +++ b/src/utils/process_pool.py @@ -1,10 +1,13 @@ import os from concurrent.futures import ProcessPoolExecutor from utils.gpu_detection import get_worker_count +from utils.logging_config import get_logger + +logger = get_logger(__name__) # Create shared process pool at import time (before CUDA initialization) # This avoids the "Cannot re-initialize CUDA in forked subprocess" error MAX_WORKERS = get_worker_count() process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS) -print(f"Shared process pool initialized with {MAX_WORKERS} workers") +logger.info(f"Shared process pool initialized with {MAX_WORKERS} workers") diff --git a/warm_up_docling.py b/warm_up_docling.py index 536eab7a..e4c32bd0 100644 --- a/warm_up_docling.py +++ b/warm_up_docling.py @@ -1,13 +1,16 @@ from docling.document_converter import DocumentConverter +from utils.logging_config import get_logger -print("Warming up docling models...") +logger = get_logger(__name__) + +logger.info("Warming up docling models...") try: # Use the sample document to warm up docling test_file = "/app/warmup_ocr.pdf" - print(f"Using {test_file} to warm up docling...") + logger.info(f"Using {test_file} to warm up docling...") DocumentConverter().convert(test_file) - print("Docling models warmed up successfully") + logger.info("Docling models warmed up successfully") except Exception as e: - print(f"Docling warm-up completed with: {e}") + logger.error(f"Docling warm-up completed with: {e}") # This is expected - we just want to trigger the model downloads