diff --git a/src/api/auth.py b/src/api/auth.py index b3b3e214..5432335a 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -1,5 +1,6 @@ from starlette.requests import Request from starlette.responses import JSONResponse +from utils.telemetry import TelemetryClient, Category, MessageId async def auth_init(request: Request, auth_service, session_manager): @@ -40,8 +41,11 @@ async def auth_callback(request: Request, auth_service, session_manager): connection_id, authorization_code, state, request ) + await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_CALLBACK) + # If this is app auth, set JWT cookie if result.get("purpose") == "app_auth" and result.get("jwt_token"): + await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_SUCCESS) response = JSONResponse( {k: v for k, v in result.items() if k != "jwt_token"} ) @@ -61,6 +65,7 @@ async def auth_callback(request: Request, auth_service, session_manager): import traceback traceback.print_exc() + await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_FAILED) return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500) @@ -72,6 +77,7 @@ async def auth_me(request: Request, auth_service, session_manager): async def auth_logout(request: Request, auth_service, session_manager): """Logout user by clearing auth cookie""" + await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_LOGOUT) response = JSONResponse( {"status": "logged_out", "message": "Successfully logged out"} ) diff --git a/src/api/connectors.py b/src/api/connectors.py index 5fbcec86..541ebd51 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -1,6 +1,7 @@ from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse from utils.logging_config import get_logger +from utils.telemetry import TelemetryClient, Category, MessageId logger = get_logger(__name__) @@ -25,6 +26,7 @@ async def connector_sync(request: Request, connector_service, session_manager): selected_files = data.get("selected_files") try: + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_START) logger.debug( "Starting connector sync", connector_type=connector_type, @@ -102,6 +104,7 @@ async def connector_sync(request: Request, connector_service, session_manager): jwt_token=jwt_token, ) task_ids = [task_id] + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_COMPLETE) return JSONResponse( { "task_ids": task_ids, @@ -114,6 +117,7 @@ async def connector_sync(request: Request, connector_service, session_manager): except Exception as e: logger.error("Connector sync failed", error=str(e)) + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_FAILED) return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500) @@ -185,6 +189,7 @@ async def connector_webhook(request: Request, connector_service, session_manager config=temp_config, ) try: + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_RECV) temp_connector = connector_service.connection_manager._create_connector( temp_connection ) @@ -336,6 +341,7 @@ async def connector_webhook(request: Request, connector_service, session_manager except Exception as e: logger.error("Webhook processing failed", error=str(e)) + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_FAILED) return JSONResponse( {"error": f"Webhook processing failed: {str(e)}"}, status_code=500 ) diff --git a/src/api/settings.py b/src/api/settings.py index c8e443cf..bfa493d9 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -4,6 +4,7 @@ import time from starlette.responses import JSONResponse from utils.container_utils import transform_localhost_url from utils.logging_config import get_logger +from utils.telemetry import TelemetryClient, Category, MessageId from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, LANGFLOW_URL, @@ -409,16 +410,32 @@ async def update_settings(request, session_manager): # Update agent settings if "llm_model" in body: + old_model = current_config.agent.llm_model current_config.agent.llm_model = body["llm_model"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_LLM_MODEL + ) + logger.info(f"LLM model changed from {old_model} to {body['llm_model']}") if "llm_provider" in body: + old_provider = current_config.agent.llm_provider current_config.agent.llm_provider = body["llm_provider"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_LLM_PROVIDER + ) + logger.info(f"LLM provider changed from {old_provider} to {body['llm_provider']}") if "system_prompt" in body: current_config.agent.system_prompt = body["system_prompt"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_SYSTEM_PROMPT + ) # Also update the chat flow with the new system prompt try: @@ -431,17 +448,33 @@ async def update_settings(request, session_manager): # Update knowledge settings if "embedding_model" in body: + old_model = current_config.knowledge.embedding_model new_embedding_model = body["embedding_model"].strip() current_config.knowledge.embedding_model = new_embedding_model config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_EMBED_MODEL + ) + logger.info(f"Embedding model changed from {old_model} to {new_embedding_model}") if "embedding_provider" in body: + old_provider = current_config.knowledge.embedding_provider current_config.knowledge.embedding_provider = body["embedding_provider"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_EMBED_PROVIDER + ) + logger.info(f"Embedding provider changed from {old_provider} to {body['embedding_provider']}") if "table_structure" in body: current_config.knowledge.table_structure = body["table_structure"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_DOCLING_UPDATED + ) # Also update the flow with the new docling settings try: @@ -453,6 +486,10 @@ async def update_settings(request, session_manager): if "ocr" in body: current_config.knowledge.ocr = body["ocr"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_DOCLING_UPDATED + ) # Also update the flow with the new docling settings try: @@ -464,6 +501,10 @@ async def update_settings(request, session_manager): if "picture_descriptions" in body: current_config.knowledge.picture_descriptions = body["picture_descriptions"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_DOCLING_UPDATED + ) # Also update the flow with the new docling settings try: @@ -475,6 +516,10 @@ async def update_settings(request, session_manager): if "chunk_size" in body: current_config.knowledge.chunk_size = body["chunk_size"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_CHUNK_UPDATED + ) # Also update the ingest flow with the new chunk size try: @@ -491,6 +536,10 @@ async def update_settings(request, session_manager): if "chunk_overlap" in body: current_config.knowledge.chunk_overlap = body["chunk_overlap"] config_updated = True + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_CHUNK_UPDATED + ) # Also update the ingest flow with the new chunk overlap try: @@ -507,35 +556,48 @@ async def update_settings(request, session_manager): # The config will still be saved # Update provider-specific settings + provider_updated = False if "openai_api_key" in body and body["openai_api_key"].strip(): current_config.providers.openai.api_key = body["openai_api_key"] current_config.providers.openai.configured = True config_updated = True + provider_updated = True if "anthropic_api_key" in body and body["anthropic_api_key"].strip(): current_config.providers.anthropic.api_key = body["anthropic_api_key"] current_config.providers.anthropic.configured = True config_updated = True + provider_updated = True if "watsonx_api_key" in body and body["watsonx_api_key"].strip(): current_config.providers.watsonx.api_key = body["watsonx_api_key"] current_config.providers.watsonx.configured = True config_updated = True + provider_updated = True if "watsonx_endpoint" in body: current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip() current_config.providers.watsonx.configured = True config_updated = True + provider_updated = True if "watsonx_project_id" in body: current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip() current_config.providers.watsonx.configured = True config_updated = True + provider_updated = True if "ollama_endpoint" in body: current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip() current_config.providers.ollama.configured = True config_updated = True + provider_updated = True + + if provider_updated: + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_PROVIDER_CREDS + ) if not config_updated: return JSONResponse( @@ -577,10 +639,18 @@ async def update_settings(request, session_manager): logger.info( "Configuration updated successfully", updated_fields=list(body.keys()) ) + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_UPDATED + ) return JSONResponse({"message": "Configuration updated successfully"}) except Exception as e: logger.error("Failed to update settings", error=str(e)) + await TelemetryClient.send_event( + Category.SETTINGS_OPERATIONS, + MessageId.ORB_SETTINGS_UPDATE_FAILED + ) return JSONResponse( {"error": f"Failed to update settings: {str(e)}"}, status_code=500 ) @@ -589,6 +659,8 @@ async def update_settings(request, session_manager): async def onboarding(request, flows_service, session_manager=None): """Handle onboarding configuration setup""" try: + await TelemetryClient.send_event(Category.ONBOARDING, MessageId.ORB_ONBOARD_START) + # Get current configuration current_config = get_openrag_config() @@ -631,13 +703,23 @@ async def onboarding(request, flows_service, session_manager=None): config_updated = False # Update agent settings (LLM) + llm_model_selected = None + llm_provider_selected = None + if "llm_model" in body: if not isinstance(body["llm_model"], str) or not body["llm_model"].strip(): return JSONResponse( {"error": "llm_model must be a non-empty string"}, status_code=400 ) - current_config.agent.llm_model = body["llm_model"].strip() + llm_model_selected = body["llm_model"].strip() + current_config.agent.llm_model = llm_model_selected config_updated = True + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_LLM_MODEL, + metadata={"llm_model": llm_model_selected} + ) + logger.info(f"LLM model selected during onboarding: {llm_model_selected}") if "llm_provider" in body: if ( @@ -653,10 +735,20 @@ async def onboarding(request, flows_service, session_manager=None): {"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"}, status_code=400, ) - current_config.agent.llm_provider = body["llm_provider"].strip() + llm_provider_selected = body["llm_provider"].strip() + current_config.agent.llm_provider = llm_provider_selected config_updated = True + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_LLM_PROVIDER, + metadata={"llm_provider": llm_provider_selected} + ) + logger.info(f"LLM provider selected during onboarding: {llm_provider_selected}") # Update knowledge settings (embedding) + embedding_model_selected = None + embedding_provider_selected = None + if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW: if ( not isinstance(body["embedding_model"], str) @@ -666,8 +758,15 @@ async def onboarding(request, flows_service, session_manager=None): {"error": "embedding_model must be a non-empty string"}, status_code=400, ) - current_config.knowledge.embedding_model = body["embedding_model"].strip() + embedding_model_selected = body["embedding_model"].strip() + current_config.knowledge.embedding_model = embedding_model_selected config_updated = True + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_EMBED_MODEL, + metadata={"embedding_model": embedding_model_selected} + ) + logger.info(f"Embedding model selected during onboarding: {embedding_model_selected}") if "embedding_provider" in body: if ( @@ -684,8 +783,15 @@ async def onboarding(request, flows_service, session_manager=None): {"error": "embedding_provider must be one of: openai, watsonx, ollama"}, status_code=400, ) - current_config.knowledge.embedding_provider = body["embedding_provider"].strip() + embedding_provider_selected = body["embedding_provider"].strip() + current_config.knowledge.embedding_provider = embedding_provider_selected config_updated = True + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_EMBED_PROVIDER, + metadata={"embedding_provider": embedding_provider_selected} + ) + logger.info(f"Embedding provider selected during onboarding: {embedding_provider_selected}") # Update provider-specific credentials if "openai_api_key" in body and body["openai_api_key"].strip(): @@ -771,6 +877,12 @@ async def onboarding(request, flows_service, session_manager=None): {"error": "sample_data must be a boolean value"}, status_code=400 ) should_ingest_sample_data = body["sample_data"] + if should_ingest_sample_data: + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_SAMPLE_DATA + ) + logger.info("Sample data ingestion requested during onboarding") if not config_updated: return JSONResponse( @@ -913,8 +1025,38 @@ async def onboarding(request, flows_service, session_manager=None): "Onboarding configuration updated successfully", updated_fields=updated_fields, ) + + # Mark config as edited and send telemetry with model information + current_config.edited = True + + # Build metadata with selected models + onboarding_metadata = {} + if llm_provider_selected: + onboarding_metadata["llm_provider"] = llm_provider_selected + if llm_model_selected: + onboarding_metadata["llm_model"] = llm_model_selected + if embedding_provider_selected: + onboarding_metadata["embedding_provider"] = embedding_provider_selected + if embedding_model_selected: + onboarding_metadata["embedding_model"] = embedding_model_selected + + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_CONFIG_EDITED, + metadata=onboarding_metadata + ) + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_COMPLETE, + metadata=onboarding_metadata + ) + logger.info("Configuration marked as edited after onboarding") else: + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_FAILED + ) return JSONResponse( {"error": "Failed to save configuration"}, status_code=500 ) @@ -929,6 +1071,10 @@ async def onboarding(request, flows_service, session_manager=None): except Exception as e: logger.error("Failed to update onboarding settings", error=str(e)) + await TelemetryClient.send_event( + Category.ONBOARDING, + MessageId.ORB_ONBOARD_FAILED + ) return JSONResponse( {"error": str(e)}, status_code=500, @@ -1214,11 +1360,11 @@ async def update_docling_preset(request, session_manager): flows_service = _get_flows_service() await flows_service.update_flow_docling_preset("custom", preset_config) - logger.info(f"Successfully updated docling settings in ingest flow") + logger.info("Successfully updated docling settings in ingest flow") return JSONResponse( { - "message": f"Successfully updated docling settings", + "message": "Successfully updated docling settings", "settings": settings, "preset_config": preset_config, } diff --git a/src/api/tasks.py b/src/api/tasks.py index 92779d09..9ee30125 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -1,5 +1,6 @@ from starlette.requests import Request from starlette.responses import JSONResponse +from utils.telemetry import TelemetryClient, Category, MessageId async def task_status(request: Request, task_service, session_manager): @@ -28,8 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager): success = await task_service.cancel_task(user.user_id, task_id) if not success: + await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCEL_FAILED) return JSONResponse( {"error": "Task not found or cannot be cancelled"}, status_code=400 ) + await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCELLED) return JSONResponse({"status": "cancelled", "task_id": task_id}) diff --git a/src/main.py b/src/main.py index 606f975d..e32bc081 100644 --- a/src/main.py +++ b/src/main.py @@ -5,6 +5,7 @@ from services.flows_service import FlowsService from utils.container_utils import detect_container_environment from utils.embeddings import create_dynamic_index_body from utils.logging_config import configure_from_env, get_logger +from utils.telemetry import TelemetryClient, Category, MessageId configure_from_env() logger = get_logger(__name__) @@ -100,6 +101,7 @@ async def wait_for_opensearch(): try: await clients.opensearch.info() logger.info("OpenSearch is ready") + await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_CONN_ESTABLISHED) return except Exception as e: logger.warning( @@ -111,6 +113,7 @@ async def wait_for_opensearch(): if attempt < max_retries - 1: await asyncio.sleep(retry_delay) else: + await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_TIMEOUT) raise Exception("OpenSearch failed to become ready") @@ -154,6 +157,7 @@ async def _ensure_opensearch_index(): "dimension" ], ) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED) except Exception as e: logger.error( @@ -161,6 +165,7 @@ async def _ensure_opensearch_index(): error=str(e), index_name=INDEX_NAME, ) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATE_FAIL) # Don't raise the exception to avoid breaking the initialization # The service can still function, document operations might fail later @@ -193,12 +198,14 @@ async def init_index(): index_name=INDEX_NAME, embedding_model=embedding_model, ) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED) else: logger.info( "Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model, ) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_EXISTS) # Create knowledge filters index knowledge_filter_index_name = "knowledge_filters" @@ -226,6 +233,7 @@ async def init_index(): logger.info( "Created knowledge filters index", index_name=knowledge_filter_index_name ) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_KF_INDEX_CREATED) else: logger.info( "Knowledge filters index already exists, skipping creation", @@ -279,6 +287,7 @@ def generate_jwt_keys(): logger.info("Generated RSA keys for JWT signing") except subprocess.CalledProcessError as e: logger.error("Failed to generate RSA keys", error=str(e)) + TelemetryClient.send_event_sync(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_JWT_KEY_FAIL) raise else: # Ensure correct permissions on existing keys @@ -297,6 +306,7 @@ async def init_index_when_ready(): logger.info("OpenSearch index initialization completed successfully") except Exception as e: logger.error("OpenSearch index initialization failed", error=str(e)) + await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_INIT_FAIL) logger.warning( "OIDC endpoints will still work, but document operations may fail until OpenSearch is ready" ) @@ -324,6 +334,7 @@ async def ingest_default_documents_when_ready(services): "Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, ) + await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_START) base_dir = _get_documents_dir() if not os.path.isdir(base_dir): logger.info( @@ -350,9 +361,12 @@ async def ingest_default_documents_when_ready(services): await _ingest_default_documents_openrag(services, file_paths) else: await _ingest_default_documents_langflow(services, file_paths) + + await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_COMPLETE) except Exception as e: logger.error("Default documents ingestion failed", error=str(e)) + await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_FAILED) async def _ingest_default_documents_langflow(services, file_paths): @@ -502,6 +516,7 @@ async def _update_mcp_servers_with_provider_credentials(services): async def startup_tasks(services): """Startup tasks""" logger.info("Starting startup tasks") + await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_START_INIT) # Only initialize basic OpenSearch connection, not the index # Index will be created after onboarding when we know the embedding model await wait_for_opensearch() @@ -527,25 +542,34 @@ async def startup_tasks(services): logger.info( f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings." ) + await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_DETECTED) from api.settings import reapply_all_settings await reapply_all_settings(session_manager=services["session_manager"]) logger.info("Successfully reapplied settings after detecting flow resets") + await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_SETTINGS_REAPPLIED) else: logger.info("No flows detected as reset, skipping settings reapplication") else: logger.debug("Configuration not yet edited, skipping flow reset check") except Exception as e: logger.error(f"Failed to check flows reset or reapply settings: {str(e)}") + await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_CHECK_FAIL) # Don't fail startup if this check fails async def initialize_services(): """Initialize all services and their dependencies""" + await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_START) # Generate JWT keys if they don't exist generate_jwt_keys() # Initialize clients (now async to generate Langflow API key) - await clients.initialize() + try: + await clients.initialize() + except Exception as e: + logger.error("Failed to initialize clients", error=str(e)) + await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_OS_CLIENT_FAIL) + raise # Initialize session manager session_manager = SessionManager(SESSION_SECRET) @@ -608,8 +632,11 @@ async def initialize_services(): logger.warning( "Failed to load persisted connections on startup", error=str(e) ) + await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_LOAD_FAILED) else: logger.info("[CONNECTORS] Skipping connection loading in no-auth mode") + + await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_SUCCESS) langflow_file_service = LangflowFileService() @@ -1223,6 +1250,7 @@ async def create_app(): # Add startup event handler @app.on_event("startup") async def startup_event(): + await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_STARTED) # Start index initialization in background to avoid blocking OIDC endpoints t1 = asyncio.create_task(startup_tasks(services)) app.state.background_tasks.add(t1) @@ -1270,9 +1298,13 @@ async def create_app(): # Add shutdown event handler @app.on_event("shutdown") async def shutdown_event(): + await TelemetryClient.send_event(Category.APPLICATION_SHUTDOWN, MessageId.ORB_APP_SHUTDOWN) await cleanup_subscriptions_proper(services) # Cleanup async clients await clients.cleanup() + # Cleanup telemetry client + from utils.telemetry.client import cleanup_telemetry_client + await cleanup_telemetry_client() return app diff --git a/src/services/document_service.py b/src/services/document_service.py index 882b5eaf..de1b3cf6 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -14,6 +14,7 @@ logger = get_logger(__name__) from config.settings import clients, INDEX_NAME, get_embedding_model from utils.document_processing import extract_relevant, process_document_sync +from utils.telemetry import TelemetryClient, Category, MessageId def get_token_count(text: str, model: str = None) -> int: @@ -98,6 +99,7 @@ class DocumentService: """Recreate the process pool if it's broken""" if self._process_pool_broken and self.process_pool: logger.warning("Attempting to recreate broken process pool") + TelemetryClient.send_event_sync(Category.DOCUMENT_PROCESSING, MessageId.ORB_DOC_POOL_RECREATE) try: # Shutdown the old pool self.process_pool.shutdown(wait=False) diff --git a/src/services/flows_service.py b/src/services/flows_service.py index 8d3aae73..e97ac2d3 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -28,6 +28,7 @@ import copy from datetime import datetime from utils.logging_config import get_logger from utils.container_utils import transform_localhost_url +from utils.telemetry import TelemetryClient, Category, MessageId logger = get_logger(__name__) @@ -228,6 +229,12 @@ class FlowsService: failed_count=len(backup_results["failed"]), ) + # Send telemetry event + if backup_results["failed"]: + await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_FAILED) + else: + await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_COMPLETE) + return backup_results async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None): diff --git a/src/services/task_service.py b/src/services/task_service.py index 735ad483..c86e96b1 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -7,6 +7,7 @@ from models.tasks import FileTask, TaskStatus, UploadTask from session_manager import AnonymousUser from utils.gpu_detection import get_worker_count from utils.logging_config import get_logger +from utils.telemetry import TelemetryClient, Category, MessageId logger = get_logger(__name__) @@ -131,6 +132,18 @@ class TaskService: # Store reference to background task for cancellation upload_task.background_task = background_task + # Send telemetry event for task creation with metadata + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORB_TASK_CREATED, + metadata={ + "total_files": len(items), + "processor_type": processor.__class__.__name__, + } + ) + ) + return task_id async def background_upload_processor(self, user_id: str, task_id: str) -> None: @@ -174,6 +187,19 @@ class TaskService: if upload_task.processed_files >= upload_task.total_files: upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() + + # Send telemetry for task completion + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORB_TASK_COMPLETE, + metadata={ + "total_files": upload_task.total_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + } + ) + ) except Exception as e: logger.error( @@ -183,8 +209,23 @@ class TaskService: traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: - self.task_store[user_id][task_id].status = TaskStatus.FAILED - self.task_store[user_id][task_id].updated_at = time.time() + failed_task = self.task_store[user_id][task_id] + failed_task.status = TaskStatus.FAILED + failed_task.updated_at = time.time() + + # Send telemetry for task failure + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORB_TASK_FAILED, + metadata={ + "total_files": failed_task.total_files, + "processed_files": failed_task.processed_files, + "successful_files": failed_task.successful_files, + "failed_files": failed_task.failed_files, + } + ) + ) async def background_custom_processor( self, user_id: str, task_id: str, items: list @@ -231,6 +272,19 @@ class TaskService: # Mark task as completed upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() + + # Send telemetry for task completion + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORB_TASK_COMPLETE, + metadata={ + "total_files": upload_task.total_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + } + ) + ) except asyncio.CancelledError: logger.info("Background processor cancelled", task_id=task_id) @@ -246,8 +300,23 @@ class TaskService: traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: - self.task_store[user_id][task_id].status = TaskStatus.FAILED - self.task_store[user_id][task_id].updated_at = time.time() + failed_task = self.task_store[user_id][task_id] + failed_task.status = TaskStatus.FAILED + failed_task.updated_at = time.time() + + # Send telemetry for task failure + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORB_TASK_FAILED, + metadata={ + "total_files": failed_task.total_files, + "processed_files": failed_task.processed_files, + "successful_files": failed_task.successful_files, + "failed_files": failed_task.failed_files, + } + ) + ) def get_task_status(self, user_id: str, task_id: str) -> dict | None: """Get the status of a specific upload task diff --git a/src/utils/telemetry/__init__.py b/src/utils/telemetry/__init__.py new file mode 100644 index 00000000..34e11185 --- /dev/null +++ b/src/utils/telemetry/__init__.py @@ -0,0 +1,8 @@ +"""Telemetry module for OpenRAG backend.""" + +from .client import TelemetryClient +from .category import Category +from .message_id import MessageId + +__all__ = ["TelemetryClient", "Category", "MessageId"] + diff --git a/src/utils/telemetry/category.py b/src/utils/telemetry/category.py new file mode 100644 index 00000000..17f9216c --- /dev/null +++ b/src/utils/telemetry/category.py @@ -0,0 +1,45 @@ +"""Telemetry categories for OpenRAG backend.""" + + +class Category: + """Telemetry event categories.""" + + # Application lifecycle + APPLICATION_STARTUP = "APPLICATION_STARTUP" + APPLICATION_SHUTDOWN = "APPLICATION_SHUTDOWN" + + # Service initialization + SERVICE_INITIALIZATION = "SERVICE_INITIALIZATION" + + # OpenSearch operations + OPENSEARCH_SETUP = "OPENSEARCH_SETUP" + OPENSEARCH_INDEX = "OPENSEARCH_INDEX" + + # Document operations + DOCUMENT_INGESTION = "DOCUMENT_INGESTION" + DOCUMENT_PROCESSING = "DOCUMENT_PROCESSING" + + # Authentication + AUTHENTICATION = "AUTHENTICATION" + + # Connector operations + CONNECTOR_OPERATIONS = "CONNECTOR_OPERATIONS" + + # Flow operations + FLOW_OPERATIONS = "FLOW_OPERATIONS" + + # Task operations + TASK_OPERATIONS = "TASK_OPERATIONS" + + # Chat operations + CHAT_OPERATIONS = "CHAT_OPERATIONS" + + # Error conditions + ERROR_CONDITIONS = "ERROR_CONDITIONS" + + # Settings operations + SETTINGS_OPERATIONS = "SETTINGS_OPERATIONS" + + # Onboarding + ONBOARDING = "ONBOARDING" + diff --git a/src/utils/telemetry/client.py b/src/utils/telemetry/client.py new file mode 100644 index 00000000..7a99b649 --- /dev/null +++ b/src/utils/telemetry/client.py @@ -0,0 +1,402 @@ +"""Telemetry client for OpenRAG backend using Scarf.""" + +import asyncio +import os +import platform +from datetime import datetime, timezone +from typing import Optional +from urllib.parse import urlencode + +import httpx +from utils.logging_config import get_logger + +logger = get_logger(__name__) + +# Constants +SCARF_BASE_URL_DEFAULT = "https://langflow.gateway.scarf.sh" +SCARF_PATH = "openrag" +CLIENT_TYPE = "backend" +PLATFORM_TYPE = "backend" + + +def _get_openrag_version() -> str: + """Get OpenRAG version from package metadata.""" + try: + from importlib.metadata import version, PackageNotFoundError + + try: + return version("openrag") + except PackageNotFoundError: + # Fallback: try to read from pyproject.toml if package not installed (dev mode) + try: + import tomllib + from pathlib import Path + + # Try to find pyproject.toml relative to this file + current_file = Path(__file__) + project_root = current_file.parent.parent.parent.parent + pyproject_path = project_root / "pyproject.toml" + + if pyproject_path.exists(): + with open(pyproject_path, "rb") as f: + data = tomllib.load(f) + return data.get("project", {}).get("version", "dev") + except Exception: + pass + + return "dev" + except Exception as e: + logger.warning(f"Failed to get OpenRAG version: {e}") + return "unknown" + + +# Get version dynamically +OPENRAG_VERSION = _get_openrag_version() + +# HTTP timeouts +HTTP_REQUEST_TIMEOUT = 10.0 +HTTP_CONNECT_TIMEOUT = 5.0 + +# Retry configuration +RETRY_BASE_MS = 250 +MAX_WAIT_INTERVAL_MS = 5000 +MAX_RETRIES = 3 + +# Global HTTP client +_http_client: Optional[httpx.AsyncClient] = None +_base_url_override: Optional[str] = None + + +def _get_http_client() -> Optional[httpx.AsyncClient]: + """Get or create the HTTP client for telemetry.""" + global _http_client + if _http_client is None: + try: + _http_client = httpx.AsyncClient( + timeout=httpx.Timeout( + connect=HTTP_CONNECT_TIMEOUT, + read=HTTP_REQUEST_TIMEOUT, + write=HTTP_REQUEST_TIMEOUT, + pool=HTTP_CONNECT_TIMEOUT, + ), + headers={ + "User-Agent": f"OpenRAG-Backend/{OPENRAG_VERSION}", + }, + ) + logger.debug("Telemetry HTTP client initialized") + except Exception as e: + logger.error(f"Failed to initialize telemetry HTTP client: {e}") + return None + return _http_client + + +def set_base_url(url: str) -> None: + """Override the default Scarf base URL (for testing).""" + global _base_url_override + _base_url_override = url + logger.info(f"Telemetry base URL overridden: {url}") + + +def _get_effective_base_url() -> str: + """Get the effective base URL (override or default).""" + return _base_url_override or SCARF_BASE_URL_DEFAULT + + +def is_do_not_track() -> bool: + """Check if DO_NOT_TRACK environment variable is set.""" + do_not_track = os.environ.get("DO_NOT_TRACK", "").lower() + return do_not_track in ("true", "1", "yes", "on") + + +def _get_os() -> str: + """Get the operating system identifier.""" + system = platform.system().lower() + if system == "darwin": + return "macos" + elif system == "windows": + return "windows" + elif system == "linux": + return "linux" + else: + return "unknown" + + +def _get_os_version() -> str: + """Get the operating system version.""" + try: + system = platform.system().lower() + if system == "darwin": + # macOS version + return platform.mac_ver()[0] if platform.mac_ver()[0] else "unknown" + elif system == "windows": + # Windows version + return platform.win32_ver()[0] if platform.win32_ver()[0] else "unknown" + elif system == "linux": + # Linux - try to get distribution info + try: + import distro + return f"{distro.name()} {distro.version()}".strip() or platform.release() + except ImportError: + # Fallback to platform.release() if distro not available + return platform.release() + else: + return platform.release() + except Exception: + return "unknown" + + +def _get_gpu_info() -> dict: + """Get GPU information for telemetry.""" + gpu_info = { + "gpu_available": False, + "gpu_count": 0, + "cuda_available": False, + "cuda_version": None, + } + + try: + # Try to use the existing GPU detection utility + from utils.gpu_detection import detect_gpu_devices + + has_gpu, gpu_count = detect_gpu_devices() + gpu_info["gpu_available"] = has_gpu + gpu_info["gpu_count"] = gpu_count if isinstance(gpu_count, int) else 0 + + # Also check CUDA availability via torch + try: + import torch + gpu_info["cuda_available"] = torch.cuda.is_available() + if torch.cuda.is_available(): + gpu_info["cuda_version"] = torch.version.cuda or "unknown" + except ImportError: + pass + except Exception as e: + logger.debug(f"Failed to detect GPU info: {e}") + + return gpu_info + + +def _get_current_utc() -> str: + """Get current UTC time as RFC 3339 formatted string.""" + now = datetime.now(timezone.utc) + return now.isoformat().replace("+00:00", "Z") + + +def _get_exponential_backoff_delay(attempt: int) -> float: + """Calculate exponential backoff delay with full jitter (in seconds). + + Formula: + temp = min(MAX_BACKOFF, base * 2^attempt) + sleep = random_between(0, temp) + """ + import random + + exp = min(2 ** attempt, MAX_WAIT_INTERVAL_MS // RETRY_BASE_MS) + temp_ms = RETRY_BASE_MS * exp + temp_ms = min(temp_ms, MAX_WAIT_INTERVAL_MS) + + # Full jitter: random duration between 0 and temp_ms + sleep_ms = random.uniform(0, temp_ms) if temp_ms > 0 else 0 + return sleep_ms / 1000.0 # Convert to seconds + + +async def _send_scarf_event( + category: str, + message_id: str, + metadata: dict = None, +) -> None: + """Send a telemetry event to Scarf. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata to include in the event + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + http_client = _get_http_client() + if http_client is None: + logger.error( + f"Telemetry event aborted: {category}:{message_id}. HTTP client not initialized" + ) + return + + os_name = _get_os() + os_version = _get_os_version() + gpu_info = _get_gpu_info() + timestamp = _get_current_utc() + effective_base_url = _get_effective_base_url() + # Build URL with format: /openrag/{platform}.{version} + base_url = f"{effective_base_url}/{SCARF_PATH}/{PLATFORM_TYPE}.{OPENRAG_VERSION}" + + # Build query parameters + params = { + "clientType": CLIENT_TYPE, + "openrag_version": OPENRAG_VERSION, + "platform": PLATFORM_TYPE, + "os": os_name, + "os_version": os_version, + "gpu_available": str(gpu_info["gpu_available"]).lower(), + "gpu_count": str(gpu_info["gpu_count"]), + "cuda_available": str(gpu_info["cuda_available"]).lower(), + "category": category, + "message_id": message_id, + "timestamp": timestamp, + } + + # Add CUDA version if available + if gpu_info["cuda_version"]: + params["cuda_version"] = str(gpu_info["cuda_version"]) + + # Add metadata if provided + if metadata: + for key, value in metadata.items(): + if value is not None: + # URL encode the value + params[key] = str(value) + + url = f"{base_url}?{urlencode(params)}" + retry_count = 0 + + while retry_count < MAX_RETRIES: + if retry_count == 0: + logger.info(f"Sending telemetry event: {category}:{message_id}...") + else: + logger.info( + f"Sending telemetry event: {category}:{message_id}. Retry #{retry_count}..." + ) + + logger.debug(f"Telemetry URL: {url}") + + try: + response = await http_client.get(url) + status = response.status_code + + if 200 <= status < 300: + logger.info( + f"Successfully sent telemetry event: {category}:{message_id}. Status: {status}" + ) + return + elif 500 <= status < 600: + # Retry server errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. Status: {status}" + ) + else: + # Non-retryable status codes (400, 401, 403, 404, 429, etc.) + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Status: {status} (non-retryable)" + ) + return + + except httpx.TimeoutException as e: + # Retry timeout errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Timeout error: {e}" + ) + except httpx.ConnectError as e: + # Retry connection errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Connection error: {e}" + ) + except httpx.RequestError as e: + # Non-retryable request errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Request error: {e}" + ) + return + except Exception as e: + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Unknown error: {e}" + ) + + retry_count += 1 + + if retry_count < MAX_RETRIES: + delay = _get_exponential_backoff_delay(retry_count) + await asyncio.sleep(delay) + + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Maximum retries exceeded: {MAX_RETRIES}" + ) + + +class TelemetryClient: + """Telemetry client for sending events to Scarf.""" + + @staticmethod + async def send_event(category: str, message_id: str, metadata: dict = None) -> None: + """Send a telemetry event asynchronously. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata (e.g., {"llm_model": "gpt-4o"}) + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + try: + await _send_scarf_event(category, message_id, metadata) + except Exception as e: + logger.error(f"Error sending telemetry event: {e}") + + @staticmethod + def send_event_sync(category: str, message_id: str, metadata: dict = None) -> None: + """Send a telemetry event synchronously (creates a task). + + This is a convenience method for use in synchronous contexts. + It creates an async task but doesn't wait for it. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + try: + # Try to get the current event loop + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # If loop is running, create a task + asyncio.create_task(_send_scarf_event(category, message_id, metadata)) + else: + # If loop exists but not running, run it + loop.run_until_complete(_send_scarf_event(category, message_id, metadata)) + except RuntimeError: + # No event loop, create a new one + asyncio.run(_send_scarf_event(category, message_id, metadata)) + except Exception as e: + logger.error(f"Error sending telemetry event: {e}") + + +async def cleanup_telemetry_client() -> None: + """Cleanup the telemetry HTTP client.""" + global _http_client + if _http_client is not None: + try: + await _http_client.aclose() + _http_client = None + logger.debug("Telemetry HTTP client closed") + except Exception as e: + logger.error(f"Error closing telemetry HTTP client: {e}") + diff --git a/src/utils/telemetry/message_id.py b/src/utils/telemetry/message_id.py new file mode 100644 index 00000000..af242257 --- /dev/null +++ b/src/utils/telemetry/message_id.py @@ -0,0 +1,201 @@ +"""Telemetry message IDs for OpenRAG backend. + +All message IDs start with ORB_ (OpenRAG Backend) followed by descriptive text. +Format: ORB__[_] +""" + + +class MessageId: + """Telemetry message IDs.""" + + # Category: APPLICATION_STARTUP -------------------------------------------> + + # Message: Application started successfully + ORB_APP_STARTED = "ORB_APP_STARTED" + # Message: Application startup initiated + ORB_APP_START_INIT = "ORB_APP_START_INIT" + # Message: Application shutdown initiated + ORB_APP_SHUTDOWN = "ORB_APP_SHUTDOWN" + + # Category: SERVICE_INITIALIZATION -----------------------------------------> + + # Message: Services initialized successfully + ORB_SVC_INIT_SUCCESS = "ORB_SVC_INIT_SUCCESS" + # Message: Service initialization started + ORB_SVC_INIT_START = "ORB_SVC_INIT_START" + # Message: Failed to initialize services + ORB_SVC_INIT_FAILED = "ORB_SVC_INIT_FAILED" + # Message: Failed to initialize OpenSearch client + ORB_SVC_OS_CLIENT_FAIL = "ORB_SVC_OS_CLIENT_FAIL" + # Message: Failed to generate JWT keys + ORB_SVC_JWT_KEY_FAIL = "ORB_SVC_JWT_KEY_FAIL" + + # Category: OPENSEARCH_SETUP ----------------------------------------------> + + # Message: OpenSearch connection established + ORB_OS_CONN_ESTABLISHED = "ORB_OS_CONN_ESTABLISHED" + # Message: OpenSearch connection failed + ORB_OS_CONN_FAILED = "ORB_OS_CONN_FAILED" + # Message: Waiting for OpenSearch to be ready + ORB_OS_WAITING = "ORB_OS_WAITING" + # Message: OpenSearch ready check timeout + ORB_OS_TIMEOUT = "ORB_OS_TIMEOUT" + + # Category: OPENSEARCH_INDEX ----------------------------------------------> + + # Message: OpenSearch index created successfully + ORB_OS_INDEX_CREATED = "ORB_OS_INDEX_CREATED" + # Message: OpenSearch index already exists + ORB_OS_INDEX_EXISTS = "ORB_OS_INDEX_EXISTS" + # Message: Failed to create OpenSearch index + ORB_OS_INDEX_CREATE_FAIL = "ORB_OS_INDEX_CREATE_FAIL" + # Message: Failed to initialize index + ORB_OS_INDEX_INIT_FAIL = "ORB_OS_INDEX_INIT_FAIL" + # Message: Knowledge filters index created + ORB_OS_KF_INDEX_CREATED = "ORB_OS_KF_INDEX_CREATED" + # Message: Failed to create knowledge filters index + ORB_OS_KF_INDEX_FAIL = "ORB_OS_KF_INDEX_FAIL" + + # Category: DOCUMENT_INGESTION --------------------------------------------> + + # Message: Document ingestion started + ORB_DOC_INGEST_START = "ORB_DOC_INGEST_START" + # Message: Document ingestion completed successfully + ORB_DOC_INGEST_COMPLETE = "ORB_DOC_INGEST_COMPLETE" + # Message: Document ingestion failed + ORB_DOC_INGEST_FAILED = "ORB_DOC_INGEST_FAILED" + # Message: Default documents ingestion started + ORB_DOC_DEFAULT_START = "ORB_DOC_DEFAULT_START" + # Message: Default documents ingestion completed + ORB_DOC_DEFAULT_COMPLETE = "ORB_DOC_DEFAULT_COMPLETE" + # Message: Default documents ingestion failed + ORB_DOC_DEFAULT_FAILED = "ORB_DOC_DEFAULT_FAILED" + + # Category: DOCUMENT_PROCESSING --------------------------------------------> + + # Message: Document processing started + ORB_DOC_PROCESS_START = "ORB_DOC_PROCESS_START" + # Message: Document processing completed + ORB_DOC_PROCESS_COMPLETE = "ORB_DOC_PROCESS_COMPLETE" + # Message: Document processing failed + ORB_DOC_PROCESS_FAILED = "ORB_DOC_PROCESS_FAILED" + # Message: Process pool recreation attempted + ORB_DOC_POOL_RECREATE = "ORB_DOC_POOL_RECREATE" + + # Category: AUTHENTICATION ------------------------------------------------> + + # Message: Authentication successful + ORB_AUTH_SUCCESS = "ORB_AUTH_SUCCESS" + # Message: Authentication failed + ORB_AUTH_FAILED = "ORB_AUTH_FAILED" + # Message: User logged out + ORB_AUTH_LOGOUT = "ORB_AUTH_LOGOUT" + # Message: OAuth callback received + ORB_AUTH_OAUTH_CALLBACK = "ORB_AUTH_OAUTH_CALLBACK" + # Message: OAuth callback failed + ORB_AUTH_OAUTH_FAILED = "ORB_AUTH_OAUTH_FAILED" + + # Category: CONNECTOR_OPERATIONS -------------------------------------------> + + # Message: Connector connection established + ORB_CONN_CONNECTED = "ORB_CONN_CONNECTED" + # Message: Connector connection failed + ORB_CONN_CONNECT_FAILED = "ORB_CONN_CONNECT_FAILED" + # Message: Connector sync started + ORB_CONN_SYNC_START = "ORB_CONN_SYNC_START" + # Message: Connector sync completed + ORB_CONN_SYNC_COMPLETE = "ORB_CONN_SYNC_COMPLETE" + # Message: Connector sync failed + ORB_CONN_SYNC_FAILED = "ORB_CONN_SYNC_FAILED" + # Message: Connector webhook received + ORB_CONN_WEBHOOK_RECV = "ORB_CONN_WEBHOOK_RECV" + # Message: Connector webhook failed + ORB_CONN_WEBHOOK_FAILED = "ORB_CONN_WEBHOOK_FAILED" + # Message: Failed to load persisted connections + ORB_CONN_LOAD_FAILED = "ORB_CONN_LOAD_FAILED" + + # Category: FLOW_OPERATIONS ------------------------------------------------> + + # Message: Flow backup completed + ORB_FLOW_BACKUP_COMPLETE = "ORB_FLOW_BACKUP_COMPLETE" + # Message: Flow backup failed + ORB_FLOW_BACKUP_FAILED = "ORB_FLOW_BACKUP_FAILED" + # Message: Flow reset detected + ORB_FLOW_RESET_DETECTED = "ORB_FLOW_RESET_DETECTED" + # Message: Flow reset check failed + ORB_FLOW_RESET_CHECK_FAIL = "ORB_FLOW_RESET_CHECK_FAIL" + # Message: Settings reapplied after flow reset + ORB_FLOW_SETTINGS_REAPPLIED = "ORB_FLOW_SETTINGS_REAPPLIED" + + # Category: TASK_OPERATIONS ------------------------------------------------> + + # Message: Task created successfully + ORB_TASK_CREATED = "ORB_TASK_CREATED" + # Message: Task completed successfully + ORB_TASK_COMPLETE = "ORB_TASK_COMPLETE" + # Message: Task failed + ORB_TASK_FAILED = "ORB_TASK_FAILED" + # Message: Task cancelled + ORB_TASK_CANCELLED = "ORB_TASK_CANCELLED" + # Message: Task cancellation failed + ORB_TASK_CANCEL_FAILED = "ORB_TASK_CANCEL_FAILED" + + # Category: CHAT_OPERATIONS ------------------------------------------------> + + # Message: Chat request received + ORB_CHAT_REQUEST_RECV = "ORB_CHAT_REQUEST_RECV" + # Message: Chat request completed + ORB_CHAT_REQUEST_COMPLETE = "ORB_CHAT_REQUEST_COMPLETE" + # Message: Chat request failed + ORB_CHAT_REQUEST_FAILED = "ORB_CHAT_REQUEST_FAILED" + + # Category: ERROR_CONDITIONS -----------------------------------------------> + + # Message: Critical error occurred + ORB_ERROR_CRITICAL = "ORB_ERROR_CRITICAL" + # Message: Warning condition + ORB_ERROR_WARNING = "ORB_ERROR_WARNING" + + # Category: SETTINGS_OPERATIONS --------------------------------------------> + + # Message: Settings updated successfully + ORB_SETTINGS_UPDATED = "ORB_SETTINGS_UPDATED" + # Message: Settings update failed + ORB_SETTINGS_UPDATE_FAILED = "ORB_SETTINGS_UPDATE_FAILED" + # Message: LLM provider changed + ORB_SETTINGS_LLM_PROVIDER = "ORB_SETTINGS_LLM_PROVIDER" + # Message: LLM model changed + ORB_SETTINGS_LLM_MODEL = "ORB_SETTINGS_LLM_MODEL" + # Message: Embedding provider changed + ORB_SETTINGS_EMBED_PROVIDER = "ORB_SETTINGS_EMBED_PROVIDER" + # Message: Embedding model changed + ORB_SETTINGS_EMBED_MODEL = "ORB_SETTINGS_EMBED_MODEL" + # Message: System prompt updated + ORB_SETTINGS_SYSTEM_PROMPT = "ORB_SETTINGS_SYSTEM_PROMPT" + # Message: Chunk settings updated + ORB_SETTINGS_CHUNK_UPDATED = "ORB_SETTINGS_CHUNK_UPDATED" + # Message: Docling settings updated + ORB_SETTINGS_DOCLING_UPDATED = "ORB_SETTINGS_DOCLING_UPDATED" + # Message: Provider credentials updated + ORB_SETTINGS_PROVIDER_CREDS = "ORB_SETTINGS_PROVIDER_CREDS" + + # Category: ONBOARDING -----------------------------------------------------> + + # Message: Onboarding started + ORB_ONBOARD_START = "ORB_ONBOARD_START" + # Message: Onboarding completed successfully + ORB_ONBOARD_COMPLETE = "ORB_ONBOARD_COMPLETE" + # Message: Onboarding failed + ORB_ONBOARD_FAILED = "ORB_ONBOARD_FAILED" + # Message: LLM provider selected during onboarding + ORB_ONBOARD_LLM_PROVIDER = "ORB_ONBOARD_LLM_PROVIDER" + # Message: LLM model selected during onboarding + ORB_ONBOARD_LLM_MODEL = "ORB_ONBOARD_LLM_MODEL" + # Message: Embedding provider selected during onboarding + ORB_ONBOARD_EMBED_PROVIDER = "ORB_ONBOARD_EMBED_PROVIDER" + # Message: Embedding model selected during onboarding + ORB_ONBOARD_EMBED_MODEL = "ORB_ONBOARD_EMBED_MODEL" + # Message: Sample data ingestion requested + ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA" + # Message: Configuration marked as edited + ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED"