diff --git a/src/utils/telemetry/__init__.py b/src/utils/telemetry/__init__.py new file mode 100644 index 00000000..34e11185 --- /dev/null +++ b/src/utils/telemetry/__init__.py @@ -0,0 +1,8 @@ +"""Telemetry module for OpenRAG backend.""" + +from .client import TelemetryClient +from .category import Category +from .message_id import MessageId + +__all__ = ["TelemetryClient", "Category", "MessageId"] + diff --git a/src/utils/telemetry/category.py b/src/utils/telemetry/category.py new file mode 100644 index 00000000..17f9216c --- /dev/null +++ b/src/utils/telemetry/category.py @@ -0,0 +1,45 @@ +"""Telemetry categories for OpenRAG backend.""" + + +class Category: + """Telemetry event categories.""" + + # Application lifecycle + APPLICATION_STARTUP = "APPLICATION_STARTUP" + APPLICATION_SHUTDOWN = "APPLICATION_SHUTDOWN" + + # Service initialization + SERVICE_INITIALIZATION = "SERVICE_INITIALIZATION" + + # OpenSearch operations + OPENSEARCH_SETUP = "OPENSEARCH_SETUP" + OPENSEARCH_INDEX = "OPENSEARCH_INDEX" + + # Document operations + DOCUMENT_INGESTION = "DOCUMENT_INGESTION" + DOCUMENT_PROCESSING = "DOCUMENT_PROCESSING" + + # Authentication + AUTHENTICATION = "AUTHENTICATION" + + # Connector operations + CONNECTOR_OPERATIONS = "CONNECTOR_OPERATIONS" + + # Flow operations + FLOW_OPERATIONS = "FLOW_OPERATIONS" + + # Task operations + TASK_OPERATIONS = "TASK_OPERATIONS" + + # Chat operations + CHAT_OPERATIONS = "CHAT_OPERATIONS" + + # Error conditions + ERROR_CONDITIONS = "ERROR_CONDITIONS" + + # Settings operations + SETTINGS_OPERATIONS = "SETTINGS_OPERATIONS" + + # Onboarding + ONBOARDING = "ONBOARDING" + diff --git a/src/utils/telemetry/client.py b/src/utils/telemetry/client.py new file mode 100644 index 00000000..7c29b424 --- /dev/null +++ b/src/utils/telemetry/client.py @@ -0,0 +1,335 @@ +"""Telemetry client for OpenRAG backend using Scarf.""" + +import asyncio +import os +import platform +from datetime import datetime, timezone +from typing import Optional +from urllib.parse import urlencode + +import httpx +from utils.logging_config import get_logger + +logger = get_logger(__name__) + +# Constants +SCARF_BASE_URL_DEFAULT = "https://langflow.gateway.scarf.sh" +SCARF_PATH = "openrag" +CLIENT_TYPE = "backend" +PLATFORM_TYPE = "backend" + + +def _get_openrag_version() -> str: + """Get OpenRAG version from package metadata.""" + try: + from importlib.metadata import version, PackageNotFoundError + + try: + return version("openrag") + except PackageNotFoundError: + # Fallback: try to read from pyproject.toml if package not installed (dev mode) + try: + import tomllib + from pathlib import Path + + # Try to find pyproject.toml relative to this file + current_file = Path(__file__) + project_root = current_file.parent.parent.parent.parent + pyproject_path = project_root / "pyproject.toml" + + if pyproject_path.exists(): + with open(pyproject_path, "rb") as f: + data = tomllib.load(f) + return data.get("project", {}).get("version", "dev") + except Exception: + pass + + return "dev" + except Exception as e: + logger.warning(f"Failed to get OpenRAG version: {e}") + return "unknown" + + +# Get version dynamically +OPENRAG_VERSION = _get_openrag_version() + +# HTTP timeouts +HTTP_REQUEST_TIMEOUT = 10.0 +HTTP_CONNECT_TIMEOUT = 5.0 + +# Retry configuration +RETRY_BASE_MS = 250 +MAX_WAIT_INTERVAL_MS = 5000 +MAX_RETRIES = 3 + +# Global HTTP client +_http_client: Optional[httpx.AsyncClient] = None +_base_url_override: Optional[str] = None + + +def _get_http_client() -> Optional[httpx.AsyncClient]: + """Get or create the HTTP client for telemetry.""" + global _http_client + if _http_client is None: + try: + _http_client = httpx.AsyncClient( + timeout=httpx.Timeout( + connect=HTTP_CONNECT_TIMEOUT, + read=HTTP_REQUEST_TIMEOUT, + ), + headers={ + "User-Agent": f"OpenRAG-Backend/{OPENRAG_VERSION}", + }, + ) + logger.debug("Telemetry HTTP client initialized") + except Exception as e: + logger.error(f"Failed to initialize telemetry HTTP client: {e}") + return None + return _http_client + + +def set_base_url(url: str) -> None: + """Override the default Scarf base URL (for testing).""" + global _base_url_override + _base_url_override = url + logger.info(f"Telemetry base URL overridden: {url}") + + +def _get_effective_base_url() -> str: + """Get the effective base URL (override or default).""" + return _base_url_override or SCARF_BASE_URL_DEFAULT + + +def is_do_not_track() -> bool: + """Check if DO_NOT_TRACK environment variable is set.""" + do_not_track = os.environ.get("DO_NOT_TRACK", "").lower() + return do_not_track in ("true", "1", "yes", "on") + + +def _get_os() -> str: + """Get the operating system identifier.""" + system = platform.system().lower() + if system == "darwin": + return "macos" + elif system == "windows": + return "windows" + elif system == "linux": + return "linux" + else: + return "unknown" + + +def _get_current_utc() -> str: + """Get current UTC time as RFC 3339 formatted string.""" + now = datetime.now(timezone.utc) + return now.isoformat().replace("+00:00", "Z") + + +def _get_exponential_backoff_delay(attempt: int) -> float: + """Calculate exponential backoff delay with full jitter (in seconds). + + Formula: + temp = min(MAX_BACKOFF, base * 2^attempt) + sleep = random_between(0, temp) + """ + import random + + exp = min(2 ** attempt, MAX_WAIT_INTERVAL_MS // RETRY_BASE_MS) + temp_ms = RETRY_BASE_MS * exp + temp_ms = min(temp_ms, MAX_WAIT_INTERVAL_MS) + + # Full jitter: random duration between 0 and temp_ms + sleep_ms = random.uniform(0, temp_ms) if temp_ms > 0 else 0 + return sleep_ms / 1000.0 # Convert to seconds + + +async def _send_scarf_event( + category: str, + message_id: str, + metadata: dict = None, +) -> None: + """Send a telemetry event to Scarf. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata to include in the event + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + http_client = _get_http_client() + if http_client is None: + logger.error( + f"Telemetry event aborted: {category}:{message_id}. HTTP client not initialized" + ) + return + + os_name = _get_os() + timestamp = _get_current_utc() + effective_base_url = _get_effective_base_url() + # Build URL with format: /openrag/{platform}.{version} + base_url = f"{effective_base_url}/{SCARF_PATH}/{PLATFORM_TYPE}.{OPENRAG_VERSION}" + + # Build query parameters + params = { + "clientType": CLIENT_TYPE, + "openrag_version": OPENRAG_VERSION, + "platform": PLATFORM_TYPE, + "os": os_name, + "category": category, + "message_id": message_id, + "timestamp": timestamp, + } + + # Add metadata if provided + if metadata: + for key, value in metadata.items(): + if value is not None: + # URL encode the value + params[key] = str(value) + + url = f"{base_url}?{urlencode(params)}" + retry_count = 0 + + while retry_count < MAX_RETRIES: + if retry_count == 0: + logger.info(f"Sending telemetry event: {category}:{message_id}...") + else: + logger.info( + f"Sending telemetry event: {category}:{message_id}. Retry #{retry_count}..." + ) + + logger.debug(f"Telemetry URL: {url}") + + try: + response = await http_client.get(url) + status = response.status_code + + if 200 <= status < 300: + logger.info( + f"Successfully sent telemetry event: {category}:{message_id}. Status: {status}" + ) + return + elif 500 <= status < 600: + # Retry server errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. Status: {status}" + ) + else: + # Non-retryable status codes (400, 401, 403, 404, 429, etc.) + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Status: {status} (non-retryable)" + ) + return + + except httpx.TimeoutException as e: + # Retry timeout errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Timeout error: {e}" + ) + except httpx.ConnectError as e: + # Retry connection errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Connection error: {e}" + ) + except httpx.RequestError as e: + # Non-retryable request errors + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Request error: {e}" + ) + return + except Exception as e: + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Unknown error: {e}" + ) + + retry_count += 1 + + if retry_count < MAX_RETRIES: + delay = _get_exponential_backoff_delay(retry_count) + await asyncio.sleep(delay) + + logger.error( + f"Failed to send telemetry event: {category}:{message_id}. " + f"Maximum retries exceeded: {MAX_RETRIES}" + ) + + +class TelemetryClient: + """Telemetry client for sending events to Scarf.""" + + @staticmethod + async def send_event(category: str, message_id: str, metadata: dict = None) -> None: + """Send a telemetry event asynchronously. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata (e.g., {"llm_model": "gpt-4o"}) + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + try: + await _send_scarf_event(category, message_id, metadata) + except Exception as e: + logger.error(f"Error sending telemetry event: {e}") + + @staticmethod + def send_event_sync(category: str, message_id: str, metadata: dict = None) -> None: + """Send a telemetry event synchronously (creates a task). + + This is a convenience method for use in synchronous contexts. + It creates an async task but doesn't wait for it. + + Args: + category: Event category + message_id: Event message ID + metadata: Optional dictionary of additional metadata + """ + if is_do_not_track(): + logger.debug( + f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled" + ) + return + + try: + # Try to get the current event loop + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # If loop is running, create a task + asyncio.create_task(_send_scarf_event(category, message_id, metadata)) + else: + # If loop exists but not running, run it + loop.run_until_complete(_send_scarf_event(category, message_id, metadata)) + except RuntimeError: + # No event loop, create a new one + asyncio.run(_send_scarf_event(category, message_id, metadata)) + except Exception as e: + logger.error(f"Error sending telemetry event: {e}") + + +async def cleanup_telemetry_client() -> None: + """Cleanup the telemetry HTTP client.""" + global _http_client + if _http_client is not None: + try: + await _http_client.aclose() + _http_client = None + logger.debug("Telemetry HTTP client closed") + except Exception as e: + logger.error(f"Error closing telemetry HTTP client: {e}") + diff --git a/src/utils/telemetry/message_id.py b/src/utils/telemetry/message_id.py new file mode 100644 index 00000000..dd8339c3 --- /dev/null +++ b/src/utils/telemetry/message_id.py @@ -0,0 +1,197 @@ +"""Telemetry message IDs for OpenRAG backend. + +All message IDs start with ORBTA (OpenRAG Backend Telemetry Analytics). +""" + + +class MessageId: + """Telemetry message IDs.""" + + # Category: APPLICATION_STARTUP -------------------------------------------> + + # Message: Application started successfully + ORBTA0001I = "ORBTA0001I" + # Message: Application startup initiated + ORBTA0002I = "ORBTA0002I" + # Message: Application shutdown initiated + ORBTA0003I = "ORBTA0003I" + + # Category: SERVICE_INITIALIZATION -----------------------------------------> + + # Message: Services initialized successfully + ORBTA0010I = "ORBTA0010I" + # Message: Service initialization started + ORBTA0011I = "ORBTA0011I" + # Message: Failed to initialize services + ORBTA0012E = "ORBTA0012E" + # Message: Failed to initialize OpenSearch client + ORBTA0013E = "ORBTA0013E" + # Message: Failed to generate JWT keys + ORBTA0014E = "ORBTA0014E" + + # Category: OPENSEARCH_SETUP ----------------------------------------------> + + # Message: OpenSearch connection established + ORBTA0020I = "ORBTA0020I" + # Message: OpenSearch connection failed + ORBTA0021E = "ORBTA0021E" + # Message: Waiting for OpenSearch to be ready + ORBTA0022W = "ORBTA0022W" + # Message: OpenSearch ready check timeout + ORBTA0023E = "ORBTA0023E" + + # Category: OPENSEARCH_INDEX ----------------------------------------------> + + # Message: OpenSearch index created successfully + ORBTA0030I = "ORBTA0030I" + # Message: OpenSearch index already exists + ORBTA0031I = "ORBTA0031I" + # Message: Failed to create OpenSearch index + ORBTA0032E = "ORBTA0032E" + # Message: Failed to initialize index + ORBTA0033E = "ORBTA0033E" + # Message: Knowledge filters index created + ORBTA0034I = "ORBTA0034I" + # Message: Failed to create knowledge filters index + ORBTA0035E = "ORBTA0035E" + + # Category: DOCUMENT_INGESTION --------------------------------------------> + + # Message: Document ingestion started + ORBTA0040I = "ORBTA0040I" + # Message: Document ingestion completed successfully + ORBTA0041I = "ORBTA0041I" + # Message: Document ingestion failed + ORBTA0042E = "ORBTA0042E" + # Message: Default documents ingestion started + ORBTA0043I = "ORBTA0043I" + # Message: Default documents ingestion completed + ORBTA0044I = "ORBTA0044I" + # Message: Default documents ingestion failed + ORBTA0045E = "ORBTA0045E" + + # Category: DOCUMENT_PROCESSING --------------------------------------------> + + # Message: Document processing started + ORBTA0050I = "ORBTA0050I" + # Message: Document processing completed + ORBTA0051I = "ORBTA0051I" + # Message: Document processing failed + ORBTA0052E = "ORBTA0052E" + # Message: Process pool recreation attempted + ORBTA0053W = "ORBTA0053W" + + # Category: AUTHENTICATION ------------------------------------------------> + + # Message: Authentication successful + ORBTA0060I = "ORBTA0060I" + # Message: Authentication failed + ORBTA0061E = "ORBTA0061E" + # Message: User logged out + ORBTA0062I = "ORBTA0062I" + # Message: OAuth callback received + ORBTA0063I = "ORBTA0063I" + # Message: OAuth callback failed + ORBTA0064E = "ORBTA0064E" + + # Category: CONNECTOR_OPERATIONS -------------------------------------------> + + # Message: Connector connection established + ORBTA0070I = "ORBTA0070I" + # Message: Connector connection failed + ORBTA0071E = "ORBTA0071E" + # Message: Connector sync started + ORBTA0072I = "ORBTA0072I" + # Message: Connector sync completed + ORBTA0073I = "ORBTA0073I" + # Message: Connector sync failed + ORBTA0074E = "ORBTA0074E" + # Message: Connector webhook received + ORBTA0075I = "ORBTA0075I" + # Message: Connector webhook failed + ORBTA0076E = "ORBTA0076E" + # Message: Failed to load persisted connections + ORBTA0077W = "ORBTA0077W" + + # Category: FLOW_OPERATIONS ------------------------------------------------> + + # Message: Flow backup completed + ORBTA0080I = "ORBTA0080I" + # Message: Flow backup failed + ORBTA0081E = "ORBTA0081E" + # Message: Flow reset detected + ORBTA0082W = "ORBTA0082W" + # Message: Flow reset check failed + ORBTA0083E = "ORBTA0083E" + # Message: Settings reapplied after flow reset + ORBTA0084I = "ORBTA0084I" + + # Category: TASK_OPERATIONS ------------------------------------------------> + + # Message: Task created successfully + ORBTA0090I = "ORBTA0090I" + # Message: Task failed + ORBTA0091E = "ORBTA0091E" + # Message: Task cancelled + ORBTA0092I = "ORBTA0092I" + + # Category: CHAT_OPERATIONS ------------------------------------------------> + + # Message: Chat request received + ORBTA0100I = "ORBTA0100I" + # Message: Chat request completed + ORBTA0101I = "ORBTA0101I" + # Message: Chat request failed + ORBTA0102E = "ORBTA0102E" + + # Category: ERROR_CONDITIONS -----------------------------------------------> + + # Message: Critical error occurred + ORBTA0110E = "ORBTA0110E" + # Message: Warning condition + ORBTA0111W = "ORBTA0111W" + + # Category: SETTINGS_OPERATIONS --------------------------------------------> + + # Message: Settings updated successfully + ORBTA0120I = "ORBTA0120I" + # Message: Settings update failed + ORBTA0121E = "ORBTA0121E" + # Message: LLM provider changed + ORBTA0122I = "ORBTA0122I" + # Message: LLM model changed + ORBTA0123I = "ORBTA0123I" + # Message: Embedding provider changed + ORBTA0124I = "ORBTA0124I" + # Message: Embedding model changed + ORBTA0125I = "ORBTA0125I" + # Message: System prompt updated + ORBTA0126I = "ORBTA0126I" + # Message: Chunk settings updated + ORBTA0127I = "ORBTA0127I" + # Message: Docling settings updated + ORBTA0128I = "ORBTA0128I" + # Message: Provider credentials updated + ORBTA0129I = "ORBTA0129I" + + # Category: ONBOARDING -----------------------------------------------------> + + # Message: Onboarding started + ORBTA0130I = "ORBTA0130I" + # Message: Onboarding completed successfully + ORBTA0131I = "ORBTA0131I" + # Message: Onboarding failed + ORBTA0132E = "ORBTA0132E" + # Message: LLM provider selected during onboarding + ORBTA0133I = "ORBTA0133I" + # Message: LLM model selected during onboarding + ORBTA0134I = "ORBTA0134I" + # Message: Embedding provider selected during onboarding + ORBTA0135I = "ORBTA0135I" + # Message: Embedding model selected during onboarding + ORBTA0136I = "ORBTA0136I" + # Message: Sample data ingestion requested + ORBTA0137I = "ORBTA0137I" + # Message: Configuration marked as edited + ORBTA0138I = "ORBTA0138I" +