diff --git a/src/agent.py b/src/agent.py index 749eb299..5eba9762 100644 --- a/src/agent.py +++ b/src/agent.py @@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence # In-memory storage for active conversation threads (preserves function calls) active_conversations = {} + def get_user_conversations(user_id: str): """Get conversation metadata for a user from persistent storage""" return conversation_persistence.get_user_conversations(user_id) @@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None): # If we have a previous_response_id, try to get the existing conversation if previous_response_id and previous_response_id in active_conversations[user_id]: - logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}") + logger.debug( + f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}" + ) return active_conversations[user_id][previous_response_id] # Create new conversation thread @@ -48,7 +51,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if user_id not in active_conversations: active_conversations[user_id] = {} active_conversations[user_id][response_id] = conversation_state - + # 2. Store only essential metadata to disk (simplified JSON) messages = conversation_state.get("messages", []) first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) @@ -56,7 +59,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if first_user_msg: content = first_user_msg.get("content", "") title = content[:50] + "..." if len(content) > 50 else content - + metadata_only = { "response_id": response_id, "title": title, @@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state "created_at": conversation_state.get("created_at"), "last_activity": conversation_state.get("last_activity"), "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]), + "total_messages": len( + [msg for msg in messages if msg.get("role") in ["user", "assistant"]] + ), # Don't store actual messages - Langflow has them } - - conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only) + + conversation_persistence.store_conversation_thread( + user_id, response_id, metadata_only + ) # Legacy function for backward compatibility @@ -76,10 +83,12 @@ def get_user_conversation(user_id: str): """Get the most recent conversation for a user (for backward compatibility)""" # Check in-memory conversations first (with function calls) if user_id in active_conversations and active_conversations[user_id]: - latest_response_id = max(active_conversations[user_id].keys(), - key=lambda k: active_conversations[user_id][k]["last_activity"]) + latest_response_id = max( + active_conversations[user_id].keys(), + key=lambda k: active_conversations[user_id][k]["last_activity"], + ) return active_conversations[user_id][latest_response_id] - + # Fallback to metadata-only conversations conversations = get_user_conversations(user_id) if not conversations: @@ -342,7 +351,9 @@ async def async_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -428,7 +439,7 @@ async def async_chat_stream( conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) logger.debug( - "Stored conversation thread", user_id=user_id, response_id=response_id + f"Stored conversation thread for user {user_id} with response_id: {response_id}" ) @@ -443,11 +454,8 @@ async def async_langflow_chat( store_conversation: bool = True, ): logger.debug( - "async_langflow_chat called", - user_id=user_id, - previous_response_id=previous_response_id, ) @@ -496,7 +504,9 @@ async def async_langflow_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -511,17 +521,18 @@ async def async_langflow_chat( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service - session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") - except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + session_ownership_service.claim_session(user_id, response_id) + logger.debug(f"Claimed session {response_id} for user {user_id}") + except Exception as e: + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( "Stored langflow conversation thread", @@ -570,7 +581,7 @@ async def async_langflow_chat_stream( full_response = "" response_id = None collected_chunks = [] # Store all chunks for function call data - + async for chunk in async_stream( langflow_client, prompt, @@ -585,7 +596,7 @@ async def async_langflow_chat_stream( chunk_data = json.loads(chunk.decode("utf-8")) collected_chunks.append(chunk_data) # Collect all chunk data - + if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk @@ -612,20 +623,19 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service + session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") + logger.debug(f"Claimed session {response_id} for user {user_id}") except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( - "Stored langflow conversation thread", - user_id=user_id, - response_id=response_id, + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) diff --git a/src/api/settings.py b/src/api/settings.py index b5ce0b90..0f49f85e 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -1,4 +1,5 @@ from starlette.responses import JSONResponse +from utils.logging_config import get_logger from config.settings import ( LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, @@ -7,6 +8,9 @@ from config.settings import ( clients, ) +logger = get_logger(__name__) + + async def get_settings(request, session_manager): """Get application settings""" @@ -91,7 +95,7 @@ async def get_settings(request, session_manager): settings["ingestion_defaults"] = ingestion_defaults except Exception as e: - print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}") + logger.warning(f"Failed to fetch ingestion flow defaults: {e}") # Continue without ingestion defaults return JSONResponse(settings) diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 71eb24e0..0aa4234a 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -64,7 +64,6 @@ class GoogleDriveConnector(BaseConnector): Integration points: - `BaseConnector` is your project’s base class; minimum methods used here: * self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline) - * self.log/info/warn/error (optional) - Adjust paths, logging, and error handling to match your project style. """ @@ -81,8 +80,6 @@ class GoogleDriveConnector(BaseConnector): _FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files") _FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders") - def log(self, message: str) -> None: - print(message) def emit(self, doc: ConnectorDocument) -> None: """ @@ -91,7 +88,7 @@ class GoogleDriveConnector(BaseConnector): """ # If BaseConnector has an emit method, call super().emit(doc) # Otherwise, implement your custom logic here. - print(f"Emitting document: {doc.id} ({doc.filename})") + logger.debug(f"Emitting document: {doc.id} ({doc.filename})") def __init__(self, config: Dict[str, Any]) -> None: # Read from config OR env (backend env, not NEXT_PUBLIC_*): @@ -433,7 +430,7 @@ class GoogleDriveConnector(BaseConnector): # If still not authenticated, bail (caller should kick off OAuth init) if not await self.oauth.is_authenticated(): - self.log("authenticate: no valid credentials; run OAuth init/callback first.") + logger.debug("authenticate: no valid credentials; run OAuth init/callback first.") return False # Build Drive service from OAuth helper @@ -482,7 +479,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Optionally log error with your base class logger try: - self.log(f"GoogleDriveConnector.list_files failed: {e}") + logger.error(f"GoogleDriveConnector.list_files failed: {e}") except Exception: pass return {"files": [], "next_page_token": None} @@ -500,7 +497,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Use your base class logger if available try: - self.log(f"Download failed for {file_id}: {e}") + logger.error(f"Download failed for {file_id}: {e}") except Exception: pass raise @@ -567,7 +564,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: # Optional: use your base logger try: - self.log(f"Failed to get start page token: {e}") + logger.error(f"Failed to get start page token: {e}") except Exception: pass raise @@ -634,7 +631,7 @@ class GoogleDriveConnector(BaseConnector): ok = await self.authenticate() if not ok: try: - self.log("cleanup_subscription: not authenticated") + logger.error("cleanup_subscription: not authenticated") except Exception: pass return False @@ -662,7 +659,7 @@ class GoogleDriveConnector(BaseConnector): if not resource_id: try: - self.log( + logger.error( f"cleanup_subscription: missing resource_id for channel {subscription_id}. " f"Persist (channel_id, resource_id) when creating the subscription." ) @@ -684,7 +681,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: try: - self.log(f"cleanup_subscription failed for {subscription_id}: {e}") + logger.error(f"cleanup_subscription failed for {subscription_id}: {e}") except Exception: pass return False @@ -708,7 +705,7 @@ class GoogleDriveConnector(BaseConnector): ok = await self.authenticate() if not ok: try: - self.log("handle_webhook: not authenticated") + logger.error("handle_webhook: not authenticated") except Exception: pass return affected @@ -728,7 +725,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: selected_ids = set() try: - self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") + logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") except Exception: pass @@ -797,7 +794,7 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: try: - self.log(f"handle_webhook failed: {e}") + logger.error(f"handle_webhook failed: {e}") except Exception: pass return [] @@ -814,7 +811,7 @@ class GoogleDriveConnector(BaseConnector): blob = self._download_file_bytes(meta) except HttpError as e: # Skip/record failures - self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") + logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") continue from datetime import datetime diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 83e8fc4f..51da4b31 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -391,7 +391,7 @@ class ChatService: local_metadata[response_id] = conversation_metadata # 2. Get actual conversations from Langflow database (source of truth for messages) - print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") + logger.debug(f"Attempting to fetch Langflow history for user: {user_id}") langflow_history = ( await langflow_history_service.get_user_conversation_history( user_id, flow_id=LANGFLOW_CHAT_FLOW_ID @@ -462,24 +462,24 @@ class ChatService: ) if langflow_history.get("conversations"): - print( - f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + logger.debug( + f"Added {len(langflow_history['conversations'])} historical conversations from Langflow" ) elif langflow_history.get("error"): - print( - f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + logger.debug( + f"Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" ) else: - print(f"[DEBUG] No Langflow conversations found for user {user_id}") + logger.debug(f"No Langflow conversations found for user {user_id}") except Exception as e: - print(f"[ERROR] Failed to fetch Langflow history: {e}") + logger.error(f"Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations # Sort by last activity (most recent first) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - print( + logger.debug( f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)" ) diff --git a/src/services/conversation_persistence_service.py b/src/services/conversation_persistence_service.py index 1b37eb4e..fa5717c1 100644 --- a/src/services/conversation_persistence_service.py +++ b/src/services/conversation_persistence_service.py @@ -8,7 +8,9 @@ import os from typing import Dict, Any from datetime import datetime import threading +from utils.logging_config import get_logger +logger = get_logger(__name__) class ConversationPersistenceService: """Simple service to persist conversations to disk""" @@ -24,10 +26,10 @@ class ConversationPersistenceService: try: with open(self.storage_file, 'r', encoding='utf-8') as f: data = json.load(f) - print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") + logger.debug(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") return data except Exception as e: - print(f"Error loading conversations from {self.storage_file}: {e}") + logger.error(f"Error loading conversations from {self.storage_file}: {e}") return {} return {} @@ -37,9 +39,9 @@ class ConversationPersistenceService: with self.lock: with open(self.storage_file, 'w', encoding='utf-8') as f: json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str) - print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") + logger.debug(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") except Exception as e: - print(f"Error saving conversations to {self.storage_file}: {e}") + logger.error(f"Error saving conversations to {self.storage_file}: {e}") def _count_total_conversations(self, data: Dict[str, Any]) -> int: """Count total conversations across all users""" @@ -89,14 +91,14 @@ class ConversationPersistenceService: if user_id in self._conversations and response_id in self._conversations[user_id]: del self._conversations[user_id][response_id] self._save_conversations() - print(f"Deleted conversation {response_id} for user {user_id}") + logger.debug(f"Deleted conversation {response_id} for user {user_id}") def clear_user_conversations(self, user_id: str): """Clear all conversations for a user""" if user_id in self._conversations: del self._conversations[user_id] self._save_conversations() - print(f"Cleared all conversations for user {user_id}") + logger.debug(f"Cleared all conversations for user {user_id}") def get_storage_stats(self) -> Dict[str, Any]: """Get statistics about stored conversations""" diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py index 613c2113..ee3366c1 100644 --- a/src/services/langflow_history_service.py +++ b/src/services/langflow_history_service.py @@ -6,7 +6,9 @@ Simplified service that retrieves message history from Langflow using shared cli from typing import List, Dict, Optional, Any from config.settings import clients +from utils.logging_config import get_logger +logger = get_logger(__name__) class LangflowHistoryService: """Simplified service to retrieve message history from Langflow""" @@ -29,14 +31,14 @@ class LangflowHistoryService: if response.status_code == 200: session_ids = response.json() - print(f"Found {len(session_ids)} total sessions from Langflow") + logger.debug(f"Found {len(session_ids)} total sessions from Langflow") return session_ids else: - print(f"Failed to get sessions: {response.status_code} - {response.text}") + logger.error(f"Failed to get sessions: {response.status_code} - {response.text}") return [] except Exception as e: - print(f"Error getting user sessions: {e}") + logger.error(f"Error getting user sessions: {e}") return [] async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: @@ -56,11 +58,11 @@ class LangflowHistoryService: # Convert to OpenRAG format return self._convert_langflow_messages(messages) else: - print(f"Failed to get messages for session {session_id}: {response.status_code}") + logger.error(f"Failed to get messages for session {session_id}: {response.status_code}") return [] except Exception as e: - print(f"Error getting session messages: {e}") + logger.error(f"Error getting session messages: {e}") return [] def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -114,7 +116,7 @@ class LangflowHistoryService: converted_messages.append(converted_msg) except Exception as e: - print(f"Error converting message: {e}") + logger.error(f"Error converting message: {e}") continue return converted_messages @@ -159,7 +161,7 @@ class LangflowHistoryService: } except Exception as e: - print(f"Error getting user conversation history: {e}") + logger.error(f"Error getting user conversation history: {e}") return { "error": str(e), "conversations": [] diff --git a/src/services/session_ownership_service.py b/src/services/session_ownership_service.py index 9e3677fd..220a6d96 100644 --- a/src/services/session_ownership_service.py +++ b/src/services/session_ownership_service.py @@ -7,7 +7,9 @@ import json import os from typing import Dict, List, Optional from datetime import datetime +from utils.logging_config import get_logger +logger = get_logger(__name__) class SessionOwnershipService: """Simple service to track which user owns which session""" @@ -23,7 +25,7 @@ class SessionOwnershipService: with open(self.ownership_file, 'r') as f: return json.load(f) except Exception as e: - print(f"Error loading session ownership data: {e}") + logger.error(f"Error loading session ownership data: {e}") return {} return {} @@ -32,9 +34,9 @@ class SessionOwnershipService: try: with open(self.ownership_file, 'w') as f: json.dump(self.ownership_data, f, indent=2) - print(f"Saved session ownership data to {self.ownership_file}") + logger.debug(f"Saved session ownership data to {self.ownership_file}") except Exception as e: - print(f"Error saving session ownership data: {e}") + logger.error(f"Error saving session ownership data: {e}") def claim_session(self, user_id: str, session_id: str): """Claim a session for a user""" @@ -45,7 +47,7 @@ class SessionOwnershipService: "last_accessed": datetime.now().isoformat() } self._save_ownership_data() - print(f"Claimed session {session_id} for user {user_id}") + logger.debug(f"Claimed session {session_id} for user {user_id}") else: # Update last accessed time self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()