diff --git a/src/main.py b/src/main.py index d1b98759..1c0dc09f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,8 +1,7 @@ - # Configure structured logging early -from services.flows_service import FlowsService from connectors.langflow_connector_service import LangflowConnectorService from connectors.service import ConnectorService +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -23,24 +22,28 @@ from starlette.routing import Route multiprocessing.set_start_method("spawn", force=True) # Create process pool FIRST, before any torch/CUDA imports -from utils.process_pool import process_pool - +from utils.process_pool import process_pool # isort: skip import torch # API endpoints from api import ( - router, auth, chat, connectors, + flows, knowledge_filter, langflow_files, + nudges, oidc, + router, search, settings, tasks, upload, ) + +# Existing services +from api.connector_router import ConnectorRouter from auth_middleware import optional_auth, require_auth # Configuration and setup @@ -53,9 +56,6 @@ from config.settings import ( clients, is_no_auth_mode, ) - -# Existing services -from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -70,24 +70,6 @@ from services.monitor_service import MonitorService from services.search_service import SearchService from services.task_service import TaskService from session_manager import SessionManager -from utils.process_pool import process_pool - -# API endpoints -from api import ( - flows, - router, - nudges, - upload, - search, - chat, - auth, - connectors, - tasks, - oidc, - knowledge_filter, - settings, -) - logger.info( "CUDA device information", @@ -246,7 +228,10 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW) + logger.info( + "Ingesting default documents when ready", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, + ) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -281,40 +266,41 @@ async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] - + logger.info( "Using Langflow ingestion pipeline for default documents", file_count=len(file_paths), ) - + success_count = 0 error_count = 0 - + for file_path in file_paths: try: logger.debug("Processing file with Langflow pipeline", file_path=file_path) - + # Read file content - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: content = f.read() - + # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension content_type, _ = mimetypes.guess_type(filename) if not content_type: - content_type = 'application/octet-stream' - + content_type = "application/octet-stream" + file_tuple = (filename, content, content_type) - + # Use AnonymousUser details for default documents from session_manager import AnonymousUser + anonymous_user = AnonymousUser() - + # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed for anonymous user effective_jwt = None - + # Let session manager handle anonymous JWT creation if needed if session_manager: # This call will create anonymous JWT if needed (same as DocumentFileProcessor) @@ -322,9 +308,9 @@ async def _ingest_default_documents_langflow(services, file_paths): anonymous_user.user_id, effective_jwt ) # Get the JWT that was created by session manager - if hasattr(session_manager, '_anonymous_jwt'): + if hasattr(session_manager, "_anonymous_jwt"): effective_jwt = session_manager._anonymous_jwt - + # Prepare tweaks for default documents with anonymous user metadata default_tweaks = { "OpenSearchHybrid-Ve6bS": { @@ -332,11 +318,11 @@ async def _ingest_default_documents_langflow(services, file_paths): {"key": "owner", "value": None}, {"key": "owner_name", "value": anonymous_user.name}, {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"} + {"key": "connector_type", "value": "system_default"}, ] } } - + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, @@ -346,14 +332,14 @@ async def _ingest_default_documents_langflow(services, file_paths): jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) - + logger.info( "Successfully ingested file via Langflow", file_path=file_path, result_status=result.get("status"), ) success_count += 1 - + except Exception as e: logger.error( "Failed to ingest file via Langflow", @@ -361,7 +347,7 @@ async def _ingest_default_documents_langflow(services, file_paths): error=str(e), ) error_count += 1 - + logger.info( "Langflow ingestion completed", success_count=success_count, @@ -376,7 +362,7 @@ async def _ingest_default_documents_openrag(services, file_paths): "Using traditional OpenRAG ingestion for default documents", file_count=len(file_paths), ) - + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) from models.processors import DocumentFileProcessor @@ -443,11 +429,11 @@ async def initialize_services(): task_service=task_service, session_manager=session_manager, ) - + # Create connector router that chooses based on configuration connector_service = ConnectorRouter( langflow_connector_service=langflow_connector_service, - openrag_connector_service=openrag_connector_service + openrag_connector_service=openrag_connector_service, ) # Initialize auth service diff --git a/uv.lock b/uv.lock index bd7da744..6c858d0c 100644 --- a/uv.lock +++ b/uv.lock @@ -1405,7 +1405,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.1" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "agentd" },