Compare commits
6 commits
main
...
feat/telem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
633fad310e | ||
|
|
1627f40856 | ||
|
|
1563d4fd34 | ||
|
|
d11528e1a9 | ||
|
|
7977216174 | ||
|
|
53a3df1608 |
12 changed files with 938 additions and 11 deletions
|
|
@ -1,5 +1,6 @@
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
|
|
||||||
async def auth_init(request: Request, auth_service, session_manager):
|
async def auth_init(request: Request, auth_service, session_manager):
|
||||||
|
|
@ -40,8 +41,11 @@ async def auth_callback(request: Request, auth_service, session_manager):
|
||||||
connection_id, authorization_code, state, request
|
connection_id, authorization_code, state, request
|
||||||
)
|
)
|
||||||
|
|
||||||
|
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_CALLBACK)
|
||||||
|
|
||||||
# If this is app auth, set JWT cookie
|
# If this is app auth, set JWT cookie
|
||||||
if result.get("purpose") == "app_auth" and result.get("jwt_token"):
|
if result.get("purpose") == "app_auth" and result.get("jwt_token"):
|
||||||
|
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_SUCCESS)
|
||||||
response = JSONResponse(
|
response = JSONResponse(
|
||||||
{k: v for k, v in result.items() if k != "jwt_token"}
|
{k: v for k, v in result.items() if k != "jwt_token"}
|
||||||
)
|
)
|
||||||
|
|
@ -61,6 +65,7 @@ async def auth_callback(request: Request, auth_service, session_manager):
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_FAILED)
|
||||||
return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500)
|
return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -72,6 +77,7 @@ async def auth_me(request: Request, auth_service, session_manager):
|
||||||
|
|
||||||
async def auth_logout(request: Request, auth_service, session_manager):
|
async def auth_logout(request: Request, auth_service, session_manager):
|
||||||
"""Logout user by clearing auth cookie"""
|
"""Logout user by clearing auth cookie"""
|
||||||
|
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_LOGOUT)
|
||||||
response = JSONResponse(
|
response = JSONResponse(
|
||||||
{"status": "logged_out", "message": "Successfully logged out"}
|
{"status": "logged_out", "message": "Successfully logged out"}
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
from starlette.responses import JSONResponse, PlainTextResponse
|
from starlette.responses import JSONResponse, PlainTextResponse
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
@ -25,6 +26,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
selected_files = data.get("selected_files")
|
selected_files = data.get("selected_files")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_START)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Starting connector sync",
|
"Starting connector sync",
|
||||||
connector_type=connector_type,
|
connector_type=connector_type,
|
||||||
|
|
@ -102,6 +104,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
jwt_token=jwt_token,
|
jwt_token=jwt_token,
|
||||||
)
|
)
|
||||||
task_ids = [task_id]
|
task_ids = [task_id]
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_COMPLETE)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{
|
{
|
||||||
"task_ids": task_ids,
|
"task_ids": task_ids,
|
||||||
|
|
@ -114,6 +117,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Connector sync failed", error=str(e))
|
logger.error("Connector sync failed", error=str(e))
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_FAILED)
|
||||||
return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500)
|
return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -185,6 +189,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
|
||||||
config=temp_config,
|
config=temp_config,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_RECV)
|
||||||
temp_connector = connector_service.connection_manager._create_connector(
|
temp_connector = connector_service.connection_manager._create_connector(
|
||||||
temp_connection
|
temp_connection
|
||||||
)
|
)
|
||||||
|
|
@ -336,6 +341,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Webhook processing failed", error=str(e))
|
logger.error("Webhook processing failed", error=str(e))
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_FAILED)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": f"Webhook processing failed: {str(e)}"}, status_code=500
|
{"error": f"Webhook processing failed: {str(e)}"}, status_code=500
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import time
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
from utils.container_utils import transform_localhost_url
|
from utils.container_utils import transform_localhost_url
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
DISABLE_INGEST_WITH_LANGFLOW,
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
LANGFLOW_URL,
|
LANGFLOW_URL,
|
||||||
|
|
@ -409,16 +410,32 @@ async def update_settings(request, session_manager):
|
||||||
|
|
||||||
# Update agent settings
|
# Update agent settings
|
||||||
if "llm_model" in body:
|
if "llm_model" in body:
|
||||||
|
old_model = current_config.agent.llm_model
|
||||||
current_config.agent.llm_model = body["llm_model"]
|
current_config.agent.llm_model = body["llm_model"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_LLM_MODEL
|
||||||
|
)
|
||||||
|
logger.info(f"LLM model changed from {old_model} to {body['llm_model']}")
|
||||||
|
|
||||||
if "llm_provider" in body:
|
if "llm_provider" in body:
|
||||||
|
old_provider = current_config.agent.llm_provider
|
||||||
current_config.agent.llm_provider = body["llm_provider"]
|
current_config.agent.llm_provider = body["llm_provider"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_LLM_PROVIDER
|
||||||
|
)
|
||||||
|
logger.info(f"LLM provider changed from {old_provider} to {body['llm_provider']}")
|
||||||
|
|
||||||
if "system_prompt" in body:
|
if "system_prompt" in body:
|
||||||
current_config.agent.system_prompt = body["system_prompt"]
|
current_config.agent.system_prompt = body["system_prompt"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_SYSTEM_PROMPT
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the chat flow with the new system prompt
|
# Also update the chat flow with the new system prompt
|
||||||
try:
|
try:
|
||||||
|
|
@ -431,17 +448,33 @@ async def update_settings(request, session_manager):
|
||||||
|
|
||||||
# Update knowledge settings
|
# Update knowledge settings
|
||||||
if "embedding_model" in body:
|
if "embedding_model" in body:
|
||||||
|
old_model = current_config.knowledge.embedding_model
|
||||||
new_embedding_model = body["embedding_model"].strip()
|
new_embedding_model = body["embedding_model"].strip()
|
||||||
current_config.knowledge.embedding_model = new_embedding_model
|
current_config.knowledge.embedding_model = new_embedding_model
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_EMBED_MODEL
|
||||||
|
)
|
||||||
|
logger.info(f"Embedding model changed from {old_model} to {new_embedding_model}")
|
||||||
|
|
||||||
if "embedding_provider" in body:
|
if "embedding_provider" in body:
|
||||||
|
old_provider = current_config.knowledge.embedding_provider
|
||||||
current_config.knowledge.embedding_provider = body["embedding_provider"]
|
current_config.knowledge.embedding_provider = body["embedding_provider"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_EMBED_PROVIDER
|
||||||
|
)
|
||||||
|
logger.info(f"Embedding provider changed from {old_provider} to {body['embedding_provider']}")
|
||||||
|
|
||||||
if "table_structure" in body:
|
if "table_structure" in body:
|
||||||
current_config.knowledge.table_structure = body["table_structure"]
|
current_config.knowledge.table_structure = body["table_structure"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_DOCLING_UPDATED
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the flow with the new docling settings
|
# Also update the flow with the new docling settings
|
||||||
try:
|
try:
|
||||||
|
|
@ -453,6 +486,10 @@ async def update_settings(request, session_manager):
|
||||||
if "ocr" in body:
|
if "ocr" in body:
|
||||||
current_config.knowledge.ocr = body["ocr"]
|
current_config.knowledge.ocr = body["ocr"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_DOCLING_UPDATED
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the flow with the new docling settings
|
# Also update the flow with the new docling settings
|
||||||
try:
|
try:
|
||||||
|
|
@ -464,6 +501,10 @@ async def update_settings(request, session_manager):
|
||||||
if "picture_descriptions" in body:
|
if "picture_descriptions" in body:
|
||||||
current_config.knowledge.picture_descriptions = body["picture_descriptions"]
|
current_config.knowledge.picture_descriptions = body["picture_descriptions"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_DOCLING_UPDATED
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the flow with the new docling settings
|
# Also update the flow with the new docling settings
|
||||||
try:
|
try:
|
||||||
|
|
@ -475,6 +516,10 @@ async def update_settings(request, session_manager):
|
||||||
if "chunk_size" in body:
|
if "chunk_size" in body:
|
||||||
current_config.knowledge.chunk_size = body["chunk_size"]
|
current_config.knowledge.chunk_size = body["chunk_size"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_CHUNK_UPDATED
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the ingest flow with the new chunk size
|
# Also update the ingest flow with the new chunk size
|
||||||
try:
|
try:
|
||||||
|
|
@ -491,6 +536,10 @@ async def update_settings(request, session_manager):
|
||||||
if "chunk_overlap" in body:
|
if "chunk_overlap" in body:
|
||||||
current_config.knowledge.chunk_overlap = body["chunk_overlap"]
|
current_config.knowledge.chunk_overlap = body["chunk_overlap"]
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_CHUNK_UPDATED
|
||||||
|
)
|
||||||
|
|
||||||
# Also update the ingest flow with the new chunk overlap
|
# Also update the ingest flow with the new chunk overlap
|
||||||
try:
|
try:
|
||||||
|
|
@ -507,35 +556,48 @@ async def update_settings(request, session_manager):
|
||||||
# The config will still be saved
|
# The config will still be saved
|
||||||
|
|
||||||
# Update provider-specific settings
|
# Update provider-specific settings
|
||||||
|
provider_updated = False
|
||||||
if "openai_api_key" in body and body["openai_api_key"].strip():
|
if "openai_api_key" in body and body["openai_api_key"].strip():
|
||||||
current_config.providers.openai.api_key = body["openai_api_key"]
|
current_config.providers.openai.api_key = body["openai_api_key"]
|
||||||
current_config.providers.openai.configured = True
|
current_config.providers.openai.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
if "anthropic_api_key" in body and body["anthropic_api_key"].strip():
|
if "anthropic_api_key" in body and body["anthropic_api_key"].strip():
|
||||||
current_config.providers.anthropic.api_key = body["anthropic_api_key"]
|
current_config.providers.anthropic.api_key = body["anthropic_api_key"]
|
||||||
current_config.providers.anthropic.configured = True
|
current_config.providers.anthropic.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
if "watsonx_api_key" in body and body["watsonx_api_key"].strip():
|
if "watsonx_api_key" in body and body["watsonx_api_key"].strip():
|
||||||
current_config.providers.watsonx.api_key = body["watsonx_api_key"]
|
current_config.providers.watsonx.api_key = body["watsonx_api_key"]
|
||||||
current_config.providers.watsonx.configured = True
|
current_config.providers.watsonx.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
if "watsonx_endpoint" in body:
|
if "watsonx_endpoint" in body:
|
||||||
current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip()
|
current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip()
|
||||||
current_config.providers.watsonx.configured = True
|
current_config.providers.watsonx.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
if "watsonx_project_id" in body:
|
if "watsonx_project_id" in body:
|
||||||
current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip()
|
current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip()
|
||||||
current_config.providers.watsonx.configured = True
|
current_config.providers.watsonx.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
if "ollama_endpoint" in body:
|
if "ollama_endpoint" in body:
|
||||||
current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip()
|
current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip()
|
||||||
current_config.providers.ollama.configured = True
|
current_config.providers.ollama.configured = True
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
provider_updated = True
|
||||||
|
|
||||||
|
if provider_updated:
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_PROVIDER_CREDS
|
||||||
|
)
|
||||||
|
|
||||||
if not config_updated:
|
if not config_updated:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
|
|
@ -577,10 +639,18 @@ async def update_settings(request, session_manager):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Configuration updated successfully", updated_fields=list(body.keys())
|
"Configuration updated successfully", updated_fields=list(body.keys())
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_UPDATED
|
||||||
|
)
|
||||||
return JSONResponse({"message": "Configuration updated successfully"})
|
return JSONResponse({"message": "Configuration updated successfully"})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to update settings", error=str(e))
|
logger.error("Failed to update settings", error=str(e))
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.SETTINGS_OPERATIONS,
|
||||||
|
MessageId.ORB_SETTINGS_UPDATE_FAILED
|
||||||
|
)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": f"Failed to update settings: {str(e)}"}, status_code=500
|
{"error": f"Failed to update settings: {str(e)}"}, status_code=500
|
||||||
)
|
)
|
||||||
|
|
@ -589,6 +659,8 @@ async def update_settings(request, session_manager):
|
||||||
async def onboarding(request, flows_service, session_manager=None):
|
async def onboarding(request, flows_service, session_manager=None):
|
||||||
"""Handle onboarding configuration setup"""
|
"""Handle onboarding configuration setup"""
|
||||||
try:
|
try:
|
||||||
|
await TelemetryClient.send_event(Category.ONBOARDING, MessageId.ORB_ONBOARD_START)
|
||||||
|
|
||||||
# Get current configuration
|
# Get current configuration
|
||||||
current_config = get_openrag_config()
|
current_config = get_openrag_config()
|
||||||
|
|
||||||
|
|
@ -631,13 +703,23 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
config_updated = False
|
config_updated = False
|
||||||
|
|
||||||
# Update agent settings (LLM)
|
# Update agent settings (LLM)
|
||||||
|
llm_model_selected = None
|
||||||
|
llm_provider_selected = None
|
||||||
|
|
||||||
if "llm_model" in body:
|
if "llm_model" in body:
|
||||||
if not isinstance(body["llm_model"], str) or not body["llm_model"].strip():
|
if not isinstance(body["llm_model"], str) or not body["llm_model"].strip():
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": "llm_model must be a non-empty string"}, status_code=400
|
{"error": "llm_model must be a non-empty string"}, status_code=400
|
||||||
)
|
)
|
||||||
current_config.agent.llm_model = body["llm_model"].strip()
|
llm_model_selected = body["llm_model"].strip()
|
||||||
|
current_config.agent.llm_model = llm_model_selected
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_LLM_MODEL,
|
||||||
|
metadata={"llm_model": llm_model_selected}
|
||||||
|
)
|
||||||
|
logger.info(f"LLM model selected during onboarding: {llm_model_selected}")
|
||||||
|
|
||||||
if "llm_provider" in body:
|
if "llm_provider" in body:
|
||||||
if (
|
if (
|
||||||
|
|
@ -653,10 +735,20 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
{"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"},
|
{"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"},
|
||||||
status_code=400,
|
status_code=400,
|
||||||
)
|
)
|
||||||
current_config.agent.llm_provider = body["llm_provider"].strip()
|
llm_provider_selected = body["llm_provider"].strip()
|
||||||
|
current_config.agent.llm_provider = llm_provider_selected
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_LLM_PROVIDER,
|
||||||
|
metadata={"llm_provider": llm_provider_selected}
|
||||||
|
)
|
||||||
|
logger.info(f"LLM provider selected during onboarding: {llm_provider_selected}")
|
||||||
|
|
||||||
# Update knowledge settings (embedding)
|
# Update knowledge settings (embedding)
|
||||||
|
embedding_model_selected = None
|
||||||
|
embedding_provider_selected = None
|
||||||
|
|
||||||
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
|
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
if (
|
if (
|
||||||
not isinstance(body["embedding_model"], str)
|
not isinstance(body["embedding_model"], str)
|
||||||
|
|
@ -666,8 +758,15 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
{"error": "embedding_model must be a non-empty string"},
|
{"error": "embedding_model must be a non-empty string"},
|
||||||
status_code=400,
|
status_code=400,
|
||||||
)
|
)
|
||||||
current_config.knowledge.embedding_model = body["embedding_model"].strip()
|
embedding_model_selected = body["embedding_model"].strip()
|
||||||
|
current_config.knowledge.embedding_model = embedding_model_selected
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_EMBED_MODEL,
|
||||||
|
metadata={"embedding_model": embedding_model_selected}
|
||||||
|
)
|
||||||
|
logger.info(f"Embedding model selected during onboarding: {embedding_model_selected}")
|
||||||
|
|
||||||
if "embedding_provider" in body:
|
if "embedding_provider" in body:
|
||||||
if (
|
if (
|
||||||
|
|
@ -684,8 +783,15 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
{"error": "embedding_provider must be one of: openai, watsonx, ollama"},
|
{"error": "embedding_provider must be one of: openai, watsonx, ollama"},
|
||||||
status_code=400,
|
status_code=400,
|
||||||
)
|
)
|
||||||
current_config.knowledge.embedding_provider = body["embedding_provider"].strip()
|
embedding_provider_selected = body["embedding_provider"].strip()
|
||||||
|
current_config.knowledge.embedding_provider = embedding_provider_selected
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_EMBED_PROVIDER,
|
||||||
|
metadata={"embedding_provider": embedding_provider_selected}
|
||||||
|
)
|
||||||
|
logger.info(f"Embedding provider selected during onboarding: {embedding_provider_selected}")
|
||||||
|
|
||||||
# Update provider-specific credentials
|
# Update provider-specific credentials
|
||||||
if "openai_api_key" in body and body["openai_api_key"].strip():
|
if "openai_api_key" in body and body["openai_api_key"].strip():
|
||||||
|
|
@ -771,6 +877,12 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
{"error": "sample_data must be a boolean value"}, status_code=400
|
{"error": "sample_data must be a boolean value"}, status_code=400
|
||||||
)
|
)
|
||||||
should_ingest_sample_data = body["sample_data"]
|
should_ingest_sample_data = body["sample_data"]
|
||||||
|
if should_ingest_sample_data:
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_SAMPLE_DATA
|
||||||
|
)
|
||||||
|
logger.info("Sample data ingestion requested during onboarding")
|
||||||
|
|
||||||
if not config_updated:
|
if not config_updated:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
|
|
@ -913,8 +1025,38 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
"Onboarding configuration updated successfully",
|
"Onboarding configuration updated successfully",
|
||||||
updated_fields=updated_fields,
|
updated_fields=updated_fields,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Mark config as edited and send telemetry with model information
|
||||||
|
current_config.edited = True
|
||||||
|
|
||||||
|
# Build metadata with selected models
|
||||||
|
onboarding_metadata = {}
|
||||||
|
if llm_provider_selected:
|
||||||
|
onboarding_metadata["llm_provider"] = llm_provider_selected
|
||||||
|
if llm_model_selected:
|
||||||
|
onboarding_metadata["llm_model"] = llm_model_selected
|
||||||
|
if embedding_provider_selected:
|
||||||
|
onboarding_metadata["embedding_provider"] = embedding_provider_selected
|
||||||
|
if embedding_model_selected:
|
||||||
|
onboarding_metadata["embedding_model"] = embedding_model_selected
|
||||||
|
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_CONFIG_EDITED,
|
||||||
|
metadata=onboarding_metadata
|
||||||
|
)
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_COMPLETE,
|
||||||
|
metadata=onboarding_metadata
|
||||||
|
)
|
||||||
|
logger.info("Configuration marked as edited after onboarding")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_FAILED
|
||||||
|
)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": "Failed to save configuration"}, status_code=500
|
{"error": "Failed to save configuration"}, status_code=500
|
||||||
)
|
)
|
||||||
|
|
@ -929,6 +1071,10 @@ async def onboarding(request, flows_service, session_manager=None):
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to update onboarding settings", error=str(e))
|
logger.error("Failed to update onboarding settings", error=str(e))
|
||||||
|
await TelemetryClient.send_event(
|
||||||
|
Category.ONBOARDING,
|
||||||
|
MessageId.ORB_ONBOARD_FAILED
|
||||||
|
)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": str(e)},
|
{"error": str(e)},
|
||||||
status_code=500,
|
status_code=500,
|
||||||
|
|
@ -1214,11 +1360,11 @@ async def update_docling_preset(request, session_manager):
|
||||||
flows_service = _get_flows_service()
|
flows_service = _get_flows_service()
|
||||||
await flows_service.update_flow_docling_preset("custom", preset_config)
|
await flows_service.update_flow_docling_preset("custom", preset_config)
|
||||||
|
|
||||||
logger.info(f"Successfully updated docling settings in ingest flow")
|
logger.info("Successfully updated docling settings in ingest flow")
|
||||||
|
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{
|
{
|
||||||
"message": f"Successfully updated docling settings",
|
"message": "Successfully updated docling settings",
|
||||||
"settings": settings,
|
"settings": settings,
|
||||||
"preset_config": preset_config,
|
"preset_config": preset_config,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
|
|
||||||
async def task_status(request: Request, task_service, session_manager):
|
async def task_status(request: Request, task_service, session_manager):
|
||||||
|
|
@ -28,8 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager):
|
||||||
|
|
||||||
success = await task_service.cancel_task(user.user_id, task_id)
|
success = await task_service.cancel_task(user.user_id, task_id)
|
||||||
if not success:
|
if not success:
|
||||||
|
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCEL_FAILED)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": "Task not found or cannot be cancelled"}, status_code=400
|
{"error": "Task not found or cannot be cancelled"}, status_code=400
|
||||||
)
|
)
|
||||||
|
|
||||||
|
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCELLED)
|
||||||
return JSONResponse({"status": "cancelled", "task_id": task_id})
|
return JSONResponse({"status": "cancelled", "task_id": task_id})
|
||||||
|
|
|
||||||
34
src/main.py
34
src/main.py
|
|
@ -5,6 +5,7 @@ from services.flows_service import FlowsService
|
||||||
from utils.container_utils import detect_container_environment
|
from utils.container_utils import detect_container_environment
|
||||||
from utils.embeddings import create_dynamic_index_body
|
from utils.embeddings import create_dynamic_index_body
|
||||||
from utils.logging_config import configure_from_env, get_logger
|
from utils.logging_config import configure_from_env, get_logger
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
configure_from_env()
|
configure_from_env()
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
@ -100,6 +101,7 @@ async def wait_for_opensearch():
|
||||||
try:
|
try:
|
||||||
await clients.opensearch.info()
|
await clients.opensearch.info()
|
||||||
logger.info("OpenSearch is ready")
|
logger.info("OpenSearch is ready")
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_CONN_ESTABLISHED)
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|
@ -111,6 +113,7 @@ async def wait_for_opensearch():
|
||||||
if attempt < max_retries - 1:
|
if attempt < max_retries - 1:
|
||||||
await asyncio.sleep(retry_delay)
|
await asyncio.sleep(retry_delay)
|
||||||
else:
|
else:
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_TIMEOUT)
|
||||||
raise Exception("OpenSearch failed to become ready")
|
raise Exception("OpenSearch failed to become ready")
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -154,6 +157,7 @@ async def _ensure_opensearch_index():
|
||||||
"dimension"
|
"dimension"
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
|
|
@ -161,6 +165,7 @@ async def _ensure_opensearch_index():
|
||||||
error=str(e),
|
error=str(e),
|
||||||
index_name=INDEX_NAME,
|
index_name=INDEX_NAME,
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATE_FAIL)
|
||||||
# Don't raise the exception to avoid breaking the initialization
|
# Don't raise the exception to avoid breaking the initialization
|
||||||
# The service can still function, document operations might fail later
|
# The service can still function, document operations might fail later
|
||||||
|
|
||||||
|
|
@ -193,12 +198,14 @@ async def init_index():
|
||||||
index_name=INDEX_NAME,
|
index_name=INDEX_NAME,
|
||||||
embedding_model=embedding_model,
|
embedding_model=embedding_model,
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED)
|
||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Index already exists, skipping creation",
|
"Index already exists, skipping creation",
|
||||||
index_name=INDEX_NAME,
|
index_name=INDEX_NAME,
|
||||||
embedding_model=embedding_model,
|
embedding_model=embedding_model,
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_EXISTS)
|
||||||
|
|
||||||
# Create knowledge filters index
|
# Create knowledge filters index
|
||||||
knowledge_filter_index_name = "knowledge_filters"
|
knowledge_filter_index_name = "knowledge_filters"
|
||||||
|
|
@ -226,6 +233,7 @@ async def init_index():
|
||||||
logger.info(
|
logger.info(
|
||||||
"Created knowledge filters index", index_name=knowledge_filter_index_name
|
"Created knowledge filters index", index_name=knowledge_filter_index_name
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_KF_INDEX_CREATED)
|
||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Knowledge filters index already exists, skipping creation",
|
"Knowledge filters index already exists, skipping creation",
|
||||||
|
|
@ -279,6 +287,7 @@ def generate_jwt_keys():
|
||||||
logger.info("Generated RSA keys for JWT signing")
|
logger.info("Generated RSA keys for JWT signing")
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
logger.error("Failed to generate RSA keys", error=str(e))
|
logger.error("Failed to generate RSA keys", error=str(e))
|
||||||
|
TelemetryClient.send_event_sync(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_JWT_KEY_FAIL)
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
# Ensure correct permissions on existing keys
|
# Ensure correct permissions on existing keys
|
||||||
|
|
@ -297,6 +306,7 @@ async def init_index_when_ready():
|
||||||
logger.info("OpenSearch index initialization completed successfully")
|
logger.info("OpenSearch index initialization completed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("OpenSearch index initialization failed", error=str(e))
|
logger.error("OpenSearch index initialization failed", error=str(e))
|
||||||
|
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_INIT_FAIL)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"OIDC endpoints will still work, but document operations may fail until OpenSearch is ready"
|
"OIDC endpoints will still work, but document operations may fail until OpenSearch is ready"
|
||||||
)
|
)
|
||||||
|
|
@ -324,6 +334,7 @@ async def ingest_default_documents_when_ready(services):
|
||||||
"Ingesting default documents when ready",
|
"Ingesting default documents when ready",
|
||||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_START)
|
||||||
base_dir = _get_documents_dir()
|
base_dir = _get_documents_dir()
|
||||||
if not os.path.isdir(base_dir):
|
if not os.path.isdir(base_dir):
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -350,9 +361,12 @@ async def ingest_default_documents_when_ready(services):
|
||||||
await _ingest_default_documents_openrag(services, file_paths)
|
await _ingest_default_documents_openrag(services, file_paths)
|
||||||
else:
|
else:
|
||||||
await _ingest_default_documents_langflow(services, file_paths)
|
await _ingest_default_documents_langflow(services, file_paths)
|
||||||
|
|
||||||
|
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_COMPLETE)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Default documents ingestion failed", error=str(e))
|
logger.error("Default documents ingestion failed", error=str(e))
|
||||||
|
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_FAILED)
|
||||||
|
|
||||||
|
|
||||||
async def _ingest_default_documents_langflow(services, file_paths):
|
async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
|
|
@ -502,6 +516,7 @@ async def _update_mcp_servers_with_provider_credentials(services):
|
||||||
async def startup_tasks(services):
|
async def startup_tasks(services):
|
||||||
"""Startup tasks"""
|
"""Startup tasks"""
|
||||||
logger.info("Starting startup tasks")
|
logger.info("Starting startup tasks")
|
||||||
|
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_START_INIT)
|
||||||
# Only initialize basic OpenSearch connection, not the index
|
# Only initialize basic OpenSearch connection, not the index
|
||||||
# Index will be created after onboarding when we know the embedding model
|
# Index will be created after onboarding when we know the embedding model
|
||||||
await wait_for_opensearch()
|
await wait_for_opensearch()
|
||||||
|
|
@ -527,25 +542,34 @@ async def startup_tasks(services):
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
|
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_DETECTED)
|
||||||
from api.settings import reapply_all_settings
|
from api.settings import reapply_all_settings
|
||||||
await reapply_all_settings(session_manager=services["session_manager"])
|
await reapply_all_settings(session_manager=services["session_manager"])
|
||||||
logger.info("Successfully reapplied settings after detecting flow resets")
|
logger.info("Successfully reapplied settings after detecting flow resets")
|
||||||
|
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_SETTINGS_REAPPLIED)
|
||||||
else:
|
else:
|
||||||
logger.info("No flows detected as reset, skipping settings reapplication")
|
logger.info("No flows detected as reset, skipping settings reapplication")
|
||||||
else:
|
else:
|
||||||
logger.debug("Configuration not yet edited, skipping flow reset check")
|
logger.debug("Configuration not yet edited, skipping flow reset check")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
|
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
|
||||||
|
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_CHECK_FAIL)
|
||||||
# Don't fail startup if this check fails
|
# Don't fail startup if this check fails
|
||||||
|
|
||||||
|
|
||||||
async def initialize_services():
|
async def initialize_services():
|
||||||
"""Initialize all services and their dependencies"""
|
"""Initialize all services and their dependencies"""
|
||||||
|
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_START)
|
||||||
# Generate JWT keys if they don't exist
|
# Generate JWT keys if they don't exist
|
||||||
generate_jwt_keys()
|
generate_jwt_keys()
|
||||||
|
|
||||||
# Initialize clients (now async to generate Langflow API key)
|
# Initialize clients (now async to generate Langflow API key)
|
||||||
await clients.initialize()
|
try:
|
||||||
|
await clients.initialize()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to initialize clients", error=str(e))
|
||||||
|
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_OS_CLIENT_FAIL)
|
||||||
|
raise
|
||||||
|
|
||||||
# Initialize session manager
|
# Initialize session manager
|
||||||
session_manager = SessionManager(SESSION_SECRET)
|
session_manager = SessionManager(SESSION_SECRET)
|
||||||
|
|
@ -608,8 +632,11 @@ async def initialize_services():
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Failed to load persisted connections on startup", error=str(e)
|
"Failed to load persisted connections on startup", error=str(e)
|
||||||
)
|
)
|
||||||
|
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_LOAD_FAILED)
|
||||||
else:
|
else:
|
||||||
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
|
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
|
||||||
|
|
||||||
|
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_SUCCESS)
|
||||||
|
|
||||||
langflow_file_service = LangflowFileService()
|
langflow_file_service = LangflowFileService()
|
||||||
|
|
||||||
|
|
@ -1223,6 +1250,7 @@ async def create_app():
|
||||||
# Add startup event handler
|
# Add startup event handler
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup_event():
|
async def startup_event():
|
||||||
|
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_STARTED)
|
||||||
# Start index initialization in background to avoid blocking OIDC endpoints
|
# Start index initialization in background to avoid blocking OIDC endpoints
|
||||||
t1 = asyncio.create_task(startup_tasks(services))
|
t1 = asyncio.create_task(startup_tasks(services))
|
||||||
app.state.background_tasks.add(t1)
|
app.state.background_tasks.add(t1)
|
||||||
|
|
@ -1270,9 +1298,13 @@ async def create_app():
|
||||||
# Add shutdown event handler
|
# Add shutdown event handler
|
||||||
@app.on_event("shutdown")
|
@app.on_event("shutdown")
|
||||||
async def shutdown_event():
|
async def shutdown_event():
|
||||||
|
await TelemetryClient.send_event(Category.APPLICATION_SHUTDOWN, MessageId.ORB_APP_SHUTDOWN)
|
||||||
await cleanup_subscriptions_proper(services)
|
await cleanup_subscriptions_proper(services)
|
||||||
# Cleanup async clients
|
# Cleanup async clients
|
||||||
await clients.cleanup()
|
await clients.cleanup()
|
||||||
|
# Cleanup telemetry client
|
||||||
|
from utils.telemetry.client import cleanup_telemetry_client
|
||||||
|
await cleanup_telemetry_client()
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ logger = get_logger(__name__)
|
||||||
|
|
||||||
from config.settings import clients, INDEX_NAME, get_embedding_model
|
from config.settings import clients, INDEX_NAME, get_embedding_model
|
||||||
from utils.document_processing import extract_relevant, process_document_sync
|
from utils.document_processing import extract_relevant, process_document_sync
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
|
|
||||||
def get_token_count(text: str, model: str = None) -> int:
|
def get_token_count(text: str, model: str = None) -> int:
|
||||||
|
|
@ -98,6 +99,7 @@ class DocumentService:
|
||||||
"""Recreate the process pool if it's broken"""
|
"""Recreate the process pool if it's broken"""
|
||||||
if self._process_pool_broken and self.process_pool:
|
if self._process_pool_broken and self.process_pool:
|
||||||
logger.warning("Attempting to recreate broken process pool")
|
logger.warning("Attempting to recreate broken process pool")
|
||||||
|
TelemetryClient.send_event_sync(Category.DOCUMENT_PROCESSING, MessageId.ORB_DOC_POOL_RECREATE)
|
||||||
try:
|
try:
|
||||||
# Shutdown the old pool
|
# Shutdown the old pool
|
||||||
self.process_pool.shutdown(wait=False)
|
self.process_pool.shutdown(wait=False)
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import copy
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
from utils.container_utils import transform_localhost_url
|
from utils.container_utils import transform_localhost_url
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
@ -228,6 +229,12 @@ class FlowsService:
|
||||||
failed_count=len(backup_results["failed"]),
|
failed_count=len(backup_results["failed"]),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Send telemetry event
|
||||||
|
if backup_results["failed"]:
|
||||||
|
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_FAILED)
|
||||||
|
else:
|
||||||
|
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_COMPLETE)
|
||||||
|
|
||||||
return backup_results
|
return backup_results
|
||||||
|
|
||||||
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
|
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ from models.tasks import FileTask, TaskStatus, UploadTask
|
||||||
from session_manager import AnonymousUser
|
from session_manager import AnonymousUser
|
||||||
from utils.gpu_detection import get_worker_count
|
from utils.gpu_detection import get_worker_count
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
from utils.telemetry import TelemetryClient, Category, MessageId
|
||||||
|
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
@ -131,6 +132,18 @@ class TaskService:
|
||||||
# Store reference to background task for cancellation
|
# Store reference to background task for cancellation
|
||||||
upload_task.background_task = background_task
|
upload_task.background_task = background_task
|
||||||
|
|
||||||
|
# Send telemetry event for task creation with metadata
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORB_TASK_CREATED,
|
||||||
|
metadata={
|
||||||
|
"total_files": len(items),
|
||||||
|
"processor_type": processor.__class__.__name__,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return task_id
|
return task_id
|
||||||
|
|
||||||
async def background_upload_processor(self, user_id: str, task_id: str) -> None:
|
async def background_upload_processor(self, user_id: str, task_id: str) -> None:
|
||||||
|
|
@ -174,6 +187,19 @@ class TaskService:
|
||||||
if upload_task.processed_files >= upload_task.total_files:
|
if upload_task.processed_files >= upload_task.total_files:
|
||||||
upload_task.status = TaskStatus.COMPLETED
|
upload_task.status = TaskStatus.COMPLETED
|
||||||
upload_task.updated_at = time.time()
|
upload_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task completion
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORB_TASK_COMPLETE,
|
||||||
|
metadata={
|
||||||
|
"total_files": upload_task.total_files,
|
||||||
|
"successful_files": upload_task.successful_files,
|
||||||
|
"failed_files": upload_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
|
|
@ -183,8 +209,23 @@ class TaskService:
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
failed_task = self.task_store[user_id][task_id]
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
failed_task.status = TaskStatus.FAILED
|
||||||
|
failed_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task failure
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORB_TASK_FAILED,
|
||||||
|
metadata={
|
||||||
|
"total_files": failed_task.total_files,
|
||||||
|
"processed_files": failed_task.processed_files,
|
||||||
|
"successful_files": failed_task.successful_files,
|
||||||
|
"failed_files": failed_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
async def background_custom_processor(
|
async def background_custom_processor(
|
||||||
self, user_id: str, task_id: str, items: list
|
self, user_id: str, task_id: str, items: list
|
||||||
|
|
@ -231,6 +272,19 @@ class TaskService:
|
||||||
# Mark task as completed
|
# Mark task as completed
|
||||||
upload_task.status = TaskStatus.COMPLETED
|
upload_task.status = TaskStatus.COMPLETED
|
||||||
upload_task.updated_at = time.time()
|
upload_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task completion
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORB_TASK_COMPLETE,
|
||||||
|
metadata={
|
||||||
|
"total_files": upload_task.total_files,
|
||||||
|
"successful_files": upload_task.successful_files,
|
||||||
|
"failed_files": upload_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("Background processor cancelled", task_id=task_id)
|
logger.info("Background processor cancelled", task_id=task_id)
|
||||||
|
|
@ -246,8 +300,23 @@ class TaskService:
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
failed_task = self.task_store[user_id][task_id]
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
failed_task.status = TaskStatus.FAILED
|
||||||
|
failed_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task failure
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORB_TASK_FAILED,
|
||||||
|
metadata={
|
||||||
|
"total_files": failed_task.total_files,
|
||||||
|
"processed_files": failed_task.processed_files,
|
||||||
|
"successful_files": failed_task.successful_files,
|
||||||
|
"failed_files": failed_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
|
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
|
||||||
"""Get the status of a specific upload task
|
"""Get the status of a specific upload task
|
||||||
|
|
|
||||||
8
src/utils/telemetry/__init__.py
Normal file
8
src/utils/telemetry/__init__.py
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
"""Telemetry module for OpenRAG backend."""
|
||||||
|
|
||||||
|
from .client import TelemetryClient
|
||||||
|
from .category import Category
|
||||||
|
from .message_id import MessageId
|
||||||
|
|
||||||
|
__all__ = ["TelemetryClient", "Category", "MessageId"]
|
||||||
|
|
||||||
45
src/utils/telemetry/category.py
Normal file
45
src/utils/telemetry/category.py
Normal file
|
|
@ -0,0 +1,45 @@
|
||||||
|
"""Telemetry categories for OpenRAG backend."""
|
||||||
|
|
||||||
|
|
||||||
|
class Category:
|
||||||
|
"""Telemetry event categories."""
|
||||||
|
|
||||||
|
# Application lifecycle
|
||||||
|
APPLICATION_STARTUP = "APPLICATION_STARTUP"
|
||||||
|
APPLICATION_SHUTDOWN = "APPLICATION_SHUTDOWN"
|
||||||
|
|
||||||
|
# Service initialization
|
||||||
|
SERVICE_INITIALIZATION = "SERVICE_INITIALIZATION"
|
||||||
|
|
||||||
|
# OpenSearch operations
|
||||||
|
OPENSEARCH_SETUP = "OPENSEARCH_SETUP"
|
||||||
|
OPENSEARCH_INDEX = "OPENSEARCH_INDEX"
|
||||||
|
|
||||||
|
# Document operations
|
||||||
|
DOCUMENT_INGESTION = "DOCUMENT_INGESTION"
|
||||||
|
DOCUMENT_PROCESSING = "DOCUMENT_PROCESSING"
|
||||||
|
|
||||||
|
# Authentication
|
||||||
|
AUTHENTICATION = "AUTHENTICATION"
|
||||||
|
|
||||||
|
# Connector operations
|
||||||
|
CONNECTOR_OPERATIONS = "CONNECTOR_OPERATIONS"
|
||||||
|
|
||||||
|
# Flow operations
|
||||||
|
FLOW_OPERATIONS = "FLOW_OPERATIONS"
|
||||||
|
|
||||||
|
# Task operations
|
||||||
|
TASK_OPERATIONS = "TASK_OPERATIONS"
|
||||||
|
|
||||||
|
# Chat operations
|
||||||
|
CHAT_OPERATIONS = "CHAT_OPERATIONS"
|
||||||
|
|
||||||
|
# Error conditions
|
||||||
|
ERROR_CONDITIONS = "ERROR_CONDITIONS"
|
||||||
|
|
||||||
|
# Settings operations
|
||||||
|
SETTINGS_OPERATIONS = "SETTINGS_OPERATIONS"
|
||||||
|
|
||||||
|
# Onboarding
|
||||||
|
ONBOARDING = "ONBOARDING"
|
||||||
|
|
||||||
402
src/utils/telemetry/client.py
Normal file
402
src/utils/telemetry/client.py
Normal file
|
|
@ -0,0 +1,402 @@
|
||||||
|
"""Telemetry client for OpenRAG backend using Scarf."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import platform
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
# Constants
|
||||||
|
SCARF_BASE_URL_DEFAULT = "https://langflow.gateway.scarf.sh"
|
||||||
|
SCARF_PATH = "openrag"
|
||||||
|
CLIENT_TYPE = "backend"
|
||||||
|
PLATFORM_TYPE = "backend"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_openrag_version() -> str:
|
||||||
|
"""Get OpenRAG version from package metadata."""
|
||||||
|
try:
|
||||||
|
from importlib.metadata import version, PackageNotFoundError
|
||||||
|
|
||||||
|
try:
|
||||||
|
return version("openrag")
|
||||||
|
except PackageNotFoundError:
|
||||||
|
# Fallback: try to read from pyproject.toml if package not installed (dev mode)
|
||||||
|
try:
|
||||||
|
import tomllib
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Try to find pyproject.toml relative to this file
|
||||||
|
current_file = Path(__file__)
|
||||||
|
project_root = current_file.parent.parent.parent.parent
|
||||||
|
pyproject_path = project_root / "pyproject.toml"
|
||||||
|
|
||||||
|
if pyproject_path.exists():
|
||||||
|
with open(pyproject_path, "rb") as f:
|
||||||
|
data = tomllib.load(f)
|
||||||
|
return data.get("project", {}).get("version", "dev")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return "dev"
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to get OpenRAG version: {e}")
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
# Get version dynamically
|
||||||
|
OPENRAG_VERSION = _get_openrag_version()
|
||||||
|
|
||||||
|
# HTTP timeouts
|
||||||
|
HTTP_REQUEST_TIMEOUT = 10.0
|
||||||
|
HTTP_CONNECT_TIMEOUT = 5.0
|
||||||
|
|
||||||
|
# Retry configuration
|
||||||
|
RETRY_BASE_MS = 250
|
||||||
|
MAX_WAIT_INTERVAL_MS = 5000
|
||||||
|
MAX_RETRIES = 3
|
||||||
|
|
||||||
|
# Global HTTP client
|
||||||
|
_http_client: Optional[httpx.AsyncClient] = None
|
||||||
|
_base_url_override: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_http_client() -> Optional[httpx.AsyncClient]:
|
||||||
|
"""Get or create the HTTP client for telemetry."""
|
||||||
|
global _http_client
|
||||||
|
if _http_client is None:
|
||||||
|
try:
|
||||||
|
_http_client = httpx.AsyncClient(
|
||||||
|
timeout=httpx.Timeout(
|
||||||
|
connect=HTTP_CONNECT_TIMEOUT,
|
||||||
|
read=HTTP_REQUEST_TIMEOUT,
|
||||||
|
write=HTTP_REQUEST_TIMEOUT,
|
||||||
|
pool=HTTP_CONNECT_TIMEOUT,
|
||||||
|
),
|
||||||
|
headers={
|
||||||
|
"User-Agent": f"OpenRAG-Backend/{OPENRAG_VERSION}",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
logger.debug("Telemetry HTTP client initialized")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to initialize telemetry HTTP client: {e}")
|
||||||
|
return None
|
||||||
|
return _http_client
|
||||||
|
|
||||||
|
|
||||||
|
def set_base_url(url: str) -> None:
|
||||||
|
"""Override the default Scarf base URL (for testing)."""
|
||||||
|
global _base_url_override
|
||||||
|
_base_url_override = url
|
||||||
|
logger.info(f"Telemetry base URL overridden: {url}")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_effective_base_url() -> str:
|
||||||
|
"""Get the effective base URL (override or default)."""
|
||||||
|
return _base_url_override or SCARF_BASE_URL_DEFAULT
|
||||||
|
|
||||||
|
|
||||||
|
def is_do_not_track() -> bool:
|
||||||
|
"""Check if DO_NOT_TRACK environment variable is set."""
|
||||||
|
do_not_track = os.environ.get("DO_NOT_TRACK", "").lower()
|
||||||
|
return do_not_track in ("true", "1", "yes", "on")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_os() -> str:
|
||||||
|
"""Get the operating system identifier."""
|
||||||
|
system = platform.system().lower()
|
||||||
|
if system == "darwin":
|
||||||
|
return "macos"
|
||||||
|
elif system == "windows":
|
||||||
|
return "windows"
|
||||||
|
elif system == "linux":
|
||||||
|
return "linux"
|
||||||
|
else:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_os_version() -> str:
|
||||||
|
"""Get the operating system version."""
|
||||||
|
try:
|
||||||
|
system = platform.system().lower()
|
||||||
|
if system == "darwin":
|
||||||
|
# macOS version
|
||||||
|
return platform.mac_ver()[0] if platform.mac_ver()[0] else "unknown"
|
||||||
|
elif system == "windows":
|
||||||
|
# Windows version
|
||||||
|
return platform.win32_ver()[0] if platform.win32_ver()[0] else "unknown"
|
||||||
|
elif system == "linux":
|
||||||
|
# Linux - try to get distribution info
|
||||||
|
try:
|
||||||
|
import distro
|
||||||
|
return f"{distro.name()} {distro.version()}".strip() or platform.release()
|
||||||
|
except ImportError:
|
||||||
|
# Fallback to platform.release() if distro not available
|
||||||
|
return platform.release()
|
||||||
|
else:
|
||||||
|
return platform.release()
|
||||||
|
except Exception:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_gpu_info() -> dict:
|
||||||
|
"""Get GPU information for telemetry."""
|
||||||
|
gpu_info = {
|
||||||
|
"gpu_available": False,
|
||||||
|
"gpu_count": 0,
|
||||||
|
"cuda_available": False,
|
||||||
|
"cuda_version": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try to use the existing GPU detection utility
|
||||||
|
from utils.gpu_detection import detect_gpu_devices
|
||||||
|
|
||||||
|
has_gpu, gpu_count = detect_gpu_devices()
|
||||||
|
gpu_info["gpu_available"] = has_gpu
|
||||||
|
gpu_info["gpu_count"] = gpu_count if isinstance(gpu_count, int) else 0
|
||||||
|
|
||||||
|
# Also check CUDA availability via torch
|
||||||
|
try:
|
||||||
|
import torch
|
||||||
|
gpu_info["cuda_available"] = torch.cuda.is_available()
|
||||||
|
if torch.cuda.is_available():
|
||||||
|
gpu_info["cuda_version"] = torch.version.cuda or "unknown"
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Failed to detect GPU info: {e}")
|
||||||
|
|
||||||
|
return gpu_info
|
||||||
|
|
||||||
|
|
||||||
|
def _get_current_utc() -> str:
|
||||||
|
"""Get current UTC time as RFC 3339 formatted string."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
return now.isoformat().replace("+00:00", "Z")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_exponential_backoff_delay(attempt: int) -> float:
|
||||||
|
"""Calculate exponential backoff delay with full jitter (in seconds).
|
||||||
|
|
||||||
|
Formula:
|
||||||
|
temp = min(MAX_BACKOFF, base * 2^attempt)
|
||||||
|
sleep = random_between(0, temp)
|
||||||
|
"""
|
||||||
|
import random
|
||||||
|
|
||||||
|
exp = min(2 ** attempt, MAX_WAIT_INTERVAL_MS // RETRY_BASE_MS)
|
||||||
|
temp_ms = RETRY_BASE_MS * exp
|
||||||
|
temp_ms = min(temp_ms, MAX_WAIT_INTERVAL_MS)
|
||||||
|
|
||||||
|
# Full jitter: random duration between 0 and temp_ms
|
||||||
|
sleep_ms = random.uniform(0, temp_ms) if temp_ms > 0 else 0
|
||||||
|
return sleep_ms / 1000.0 # Convert to seconds
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_scarf_event(
|
||||||
|
category: str,
|
||||||
|
message_id: str,
|
||||||
|
metadata: dict = None,
|
||||||
|
) -> None:
|
||||||
|
"""Send a telemetry event to Scarf.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Event category
|
||||||
|
message_id: Event message ID
|
||||||
|
metadata: Optional dictionary of additional metadata to include in the event
|
||||||
|
"""
|
||||||
|
if is_do_not_track():
|
||||||
|
logger.debug(
|
||||||
|
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
http_client = _get_http_client()
|
||||||
|
if http_client is None:
|
||||||
|
logger.error(
|
||||||
|
f"Telemetry event aborted: {category}:{message_id}. HTTP client not initialized"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
os_name = _get_os()
|
||||||
|
os_version = _get_os_version()
|
||||||
|
gpu_info = _get_gpu_info()
|
||||||
|
timestamp = _get_current_utc()
|
||||||
|
effective_base_url = _get_effective_base_url()
|
||||||
|
# Build URL with format: /openrag/{platform}.{version}
|
||||||
|
base_url = f"{effective_base_url}/{SCARF_PATH}/{PLATFORM_TYPE}.{OPENRAG_VERSION}"
|
||||||
|
|
||||||
|
# Build query parameters
|
||||||
|
params = {
|
||||||
|
"clientType": CLIENT_TYPE,
|
||||||
|
"openrag_version": OPENRAG_VERSION,
|
||||||
|
"platform": PLATFORM_TYPE,
|
||||||
|
"os": os_name,
|
||||||
|
"os_version": os_version,
|
||||||
|
"gpu_available": str(gpu_info["gpu_available"]).lower(),
|
||||||
|
"gpu_count": str(gpu_info["gpu_count"]),
|
||||||
|
"cuda_available": str(gpu_info["cuda_available"]).lower(),
|
||||||
|
"category": category,
|
||||||
|
"message_id": message_id,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add CUDA version if available
|
||||||
|
if gpu_info["cuda_version"]:
|
||||||
|
params["cuda_version"] = str(gpu_info["cuda_version"])
|
||||||
|
|
||||||
|
# Add metadata if provided
|
||||||
|
if metadata:
|
||||||
|
for key, value in metadata.items():
|
||||||
|
if value is not None:
|
||||||
|
# URL encode the value
|
||||||
|
params[key] = str(value)
|
||||||
|
|
||||||
|
url = f"{base_url}?{urlencode(params)}"
|
||||||
|
retry_count = 0
|
||||||
|
|
||||||
|
while retry_count < MAX_RETRIES:
|
||||||
|
if retry_count == 0:
|
||||||
|
logger.info(f"Sending telemetry event: {category}:{message_id}...")
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"Sending telemetry event: {category}:{message_id}. Retry #{retry_count}..."
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(f"Telemetry URL: {url}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = await http_client.get(url)
|
||||||
|
status = response.status_code
|
||||||
|
|
||||||
|
if 200 <= status < 300:
|
||||||
|
logger.info(
|
||||||
|
f"Successfully sent telemetry event: {category}:{message_id}. Status: {status}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
elif 500 <= status < 600:
|
||||||
|
# Retry server errors
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. Status: {status}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Non-retryable status codes (400, 401, 403, 404, 429, etc.)
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Status: {status} (non-retryable)"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
except httpx.TimeoutException as e:
|
||||||
|
# Retry timeout errors
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Timeout error: {e}"
|
||||||
|
)
|
||||||
|
except httpx.ConnectError as e:
|
||||||
|
# Retry connection errors
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Connection error: {e}"
|
||||||
|
)
|
||||||
|
except httpx.RequestError as e:
|
||||||
|
# Non-retryable request errors
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Request error: {e}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Unknown error: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
|
retry_count += 1
|
||||||
|
|
||||||
|
if retry_count < MAX_RETRIES:
|
||||||
|
delay = _get_exponential_backoff_delay(retry_count)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send telemetry event: {category}:{message_id}. "
|
||||||
|
f"Maximum retries exceeded: {MAX_RETRIES}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TelemetryClient:
|
||||||
|
"""Telemetry client for sending events to Scarf."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def send_event(category: str, message_id: str, metadata: dict = None) -> None:
|
||||||
|
"""Send a telemetry event asynchronously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Event category
|
||||||
|
message_id: Event message ID
|
||||||
|
metadata: Optional dictionary of additional metadata (e.g., {"llm_model": "gpt-4o"})
|
||||||
|
"""
|
||||||
|
if is_do_not_track():
|
||||||
|
logger.debug(
|
||||||
|
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await _send_scarf_event(category, message_id, metadata)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error sending telemetry event: {e}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def send_event_sync(category: str, message_id: str, metadata: dict = None) -> None:
|
||||||
|
"""Send a telemetry event synchronously (creates a task).
|
||||||
|
|
||||||
|
This is a convenience method for use in synchronous contexts.
|
||||||
|
It creates an async task but doesn't wait for it.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Event category
|
||||||
|
message_id: Event message ID
|
||||||
|
metadata: Optional dictionary of additional metadata
|
||||||
|
"""
|
||||||
|
if is_do_not_track():
|
||||||
|
logger.debug(
|
||||||
|
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try to get the current event loop
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
if loop.is_running():
|
||||||
|
# If loop is running, create a task
|
||||||
|
asyncio.create_task(_send_scarf_event(category, message_id, metadata))
|
||||||
|
else:
|
||||||
|
# If loop exists but not running, run it
|
||||||
|
loop.run_until_complete(_send_scarf_event(category, message_id, metadata))
|
||||||
|
except RuntimeError:
|
||||||
|
# No event loop, create a new one
|
||||||
|
asyncio.run(_send_scarf_event(category, message_id, metadata))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error sending telemetry event: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
async def cleanup_telemetry_client() -> None:
|
||||||
|
"""Cleanup the telemetry HTTP client."""
|
||||||
|
global _http_client
|
||||||
|
if _http_client is not None:
|
||||||
|
try:
|
||||||
|
await _http_client.aclose()
|
||||||
|
_http_client = None
|
||||||
|
logger.debug("Telemetry HTTP client closed")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error closing telemetry HTTP client: {e}")
|
||||||
|
|
||||||
201
src/utils/telemetry/message_id.py
Normal file
201
src/utils/telemetry/message_id.py
Normal file
|
|
@ -0,0 +1,201 @@
|
||||||
|
"""Telemetry message IDs for OpenRAG backend.
|
||||||
|
|
||||||
|
All message IDs start with ORB_ (OpenRAG Backend) followed by descriptive text.
|
||||||
|
Format: ORB_<CATEGORY>_<ACTION>[_<STATUS>]
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MessageId:
|
||||||
|
"""Telemetry message IDs."""
|
||||||
|
|
||||||
|
# Category: APPLICATION_STARTUP ------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Application started successfully
|
||||||
|
ORB_APP_STARTED = "ORB_APP_STARTED"
|
||||||
|
# Message: Application startup initiated
|
||||||
|
ORB_APP_START_INIT = "ORB_APP_START_INIT"
|
||||||
|
# Message: Application shutdown initiated
|
||||||
|
ORB_APP_SHUTDOWN = "ORB_APP_SHUTDOWN"
|
||||||
|
|
||||||
|
# Category: SERVICE_INITIALIZATION ----------------------------------------->
|
||||||
|
|
||||||
|
# Message: Services initialized successfully
|
||||||
|
ORB_SVC_INIT_SUCCESS = "ORB_SVC_INIT_SUCCESS"
|
||||||
|
# Message: Service initialization started
|
||||||
|
ORB_SVC_INIT_START = "ORB_SVC_INIT_START"
|
||||||
|
# Message: Failed to initialize services
|
||||||
|
ORB_SVC_INIT_FAILED = "ORB_SVC_INIT_FAILED"
|
||||||
|
# Message: Failed to initialize OpenSearch client
|
||||||
|
ORB_SVC_OS_CLIENT_FAIL = "ORB_SVC_OS_CLIENT_FAIL"
|
||||||
|
# Message: Failed to generate JWT keys
|
||||||
|
ORB_SVC_JWT_KEY_FAIL = "ORB_SVC_JWT_KEY_FAIL"
|
||||||
|
|
||||||
|
# Category: OPENSEARCH_SETUP ---------------------------------------------->
|
||||||
|
|
||||||
|
# Message: OpenSearch connection established
|
||||||
|
ORB_OS_CONN_ESTABLISHED = "ORB_OS_CONN_ESTABLISHED"
|
||||||
|
# Message: OpenSearch connection failed
|
||||||
|
ORB_OS_CONN_FAILED = "ORB_OS_CONN_FAILED"
|
||||||
|
# Message: Waiting for OpenSearch to be ready
|
||||||
|
ORB_OS_WAITING = "ORB_OS_WAITING"
|
||||||
|
# Message: OpenSearch ready check timeout
|
||||||
|
ORB_OS_TIMEOUT = "ORB_OS_TIMEOUT"
|
||||||
|
|
||||||
|
# Category: OPENSEARCH_INDEX ---------------------------------------------->
|
||||||
|
|
||||||
|
# Message: OpenSearch index created successfully
|
||||||
|
ORB_OS_INDEX_CREATED = "ORB_OS_INDEX_CREATED"
|
||||||
|
# Message: OpenSearch index already exists
|
||||||
|
ORB_OS_INDEX_EXISTS = "ORB_OS_INDEX_EXISTS"
|
||||||
|
# Message: Failed to create OpenSearch index
|
||||||
|
ORB_OS_INDEX_CREATE_FAIL = "ORB_OS_INDEX_CREATE_FAIL"
|
||||||
|
# Message: Failed to initialize index
|
||||||
|
ORB_OS_INDEX_INIT_FAIL = "ORB_OS_INDEX_INIT_FAIL"
|
||||||
|
# Message: Knowledge filters index created
|
||||||
|
ORB_OS_KF_INDEX_CREATED = "ORB_OS_KF_INDEX_CREATED"
|
||||||
|
# Message: Failed to create knowledge filters index
|
||||||
|
ORB_OS_KF_INDEX_FAIL = "ORB_OS_KF_INDEX_FAIL"
|
||||||
|
|
||||||
|
# Category: DOCUMENT_INGESTION -------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Document ingestion started
|
||||||
|
ORB_DOC_INGEST_START = "ORB_DOC_INGEST_START"
|
||||||
|
# Message: Document ingestion completed successfully
|
||||||
|
ORB_DOC_INGEST_COMPLETE = "ORB_DOC_INGEST_COMPLETE"
|
||||||
|
# Message: Document ingestion failed
|
||||||
|
ORB_DOC_INGEST_FAILED = "ORB_DOC_INGEST_FAILED"
|
||||||
|
# Message: Default documents ingestion started
|
||||||
|
ORB_DOC_DEFAULT_START = "ORB_DOC_DEFAULT_START"
|
||||||
|
# Message: Default documents ingestion completed
|
||||||
|
ORB_DOC_DEFAULT_COMPLETE = "ORB_DOC_DEFAULT_COMPLETE"
|
||||||
|
# Message: Default documents ingestion failed
|
||||||
|
ORB_DOC_DEFAULT_FAILED = "ORB_DOC_DEFAULT_FAILED"
|
||||||
|
|
||||||
|
# Category: DOCUMENT_PROCESSING -------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Document processing started
|
||||||
|
ORB_DOC_PROCESS_START = "ORB_DOC_PROCESS_START"
|
||||||
|
# Message: Document processing completed
|
||||||
|
ORB_DOC_PROCESS_COMPLETE = "ORB_DOC_PROCESS_COMPLETE"
|
||||||
|
# Message: Document processing failed
|
||||||
|
ORB_DOC_PROCESS_FAILED = "ORB_DOC_PROCESS_FAILED"
|
||||||
|
# Message: Process pool recreation attempted
|
||||||
|
ORB_DOC_POOL_RECREATE = "ORB_DOC_POOL_RECREATE"
|
||||||
|
|
||||||
|
# Category: AUTHENTICATION ------------------------------------------------>
|
||||||
|
|
||||||
|
# Message: Authentication successful
|
||||||
|
ORB_AUTH_SUCCESS = "ORB_AUTH_SUCCESS"
|
||||||
|
# Message: Authentication failed
|
||||||
|
ORB_AUTH_FAILED = "ORB_AUTH_FAILED"
|
||||||
|
# Message: User logged out
|
||||||
|
ORB_AUTH_LOGOUT = "ORB_AUTH_LOGOUT"
|
||||||
|
# Message: OAuth callback received
|
||||||
|
ORB_AUTH_OAUTH_CALLBACK = "ORB_AUTH_OAUTH_CALLBACK"
|
||||||
|
# Message: OAuth callback failed
|
||||||
|
ORB_AUTH_OAUTH_FAILED = "ORB_AUTH_OAUTH_FAILED"
|
||||||
|
|
||||||
|
# Category: CONNECTOR_OPERATIONS ------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Connector connection established
|
||||||
|
ORB_CONN_CONNECTED = "ORB_CONN_CONNECTED"
|
||||||
|
# Message: Connector connection failed
|
||||||
|
ORB_CONN_CONNECT_FAILED = "ORB_CONN_CONNECT_FAILED"
|
||||||
|
# Message: Connector sync started
|
||||||
|
ORB_CONN_SYNC_START = "ORB_CONN_SYNC_START"
|
||||||
|
# Message: Connector sync completed
|
||||||
|
ORB_CONN_SYNC_COMPLETE = "ORB_CONN_SYNC_COMPLETE"
|
||||||
|
# Message: Connector sync failed
|
||||||
|
ORB_CONN_SYNC_FAILED = "ORB_CONN_SYNC_FAILED"
|
||||||
|
# Message: Connector webhook received
|
||||||
|
ORB_CONN_WEBHOOK_RECV = "ORB_CONN_WEBHOOK_RECV"
|
||||||
|
# Message: Connector webhook failed
|
||||||
|
ORB_CONN_WEBHOOK_FAILED = "ORB_CONN_WEBHOOK_FAILED"
|
||||||
|
# Message: Failed to load persisted connections
|
||||||
|
ORB_CONN_LOAD_FAILED = "ORB_CONN_LOAD_FAILED"
|
||||||
|
|
||||||
|
# Category: FLOW_OPERATIONS ------------------------------------------------>
|
||||||
|
|
||||||
|
# Message: Flow backup completed
|
||||||
|
ORB_FLOW_BACKUP_COMPLETE = "ORB_FLOW_BACKUP_COMPLETE"
|
||||||
|
# Message: Flow backup failed
|
||||||
|
ORB_FLOW_BACKUP_FAILED = "ORB_FLOW_BACKUP_FAILED"
|
||||||
|
# Message: Flow reset detected
|
||||||
|
ORB_FLOW_RESET_DETECTED = "ORB_FLOW_RESET_DETECTED"
|
||||||
|
# Message: Flow reset check failed
|
||||||
|
ORB_FLOW_RESET_CHECK_FAIL = "ORB_FLOW_RESET_CHECK_FAIL"
|
||||||
|
# Message: Settings reapplied after flow reset
|
||||||
|
ORB_FLOW_SETTINGS_REAPPLIED = "ORB_FLOW_SETTINGS_REAPPLIED"
|
||||||
|
|
||||||
|
# Category: TASK_OPERATIONS ------------------------------------------------>
|
||||||
|
|
||||||
|
# Message: Task created successfully
|
||||||
|
ORB_TASK_CREATED = "ORB_TASK_CREATED"
|
||||||
|
# Message: Task completed successfully
|
||||||
|
ORB_TASK_COMPLETE = "ORB_TASK_COMPLETE"
|
||||||
|
# Message: Task failed
|
||||||
|
ORB_TASK_FAILED = "ORB_TASK_FAILED"
|
||||||
|
# Message: Task cancelled
|
||||||
|
ORB_TASK_CANCELLED = "ORB_TASK_CANCELLED"
|
||||||
|
# Message: Task cancellation failed
|
||||||
|
ORB_TASK_CANCEL_FAILED = "ORB_TASK_CANCEL_FAILED"
|
||||||
|
|
||||||
|
# Category: CHAT_OPERATIONS ------------------------------------------------>
|
||||||
|
|
||||||
|
# Message: Chat request received
|
||||||
|
ORB_CHAT_REQUEST_RECV = "ORB_CHAT_REQUEST_RECV"
|
||||||
|
# Message: Chat request completed
|
||||||
|
ORB_CHAT_REQUEST_COMPLETE = "ORB_CHAT_REQUEST_COMPLETE"
|
||||||
|
# Message: Chat request failed
|
||||||
|
ORB_CHAT_REQUEST_FAILED = "ORB_CHAT_REQUEST_FAILED"
|
||||||
|
|
||||||
|
# Category: ERROR_CONDITIONS ----------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Critical error occurred
|
||||||
|
ORB_ERROR_CRITICAL = "ORB_ERROR_CRITICAL"
|
||||||
|
# Message: Warning condition
|
||||||
|
ORB_ERROR_WARNING = "ORB_ERROR_WARNING"
|
||||||
|
|
||||||
|
# Category: SETTINGS_OPERATIONS -------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Settings updated successfully
|
||||||
|
ORB_SETTINGS_UPDATED = "ORB_SETTINGS_UPDATED"
|
||||||
|
# Message: Settings update failed
|
||||||
|
ORB_SETTINGS_UPDATE_FAILED = "ORB_SETTINGS_UPDATE_FAILED"
|
||||||
|
# Message: LLM provider changed
|
||||||
|
ORB_SETTINGS_LLM_PROVIDER = "ORB_SETTINGS_LLM_PROVIDER"
|
||||||
|
# Message: LLM model changed
|
||||||
|
ORB_SETTINGS_LLM_MODEL = "ORB_SETTINGS_LLM_MODEL"
|
||||||
|
# Message: Embedding provider changed
|
||||||
|
ORB_SETTINGS_EMBED_PROVIDER = "ORB_SETTINGS_EMBED_PROVIDER"
|
||||||
|
# Message: Embedding model changed
|
||||||
|
ORB_SETTINGS_EMBED_MODEL = "ORB_SETTINGS_EMBED_MODEL"
|
||||||
|
# Message: System prompt updated
|
||||||
|
ORB_SETTINGS_SYSTEM_PROMPT = "ORB_SETTINGS_SYSTEM_PROMPT"
|
||||||
|
# Message: Chunk settings updated
|
||||||
|
ORB_SETTINGS_CHUNK_UPDATED = "ORB_SETTINGS_CHUNK_UPDATED"
|
||||||
|
# Message: Docling settings updated
|
||||||
|
ORB_SETTINGS_DOCLING_UPDATED = "ORB_SETTINGS_DOCLING_UPDATED"
|
||||||
|
# Message: Provider credentials updated
|
||||||
|
ORB_SETTINGS_PROVIDER_CREDS = "ORB_SETTINGS_PROVIDER_CREDS"
|
||||||
|
|
||||||
|
# Category: ONBOARDING ----------------------------------------------------->
|
||||||
|
|
||||||
|
# Message: Onboarding started
|
||||||
|
ORB_ONBOARD_START = "ORB_ONBOARD_START"
|
||||||
|
# Message: Onboarding completed successfully
|
||||||
|
ORB_ONBOARD_COMPLETE = "ORB_ONBOARD_COMPLETE"
|
||||||
|
# Message: Onboarding failed
|
||||||
|
ORB_ONBOARD_FAILED = "ORB_ONBOARD_FAILED"
|
||||||
|
# Message: LLM provider selected during onboarding
|
||||||
|
ORB_ONBOARD_LLM_PROVIDER = "ORB_ONBOARD_LLM_PROVIDER"
|
||||||
|
# Message: LLM model selected during onboarding
|
||||||
|
ORB_ONBOARD_LLM_MODEL = "ORB_ONBOARD_LLM_MODEL"
|
||||||
|
# Message: Embedding provider selected during onboarding
|
||||||
|
ORB_ONBOARD_EMBED_PROVIDER = "ORB_ONBOARD_EMBED_PROVIDER"
|
||||||
|
# Message: Embedding model selected during onboarding
|
||||||
|
ORB_ONBOARD_EMBED_MODEL = "ORB_ONBOARD_EMBED_MODEL"
|
||||||
|
# Message: Sample data ingestion requested
|
||||||
|
ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA"
|
||||||
|
# Message: Configuration marked as edited
|
||||||
|
ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED"
|
||||||
Loading…
Add table
Reference in a new issue