From a52a842df9325cd227ff0a64fcaf7e4a94e2ce3a Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 3 Sep 2025 15:57:35 -0400 Subject: [PATCH] centralized logging --- pyproject.toml | 1 + src/agent.py | 102 ++++++++--------------- src/api/chat.py | 5 +- src/api/connectors.py | 45 ++++------ src/api/knowledge_filter.py | 33 ++++---- src/api/search.py | 14 +++- src/config/settings.py | 39 ++++----- src/connectors/connection_manager.py | 37 +++----- src/connectors/google_drive/connector.py | 47 +++++------ src/main.py | 69 +++++++-------- src/services/chat_service.py | 11 ++- src/services/document_service.py | 41 ++++----- src/services/monitor_service.py | 7 +- src/services/search_service.py | 14 ++-- src/tui/screens/config.py | 22 +++++ src/utils/logging_config.py | 81 ++++++++++++++++++ uv.lock | 11 +++ 17 files changed, 313 insertions(+), 266 deletions(-) create mode 100644 src/utils/logging_config.py diff --git a/pyproject.toml b/pyproject.toml index eb8b820a..20d8f5c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "textual>=0.45.0", "python-dotenv>=1.0.0", "textual-fspicker>=0.6.0", + "structlog>=25.4.0", ] [tool.uv.sources] diff --git a/src/agent.py b/src/agent.py index ccd12579..ca5db7ac 100644 --- a/src/agent.py +++ b/src/agent.py @@ -1,3 +1,7 @@ +from utils.logging_config import get_logger + +logger = get_logger(__name__) + # User-scoped conversation state - keyed by user_id -> response_id -> conversation user_conversations = {} # user_id -> {response_id: {"messages": [...], "previous_response_id": parent_id, "created_at": timestamp, "last_activity": timestamp}} @@ -65,7 +69,7 @@ async def async_response_stream( previous_response_id: str = None, log_prefix: str = "response", ): - print(f"user ==> {prompt}") + logger.info("User prompt received", prompt=prompt) try: # Build request parameters @@ -91,7 +95,7 @@ async def async_response_stream( chunk_count = 0 async for chunk in response: chunk_count += 1 - print(f"[DEBUG] Chunk {chunk_count}: {chunk}") + logger.debug("Stream chunk received", chunk_count=chunk_count, chunk=str(chunk)) # Yield the raw event as JSON for the UI to process import json @@ -125,7 +129,7 @@ async def async_response_stream( yield (json.dumps(chunk_data, default=str) + "\n").encode("utf-8") except Exception as e: # Fallback to string representation - print(f"[DEBUG] JSON serialization failed: {e}") + logger.warning("JSON serialization failed", error=str(e)) yield ( json.dumps( {"error": f"Serialization failed: {e}", "raw": str(chunk)} @@ -133,11 +137,11 @@ async def async_response_stream( + "\n" ).encode("utf-8") - print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}") - print(f"{log_prefix} ==> {full_response}") + logger.debug("Stream complete", total_chunks=chunk_count) + logger.info("Response generated", log_prefix=log_prefix, response=full_response) except Exception as e: - print(f"[ERROR] Exception in streaming: {e}") + logger.error("Exception in streaming", error=str(e)) import traceback traceback.print_exc() @@ -153,7 +157,7 @@ async def async_response( previous_response_id: str = None, log_prefix: str = "response", ): - print(f"user ==> {prompt}") + logger.info("User prompt received", prompt=prompt) # Build request parameters request_params = { @@ -170,7 +174,7 @@ async def async_response( response = await client.responses.create(**request_params) response_text = response.output_text - print(f"{log_prefix} ==> {response_text}") + logger.info("Response generated", log_prefix=log_prefix, response=response_text) # Extract and store response_id if available response_id = getattr(response, "id", None) or getattr( @@ -227,7 +231,7 @@ async def async_langflow_stream( extra_headers: dict = None, previous_response_id: str = None, ): - print(f"[DEBUG] Starting langflow stream for prompt: {prompt}") + logger.debug("Starting langflow stream", prompt=prompt) try: async for chunk in async_stream( langflow_client, @@ -237,11 +241,11 @@ async def async_langflow_stream( previous_response_id=previous_response_id, log_prefix="langflow", ): - print(f"[DEBUG] Yielding chunk from langflow_stream: {chunk[:100]}...") + logger.debug("Yielding chunk from langflow stream", chunk_preview=chunk[:100].decode('utf-8', errors='replace')) yield chunk - print(f"[DEBUG] Langflow stream completed") + logger.debug("Langflow stream completed") except Exception as e: - print(f"[ERROR] Exception in langflow_stream: {e}") + logger.error("Exception in langflow stream", error=str(e)) import traceback traceback.print_exc() @@ -256,24 +260,18 @@ async def async_chat( model: str = "gpt-4.1-mini", previous_response_id: str = None, ): - print( - f"[DEBUG] async_chat called with user_id: {user_id}, previous_response_id: {previous_response_id}" - ) + logger.debug("async_chat called", user_id=user_id, previous_response_id=previous_response_id) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) - print( - f"[DEBUG] Got conversation_state with {len(conversation_state['messages'])} messages" - ) + logger.debug("Got conversation state", message_count=len(conversation_state['messages'])) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) - print( - f"[DEBUG] Added user message, now {len(conversation_state['messages'])} messages" - ) + logger.debug("Added user message", message_count=len(conversation_state['messages'])) response_text, response_id = await async_response( async_client, @@ -282,9 +280,7 @@ async def async_chat( previous_response_id=previous_response_id, log_prefix="agent", ) - print( - f"[DEBUG] Got response_text: {response_text[:50]}..., response_id: {response_id}" - ) + logger.debug("Got response", response_preview=response_text[:50], response_id=response_id) # Add assistant response to conversation with response_id and timestamp assistant_message = { @@ -294,25 +290,19 @@ async def async_chat( "timestamp": datetime.now(), } conversation_state["messages"].append(assistant_message) - print( - f"[DEBUG] Added assistant message, now {len(conversation_state['messages'])} messages" - ) + logger.debug("Added assistant message", message_count=len(conversation_state['messages'])) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - print( - f"[DEBUG] Stored conversation thread for user {user_id} with response_id: {response_id}" - ) + logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - print( - f"[DEBUG] user_conversations now has {len(conversations)} conversations: {list(conversations.keys())}" - ) + logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) else: - print(f"[DEBUG] WARNING: No response_id received, conversation not stored!") + logger.warning("No response_id received, conversation not stored") return response_text, response_id @@ -373,9 +363,7 @@ async def async_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - print( - f"Stored conversation thread for user {user_id} with response_id: {response_id}" - ) + logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) # Async langflow function with conversation storage (non-streaming) @@ -387,24 +375,18 @@ async def async_langflow_chat( extra_headers: dict = None, previous_response_id: str = None, ): - print( - f"[DEBUG] async_langflow_chat called with user_id: {user_id}, previous_response_id: {previous_response_id}" - ) + logger.debug("async_langflow_chat called", user_id=user_id, previous_response_id=previous_response_id) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) - print( - f"[DEBUG] Got langflow conversation_state with {len(conversation_state['messages'])} messages" - ) + logger.debug("Got langflow conversation state", message_count=len(conversation_state['messages'])) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) - print( - f"[DEBUG] Added user message to langflow, now {len(conversation_state['messages'])} messages" - ) + logger.debug("Added user message to langflow", message_count=len(conversation_state['messages'])) response_text, response_id = await async_response( langflow_client, @@ -414,9 +396,7 @@ async def async_langflow_chat( previous_response_id=previous_response_id, log_prefix="langflow", ) - print( - f"[DEBUG] Got langflow response_text: {response_text[:50]}..., response_id: {response_id}" - ) + logger.debug("Got langflow response", response_preview=response_text[:50], response_id=response_id) # Add assistant response to conversation with response_id and timestamp assistant_message = { @@ -426,27 +406,19 @@ async def async_langflow_chat( "timestamp": datetime.now(), } conversation_state["messages"].append(assistant_message) - print( - f"[DEBUG] Added assistant message to langflow, now {len(conversation_state['messages'])} messages" - ) + logger.debug("Added assistant message to langflow", message_count=len(conversation_state['messages'])) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" - ) + logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - print( - f"[DEBUG] user_conversations now has {len(conversations)} conversations: {list(conversations.keys())}" - ) + logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) else: - print( - f"[DEBUG] WARNING: No response_id received from langflow, conversation not stored!" - ) + logger.warning("No response_id received from langflow, conversation not stored") return response_text, response_id @@ -460,9 +432,7 @@ async def async_langflow_chat_stream( extra_headers: dict = None, previous_response_id: str = None, ): - print( - f"[DEBUG] async_langflow_chat_stream called with user_id: {user_id}, previous_response_id: {previous_response_id}" - ) + logger.debug("async_langflow_chat_stream called", user_id=user_id, previous_response_id=previous_response_id) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) @@ -513,6 +483,4 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" - ) + logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) diff --git a/src/api/chat.py b/src/api/chat.py index 2afd4f49..4d5cde33 100644 --- a/src/api/chat.py +++ b/src/api/chat.py @@ -1,5 +1,8 @@ from starlette.requests import Request from starlette.responses import JSONResponse, StreamingResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) async def chat_endpoint(request: Request, chat_service, session_manager): @@ -122,7 +125,7 @@ async def langflow_endpoint(request: Request, chat_service, session_manager): import traceback traceback.print_exc() - print(f"[ERROR] Langflow request failed: {str(e)}") + logger.error("Langflow request failed", error=str(e)) return JSONResponse( {"error": f"Langflow request failed: {str(e)}"}, status_code=500 ) diff --git a/src/api/connectors.py b/src/api/connectors.py index 87f21b4b..2426ef46 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -1,5 +1,8 @@ from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) async def list_connectors(request: Request, connector_service, session_manager): @@ -10,7 +13,7 @@ async def list_connectors(request: Request, connector_service, session_manager): ) return JSONResponse({"connectors": connector_types}) except Exception as e: - print(f"Error listing connectors: {e}") + logger.error("Error listing connectors", error=str(e)) return JSONResponse({"error": str(e)}, status_code=500) @@ -21,13 +24,11 @@ async def connector_sync(request: Request, connector_service, session_manager): max_files = data.get("max_files") try: - print( - f"[DEBUG] Starting connector sync for connector_type={connector_type}, max_files={max_files}" - ) + logger.debug("Starting connector sync", connector_type=connector_type, max_files=max_files) user = request.state.user jwt_token = request.state.jwt_token - print(f"[DEBUG] User: {user.user_id}") + logger.debug("User authenticated", user_id=user.user_id) # Get all active connections for this connector type and user connections = await connector_service.connection_manager.list_connections( @@ -44,14 +45,12 @@ async def connector_sync(request: Request, connector_service, session_manager): # Start sync tasks for all active connections task_ids = [] for connection in active_connections: - print( - f"[DEBUG] About to call sync_connector_files for connection {connection.connection_id}" - ) + logger.debug("About to call sync_connector_files for connection", connection_id=connection.connection_id) task_id = await connector_service.sync_connector_files( connection.connection_id, user.user_id, max_files, jwt_token=jwt_token ) task_ids.append(task_id) - print(f"[DEBUG] Got task_id: {task_id}") + logger.debug("Got task ID", task_id=task_id) return JSONResponse( { @@ -68,7 +67,7 @@ async def connector_sync(request: Request, connector_service, session_manager): import traceback error_msg = f"[ERROR] Connector sync failed: {str(e)}" - print(error_msg, file=sys.stderr, flush=True) + logger.error(error_msg) traceback.print_exc(file=sys.stderr) sys.stderr.flush() @@ -156,7 +155,7 @@ async def connector_webhook(request: Request, connector_service, session_manager payload["_headers"] = headers payload["_method"] = request.method - print(f"[WEBHOOK] {connector_type} notification received") + logger.info("Webhook notification received", connector_type=connector_type) # Extract channel/subscription ID using connector-specific method try: @@ -168,7 +167,7 @@ async def connector_webhook(request: Request, connector_service, session_manager channel_id = None if not channel_id: - print(f"[WEBHOOK] No channel ID found in {connector_type} webhook") + logger.warning("No channel ID found in webhook", connector_type=connector_type) return JSONResponse({"status": "ignored", "reason": "no_channel_id"}) # Find the specific connection for this webhook @@ -178,9 +177,7 @@ async def connector_webhook(request: Request, connector_service, session_manager ) ) if not connection or not connection.is_active: - print( - f"[WEBHOOK] Unknown channel {channel_id} - no cleanup attempted (will auto-expire)" - ) + logger.info("Unknown webhook channel, will auto-expire", channel_id=channel_id) return JSONResponse( {"status": "ignored_unknown_channel", "channel_id": channel_id} ) @@ -191,9 +188,7 @@ async def connector_webhook(request: Request, connector_service, session_manager # Get the connector instance connector = await connector_service._get_connector(connection.connection_id) if not connector: - print( - f"[WEBHOOK] Could not get connector for connection {connection.connection_id}" - ) + logger.error("Could not get connector for connection", connection_id=connection.connection_id) return JSONResponse( {"status": "error", "reason": "connector_not_found"} ) @@ -202,9 +197,7 @@ async def connector_webhook(request: Request, connector_service, session_manager affected_files = await connector.handle_webhook(payload) if affected_files: - print( - f"[WEBHOOK] Connection {connection.connection_id}: {len(affected_files)} files affected" - ) + logger.info("Webhook connection files affected", connection_id=connection.connection_id, affected_count=len(affected_files)) # Generate JWT token for the user (needed for OpenSearch authentication) user = session_manager.get_user(connection.user_id) @@ -228,9 +221,7 @@ async def connector_webhook(request: Request, connector_service, session_manager } else: # No specific files identified - just log the webhook - print( - f"[WEBHOOK] Connection {connection.connection_id}: general change detected, no specific files to sync" - ) + logger.info("Webhook general change detected, no specific files", connection_id=connection.connection_id) result = { "connection_id": connection.connection_id, @@ -248,9 +239,7 @@ async def connector_webhook(request: Request, connector_service, session_manager ) except Exception as e: - print( - f"[ERROR] Failed to process webhook for connection {connection.connection_id}: {e}" - ) + logger.error("Failed to process webhook for connection", connection_id=connection.connection_id, error=str(e)) import traceback traceback.print_exc() @@ -267,7 +256,7 @@ async def connector_webhook(request: Request, connector_service, session_manager except Exception as e: import traceback - print(f"[ERROR] Webhook processing failed: {str(e)}") + logger.error("Webhook processing failed", error=str(e)) traceback.print_exc() return JSONResponse( {"error": f"Webhook processing failed: {str(e)}"}, status_code=500 diff --git a/src/api/knowledge_filter.py b/src/api/knowledge_filter.py index 48a0b065..ffcce7b3 100644 --- a/src/api/knowledge_filter.py +++ b/src/api/knowledge_filter.py @@ -3,6 +3,9 @@ from starlette.responses import JSONResponse import uuid import json from datetime import datetime +from utils.logging_config import get_logger + +logger = get_logger(__name__) async def create_knowledge_filter( @@ -392,17 +395,15 @@ async def knowledge_filter_webhook( # Get the webhook payload payload = await request.json() - print( - f"[WEBHOOK] Knowledge filter webhook received for filter {filter_id}, subscription {subscription_id}" - ) - print(f"[WEBHOOK] Payload: {json.dumps(payload, indent=2)}") + logger.info("Knowledge filter webhook received", + filter_id=filter_id, + subscription_id=subscription_id, + payload_size=len(str(payload))) # Extract findings from the payload findings = payload.get("findings", []) if not findings: - print( - f"[WEBHOOK] No findings in webhook payload for subscription {subscription_id}" - ) + logger.info("No findings in webhook payload", subscription_id=subscription_id) return JSONResponse({"status": "no_findings"}) # Process the findings - these are the documents that matched the knowledge filter @@ -419,13 +420,14 @@ async def knowledge_filter_webhook( ) # Log the matched documents - print( - f"[WEBHOOK] Knowledge filter {filter_id} matched {len(matched_documents)} documents" - ) + logger.info("Knowledge filter matched documents", + filter_id=filter_id, + matched_count=len(matched_documents)) for doc in matched_documents: - print( - f"[WEBHOOK] Matched document: {doc['document_id']} from index {doc['index']}" - ) + logger.debug("Matched document", + document_id=doc['document_id'], + index=doc['index'], + score=doc.get('score')) # Here you could add additional processing: # - Send notifications to external webhooks @@ -444,7 +446,10 @@ async def knowledge_filter_webhook( ) except Exception as e: - print(f"[ERROR] Failed to process knowledge filter webhook: {str(e)}") + logger.error("Failed to process knowledge filter webhook", + filter_id=filter_id, + subscription_id=subscription_id, + error=str(e)) import traceback traceback.print_exc() diff --git a/src/api/search.py b/src/api/search.py index fc794430..20c7bdef 100644 --- a/src/api/search.py +++ b/src/api/search.py @@ -1,5 +1,8 @@ from starlette.requests import Request from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) async def search(request: Request, search_service, session_manager): @@ -20,9 +23,14 @@ async def search(request: Request, search_service, session_manager): # Extract JWT token from auth middleware jwt_token = request.state.jwt_token - print( - f"[DEBUG] search API: user={user}, user_id={user.user_id if user else None}, jwt_token={'None' if jwt_token is None else 'present'}" - ) + logger.debug("Search API request", + user=str(user), + user_id=user.user_id if user else None, + has_jwt_token=jwt_token is not None, + query=query, + filters=filters, + limit=limit, + score_threshold=score_threshold) result = await search_service.search( query, diff --git a/src/config/settings.py b/src/config/settings.py index 546c15aa..c9a02e89 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -3,6 +3,9 @@ import requests import asyncio import time from dotenv import load_dotenv +from utils.logging_config import get_logger + +logger = get_logger(__name__) from opensearchpy import AsyncOpenSearch from opensearchpy._async.http_aiohttp import AIOHttpConnection from docling.document_converter import DocumentConverter @@ -34,9 +37,7 @@ GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) - print( - f"[DEBUG] is_no_auth_mode() = {result}, CLIENT_ID={GOOGLE_OAUTH_CLIENT_ID is not None}, CLIENT_SECRET={GOOGLE_OAUTH_CLIENT_SECRET is not None}" - ) + logger.debug("Checking auth mode", no_auth_mode=result, has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None) return result @@ -95,17 +96,15 @@ async def generate_langflow_api_key(): # If key already provided via env, do not attempt generation if LANGFLOW_KEY: - print("[INFO] Using LANGFLOW_KEY from environment; skipping generation") + logger.info("Using LANGFLOW_KEY from environment, skipping generation") return LANGFLOW_KEY if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: - print( - "[WARNING] LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation" - ) + logger.warning("LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation") return None try: - print("[INFO] Generating Langflow API key using superuser credentials...") + logger.info("Generating Langflow API key using superuser credentials") max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15")) delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0")) @@ -143,28 +142,24 @@ async def generate_langflow_api_key(): raise KeyError("api_key") LANGFLOW_KEY = api_key - print( - f"[INFO] Successfully generated Langflow API key: {api_key[:8]}..." - ) + logger.info("Successfully generated Langflow API key", api_key_preview=api_key[:8]) return api_key except (requests.exceptions.RequestException, KeyError) as e: last_error = e - print( - f"[WARN] Attempt {attempt}/{max_attempts} to generate Langflow API key failed: {e}" - ) + logger.warning("Attempt to generate Langflow API key failed", attempt=attempt, max_attempts=max_attempts, error=str(e)) if attempt < max_attempts: time.sleep(delay_seconds) else: raise except requests.exceptions.RequestException as e: - print(f"[ERROR] Failed to generate Langflow API key: {e}") + logger.error("Failed to generate Langflow API key", error=str(e)) return None except KeyError as e: - print(f"[ERROR] Unexpected response format from Langflow: missing {e}") + logger.error("Unexpected response format from Langflow", missing_field=str(e)) return None except Exception as e: - print(f"[ERROR] Unexpected error generating Langflow API key: {e}") + logger.error("Unexpected error generating Langflow API key", error=str(e)) return None @@ -198,12 +193,10 @@ class AppClients: base_url=f"{LANGFLOW_URL}/api/v1", api_key=LANGFLOW_KEY ) except Exception as e: - print(f"[WARNING] Failed to initialize Langflow client: {e}") + logger.warning("Failed to initialize Langflow client", error=str(e)) self.langflow_client = None if self.langflow_client is None: - print( - "[WARNING] No Langflow client initialized yet; will attempt later on first use" - ) + logger.warning("No Langflow client initialized yet, will attempt later on first use") # Initialize patched OpenAI client self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI()) @@ -224,9 +217,9 @@ class AppClients: self.langflow_client = AsyncOpenAI( base_url=f"{LANGFLOW_URL}/api/v1", api_key=LANGFLOW_KEY ) - print("[INFO] Langflow client initialized on-demand") + logger.info("Langflow client initialized on-demand") except Exception as e: - print(f"[ERROR] Failed to initialize Langflow client on-demand: {e}") + logger.error("Failed to initialize Langflow client on-demand", error=str(e)) self.langflow_client = None return self.langflow_client diff --git a/src/connectors/connection_manager.py b/src/connectors/connection_manager.py index 259381dd..7f3b9799 100644 --- a/src/connectors/connection_manager.py +++ b/src/connectors/connection_manager.py @@ -6,6 +6,9 @@ from typing import Dict, List, Any, Optional from datetime import datetime from dataclasses import dataclass, asdict from pathlib import Path +from utils.logging_config import get_logger + +logger = get_logger(__name__) from .base import BaseConnector from .google_drive import GoogleDriveConnector @@ -318,21 +321,17 @@ class ConnectionManager: if connection_config.config.get( "webhook_channel_id" ) or connection_config.config.get("subscription_id"): - print( - f"[WEBHOOK] Subscription already exists for connection {connection_id}" - ) + logger.info("Webhook subscription already exists", connection_id=connection_id) return # Check if webhook URL is configured webhook_url = connection_config.config.get("webhook_url") if not webhook_url: - print( - f"[WEBHOOK] No webhook URL configured for connection {connection_id}, skipping subscription setup" - ) + logger.info("No webhook URL configured, skipping subscription setup", connection_id=connection_id) return try: - print(f"[WEBHOOK] Setting up subscription for connection {connection_id}") + logger.info("Setting up webhook subscription", connection_id=connection_id) subscription_id = await connector.setup_subscription() # Store the subscription and resource IDs in connection config @@ -346,14 +345,10 @@ class ConnectionManager: # Save updated connection config await self.save_connections() - print( - f"[WEBHOOK] Successfully set up subscription {subscription_id} for connection {connection_id}" - ) + logger.info("Successfully set up webhook subscription", connection_id=connection_id, subscription_id=subscription_id) except Exception as e: - print( - f"[ERROR] Failed to setup webhook subscription for connection {connection_id}: {e}" - ) + logger.error("Failed to setup webhook subscription", connection_id=connection_id, error=str(e)) # Don't fail the entire connection setup if webhook fails async def _setup_webhook_for_new_connection( @@ -361,16 +356,12 @@ class ConnectionManager: ): """Setup webhook subscription for a newly authenticated connection""" try: - print( - f"[WEBHOOK] Setting up subscription for newly authenticated connection {connection_id}" - ) + logger.info("Setting up subscription for newly authenticated connection", connection_id=connection_id) # Create and authenticate connector connector = self._create_connector(connection_config) if not await connector.authenticate(): - print( - f"[ERROR] Failed to authenticate connector for webhook setup: {connection_id}" - ) + logger.error("Failed to authenticate connector for webhook setup", connection_id=connection_id) return # Setup subscription @@ -385,12 +376,8 @@ class ConnectionManager: # Save updated connection config await self.save_connections() - print( - f"[WEBHOOK] Successfully set up subscription {subscription_id} for connection {connection_id}" - ) + logger.info("Successfully set up webhook subscription", connection_id=connection_id, subscription_id=subscription_id) except Exception as e: - print( - f"[ERROR] Failed to setup webhook subscription for new connection {connection_id}: {e}" - ) + logger.error("Failed to setup webhook subscription for new connection", connection_id=connection_id, error=str(e)) # Don't fail the connection setup if webhook fails diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index cf370109..5af35ef1 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -7,6 +7,9 @@ from typing import Dict, List, Any, Optional from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload +from utils.logging_config import get_logger + +logger = get_logger(__name__) from ..base import BaseConnector, ConnectorDocument, DocumentACL from .oauth import GoogleDriveOAuth @@ -20,9 +23,7 @@ def get_worker_drive_service(client_id: str, client_secret: str, token_file: str """Get or create a Google Drive service instance for this worker process""" global _worker_drive_service if _worker_drive_service is None: - print( - f"🔧 Initializing Google Drive service in worker process (PID: {os.getpid()})" - ) + logger.info("Initializing Google Drive service in worker process", pid=os.getpid()) # Create OAuth instance and load credentials in worker from .oauth import GoogleDriveOAuth @@ -39,9 +40,7 @@ def get_worker_drive_service(client_id: str, client_secret: str, token_file: str try: loop.run_until_complete(oauth.load_credentials()) _worker_drive_service = oauth.get_service() - print( - f"✅ Google Drive service ready in worker process (PID: {os.getpid()})" - ) + logger.info("Google Drive service ready in worker process", pid=os.getpid()) finally: loop.close() @@ -215,7 +214,7 @@ class GoogleDriveConnector(BaseConnector): return True return False except Exception as e: - print(f"Authentication failed: {e}") + logger.error("Authentication failed", error=str(e)) return False async def setup_subscription(self) -> str: @@ -258,7 +257,7 @@ class GoogleDriveConnector(BaseConnector): return channel_id except HttpError as e: - print(f"Failed to set up subscription: {e}") + logger.error("Failed to set up subscription", error=str(e)) raise def _get_start_page_token(self) -> str: @@ -340,7 +339,7 @@ class GoogleDriveConnector(BaseConnector): return {"files": files, "nextPageToken": results.get("nextPageToken")} except HttpError as e: - print(f"Failed to list files: {e}") + logger.error("Failed to list files", error=str(e)) raise async def get_file_content(self, file_id: str) -> ConnectorDocument: @@ -397,7 +396,7 @@ class GoogleDriveConnector(BaseConnector): ) except HttpError as e: - print(f"Failed to get file content: {e}") + logger.error("Failed to get file content", error=str(e)) raise async def _download_file_content( @@ -477,19 +476,17 @@ class GoogleDriveConnector(BaseConnector): resource_state = headers.get("x-goog-resource-state") if not channel_id: - print("[WEBHOOK] No channel ID found in Google Drive webhook") + logger.warning("No channel ID found in Google Drive webhook") return [] # Check if this webhook belongs to this connection if self.webhook_channel_id != channel_id: - print( - f"[WEBHOOK] Channel ID mismatch: expected {self.webhook_channel_id}, got {channel_id}" - ) + logger.warning("Channel ID mismatch", expected=self.webhook_channel_id, received=channel_id) return [] # Only process certain states (ignore 'sync' which is just a ping) if resource_state not in ["exists", "not_exists", "change"]: - print(f"[WEBHOOK] Ignoring resource state: {resource_state}") + logger.debug("Ignoring resource state", state=resource_state) return [] try: @@ -508,10 +505,10 @@ class GoogleDriveConnector(BaseConnector): page_token = query_params.get("pageToken", [None])[0] if not page_token: - print("[WEBHOOK] No page token found, cannot identify specific changes") + logger.warning("No page token found, cannot identify specific changes") return [] - print(f"[WEBHOOK] Getting changes since page token: {page_token}") + logger.info("Getting changes since page token", page_token=page_token) # Get list of changes since the page token changes = ( @@ -536,23 +533,19 @@ class GoogleDriveConnector(BaseConnector): is_trashed = file_info.get("trashed", False) if not is_trashed and mime_type in self.SUPPORTED_MIMETYPES: - print( - f"[WEBHOOK] File changed: {file_info.get('name', 'Unknown')} ({file_id})" - ) + logger.info("File changed", filename=file_info.get('name', 'Unknown'), file_id=file_id) affected_files.append(file_id) elif is_trashed: - print( - f"[WEBHOOK] File deleted/trashed: {file_info.get('name', 'Unknown')} ({file_id})" - ) + logger.info("File deleted/trashed", filename=file_info.get('name', 'Unknown'), file_id=file_id) # TODO: Handle file deletion (remove from index) else: - print(f"[WEBHOOK] Ignoring unsupported file type: {mime_type}") + logger.debug("Ignoring unsupported file type", mime_type=mime_type) - print(f"[WEBHOOK] Found {len(affected_files)} affected supported files") + logger.info("Found affected supported files", count=len(affected_files)) return affected_files except HttpError as e: - print(f"Failed to handle webhook: {e}") + logger.error("Failed to handle webhook", error=str(e)) return [] async def cleanup_subscription(self, subscription_id: str) -> bool: @@ -574,5 +567,5 @@ class GoogleDriveConnector(BaseConnector): self.service.channels().stop(body=body).execute() return True except HttpError as e: - print(f"Failed to cleanup subscription: {e}") + logger.error("Failed to cleanup subscription", error=str(e)) return False diff --git a/src/main.py b/src/main.py index 40f76884..7230e201 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,11 @@ if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui": run_tui() sys.exit(0) +# Configure structured logging early +from utils.logging_config import configure_from_env, get_logger +configure_from_env() +logger = get_logger(__name__) + import asyncio import atexit import multiprocessing @@ -54,8 +59,7 @@ from api import ( settings, ) -print("CUDA available:", torch.cuda.is_available()) -print("CUDA version PyTorch was built with:", torch.version.cuda) +logger.info("CUDA device information", cuda_available=torch.cuda.is_available(), cuda_version=torch.version.cuda) async def wait_for_opensearch(): @@ -66,12 +70,10 @@ async def wait_for_opensearch(): for attempt in range(max_retries): try: await clients.opensearch.info() - print("OpenSearch is ready!") + logger.info("OpenSearch is ready") return except Exception as e: - print( - f"Attempt {attempt + 1}/{max_retries}: OpenSearch not ready yet ({e})" - ) + logger.warning("OpenSearch not ready yet", attempt=attempt + 1, max_retries=max_retries, error=str(e)) if attempt < max_retries - 1: await asyncio.sleep(retry_delay) else: @@ -93,10 +95,9 @@ async def configure_alerting_security(): # Use admin client (clients.opensearch uses admin credentials) response = await clients.opensearch.cluster.put_settings(body=alerting_settings) - print("Alerting security settings configured successfully") - print(f"Response: {response}") + logger.info("Alerting security settings configured successfully", response=response) except Exception as e: - print(f"Warning: Failed to configure alerting security settings: {e}") + logger.warning("Failed to configure alerting security settings", error=str(e)) # Don't fail startup if alerting config fails @@ -107,9 +108,9 @@ async def init_index(): # Create documents index if not await clients.opensearch.indices.exists(index=INDEX_NAME): await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY) - print(f"Created index '{INDEX_NAME}'") + logger.info("Created OpenSearch index", index_name=INDEX_NAME) else: - print(f"Index '{INDEX_NAME}' already exists, skipping creation.") + logger.info("Index already exists, skipping creation", index_name=INDEX_NAME) # Create knowledge filters index knowledge_filter_index_name = "knowledge_filters" @@ -134,11 +135,9 @@ async def init_index(): await clients.opensearch.indices.create( index=knowledge_filter_index_name, body=knowledge_filter_index_body ) - print(f"Created index '{knowledge_filter_index_name}'") + logger.info("Created knowledge filters index", index_name=knowledge_filter_index_name) else: - print( - f"Index '{knowledge_filter_index_name}' already exists, skipping creation." - ) + logger.info("Knowledge filters index already exists, skipping creation", index_name=knowledge_filter_index_name) # Configure alerting plugin security settings await configure_alerting_security() @@ -178,24 +177,22 @@ def generate_jwt_keys(): capture_output=True, ) - print("Generated RSA keys for JWT signing") + logger.info("Generated RSA keys for JWT signing") except subprocess.CalledProcessError as e: - print(f"Failed to generate RSA keys: {e}") + logger.error("Failed to generate RSA keys", error=str(e)) raise else: - print("RSA keys already exist, skipping generation") + logger.info("RSA keys already exist, skipping generation") async def init_index_when_ready(): """Initialize OpenSearch index when it becomes available""" try: await init_index() - print("OpenSearch index initialization completed successfully") + logger.info("OpenSearch index initialization completed successfully") except Exception as e: - print(f"OpenSearch index initialization failed: {e}") - print( - "OIDC endpoints will still work, but document operations may fail until OpenSearch is ready" - ) + logger.error("OpenSearch index initialization failed", error=str(e)) + logger.warning("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") async def initialize_services(): @@ -242,13 +239,11 @@ async def initialize_services(): try: await connector_service.initialize() loaded_count = len(connector_service.connection_manager.connections) - print( - f"[CONNECTORS] Loaded {loaded_count} persisted connection(s) on startup" - ) + logger.info("Loaded persisted connector connections on startup", loaded_count=loaded_count) except Exception as e: - print(f"[WARNING] Failed to load persisted connections on startup: {e}") + logger.warning("Failed to load persisted connections on startup", error=str(e)) else: - print(f"[CONNECTORS] Skipping connection loading in no-auth mode") + logger.info("Skipping connector loading in no-auth mode") return { "document_service": document_service, @@ -661,13 +656,13 @@ async def startup(): def cleanup(): """Cleanup on application shutdown""" # Cleanup process pools only (webhooks handled by Starlette shutdown) - print("[CLEANUP] Shutting down...") + logger.info("Application shutting down") pass async def cleanup_subscriptions_proper(services): """Cancel all active webhook subscriptions""" - print("[CLEANUP] Cancelling active webhook subscriptions...") + logger.info("Cancelling active webhook subscriptions") try: connector_service = services["connector_service"] @@ -683,25 +678,21 @@ async def cleanup_subscriptions_proper(services): for connection in active_connections: try: - print( - f"[CLEANUP] Cancelling subscription for connection {connection.connection_id}" - ) + logger.info("Cancelling subscription for connection", connection_id=connection.connection_id) connector = await connector_service.get_connector( connection.connection_id ) if connector: subscription_id = connection.config.get("webhook_channel_id") await connector.cleanup_subscription(subscription_id) - print(f"[CLEANUP] Cancelled subscription {subscription_id}") + logger.info("Cancelled subscription", subscription_id=subscription_id) except Exception as e: - print( - f"[ERROR] Failed to cancel subscription for {connection.connection_id}: {e}" - ) + logger.error("Failed to cancel subscription", connection_id=connection.connection_id, error=str(e)) - print(f"[CLEANUP] Finished cancelling {len(active_connections)} subscriptions") + logger.info("Finished cancelling subscriptions", subscription_count=len(active_connections)) except Exception as e: - print(f"[ERROR] Failed to cleanup subscriptions: {e}") + logger.error("Failed to cleanup subscriptions", error=str(e)) if __name__ == "__main__": diff --git a/src/services/chat_service.py b/src/services/chat_service.py index e25dd71b..669939bb 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -2,6 +2,9 @@ from config.settings import clients, LANGFLOW_URL, FLOW_ID from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream from auth_context import set_auth_context import json +from utils.logging_config import get_logger + +logger = get_logger(__name__) class ChatService: @@ -108,9 +111,7 @@ class ChatService: # Pass the complete filter expression as a single header to Langflow (only if we have something to send) if filter_expression: - print( - f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}" - ) + logger.info("Sending OpenRAG query filter to Langflow", filter_expression=filter_expression) extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps( filter_expression ) @@ -200,9 +201,7 @@ class ChatService: return {"error": "User ID is required", "conversations": []} conversations_dict = get_user_conversations(user_id) - print( - f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations" - ) + logger.debug("Getting chat history for user", user_id=user_id, conversation_count=len(conversations_dict)) # Convert conversations dict to list format with metadata conversations = [] diff --git a/src/services/document_service.py b/src/services/document_service.py index f209d15f..9cb4dd7b 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -8,6 +8,9 @@ from docling_core.types.io import DocumentStream from typing import List import openai import tiktoken +from utils.logging_config import get_logger + +logger = get_logger(__name__) from config.settings import clients, INDEX_NAME, EMBED_MODEL from utils.document_processing import extract_relevant, process_document_sync @@ -91,7 +94,7 @@ class DocumentService: def _recreate_process_pool(self): """Recreate the process pool if it's broken""" if self._process_pool_broken and self.process_pool: - print("[WARNING] Attempting to recreate broken process pool...") + logger.warning("Attempting to recreate broken process pool") try: # Shutdown the old pool self.process_pool.shutdown(wait=False) @@ -102,10 +105,10 @@ class DocumentService: self.process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS) self._process_pool_broken = False - print(f"[INFO] Process pool recreated with {MAX_WORKERS} workers") + logger.info("Process pool recreated", worker_count=MAX_WORKERS) return True except Exception as e: - print(f"[ERROR] Failed to recreate process pool: {e}") + logger.error("Failed to recreate process pool", error=str(e)) return False return False @@ -193,8 +196,8 @@ class DocumentService: index=INDEX_NAME, id=chunk_id, body=chunk_doc ) except Exception as e: - print(f"[ERROR] OpenSearch indexing failed for chunk {chunk_id}: {e}") - print(f"[ERROR] Chunk document: {chunk_doc}") + logger.error("OpenSearch indexing failed for chunk", chunk_id=chunk_id, error=str(e)) + logger.error("Chunk document details", chunk_doc=chunk_doc) raise return {"status": "indexed", "id": file_hash} @@ -229,9 +232,7 @@ class DocumentService: try: exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) except Exception as e: - print( - f"[ERROR] OpenSearch exists check failed for document {file_hash}: {e}" - ) + logger.error("OpenSearch exists check failed", file_hash=file_hash, error=str(e)) raise if exists: return {"status": "unchanged", "id": file_hash} @@ -371,10 +372,8 @@ class DocumentService: index=INDEX_NAME, id=chunk_id, body=chunk_doc ) except Exception as e: - print( - f"[ERROR] OpenSearch indexing failed for batch chunk {chunk_id}: {e}" - ) - print(f"[ERROR] Chunk document: {chunk_doc}") + logger.error("OpenSearch indexing failed for batch chunk", chunk_id=chunk_id, error=str(e)) + logger.error("Chunk document details", chunk_doc=chunk_doc) raise result = {"status": "indexed", "id": slim_doc["id"]} @@ -389,29 +388,25 @@ class DocumentService: from concurrent.futures import BrokenExecutor if isinstance(e, BrokenExecutor): - print(f"[CRITICAL] Process pool broken while processing {file_path}") - print(f"[INFO] This usually indicates a worker process crashed") - print( - f"[INFO] You should see detailed crash logs above from the worker process" - ) + logger.error("Process pool broken while processing file", file_path=file_path) + logger.info("Worker process likely crashed") + logger.info("You should see detailed crash logs above from the worker process") # Mark pool as broken for potential recreation self._process_pool_broken = True # Attempt to recreate the pool for future operations if self._recreate_process_pool(): - print(f"[INFO] Process pool successfully recreated") + logger.info("Process pool successfully recreated") else: - print( - f"[WARNING] Failed to recreate process pool - future operations may fail" - ) + logger.warning("Failed to recreate process pool - future operations may fail") file_task.error = f"Worker process crashed: {str(e)}" else: - print(f"[ERROR] Failed to process file {file_path}: {e}") + logger.error("Failed to process file", file_path=file_path, error=str(e)) file_task.error = str(e) - print(f"[ERROR] Full traceback:") + logger.error("Full traceback available") traceback.print_exc() file_task.status = TaskStatus.FAILED upload_task.failed_files += 1 diff --git a/src/services/monitor_service.py b/src/services/monitor_service.py index e19ccc01..6a1e1105 100644 --- a/src/services/monitor_service.py +++ b/src/services/monitor_service.py @@ -3,6 +3,9 @@ import json from typing import Any, Dict, Optional, List from datetime import datetime from config.settings import clients +from utils.logging_config import get_logger + +logger = get_logger(__name__) class MonitorService: @@ -192,7 +195,7 @@ class MonitorService: return monitors except Exception as e: - print(f"Error listing monitors for user {user_id}: {e}") + logger.error("Error listing monitors for user", user_id=user_id, error=str(e)) return [] async def list_monitors_for_filter( @@ -233,7 +236,7 @@ class MonitorService: return monitors except Exception as e: - print(f"Error listing monitors for filter {filter_id}: {e}") + logger.error("Error listing monitors for filter", filter_id=filter_id, error=str(e)) return [] async def _get_or_create_webhook_destination( diff --git a/src/services/search_service.py b/src/services/search_service.py index ba86435a..ca043097 100644 --- a/src/services/search_service.py +++ b/src/services/search_service.py @@ -2,6 +2,9 @@ from typing import Any, Dict, Optional from agentd.tool_decorator import tool from config.settings import clients, INDEX_NAME, EMBED_MODEL from auth_context import get_auth_context +from utils.logging_config import get_logger + +logger = get_logger(__name__) class SearchService: @@ -135,13 +138,9 @@ class SearchService: search_body["min_score"] = score_threshold # Authentication required - DLS will handle document filtering automatically - print( - f"[DEBUG] search_service: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}" - ) + logger.debug("search_service authentication info", user_id=user_id, has_jwt_token=jwt_token is not None) if not user_id: - print( - f"[DEBUG] search_service: user_id is None/empty, returning auth error" - ) + logger.debug("search_service: user_id is None/empty, returning auth error") return {"results": [], "error": "Authentication required"} # Get user's OpenSearch client with JWT for OIDC auth through session manager @@ -152,8 +151,7 @@ class SearchService: try: results = await opensearch_client.search(index=INDEX_NAME, body=search_body) except Exception as e: - print(f"[ERROR] OpenSearch query failed: {e}") - print(f"[ERROR] Search body: {search_body}") + logger.error("OpenSearch query failed", error=str(e), search_body=search_body) # Re-raise the exception so the API returns the error to frontend raise diff --git a/src/tui/screens/config.py b/src/tui/screens/config.py index e3455d35..d31b02dc 100644 --- a/src/tui/screens/config.py +++ b/src/tui/screens/config.py @@ -162,6 +162,17 @@ class ConfigScreen(Screen): yield Label("Google OAuth Client ID") # Where to create Google OAuth credentials (helper above the box) yield Static(Text("Create credentials: https://console.cloud.google.com/apis/credentials", style="dim"), classes="helper-text") + # Callback URL guidance for Google OAuth + yield Static( + Text( + "Important: add an Authorized redirect URI to your Google OAuth app(s):\n" + " - Local: http://localhost:3000/auth/callback\n" + " - Prod: https://your-domain.com/auth/callback\n" + "If you use separate apps for login and connectors, add this URL to BOTH.", + style="dim" + ), + classes="helper-text" + ) current_value = getattr(self.env_manager.config, "google_oauth_client_id", "") input_widget = Input( placeholder="xxx.apps.googleusercontent.com", @@ -189,6 +200,17 @@ class ConfigScreen(Screen): yield Label("Microsoft Graph Client ID") # Where to create Microsoft app registrations (helper above the box) yield Static(Text("Create app: https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/ApplicationsListBlade", style="dim"), classes="helper-text") + # Callback URL guidance for Microsoft OAuth + yield Static( + Text( + "Important: configure a Web redirect URI for your Microsoft app(s):\n" + " - Local: http://localhost:3000/auth/callback\n" + " - Prod: https://your-domain.com/auth/callback\n" + "If you use separate apps for login and connectors, add this URI to BOTH.", + style="dim" + ), + classes="helper-text" + ) current_value = getattr(self.env_manager.config, "microsoft_graph_oauth_client_id", "") input_widget = Input( placeholder="", diff --git a/src/utils/logging_config.py b/src/utils/logging_config.py new file mode 100644 index 00000000..4a522b13 --- /dev/null +++ b/src/utils/logging_config.py @@ -0,0 +1,81 @@ +import os +import sys +from typing import Any, Dict +import structlog +from structlog import processors + + +def configure_logging( + log_level: str = "INFO", + json_logs: bool = False, + include_timestamps: bool = True, + service_name: str = "openrag" +) -> None: + """Configure structlog for the application.""" + + # Convert string log level to actual level + level = getattr(structlog.stdlib.logging, log_level.upper(), structlog.stdlib.logging.INFO) + + # Base processors + shared_processors = [ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.dev.set_exc_info, + ] + + if include_timestamps: + shared_processors.append(structlog.processors.TimeStamper(fmt="iso")) + + # Add service name to all logs + shared_processors.append( + structlog.processors.CallsiteParameterAdder( + parameters=[structlog.processors.CallsiteParameter.FUNC_NAME] + ) + ) + + # Console output configuration + if json_logs or os.getenv("LOG_FORMAT", "").lower() == "json": + # JSON output for production/containers + shared_processors.append(structlog.processors.JSONRenderer()) + console_renderer = structlog.processors.JSONRenderer() + else: + # Pretty colored output for development + console_renderer = structlog.dev.ConsoleRenderer( + colors=sys.stderr.isatty(), + exception_formatter=structlog.dev.plain_traceback, + ) + + # Configure structlog + structlog.configure( + processors=shared_processors + [console_renderer], + wrapper_class=structlog.make_filtering_bound_logger(level), + context_class=dict, + logger_factory=structlog.WriteLoggerFactory(sys.stderr), + cache_logger_on_first_use=True, + ) + + # Add global context + structlog.contextvars.clear_contextvars() + structlog.contextvars.bind_contextvars(service=service_name) + + +def get_logger(name: str = None) -> structlog.BoundLogger: + """Get a configured logger instance.""" + if name: + return structlog.get_logger(name) + return structlog.get_logger() + + +# Convenience function to configure logging from environment +def configure_from_env() -> None: + """Configure logging from environment variables.""" + log_level = os.getenv("LOG_LEVEL", "INFO") + json_logs = os.getenv("LOG_FORMAT", "").lower() == "json" + service_name = os.getenv("SERVICE_NAME", "openrag") + + configure_logging( + log_level=log_level, + json_logs=json_logs, + service_name=service_name + ) \ No newline at end of file diff --git a/uv.lock b/uv.lock index 5a472795..a08b7457 100644 --- a/uv.lock +++ b/uv.lock @@ -1425,6 +1425,7 @@ dependencies = [ { name = "python-multipart" }, { name = "rich" }, { name = "starlette" }, + { name = "structlog" }, { name = "textual" }, { name = "textual-fspicker" }, { name = "torch", version = "2.7.1+cu128", source = { registry = "https://download.pytorch.org/whl/cu128" }, marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, @@ -1451,6 +1452,7 @@ requires-dist = [ { name = "python-multipart", specifier = ">=0.0.20" }, { name = "rich", specifier = ">=13.0.0" }, { name = "starlette", specifier = ">=0.47.1" }, + { name = "structlog", specifier = ">=25.4.0" }, { name = "textual", specifier = ">=0.45.0" }, { name = "textual-fspicker", specifier = ">=0.6.0" }, { name = "torch", marker = "platform_machine != 'x86_64' or sys_platform != 'linux'", specifier = ">=2.7.1" }, @@ -2316,6 +2318,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/82/95/38ef0cd7fa11eaba6a99b3c4f5ac948d8bc6ff199aabd327a29cc000840c/starlette-0.47.1-py3-none-any.whl", hash = "sha256:5e11c9f5c7c3f24959edbf2dffdc01bba860228acf657129467d8a7468591527", size = 72747, upload-time = "2025-06-21T04:03:15.705Z" }, ] +[[package]] +name = "structlog" +version = "25.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/79/b9/6e672db4fec07349e7a8a8172c1a6ae235c58679ca29c3f86a61b5e59ff3/structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4", size = 1369138, upload-time = "2025-06-02T08:21:12.971Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/4a/97ee6973e3a73c74c8120d59829c3861ea52210667ec3e7a16045c62b64d/structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c", size = 68720, upload-time = "2025-06-02T08:21:11.43Z" }, +] + [[package]] name = "sympy" version = "1.14.0"