Compare commits
1 commit
main
...
clean-prin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64a543b645 |
12 changed files with 103 additions and 78 deletions
|
|
@ -3,7 +3,9 @@ from starlette.responses import JSONResponse
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from session_manager import User
|
from session_manager import User
|
||||||
from config.settings import is_no_auth_mode
|
from config.settings import is_no_auth_mode
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
def get_current_user(request: Request, session_manager) -> Optional[User]:
|
def get_current_user(request: Request, session_manager) -> Optional[User]:
|
||||||
"""Extract current user from request cookies"""
|
"""Extract current user from request cookies"""
|
||||||
|
|
@ -25,7 +27,7 @@ def require_auth(session_manager):
|
||||||
async def wrapper(request: Request):
|
async def wrapper(request: Request):
|
||||||
# In no-auth mode, bypass authentication entirely
|
# In no-auth mode, bypass authentication entirely
|
||||||
if is_no_auth_mode():
|
if is_no_auth_mode():
|
||||||
print(f"[DEBUG] No-auth mode: Creating anonymous user")
|
logger.debug("No-auth mode: Creating anonymous user")
|
||||||
# Create an anonymous user object so endpoints don't break
|
# Create an anonymous user object so endpoints don't break
|
||||||
from session_manager import User
|
from session_manager import User
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
@ -33,7 +35,7 @@ def require_auth(session_manager):
|
||||||
from session_manager import AnonymousUser
|
from session_manager import AnonymousUser
|
||||||
request.state.user = AnonymousUser()
|
request.state.user = AnonymousUser()
|
||||||
request.state.jwt_token = None # No JWT in no-auth mode
|
request.state.jwt_token = None # No JWT in no-auth mode
|
||||||
print(f"[DEBUG] Set user_id=anonymous, jwt_token=None")
|
logger.debug("Set user_id=anonymous, jwt_token=None")
|
||||||
return await handler(request)
|
return await handler(request)
|
||||||
|
|
||||||
user = get_current_user(request, session_manager)
|
user = get_current_user(request, session_manager)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,9 @@ from .google_drive import GoogleDriveConnector
|
||||||
from .sharepoint import SharePointConnector
|
from .sharepoint import SharePointConnector
|
||||||
from .onedrive import OneDriveConnector
|
from .onedrive import OneDriveConnector
|
||||||
from .connection_manager import ConnectionManager
|
from .connection_manager import ConnectionManager
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class ConnectorService:
|
class ConnectorService:
|
||||||
"""Service to manage document connectors and process files"""
|
"""Service to manage document connectors and process files"""
|
||||||
|
|
@ -63,7 +65,7 @@ class ConnectorService:
|
||||||
|
|
||||||
doc_service = DocumentService(session_manager=self.session_manager)
|
doc_service = DocumentService(session_manager=self.session_manager)
|
||||||
|
|
||||||
print(f"[DEBUG] Processing connector document with ID: {document.id}")
|
logger.debug(f"Processing connector document with ID: {document.id}")
|
||||||
|
|
||||||
# Process using the existing pipeline but with connector document metadata
|
# Process using the existing pipeline but with connector document metadata
|
||||||
result = await doc_service.process_file_common(
|
result = await doc_service.process_file_common(
|
||||||
|
|
@ -78,7 +80,7 @@ class ConnectorService:
|
||||||
connector_type=connector_type,
|
connector_type=connector_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"[DEBUG] Document processing result: {result}")
|
logger.debug(f"Document processing result: {result}")
|
||||||
|
|
||||||
# If successfully indexed or already exists, update the indexed documents with connector metadata
|
# If successfully indexed or already exists, update the indexed documents with connector metadata
|
||||||
if result["status"] in ["indexed", "unchanged"]:
|
if result["status"] in ["indexed", "unchanged"]:
|
||||||
|
|
@ -105,7 +107,7 @@ class ConnectorService:
|
||||||
jwt_token: str = None,
|
jwt_token: str = None,
|
||||||
):
|
):
|
||||||
"""Update indexed chunks with connector-specific metadata"""
|
"""Update indexed chunks with connector-specific metadata"""
|
||||||
print(f"[DEBUG] Looking for chunks with document_id: {document.id}")
|
logger.debug(f"Looking for chunks with document_id: {document.id}")
|
||||||
|
|
||||||
# Find all chunks for this document
|
# Find all chunks for this document
|
||||||
query = {"query": {"term": {"document_id": document.id}}}
|
query = {"query": {"term": {"document_id": document.id}}}
|
||||||
|
|
@ -118,25 +120,23 @@ class ConnectorService:
|
||||||
try:
|
try:
|
||||||
response = await opensearch_client.search(index=self.index_name, body=query)
|
response = await opensearch_client.search(index=self.index_name, body=query)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
logger.error(
|
||||||
f"[ERROR] OpenSearch search failed for connector metadata update: {e}"
|
f"[ERROR] OpenSearch search failed for connector metadata update: {e}"
|
||||||
)
|
)
|
||||||
print(f"[ERROR] Search query: {query}")
|
logger.error(f"Search query: {query}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
print(f"[DEBUG] Search query: {query}")
|
logger.debug(f"Search query: {query}")
|
||||||
print(
|
logger.debug(f"Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}")
|
||||||
f"[DEBUG] Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update each chunk with connector metadata
|
# Update each chunk with connector metadata
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}"
|
f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}"
|
||||||
)
|
)
|
||||||
for hit in response["hits"]["hits"]:
|
for hit in response["hits"]["hits"]:
|
||||||
chunk_id = hit["_id"]
|
chunk_id = hit["_id"]
|
||||||
current_connector_type = hit["_source"].get("connector_type", "unknown")
|
current_connector_type = hit["_source"].get("connector_type", "unknown")
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}"
|
f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -165,10 +165,10 @@ class ConnectorService:
|
||||||
await opensearch_client.update(
|
await opensearch_client.update(
|
||||||
index=self.index_name, id=chunk_id, body=update_body
|
index=self.index_name, id=chunk_id, body=update_body
|
||||||
)
|
)
|
||||||
print(f"[DEBUG] Updated chunk {chunk_id} with connector metadata")
|
logger.debug(f"Updated chunk {chunk_id} with connector metadata")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] OpenSearch update failed for chunk {chunk_id}: {e}")
|
logger.error(f"OpenSearch update failed for chunk {chunk_id}: {e}")
|
||||||
print(f"[ERROR] Update body: {update_body}")
|
logger.error(f"Update body: {update_body}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _get_file_extension(self, mimetype: str) -> str:
|
def _get_file_extension(self, mimetype: str) -> str:
|
||||||
|
|
@ -201,7 +201,7 @@ class ConnectorService:
|
||||||
"TaskService not available - connector sync requires task service dependency"
|
"TaskService not available - connector sync requires task service dependency"
|
||||||
)
|
)
|
||||||
|
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}"
|
f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -211,7 +211,7 @@ class ConnectorService:
|
||||||
f"Connection '{connection_id}' not found or not authenticated"
|
f"Connection '{connection_id}' not found or not authenticated"
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}")
|
logger.debug(f"Got connector, authenticated: {connector.is_authenticated}")
|
||||||
|
|
||||||
if not connector.is_authenticated:
|
if not connector.is_authenticated:
|
||||||
raise ValueError(f"Connection '{connection_id}' not authenticated")
|
raise ValueError(f"Connection '{connection_id}' not authenticated")
|
||||||
|
|
@ -225,11 +225,11 @@ class ConnectorService:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# List files from connector with limit
|
# List files from connector with limit
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}"
|
f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}"
|
||||||
)
|
)
|
||||||
file_list = await connector.list_files(page_token, limit=page_size)
|
file_list = await connector.list_files(page_token, limit=page_size)
|
||||||
print(f"[DEBUG] Got {len(file_list.get('files', []))} files")
|
logger.debug(f"Got {len(file_list.get('files', []))} files")
|
||||||
files = file_list["files"]
|
files = file_list["files"]
|
||||||
|
|
||||||
if not files:
|
if not files:
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
from .tasks import UploadTask, FileTask
|
from .tasks import UploadTask, FileTask
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class TaskProcessor(ABC):
|
class TaskProcessor(ABC):
|
||||||
"""Abstract base class for task processors"""
|
"""Abstract base class for task processors"""
|
||||||
|
|
@ -225,10 +227,10 @@ class S3FileProcessor(TaskProcessor):
|
||||||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
logger.error(
|
||||||
f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}"
|
f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}"
|
||||||
)
|
)
|
||||||
print(f"[ERROR] Chunk document: {chunk_doc}")
|
logger.error(f"Chunk document: {chunk_doc}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
result = {"status": "indexed", "id": slim_doc["id"]}
|
result = {"status": "indexed", "id": slim_doc["id"]}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,9 @@ from typing import Dict
|
||||||
from models.tasks import TaskStatus, UploadTask, FileTask
|
from models.tasks import TaskStatus, UploadTask, FileTask
|
||||||
|
|
||||||
from src.utils.gpu_detection import get_worker_count
|
from src.utils.gpu_detection import get_worker_count
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class TaskService:
|
class TaskService:
|
||||||
def __init__(self, document_service=None, process_pool=None):
|
def __init__(self, document_service=None, process_pool=None):
|
||||||
|
|
@ -104,7 +106,7 @@ class TaskService:
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Background upload processor failed for task {task_id}: {e}")
|
logger.error(f"Background upload processor failed for task {task_id}: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -136,7 +138,7 @@ class TaskService:
|
||||||
try:
|
try:
|
||||||
await processor.process_item(upload_task, item, file_task)
|
await processor.process_item(upload_task, item, file_task)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Failed to process item {item}: {e}")
|
logger.error(f"Failed to process item {item}: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -157,13 +159,13 @@ class TaskService:
|
||||||
upload_task.updated_at = time.time()
|
upload_task.updated_at = time.time()
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
print(f"[INFO] Background processor for task {task_id} was cancelled")
|
logger.info(f"Background processor for task {task_id} was cancelled")
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
# Task status and pending files already handled by cancel_task()
|
# Task status and pending files already handled by cancel_task()
|
||||||
pass
|
pass
|
||||||
raise # Re-raise to properly handle cancellation
|
raise # Re-raise to properly handle cancellation
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Background custom processor failed for task {task_id}: {e}")
|
logger.error(f"Background custom processor failed for task {task_id}: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,9 @@ from typing import Dict, Optional, Any
|
||||||
from dataclasses import dataclass, asdict
|
from dataclasses import dataclass, asdict
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
import os
|
import os
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class User:
|
class User:
|
||||||
|
|
@ -93,13 +95,13 @@ class SessionManager:
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
return response.json()
|
return response.json()
|
||||||
else:
|
else:
|
||||||
print(
|
logger.error(
|
||||||
f"Failed to get user info: {response.status_code} {response.text}"
|
f"Failed to get user info: {response.status_code} {response.text}"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error getting user info: {e}")
|
logger.error(f"Error getting user info: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def create_user_session(
|
async def create_user_session(
|
||||||
|
|
@ -186,19 +188,19 @@ class SessionManager:
|
||||||
"""Get or create OpenSearch client for user with their JWT"""
|
"""Get or create OpenSearch client for user with their JWT"""
|
||||||
from config.settings import is_no_auth_mode
|
from config.settings import is_no_auth_mode
|
||||||
|
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}"
|
f"get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# In no-auth mode, create anonymous JWT for OpenSearch DLS
|
# In no-auth mode, create anonymous JWT for OpenSearch DLS
|
||||||
if is_no_auth_mode() and jwt_token is None:
|
if is_no_auth_mode() and jwt_token is None:
|
||||||
if not hasattr(self, "_anonymous_jwt"):
|
if not hasattr(self, "_anonymous_jwt"):
|
||||||
# Create anonymous JWT token for OpenSearch OIDC
|
# Create anonymous JWT token for OpenSearch OIDC
|
||||||
print(f"[DEBUG] Creating anonymous JWT...")
|
logger.debug("Creating anonymous JWT...")
|
||||||
self._anonymous_jwt = self._create_anonymous_jwt()
|
self._anonymous_jwt = self._create_anonymous_jwt()
|
||||||
print(f"[DEBUG] Anonymous JWT created: {self._anonymous_jwt[:50]}...")
|
logger.debug(f"Anonymous JWT created: {self._anonymous_jwt[:50]}...")
|
||||||
jwt_token = self._anonymous_jwt
|
jwt_token = self._anonymous_jwt
|
||||||
print(f"[DEBUG] Using anonymous JWT for OpenSearch")
|
logger.debug("Using anonymous JWT for OpenSearch")
|
||||||
|
|
||||||
# Check if we have a cached client for this user
|
# Check if we have a cached client for this user
|
||||||
if user_id not in self.user_opensearch_clients:
|
if user_id not in self.user_opensearch_clients:
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,9 @@ from .managers.env_manager import EnvManager
|
||||||
from .managers.container_manager import ContainerManager
|
from .managers.container_manager import ContainerManager
|
||||||
from .utils.platform import PlatformDetector
|
from .utils.platform import PlatformDetector
|
||||||
from .widgets.diagnostics_notification import notify_with_diagnostics
|
from .widgets.diagnostics_notification import notify_with_diagnostics
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class OpenRAGTUI(App):
|
class OpenRAGTUI(App):
|
||||||
"""OpenRAG Terminal User Interface application."""
|
"""OpenRAG Terminal User Interface application."""
|
||||||
|
|
@ -221,10 +223,10 @@ def run_tui():
|
||||||
app = OpenRAGTUI()
|
app = OpenRAGTUI()
|
||||||
app.run()
|
app.run()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\nOpenRAG TUI interrupted by user")
|
logger.info("\nOpenRAG TUI interrupted by user")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error running OpenRAG TUI: {e}")
|
logger.error(f"Error running OpenRAG TUI: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,9 @@ from typing import Dict, List, Optional, AsyncIterator
|
||||||
|
|
||||||
from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType
|
from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType
|
||||||
from utils.gpu_detection import detect_gpu_devices
|
from utils.gpu_detection import detect_gpu_devices
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
class ServiceStatus(Enum):
|
class ServiceStatus(Enum):
|
||||||
"""Container service status."""
|
"""Container service status."""
|
||||||
|
|
@ -177,7 +179,7 @@ class ContainerManager:
|
||||||
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
|
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
|
||||||
"""Process a service JSON object and add it to the services dict."""
|
"""Process a service JSON object and add it to the services dict."""
|
||||||
# Debug print to see the actual service data
|
# Debug print to see the actual service data
|
||||||
print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}")
|
logger.debug(f"Processing service data: {json.dumps(service, indent=2)}")
|
||||||
|
|
||||||
container_name = service.get("Name", "")
|
container_name = service.get("Name", "")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,9 @@ from ..utils.validation import (
|
||||||
validate_documents_paths,
|
validate_documents_paths,
|
||||||
sanitize_env_value
|
sanitize_env_value
|
||||||
)
|
)
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class EnvConfig:
|
class EnvConfig:
|
||||||
|
|
@ -115,7 +117,7 @@ class EnvManager:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading .env file: {e}")
|
logger.error(f"Error loading .env file: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def setup_secure_defaults(self) -> None:
|
def setup_secure_defaults(self) -> None:
|
||||||
|
|
@ -245,7 +247,7 @@ class EnvManager:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error saving .env file: {e}")
|
logger.error(f"Error saving .env file: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
|
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,9 @@ import hashlib
|
||||||
import os
|
import os
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from .gpu_detection import detect_gpu_devices
|
from .gpu_detection import detect_gpu_devices
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
# Global converter cache for worker processes
|
# Global converter cache for worker processes
|
||||||
_worker_converter = None
|
_worker_converter = None
|
||||||
|
|
@ -37,11 +40,11 @@ def get_worker_converter():
|
||||||
"1" # Still disable progress bars
|
"1" # Still disable progress bars
|
||||||
)
|
)
|
||||||
|
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process"
|
f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process"
|
||||||
)
|
)
|
||||||
_worker_converter = DocumentConverter()
|
_worker_converter = DocumentConverter()
|
||||||
print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
|
logger.info(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
|
||||||
|
|
||||||
return _worker_converter
|
return _worker_converter
|
||||||
|
|
||||||
|
|
@ -118,33 +121,33 @@ def process_document_sync(file_path: str):
|
||||||
start_memory = process.memory_info().rss / 1024 / 1024 # MB
|
start_memory = process.memory_info().rss / 1024 / 1024 # MB
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
|
logger.info(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
|
||||||
print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
|
logger.info(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
|
||||||
|
|
||||||
# Check file size
|
# Check file size
|
||||||
try:
|
try:
|
||||||
file_size = os.path.getsize(file_path) / 1024 / 1024 # MB
|
file_size = os.path.getsize(file_path) / 1024 / 1024 # MB
|
||||||
print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
|
logger.info(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
|
logger.warning(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
|
||||||
file_size = 0
|
file_size = 0
|
||||||
|
|
||||||
# Get the cached converter for this worker
|
# Get the cached converter for this worker
|
||||||
try:
|
try:
|
||||||
print(f"[WORKER {os.getpid()}] Getting document converter...")
|
logger.info(f"[WORKER {os.getpid()}] Getting document converter...")
|
||||||
converter = get_worker_converter()
|
converter = get_worker_converter()
|
||||||
memory_after_converter = process.memory_info().rss / 1024 / 1024
|
memory_after_converter = process.memory_info().rss / 1024 / 1024
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB"
|
f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
|
logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Compute file hash
|
# Compute file hash
|
||||||
try:
|
try:
|
||||||
print(f"[WORKER {os.getpid()}] Computing file hash...")
|
logger.info(f"[WORKER {os.getpid()}] Computing file hash...")
|
||||||
sha256 = hashlib.sha256()
|
sha256 = hashlib.sha256()
|
||||||
with open(file_path, "rb") as f:
|
with open(file_path, "rb") as f:
|
||||||
while True:
|
while True:
|
||||||
|
|
@ -153,39 +156,39 @@ def process_document_sync(file_path: str):
|
||||||
break
|
break
|
||||||
sha256.update(chunk)
|
sha256.update(chunk)
|
||||||
file_hash = sha256.hexdigest()
|
file_hash = sha256.hexdigest()
|
||||||
print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
|
logger.info(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
|
logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Convert with docling
|
# Convert with docling
|
||||||
try:
|
try:
|
||||||
print(f"[WORKER {os.getpid()}] Starting docling conversion...")
|
logger.info(f"[WORKER {os.getpid()}] Starting docling conversion...")
|
||||||
memory_before_convert = process.memory_info().rss / 1024 / 1024
|
memory_before_convert = process.memory_info().rss / 1024 / 1024
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB"
|
f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB"
|
||||||
)
|
)
|
||||||
|
|
||||||
result = converter.convert(file_path)
|
result = converter.convert(file_path)
|
||||||
|
|
||||||
memory_after_convert = process.memory_info().rss / 1024 / 1024
|
memory_after_convert = process.memory_info().rss / 1024 / 1024
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB"
|
f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB"
|
||||||
)
|
)
|
||||||
print(f"[WORKER {os.getpid()}] Docling conversion completed")
|
logger.info(f"[WORKER {os.getpid()}] Docling conversion completed")
|
||||||
|
|
||||||
full_doc = result.document.export_to_dict()
|
full_doc = result.document.export_to_dict()
|
||||||
memory_after_export = process.memory_info().rss / 1024 / 1024
|
memory_after_export = process.memory_info().rss / 1024 / 1024
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB"
|
f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
logger.error(
|
||||||
f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}"
|
f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}"
|
||||||
)
|
)
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB"
|
f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB"
|
||||||
)
|
)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -193,10 +196,10 @@ def process_document_sync(file_path: str):
|
||||||
|
|
||||||
# Extract relevant content (same logic as extract_relevant)
|
# Extract relevant content (same logic as extract_relevant)
|
||||||
try:
|
try:
|
||||||
print(f"[WORKER {os.getpid()}] Extracting relevant content...")
|
logger.info(f"[WORKER {os.getpid()}] Extracting relevant content...")
|
||||||
origin = full_doc.get("origin", {})
|
origin = full_doc.get("origin", {})
|
||||||
texts = full_doc.get("texts", [])
|
texts = full_doc.get("texts", [])
|
||||||
print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
|
logger.info(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
|
||||||
|
|
||||||
page_texts = defaultdict(list)
|
page_texts = defaultdict(list)
|
||||||
for txt in texts:
|
for txt in texts:
|
||||||
|
|
@ -210,12 +213,12 @@ def process_document_sync(file_path: str):
|
||||||
joined = "\n".join(page_texts[page])
|
joined = "\n".join(page_texts[page])
|
||||||
chunks.append({"page": page, "text": joined})
|
chunks.append({"page": page, "text": joined})
|
||||||
|
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages"
|
f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(
|
logger.error(
|
||||||
f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}"
|
f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}"
|
||||||
)
|
)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -223,8 +226,8 @@ def process_document_sync(file_path: str):
|
||||||
|
|
||||||
final_memory = process.memory_info().rss / 1024 / 1024
|
final_memory = process.memory_info().rss / 1024 / 1024
|
||||||
memory_delta = final_memory - start_memory
|
memory_delta = final_memory - start_memory
|
||||||
print(f"[WORKER {os.getpid()}] Document processing completed successfully")
|
logger.info(f"[WORKER {os.getpid()}] Document processing completed successfully")
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -239,24 +242,24 @@ def process_document_sync(file_path: str):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
final_memory = process.memory_info().rss / 1024 / 1024
|
final_memory = process.memory_info().rss / 1024 / 1024
|
||||||
memory_delta = final_memory - start_memory
|
memory_delta = final_memory - start_memory
|
||||||
print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
|
logger.error(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
|
||||||
print(f"[WORKER {os.getpid()}] File: {file_path}")
|
logger.info(f"[WORKER {os.getpid()}] File: {file_path}")
|
||||||
print(f"[WORKER {os.getpid()}] Python version: {sys.version}")
|
logger.info(f"[WORKER {os.getpid()}] Python version: {sys.version}")
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
|
||||||
)
|
)
|
||||||
print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
|
logger.error(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
|
||||||
print(f"[WORKER {os.getpid()}] Full traceback:")
|
logger.error(f"[WORKER {os.getpid()}] Full traceback:")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
# Try to get more system info before crashing
|
# Try to get more system info before crashing
|
||||||
try:
|
try:
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
print(
|
logger.info(
|
||||||
f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}"
|
f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}"
|
||||||
)
|
)
|
||||||
print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
|
logger.info(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
def detect_gpu_devices():
|
def detect_gpu_devices():
|
||||||
"""Detect if GPU devices are actually available"""
|
"""Detect if GPU devices are actually available"""
|
||||||
|
|
@ -30,12 +32,12 @@ def get_worker_count():
|
||||||
|
|
||||||
if has_gpu_devices:
|
if has_gpu_devices:
|
||||||
default_workers = min(4, multiprocessing.cpu_count() // 2)
|
default_workers = min(4, multiprocessing.cpu_count() // 2)
|
||||||
print(
|
logger.info(
|
||||||
f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)"
|
f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
default_workers = multiprocessing.cpu_count()
|
default_workers = multiprocessing.cpu_count()
|
||||||
print(
|
logger.info(
|
||||||
f"CPU-only mode enabled - using full concurrency ({default_workers} workers)"
|
f"CPU-only mode enabled - using full concurrency ({default_workers} workers)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,13 @@
|
||||||
import os
|
import os
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
from utils.gpu_detection import get_worker_count
|
from utils.gpu_detection import get_worker_count
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
# Create shared process pool at import time (before CUDA initialization)
|
# Create shared process pool at import time (before CUDA initialization)
|
||||||
# This avoids the "Cannot re-initialize CUDA in forked subprocess" error
|
# This avoids the "Cannot re-initialize CUDA in forked subprocess" error
|
||||||
MAX_WORKERS = get_worker_count()
|
MAX_WORKERS = get_worker_count()
|
||||||
process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)
|
process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)
|
||||||
|
|
||||||
print(f"Shared process pool initialized with {MAX_WORKERS} workers")
|
logger.info(f"Shared process pool initialized with {MAX_WORKERS} workers")
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,16 @@
|
||||||
from docling.document_converter import DocumentConverter
|
from docling.document_converter import DocumentConverter
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
print("Warming up docling models...")
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
logger.info("Warming up docling models...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use the sample document to warm up docling
|
# Use the sample document to warm up docling
|
||||||
test_file = "/app/warmup_ocr.pdf"
|
test_file = "/app/warmup_ocr.pdf"
|
||||||
print(f"Using {test_file} to warm up docling...")
|
logger.info(f"Using {test_file} to warm up docling...")
|
||||||
DocumentConverter().convert(test_file)
|
DocumentConverter().convert(test_file)
|
||||||
print("Docling models warmed up successfully")
|
logger.info("Docling models warmed up successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Docling warm-up completed with: {e}")
|
logger.error(f"Docling warm-up completed with: {e}")
|
||||||
# This is expected - we just want to trigger the model downloads
|
# This is expected - we just want to trigger the model downloads
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue