Compare commits
1 commit
main
...
clean-prin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64a543b645 |
12 changed files with 103 additions and 78 deletions
|
|
@ -3,7 +3,9 @@ from starlette.responses import JSONResponse
|
|||
from typing import Optional
|
||||
from session_manager import User
|
||||
from config.settings import is_no_auth_mode
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def get_current_user(request: Request, session_manager) -> Optional[User]:
|
||||
"""Extract current user from request cookies"""
|
||||
|
|
@ -25,7 +27,7 @@ def require_auth(session_manager):
|
|||
async def wrapper(request: Request):
|
||||
# In no-auth mode, bypass authentication entirely
|
||||
if is_no_auth_mode():
|
||||
print(f"[DEBUG] No-auth mode: Creating anonymous user")
|
||||
logger.debug("No-auth mode: Creating anonymous user")
|
||||
# Create an anonymous user object so endpoints don't break
|
||||
from session_manager import User
|
||||
from datetime import datetime
|
||||
|
|
@ -33,7 +35,7 @@ def require_auth(session_manager):
|
|||
from session_manager import AnonymousUser
|
||||
request.state.user = AnonymousUser()
|
||||
request.state.jwt_token = None # No JWT in no-auth mode
|
||||
print(f"[DEBUG] Set user_id=anonymous, jwt_token=None")
|
||||
logger.debug("Set user_id=anonymous, jwt_token=None")
|
||||
return await handler(request)
|
||||
|
||||
user = get_current_user(request, session_manager)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,9 @@ from .google_drive import GoogleDriveConnector
|
|||
from .sharepoint import SharePointConnector
|
||||
from .onedrive import OneDriveConnector
|
||||
from .connection_manager import ConnectionManager
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class ConnectorService:
|
||||
"""Service to manage document connectors and process files"""
|
||||
|
|
@ -63,7 +65,7 @@ class ConnectorService:
|
|||
|
||||
doc_service = DocumentService(session_manager=self.session_manager)
|
||||
|
||||
print(f"[DEBUG] Processing connector document with ID: {document.id}")
|
||||
logger.debug(f"Processing connector document with ID: {document.id}")
|
||||
|
||||
# Process using the existing pipeline but with connector document metadata
|
||||
result = await doc_service.process_file_common(
|
||||
|
|
@ -78,7 +80,7 @@ class ConnectorService:
|
|||
connector_type=connector_type,
|
||||
)
|
||||
|
||||
print(f"[DEBUG] Document processing result: {result}")
|
||||
logger.debug(f"Document processing result: {result}")
|
||||
|
||||
# If successfully indexed or already exists, update the indexed documents with connector metadata
|
||||
if result["status"] in ["indexed", "unchanged"]:
|
||||
|
|
@ -105,7 +107,7 @@ class ConnectorService:
|
|||
jwt_token: str = None,
|
||||
):
|
||||
"""Update indexed chunks with connector-specific metadata"""
|
||||
print(f"[DEBUG] Looking for chunks with document_id: {document.id}")
|
||||
logger.debug(f"Looking for chunks with document_id: {document.id}")
|
||||
|
||||
# Find all chunks for this document
|
||||
query = {"query": {"term": {"document_id": document.id}}}
|
||||
|
|
@ -118,25 +120,23 @@ class ConnectorService:
|
|||
try:
|
||||
response = await opensearch_client.search(index=self.index_name, body=query)
|
||||
except Exception as e:
|
||||
print(
|
||||
logger.error(
|
||||
f"[ERROR] OpenSearch search failed for connector metadata update: {e}"
|
||||
)
|
||||
print(f"[ERROR] Search query: {query}")
|
||||
logger.error(f"Search query: {query}")
|
||||
raise
|
||||
|
||||
print(f"[DEBUG] Search query: {query}")
|
||||
print(
|
||||
f"[DEBUG] Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}"
|
||||
)
|
||||
logger.debug(f"Search query: {query}")
|
||||
logger.debug(f"Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}")
|
||||
|
||||
# Update each chunk with connector metadata
|
||||
print(
|
||||
logger.debug(
|
||||
f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}"
|
||||
)
|
||||
for hit in response["hits"]["hits"]:
|
||||
chunk_id = hit["_id"]
|
||||
current_connector_type = hit["_source"].get("connector_type", "unknown")
|
||||
print(
|
||||
logger.debug(
|
||||
f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}"
|
||||
)
|
||||
|
||||
|
|
@ -165,10 +165,10 @@ class ConnectorService:
|
|||
await opensearch_client.update(
|
||||
index=self.index_name, id=chunk_id, body=update_body
|
||||
)
|
||||
print(f"[DEBUG] Updated chunk {chunk_id} with connector metadata")
|
||||
logger.debug(f"Updated chunk {chunk_id} with connector metadata")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] OpenSearch update failed for chunk {chunk_id}: {e}")
|
||||
print(f"[ERROR] Update body: {update_body}")
|
||||
logger.error(f"OpenSearch update failed for chunk {chunk_id}: {e}")
|
||||
logger.error(f"Update body: {update_body}")
|
||||
raise
|
||||
|
||||
def _get_file_extension(self, mimetype: str) -> str:
|
||||
|
|
@ -201,7 +201,7 @@ class ConnectorService:
|
|||
"TaskService not available - connector sync requires task service dependency"
|
||||
)
|
||||
|
||||
print(
|
||||
logger.debug(
|
||||
f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}"
|
||||
)
|
||||
|
||||
|
|
@ -211,7 +211,7 @@ class ConnectorService:
|
|||
f"Connection '{connection_id}' not found or not authenticated"
|
||||
)
|
||||
|
||||
print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}")
|
||||
logger.debug(f"Got connector, authenticated: {connector.is_authenticated}")
|
||||
|
||||
if not connector.is_authenticated:
|
||||
raise ValueError(f"Connection '{connection_id}' not authenticated")
|
||||
|
|
@ -225,11 +225,11 @@ class ConnectorService:
|
|||
|
||||
while True:
|
||||
# List files from connector with limit
|
||||
print(
|
||||
logger.debug(
|
||||
f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}"
|
||||
)
|
||||
file_list = await connector.list_files(page_token, limit=page_size)
|
||||
print(f"[DEBUG] Got {len(file_list.get('files', []))} files")
|
||||
logger.debug(f"Got {len(file_list.get('files', []))} files")
|
||||
files = file_list["files"]
|
||||
|
||||
if not files:
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict
|
||||
from .tasks import UploadTask, FileTask
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class TaskProcessor(ABC):
|
||||
"""Abstract base class for task processors"""
|
||||
|
|
@ -225,10 +227,10 @@ class S3FileProcessor(TaskProcessor):
|
|||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||
)
|
||||
except Exception as e:
|
||||
print(
|
||||
logger.error(
|
||||
f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}"
|
||||
)
|
||||
print(f"[ERROR] Chunk document: {chunk_doc}")
|
||||
logger.error(f"Chunk document: {chunk_doc}")
|
||||
raise
|
||||
|
||||
result = {"status": "indexed", "id": slim_doc["id"]}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@ from typing import Dict
|
|||
from models.tasks import TaskStatus, UploadTask, FileTask
|
||||
|
||||
from src.utils.gpu_detection import get_worker_count
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class TaskService:
|
||||
def __init__(self, document_service=None, process_pool=None):
|
||||
|
|
@ -104,7 +106,7 @@ class TaskService:
|
|||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Background upload processor failed for task {task_id}: {e}")
|
||||
logger.error(f"Background upload processor failed for task {task_id}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
@ -136,7 +138,7 @@ class TaskService:
|
|||
try:
|
||||
await processor.process_item(upload_task, item, file_task)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to process item {item}: {e}")
|
||||
logger.error(f"Failed to process item {item}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
@ -157,13 +159,13 @@ class TaskService:
|
|||
upload_task.updated_at = time.time()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
print(f"[INFO] Background processor for task {task_id} was cancelled")
|
||||
logger.info(f"Background processor for task {task_id} was cancelled")
|
||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||
# Task status and pending files already handled by cancel_task()
|
||||
pass
|
||||
raise # Re-raise to properly handle cancellation
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Background custom processor failed for task {task_id}: {e}")
|
||||
logger.error(f"Background custom processor failed for task {task_id}: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@ from typing import Dict, Optional, Any
|
|||
from dataclasses import dataclass, asdict
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import os
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@dataclass
|
||||
class User:
|
||||
|
|
@ -93,13 +95,13 @@ class SessionManager:
|
|||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
print(
|
||||
logger.error(
|
||||
f"Failed to get user info: {response.status_code} {response.text}"
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error getting user info: {e}")
|
||||
logger.error(f"Error getting user info: {e}")
|
||||
return None
|
||||
|
||||
async def create_user_session(
|
||||
|
|
@ -186,19 +188,19 @@ class SessionManager:
|
|||
"""Get or create OpenSearch client for user with their JWT"""
|
||||
from config.settings import is_no_auth_mode
|
||||
|
||||
print(
|
||||
f"[DEBUG] get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}"
|
||||
logger.debug(
|
||||
f"get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}"
|
||||
)
|
||||
|
||||
# In no-auth mode, create anonymous JWT for OpenSearch DLS
|
||||
if is_no_auth_mode() and jwt_token is None:
|
||||
if not hasattr(self, "_anonymous_jwt"):
|
||||
# Create anonymous JWT token for OpenSearch OIDC
|
||||
print(f"[DEBUG] Creating anonymous JWT...")
|
||||
logger.debug("Creating anonymous JWT...")
|
||||
self._anonymous_jwt = self._create_anonymous_jwt()
|
||||
print(f"[DEBUG] Anonymous JWT created: {self._anonymous_jwt[:50]}...")
|
||||
logger.debug(f"Anonymous JWT created: {self._anonymous_jwt[:50]}...")
|
||||
jwt_token = self._anonymous_jwt
|
||||
print(f"[DEBUG] Using anonymous JWT for OpenSearch")
|
||||
logger.debug("Using anonymous JWT for OpenSearch")
|
||||
|
||||
# Check if we have a cached client for this user
|
||||
if user_id not in self.user_opensearch_clients:
|
||||
|
|
|
|||
|
|
@ -13,7 +13,9 @@ from .managers.env_manager import EnvManager
|
|||
from .managers.container_manager import ContainerManager
|
||||
from .utils.platform import PlatformDetector
|
||||
from .widgets.diagnostics_notification import notify_with_diagnostics
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class OpenRAGTUI(App):
|
||||
"""OpenRAG Terminal User Interface application."""
|
||||
|
|
@ -221,10 +223,10 @@ def run_tui():
|
|||
app = OpenRAGTUI()
|
||||
app.run()
|
||||
except KeyboardInterrupt:
|
||||
print("\nOpenRAG TUI interrupted by user")
|
||||
logger.info("\nOpenRAG TUI interrupted by user")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
print(f"Error running OpenRAG TUI: {e}")
|
||||
logger.error(f"Error running OpenRAG TUI: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,9 @@ from typing import Dict, List, Optional, AsyncIterator
|
|||
|
||||
from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType
|
||||
from utils.gpu_detection import detect_gpu_devices
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
"""Container service status."""
|
||||
|
|
@ -177,7 +179,7 @@ class ContainerManager:
|
|||
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
|
||||
"""Process a service JSON object and add it to the services dict."""
|
||||
# Debug print to see the actual service data
|
||||
print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}")
|
||||
logger.debug(f"Processing service data: {json.dumps(service, indent=2)}")
|
||||
|
||||
container_name = service.get("Name", "")
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ from ..utils.validation import (
|
|||
validate_documents_paths,
|
||||
sanitize_env_value
|
||||
)
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@dataclass
|
||||
class EnvConfig:
|
||||
|
|
@ -115,7 +117,7 @@ class EnvManager:
|
|||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error loading .env file: {e}")
|
||||
logger.error(f"Error loading .env file: {e}")
|
||||
return False
|
||||
|
||||
def setup_secure_defaults(self) -> None:
|
||||
|
|
@ -245,7 +247,7 @@ class EnvManager:
|
|||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error saving .env file: {e}")
|
||||
logger.error(f"Error saving .env file: {e}")
|
||||
return False
|
||||
|
||||
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ import hashlib
|
|||
import os
|
||||
from collections import defaultdict
|
||||
from .gpu_detection import detect_gpu_devices
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Global converter cache for worker processes
|
||||
_worker_converter = None
|
||||
|
|
@ -37,11 +40,11 @@ def get_worker_converter():
|
|||
"1" # Still disable progress bars
|
||||
)
|
||||
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process"
|
||||
)
|
||||
_worker_converter = DocumentConverter()
|
||||
print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
|
||||
logger.info(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
|
||||
|
||||
return _worker_converter
|
||||
|
||||
|
|
@ -118,33 +121,33 @@ def process_document_sync(file_path: str):
|
|||
start_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||||
|
||||
try:
|
||||
print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
|
||||
print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
|
||||
logger.info(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
|
||||
logger.info(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
|
||||
|
||||
# Check file size
|
||||
try:
|
||||
file_size = os.path.getsize(file_path) / 1024 / 1024 # MB
|
||||
print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
|
||||
logger.info(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
|
||||
except OSError as e:
|
||||
print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
|
||||
logger.warning(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
|
||||
file_size = 0
|
||||
|
||||
# Get the cached converter for this worker
|
||||
try:
|
||||
print(f"[WORKER {os.getpid()}] Getting document converter...")
|
||||
logger.info(f"[WORKER {os.getpid()}] Getting document converter...")
|
||||
converter = get_worker_converter()
|
||||
memory_after_converter = process.memory_info().rss / 1024 / 1024
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
|
||||
logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
# Compute file hash
|
||||
try:
|
||||
print(f"[WORKER {os.getpid()}] Computing file hash...")
|
||||
logger.info(f"[WORKER {os.getpid()}] Computing file hash...")
|
||||
sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
while True:
|
||||
|
|
@ -153,39 +156,39 @@ def process_document_sync(file_path: str):
|
|||
break
|
||||
sha256.update(chunk)
|
||||
file_hash = sha256.hexdigest()
|
||||
print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
|
||||
logger.info(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
|
||||
except Exception as e:
|
||||
print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
|
||||
logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
# Convert with docling
|
||||
try:
|
||||
print(f"[WORKER {os.getpid()}] Starting docling conversion...")
|
||||
logger.info(f"[WORKER {os.getpid()}] Starting docling conversion...")
|
||||
memory_before_convert = process.memory_info().rss / 1024 / 1024
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB"
|
||||
)
|
||||
|
||||
result = converter.convert(file_path)
|
||||
|
||||
memory_after_convert = process.memory_info().rss / 1024 / 1024
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB"
|
||||
)
|
||||
print(f"[WORKER {os.getpid()}] Docling conversion completed")
|
||||
logger.info(f"[WORKER {os.getpid()}] Docling conversion completed")
|
||||
|
||||
full_doc = result.document.export_to_dict()
|
||||
memory_after_export = process.memory_info().rss / 1024 / 1024
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(
|
||||
logger.error(
|
||||
f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}"
|
||||
)
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB"
|
||||
)
|
||||
traceback.print_exc()
|
||||
|
|
@ -193,10 +196,10 @@ def process_document_sync(file_path: str):
|
|||
|
||||
# Extract relevant content (same logic as extract_relevant)
|
||||
try:
|
||||
print(f"[WORKER {os.getpid()}] Extracting relevant content...")
|
||||
logger.info(f"[WORKER {os.getpid()}] Extracting relevant content...")
|
||||
origin = full_doc.get("origin", {})
|
||||
texts = full_doc.get("texts", [])
|
||||
print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
|
||||
logger.info(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
|
||||
|
||||
page_texts = defaultdict(list)
|
||||
for txt in texts:
|
||||
|
|
@ -210,12 +213,12 @@ def process_document_sync(file_path: str):
|
|||
joined = "\n".join(page_texts[page])
|
||||
chunks.append({"page": page, "text": joined})
|
||||
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(
|
||||
logger.error(
|
||||
f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}"
|
||||
)
|
||||
traceback.print_exc()
|
||||
|
|
@ -223,8 +226,8 @@ def process_document_sync(file_path: str):
|
|||
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
memory_delta = final_memory - start_memory
|
||||
print(f"[WORKER {os.getpid()}] Document processing completed successfully")
|
||||
print(
|
||||
logger.info(f"[WORKER {os.getpid()}] Document processing completed successfully")
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
||||
)
|
||||
|
||||
|
|
@ -239,24 +242,24 @@ def process_document_sync(file_path: str):
|
|||
except Exception as e:
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
memory_delta = final_memory - start_memory
|
||||
print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
|
||||
print(f"[WORKER {os.getpid()}] File: {file_path}")
|
||||
print(f"[WORKER {os.getpid()}] Python version: {sys.version}")
|
||||
print(
|
||||
logger.error(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
|
||||
logger.info(f"[WORKER {os.getpid()}] File: {file_path}")
|
||||
logger.info(f"[WORKER {os.getpid()}] Python version: {sys.version}")
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
||||
)
|
||||
print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
|
||||
print(f"[WORKER {os.getpid()}] Full traceback:")
|
||||
logger.error(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
|
||||
logger.error(f"[WORKER {os.getpid()}] Full traceback:")
|
||||
traceback.print_exc()
|
||||
|
||||
# Try to get more system info before crashing
|
||||
try:
|
||||
import platform
|
||||
|
||||
print(
|
||||
logger.info(
|
||||
f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}"
|
||||
)
|
||||
print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
|
||||
logger.info(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
|
||||
except:
|
||||
pass
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
import multiprocessing
|
||||
import os
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def detect_gpu_devices():
|
||||
"""Detect if GPU devices are actually available"""
|
||||
|
|
@ -30,12 +32,12 @@ def get_worker_count():
|
|||
|
||||
if has_gpu_devices:
|
||||
default_workers = min(4, multiprocessing.cpu_count() // 2)
|
||||
print(
|
||||
logger.info(
|
||||
f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)"
|
||||
)
|
||||
else:
|
||||
default_workers = multiprocessing.cpu_count()
|
||||
print(
|
||||
logger.info(
|
||||
f"CPU-only mode enabled - using full concurrency ({default_workers} workers)"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
import os
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
from utils.gpu_detection import get_worker_count
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Create shared process pool at import time (before CUDA initialization)
|
||||
# This avoids the "Cannot re-initialize CUDA in forked subprocess" error
|
||||
MAX_WORKERS = get_worker_count()
|
||||
process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)
|
||||
|
||||
print(f"Shared process pool initialized with {MAX_WORKERS} workers")
|
||||
logger.info(f"Shared process pool initialized with {MAX_WORKERS} workers")
|
||||
|
|
|
|||
|
|
@ -1,13 +1,16 @@
|
|||
from docling.document_converter import DocumentConverter
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
print("Warming up docling models...")
|
||||
logger = get_logger(__name__)
|
||||
|
||||
logger.info("Warming up docling models...")
|
||||
|
||||
try:
|
||||
# Use the sample document to warm up docling
|
||||
test_file = "/app/warmup_ocr.pdf"
|
||||
print(f"Using {test_file} to warm up docling...")
|
||||
logger.info(f"Using {test_file} to warm up docling...")
|
||||
DocumentConverter().convert(test_file)
|
||||
print("Docling models warmed up successfully")
|
||||
logger.info("Docling models warmed up successfully")
|
||||
except Exception as e:
|
||||
print(f"Docling warm-up completed with: {e}")
|
||||
logger.error(f"Docling warm-up completed with: {e}")
|
||||
# This is expected - we just want to trigger the model downloads
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue