added telemetry to openrag

This commit is contained in:
Lucas Oliveira 2025-11-28 18:48:09 -03:00
parent 53a3df1608
commit 7977216174
8 changed files with 215 additions and 7 deletions

View file

@ -1,5 +1,6 @@
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from utils.telemetry import TelemetryClient, Category, MessageId
async def auth_init(request: Request, auth_service, session_manager): async def auth_init(request: Request, auth_service, session_manager):
@ -40,8 +41,11 @@ async def auth_callback(request: Request, auth_service, session_manager):
connection_id, authorization_code, state, request connection_id, authorization_code, state, request
) )
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORBTA0063I)
# If this is app auth, set JWT cookie # If this is app auth, set JWT cookie
if result.get("purpose") == "app_auth" and result.get("jwt_token"): if result.get("purpose") == "app_auth" and result.get("jwt_token"):
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORBTA0060I)
response = JSONResponse( response = JSONResponse(
{k: v for k, v in result.items() if k != "jwt_token"} {k: v for k, v in result.items() if k != "jwt_token"}
) )
@ -61,6 +65,7 @@ async def auth_callback(request: Request, auth_service, session_manager):
import traceback import traceback
traceback.print_exc() traceback.print_exc()
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORBTA0064E)
return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500) return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500)
@ -72,6 +77,7 @@ async def auth_me(request: Request, auth_service, session_manager):
async def auth_logout(request: Request, auth_service, session_manager): async def auth_logout(request: Request, auth_service, session_manager):
"""Logout user by clearing auth cookie""" """Logout user by clearing auth cookie"""
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORBTA0062I)
response = JSONResponse( response = JSONResponse(
{"status": "logged_out", "message": "Successfully logged out"} {"status": "logged_out", "message": "Successfully logged out"}
) )

View file

@ -1,6 +1,7 @@
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse from starlette.responses import JSONResponse, PlainTextResponse
from utils.logging_config import get_logger from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__) logger = get_logger(__name__)
@ -25,6 +26,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
selected_files = data.get("selected_files") selected_files = data.get("selected_files")
try: try:
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0072I)
logger.debug( logger.debug(
"Starting connector sync", "Starting connector sync",
connector_type=connector_type, connector_type=connector_type,
@ -102,6 +104,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
jwt_token=jwt_token, jwt_token=jwt_token,
) )
task_ids = [task_id] task_ids = [task_id]
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0073I)
return JSONResponse( return JSONResponse(
{ {
"task_ids": task_ids, "task_ids": task_ids,
@ -114,6 +117,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
except Exception as e: except Exception as e:
logger.error("Connector sync failed", error=str(e)) logger.error("Connector sync failed", error=str(e))
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0074E)
return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500) return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500)
@ -185,6 +189,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
config=temp_config, config=temp_config,
) )
try: try:
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0075I)
temp_connector = connector_service.connection_manager._create_connector( temp_connector = connector_service.connection_manager._create_connector(
temp_connection temp_connection
) )
@ -336,6 +341,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
except Exception as e: except Exception as e:
logger.error("Webhook processing failed", error=str(e)) logger.error("Webhook processing failed", error=str(e))
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0076E)
return JSONResponse( return JSONResponse(
{"error": f"Webhook processing failed: {str(e)}"}, status_code=500 {"error": f"Webhook processing failed: {str(e)}"}, status_code=500
) )

View file

@ -4,6 +4,7 @@ import time
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from utils.container_utils import transform_localhost_url from utils.container_utils import transform_localhost_url
from utils.logging_config import get_logger from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
from config.settings import ( from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW, DISABLE_INGEST_WITH_LANGFLOW,
LANGFLOW_URL, LANGFLOW_URL,
@ -409,16 +410,32 @@ async def update_settings(request, session_manager):
# Update agent settings # Update agent settings
if "llm_model" in body: if "llm_model" in body:
old_model = current_config.agent.llm_model
current_config.agent.llm_model = body["llm_model"] current_config.agent.llm_model = body["llm_model"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0123I
)
logger.info(f"LLM model changed from {old_model} to {body['llm_model']}")
if "llm_provider" in body: if "llm_provider" in body:
old_provider = current_config.agent.llm_provider
current_config.agent.llm_provider = body["llm_provider"] current_config.agent.llm_provider = body["llm_provider"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0122I
)
logger.info(f"LLM provider changed from {old_provider} to {body['llm_provider']}")
if "system_prompt" in body: if "system_prompt" in body:
current_config.agent.system_prompt = body["system_prompt"] current_config.agent.system_prompt = body["system_prompt"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0126I
)
# Also update the chat flow with the new system prompt # Also update the chat flow with the new system prompt
try: try:
@ -431,17 +448,33 @@ async def update_settings(request, session_manager):
# Update knowledge settings # Update knowledge settings
if "embedding_model" in body: if "embedding_model" in body:
old_model = current_config.knowledge.embedding_model
new_embedding_model = body["embedding_model"].strip() new_embedding_model = body["embedding_model"].strip()
current_config.knowledge.embedding_model = new_embedding_model current_config.knowledge.embedding_model = new_embedding_model
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0125I
)
logger.info(f"Embedding model changed from {old_model} to {new_embedding_model}")
if "embedding_provider" in body: if "embedding_provider" in body:
old_provider = current_config.knowledge.embedding_provider
current_config.knowledge.embedding_provider = body["embedding_provider"] current_config.knowledge.embedding_provider = body["embedding_provider"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0124I
)
logger.info(f"Embedding provider changed from {old_provider} to {body['embedding_provider']}")
if "table_structure" in body: if "table_structure" in body:
current_config.knowledge.table_structure = body["table_structure"] current_config.knowledge.table_structure = body["table_structure"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0128I
)
# Also update the flow with the new docling settings # Also update the flow with the new docling settings
try: try:
@ -453,6 +486,10 @@ async def update_settings(request, session_manager):
if "ocr" in body: if "ocr" in body:
current_config.knowledge.ocr = body["ocr"] current_config.knowledge.ocr = body["ocr"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0128I
)
# Also update the flow with the new docling settings # Also update the flow with the new docling settings
try: try:
@ -464,6 +501,10 @@ async def update_settings(request, session_manager):
if "picture_descriptions" in body: if "picture_descriptions" in body:
current_config.knowledge.picture_descriptions = body["picture_descriptions"] current_config.knowledge.picture_descriptions = body["picture_descriptions"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0128I
)
# Also update the flow with the new docling settings # Also update the flow with the new docling settings
try: try:
@ -475,6 +516,10 @@ async def update_settings(request, session_manager):
if "chunk_size" in body: if "chunk_size" in body:
current_config.knowledge.chunk_size = body["chunk_size"] current_config.knowledge.chunk_size = body["chunk_size"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0127I
)
# Also update the ingest flow with the new chunk size # Also update the ingest flow with the new chunk size
try: try:
@ -491,6 +536,10 @@ async def update_settings(request, session_manager):
if "chunk_overlap" in body: if "chunk_overlap" in body:
current_config.knowledge.chunk_overlap = body["chunk_overlap"] current_config.knowledge.chunk_overlap = body["chunk_overlap"]
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0127I
)
# Also update the ingest flow with the new chunk overlap # Also update the ingest flow with the new chunk overlap
try: try:
@ -507,35 +556,48 @@ async def update_settings(request, session_manager):
# The config will still be saved # The config will still be saved
# Update provider-specific settings # Update provider-specific settings
provider_updated = False
if "openai_api_key" in body and body["openai_api_key"].strip(): if "openai_api_key" in body and body["openai_api_key"].strip():
current_config.providers.openai.api_key = body["openai_api_key"] current_config.providers.openai.api_key = body["openai_api_key"]
current_config.providers.openai.configured = True current_config.providers.openai.configured = True
config_updated = True config_updated = True
provider_updated = True
if "anthropic_api_key" in body and body["anthropic_api_key"].strip(): if "anthropic_api_key" in body and body["anthropic_api_key"].strip():
current_config.providers.anthropic.api_key = body["anthropic_api_key"] current_config.providers.anthropic.api_key = body["anthropic_api_key"]
current_config.providers.anthropic.configured = True current_config.providers.anthropic.configured = True
config_updated = True config_updated = True
provider_updated = True
if "watsonx_api_key" in body and body["watsonx_api_key"].strip(): if "watsonx_api_key" in body and body["watsonx_api_key"].strip():
current_config.providers.watsonx.api_key = body["watsonx_api_key"] current_config.providers.watsonx.api_key = body["watsonx_api_key"]
current_config.providers.watsonx.configured = True current_config.providers.watsonx.configured = True
config_updated = True config_updated = True
provider_updated = True
if "watsonx_endpoint" in body: if "watsonx_endpoint" in body:
current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip() current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip()
current_config.providers.watsonx.configured = True current_config.providers.watsonx.configured = True
config_updated = True config_updated = True
provider_updated = True
if "watsonx_project_id" in body: if "watsonx_project_id" in body:
current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip() current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip()
current_config.providers.watsonx.configured = True current_config.providers.watsonx.configured = True
config_updated = True config_updated = True
provider_updated = True
if "ollama_endpoint" in body: if "ollama_endpoint" in body:
current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip() current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip()
current_config.providers.ollama.configured = True current_config.providers.ollama.configured = True
config_updated = True config_updated = True
provider_updated = True
if provider_updated:
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0129I
)
if not config_updated: if not config_updated:
return JSONResponse( return JSONResponse(
@ -577,10 +639,18 @@ async def update_settings(request, session_manager):
logger.info( logger.info(
"Configuration updated successfully", updated_fields=list(body.keys()) "Configuration updated successfully", updated_fields=list(body.keys())
) )
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0120I
)
return JSONResponse({"message": "Configuration updated successfully"}) return JSONResponse({"message": "Configuration updated successfully"})
except Exception as e: except Exception as e:
logger.error("Failed to update settings", error=str(e)) logger.error("Failed to update settings", error=str(e))
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORBTA0121E
)
return JSONResponse( return JSONResponse(
{"error": f"Failed to update settings: {str(e)}"}, status_code=500 {"error": f"Failed to update settings: {str(e)}"}, status_code=500
) )
@ -589,6 +659,8 @@ async def update_settings(request, session_manager):
async def onboarding(request, flows_service, session_manager=None): async def onboarding(request, flows_service, session_manager=None):
"""Handle onboarding configuration setup""" """Handle onboarding configuration setup"""
try: try:
await TelemetryClient.send_event(Category.ONBOARDING, MessageId.ORBTA0130I)
# Get current configuration # Get current configuration
current_config = get_openrag_config() current_config = get_openrag_config()
@ -631,13 +703,23 @@ async def onboarding(request, flows_service, session_manager=None):
config_updated = False config_updated = False
# Update agent settings (LLM) # Update agent settings (LLM)
llm_model_selected = None
llm_provider_selected = None
if "llm_model" in body: if "llm_model" in body:
if not isinstance(body["llm_model"], str) or not body["llm_model"].strip(): if not isinstance(body["llm_model"], str) or not body["llm_model"].strip():
return JSONResponse( return JSONResponse(
{"error": "llm_model must be a non-empty string"}, status_code=400 {"error": "llm_model must be a non-empty string"}, status_code=400
) )
current_config.agent.llm_model = body["llm_model"].strip() llm_model_selected = body["llm_model"].strip()
current_config.agent.llm_model = llm_model_selected
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0134I,
metadata={"llm_model": llm_model_selected}
)
logger.info(f"LLM model selected during onboarding: {llm_model_selected}")
if "llm_provider" in body: if "llm_provider" in body:
if ( if (
@ -653,10 +735,20 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"}, {"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"},
status_code=400, status_code=400,
) )
current_config.agent.llm_provider = body["llm_provider"].strip() llm_provider_selected = body["llm_provider"].strip()
current_config.agent.llm_provider = llm_provider_selected
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0133I,
metadata={"llm_provider": llm_provider_selected}
)
logger.info(f"LLM provider selected during onboarding: {llm_provider_selected}")
# Update knowledge settings (embedding) # Update knowledge settings (embedding)
embedding_model_selected = None
embedding_provider_selected = None
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW: if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
if ( if (
not isinstance(body["embedding_model"], str) not isinstance(body["embedding_model"], str)
@ -666,8 +758,15 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "embedding_model must be a non-empty string"}, {"error": "embedding_model must be a non-empty string"},
status_code=400, status_code=400,
) )
current_config.knowledge.embedding_model = body["embedding_model"].strip() embedding_model_selected = body["embedding_model"].strip()
current_config.knowledge.embedding_model = embedding_model_selected
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0136I,
metadata={"embedding_model": embedding_model_selected}
)
logger.info(f"Embedding model selected during onboarding: {embedding_model_selected}")
if "embedding_provider" in body: if "embedding_provider" in body:
if ( if (
@ -684,8 +783,15 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "embedding_provider must be one of: openai, watsonx, ollama"}, {"error": "embedding_provider must be one of: openai, watsonx, ollama"},
status_code=400, status_code=400,
) )
current_config.knowledge.embedding_provider = body["embedding_provider"].strip() embedding_provider_selected = body["embedding_provider"].strip()
current_config.knowledge.embedding_provider = embedding_provider_selected
config_updated = True config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0135I,
metadata={"embedding_provider": embedding_provider_selected}
)
logger.info(f"Embedding provider selected during onboarding: {embedding_provider_selected}")
# Update provider-specific credentials # Update provider-specific credentials
if "openai_api_key" in body and body["openai_api_key"].strip(): if "openai_api_key" in body and body["openai_api_key"].strip():
@ -771,6 +877,12 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "sample_data must be a boolean value"}, status_code=400 {"error": "sample_data must be a boolean value"}, status_code=400
) )
should_ingest_sample_data = body["sample_data"] should_ingest_sample_data = body["sample_data"]
if should_ingest_sample_data:
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0137I
)
logger.info("Sample data ingestion requested during onboarding")
if not config_updated: if not config_updated:
return JSONResponse( return JSONResponse(
@ -913,8 +1025,38 @@ async def onboarding(request, flows_service, session_manager=None):
"Onboarding configuration updated successfully", "Onboarding configuration updated successfully",
updated_fields=updated_fields, updated_fields=updated_fields,
) )
# Mark config as edited and send telemetry with model information
current_config.edited = True
# Build metadata with selected models
onboarding_metadata = {}
if llm_provider_selected:
onboarding_metadata["llm_provider"] = llm_provider_selected
if llm_model_selected:
onboarding_metadata["llm_model"] = llm_model_selected
if embedding_provider_selected:
onboarding_metadata["embedding_provider"] = embedding_provider_selected
if embedding_model_selected:
onboarding_metadata["embedding_model"] = embedding_model_selected
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0138I,
metadata=onboarding_metadata
)
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0131I,
metadata=onboarding_metadata
)
logger.info("Configuration marked as edited after onboarding")
else: else:
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0132E
)
return JSONResponse( return JSONResponse(
{"error": "Failed to save configuration"}, status_code=500 {"error": "Failed to save configuration"}, status_code=500
) )
@ -929,6 +1071,10 @@ async def onboarding(request, flows_service, session_manager=None):
except Exception as e: except Exception as e:
logger.error("Failed to update onboarding settings", error=str(e)) logger.error("Failed to update onboarding settings", error=str(e))
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORBTA0132E
)
return JSONResponse( return JSONResponse(
{"error": str(e)}, {"error": str(e)},
status_code=500, status_code=500,
@ -1214,11 +1360,11 @@ async def update_docling_preset(request, session_manager):
flows_service = _get_flows_service() flows_service = _get_flows_service()
await flows_service.update_flow_docling_preset("custom", preset_config) await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated docling settings in ingest flow") logger.info("Successfully updated docling settings in ingest flow")
return JSONResponse( return JSONResponse(
{ {
"message": f"Successfully updated docling settings", "message": "Successfully updated docling settings",
"settings": settings, "settings": settings,
"preset_config": preset_config, "preset_config": preset_config,
} }

View file

@ -1,5 +1,6 @@
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from utils.telemetry import TelemetryClient, Category, MessageId
async def task_status(request: Request, task_service, session_manager): async def task_status(request: Request, task_service, session_manager):
@ -28,8 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager):
success = await task_service.cancel_task(user.user_id, task_id) success = await task_service.cancel_task(user.user_id, task_id)
if not success: if not success:
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0091E)
return JSONResponse( return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400 {"error": "Task not found or cannot be cancelled"}, status_code=400
) )
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0092I)
return JSONResponse({"status": "cancelled", "task_id": task_id}) return JSONResponse({"status": "cancelled", "task_id": task_id})

View file

@ -5,6 +5,7 @@ from services.flows_service import FlowsService
from utils.container_utils import detect_container_environment from utils.container_utils import detect_container_environment
from utils.embeddings import create_dynamic_index_body from utils.embeddings import create_dynamic_index_body
from utils.logging_config import configure_from_env, get_logger from utils.logging_config import configure_from_env, get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
configure_from_env() configure_from_env()
logger = get_logger(__name__) logger = get_logger(__name__)
@ -100,6 +101,7 @@ async def wait_for_opensearch():
try: try:
await clients.opensearch.info() await clients.opensearch.info()
logger.info("OpenSearch is ready") logger.info("OpenSearch is ready")
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORBTA0020I)
return return
except Exception as e: except Exception as e:
logger.warning( logger.warning(
@ -111,6 +113,7 @@ async def wait_for_opensearch():
if attempt < max_retries - 1: if attempt < max_retries - 1:
await asyncio.sleep(retry_delay) await asyncio.sleep(retry_delay)
else: else:
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORBTA0023E)
raise Exception("OpenSearch failed to become ready") raise Exception("OpenSearch failed to become ready")
@ -154,6 +157,7 @@ async def _ensure_opensearch_index():
"dimension" "dimension"
], ],
) )
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0030I)
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -161,6 +165,7 @@ async def _ensure_opensearch_index():
error=str(e), error=str(e),
index_name=INDEX_NAME, index_name=INDEX_NAME,
) )
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0032E)
# Don't raise the exception to avoid breaking the initialization # Don't raise the exception to avoid breaking the initialization
# The service can still function, document operations might fail later # The service can still function, document operations might fail later
@ -193,12 +198,14 @@ async def init_index():
index_name=INDEX_NAME, index_name=INDEX_NAME,
embedding_model=embedding_model, embedding_model=embedding_model,
) )
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0030I)
else: else:
logger.info( logger.info(
"Index already exists, skipping creation", "Index already exists, skipping creation",
index_name=INDEX_NAME, index_name=INDEX_NAME,
embedding_model=embedding_model, embedding_model=embedding_model,
) )
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0031I)
# Create knowledge filters index # Create knowledge filters index
knowledge_filter_index_name = "knowledge_filters" knowledge_filter_index_name = "knowledge_filters"
@ -226,6 +233,7 @@ async def init_index():
logger.info( logger.info(
"Created knowledge filters index", index_name=knowledge_filter_index_name "Created knowledge filters index", index_name=knowledge_filter_index_name
) )
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0034I)
else: else:
logger.info( logger.info(
"Knowledge filters index already exists, skipping creation", "Knowledge filters index already exists, skipping creation",
@ -279,6 +287,7 @@ def generate_jwt_keys():
logger.info("Generated RSA keys for JWT signing") logger.info("Generated RSA keys for JWT signing")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logger.error("Failed to generate RSA keys", error=str(e)) logger.error("Failed to generate RSA keys", error=str(e))
TelemetryClient.send_event_sync(Category.SERVICE_INITIALIZATION, MessageId.ORBTA0014E)
raise raise
else: else:
# Ensure correct permissions on existing keys # Ensure correct permissions on existing keys
@ -297,6 +306,7 @@ async def init_index_when_ready():
logger.info("OpenSearch index initialization completed successfully") logger.info("OpenSearch index initialization completed successfully")
except Exception as e: except Exception as e:
logger.error("OpenSearch index initialization failed", error=str(e)) logger.error("OpenSearch index initialization failed", error=str(e))
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORBTA0033E)
logger.warning( logger.warning(
"OIDC endpoints will still work, but document operations may fail until OpenSearch is ready" "OIDC endpoints will still work, but document operations may fail until OpenSearch is ready"
) )
@ -324,6 +334,7 @@ async def ingest_default_documents_when_ready(services):
"Ingesting default documents when ready", "Ingesting default documents when ready",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
) )
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORBTA0043I)
base_dir = _get_documents_dir() base_dir = _get_documents_dir()
if not os.path.isdir(base_dir): if not os.path.isdir(base_dir):
logger.info( logger.info(
@ -350,9 +361,12 @@ async def ingest_default_documents_when_ready(services):
await _ingest_default_documents_openrag(services, file_paths) await _ingest_default_documents_openrag(services, file_paths)
else: else:
await _ingest_default_documents_langflow(services, file_paths) await _ingest_default_documents_langflow(services, file_paths)
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORBTA0044I)
except Exception as e: except Exception as e:
logger.error("Default documents ingestion failed", error=str(e)) logger.error("Default documents ingestion failed", error=str(e))
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORBTA0045E)
async def _ingest_default_documents_langflow(services, file_paths): async def _ingest_default_documents_langflow(services, file_paths):
@ -502,6 +516,7 @@ async def _update_mcp_servers_with_provider_credentials(services):
async def startup_tasks(services): async def startup_tasks(services):
"""Startup tasks""" """Startup tasks"""
logger.info("Starting startup tasks") logger.info("Starting startup tasks")
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORBTA0002I)
# Only initialize basic OpenSearch connection, not the index # Only initialize basic OpenSearch connection, not the index
# Index will be created after onboarding when we know the embedding model # Index will be created after onboarding when we know the embedding model
await wait_for_opensearch() await wait_for_opensearch()
@ -527,25 +542,34 @@ async def startup_tasks(services):
logger.info( logger.info(
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings." f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
) )
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORBTA0082W)
from api.settings import reapply_all_settings from api.settings import reapply_all_settings
await reapply_all_settings(session_manager=services["session_manager"]) await reapply_all_settings(session_manager=services["session_manager"])
logger.info("Successfully reapplied settings after detecting flow resets") logger.info("Successfully reapplied settings after detecting flow resets")
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORBTA0084I)
else: else:
logger.info("No flows detected as reset, skipping settings reapplication") logger.info("No flows detected as reset, skipping settings reapplication")
else: else:
logger.debug("Configuration not yet edited, skipping flow reset check") logger.debug("Configuration not yet edited, skipping flow reset check")
except Exception as e: except Exception as e:
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}") logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORBTA0083E)
# Don't fail startup if this check fails # Don't fail startup if this check fails
async def initialize_services(): async def initialize_services():
"""Initialize all services and their dependencies""" """Initialize all services and their dependencies"""
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORBTA0011I)
# Generate JWT keys if they don't exist # Generate JWT keys if they don't exist
generate_jwt_keys() generate_jwt_keys()
# Initialize clients (now async to generate Langflow API key) # Initialize clients (now async to generate Langflow API key)
await clients.initialize() try:
await clients.initialize()
except Exception as e:
logger.error("Failed to initialize clients", error=str(e))
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORBTA0013E)
raise
# Initialize session manager # Initialize session manager
session_manager = SessionManager(SESSION_SECRET) session_manager = SessionManager(SESSION_SECRET)
@ -608,8 +632,11 @@ async def initialize_services():
logger.warning( logger.warning(
"Failed to load persisted connections on startup", error=str(e) "Failed to load persisted connections on startup", error=str(e)
) )
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORBTA0077W)
else: else:
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode") logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORBTA0010I)
langflow_file_service = LangflowFileService() langflow_file_service = LangflowFileService()
@ -1223,6 +1250,7 @@ async def create_app():
# Add startup event handler # Add startup event handler
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORBTA0001I)
# Start index initialization in background to avoid blocking OIDC endpoints # Start index initialization in background to avoid blocking OIDC endpoints
t1 = asyncio.create_task(startup_tasks(services)) t1 = asyncio.create_task(startup_tasks(services))
app.state.background_tasks.add(t1) app.state.background_tasks.add(t1)
@ -1270,9 +1298,13 @@ async def create_app():
# Add shutdown event handler # Add shutdown event handler
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event(): async def shutdown_event():
await TelemetryClient.send_event(Category.APPLICATION_SHUTDOWN, MessageId.ORBTA0003I)
await cleanup_subscriptions_proper(services) await cleanup_subscriptions_proper(services)
# Cleanup async clients # Cleanup async clients
await clients.cleanup() await clients.cleanup()
# Cleanup telemetry client
from utils.telemetry.client import cleanup_telemetry_client
await cleanup_telemetry_client()
return app return app

View file

@ -14,6 +14,7 @@ logger = get_logger(__name__)
from config.settings import clients, INDEX_NAME, get_embedding_model from config.settings import clients, INDEX_NAME, get_embedding_model
from utils.document_processing import extract_relevant, process_document_sync from utils.document_processing import extract_relevant, process_document_sync
from utils.telemetry import TelemetryClient, Category, MessageId
def get_token_count(text: str, model: str = None) -> int: def get_token_count(text: str, model: str = None) -> int:
@ -98,6 +99,7 @@ class DocumentService:
"""Recreate the process pool if it's broken""" """Recreate the process pool if it's broken"""
if self._process_pool_broken and self.process_pool: if self._process_pool_broken and self.process_pool:
logger.warning("Attempting to recreate broken process pool") logger.warning("Attempting to recreate broken process pool")
TelemetryClient.send_event_sync(Category.DOCUMENT_PROCESSING, MessageId.ORBTA0053W)
try: try:
# Shutdown the old pool # Shutdown the old pool
self.process_pool.shutdown(wait=False) self.process_pool.shutdown(wait=False)

View file

@ -28,6 +28,7 @@ import copy
from datetime import datetime from datetime import datetime
from utils.logging_config import get_logger from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url from utils.container_utils import transform_localhost_url
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__) logger = get_logger(__name__)
@ -228,6 +229,12 @@ class FlowsService:
failed_count=len(backup_results["failed"]), failed_count=len(backup_results["failed"]),
) )
# Send telemetry event
if backup_results["failed"]:
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORBTA0081E)
else:
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORBTA0080I)
return backup_results return backup_results
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None): async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):

View file

@ -7,6 +7,7 @@ from models.tasks import FileTask, TaskStatus, UploadTask
from session_manager import AnonymousUser from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__) logger = get_logger(__name__)
@ -131,6 +132,11 @@ class TaskService:
# Store reference to background task for cancellation # Store reference to background task for cancellation
upload_task.background_task = background_task upload_task.background_task = background_task
# Send telemetry event for task creation
asyncio.create_task(
TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0090I)
)
return task_id return task_id
async def background_upload_processor(self, user_id: str, task_id: str) -> None: async def background_upload_processor(self, user_id: str, task_id: str) -> None: