from utils.logging_config import get_logger logger = get_logger(__name__) # Import persistent storage from services.conversation_persistence_service import conversation_persistence # In-memory storage for active conversation threads (preserves function calls) active_conversations = {} def get_user_conversations(user_id: str): """Get conversation metadata for a user from persistent storage""" return conversation_persistence.get_user_conversations(user_id) def get_conversation_thread(user_id: str, previous_response_id: str = None): """Get or create a specific conversation thread with function call preservation""" from datetime import datetime # Create user namespace if it doesn't exist if user_id not in active_conversations: active_conversations[user_id] = {} # If we have a previous_response_id, try to get the existing conversation if previous_response_id and previous_response_id in active_conversations[user_id]: logger.debug( f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}" ) return active_conversations[user_id][previous_response_id] # Create new conversation thread new_conversation = { "messages": [ { "role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions.", } ], "previous_response_id": previous_response_id, # Parent response_id for branching "created_at": datetime.now(), "last_activity": datetime.now(), } return new_conversation def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): """Store conversation both in memory (with function calls) and persist metadata to disk""" # 1. Store full conversation in memory for function call preservation if user_id not in active_conversations: active_conversations[user_id] = {} active_conversations[user_id][response_id] = conversation_state # 2. Store only essential metadata to disk (simplified JSON) messages = conversation_state.get("messages", []) first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) title = "New Chat" if first_user_msg: content = first_user_msg.get("content", "") title = content[:50] + "..." if len(content) > 50 else content metadata_only = { "response_id": response_id, "title": title, "endpoint": "langflow", "created_at": conversation_state.get("created_at"), "last_activity": conversation_state.get("last_activity"), "previous_response_id": conversation_state.get("previous_response_id"), "total_messages": len( [msg for msg in messages if msg.get("role") in ["user", "assistant"]] ), # Don't store actual messages - Langflow has them } conversation_persistence.store_conversation_thread( user_id, response_id, metadata_only ) # Legacy function for backward compatibility def get_user_conversation(user_id: str): """Get the most recent conversation for a user (for backward compatibility)""" # Check in-memory conversations first (with function calls) if user_id in active_conversations and active_conversations[user_id]: latest_response_id = max( active_conversations[user_id].keys(), key=lambda k: active_conversations[user_id][k]["last_activity"], ) return active_conversations[user_id][latest_response_id] # Fallback to metadata-only conversations conversations = get_user_conversations(user_id) if not conversations: return get_conversation_thread(user_id) # Return the most recently active conversation metadata latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"]) return latest_conversation # Generic async response function for streaming async def async_response_stream( client, prompt: str, model: str, extra_headers: dict = None, previous_response_id: str = None, log_prefix: str = "response", ): logger.info("User prompt received", prompt=prompt) try: # Build request parameters request_params = { "model": model, "input": prompt, "stream": True, "include": ["tool_call.results"], } if previous_response_id is not None: request_params["previous_response_id"] = previous_response_id if "x-api-key" not in client.default_headers: if hasattr(client, "api_key") and extra_headers is not None: extra_headers["x-api-key"] = client.api_key if extra_headers: request_params["extra_headers"] = extra_headers response = await client.responses.create(**request_params) full_response = "" chunk_count = 0 async for chunk in response: chunk_count += 1 logger.debug( "Stream chunk received", chunk_count=chunk_count, chunk=str(chunk) ) # Yield the raw event as JSON for the UI to process import json # Also extract text content for logging if hasattr(chunk, "output_text") and chunk.output_text: full_response += chunk.output_text elif hasattr(chunk, "delta") and chunk.delta: # Handle delta properly - it might be a dict or string if isinstance(chunk.delta, dict): delta_text = ( chunk.delta.get("content", "") or chunk.delta.get("text", "") or str(chunk.delta) ) else: delta_text = str(chunk.delta) full_response += delta_text # Send the raw event as JSON followed by newline for easy parsing try: # Try to serialize the chunk object if hasattr(chunk, "model_dump"): # Pydantic model chunk_data = chunk.model_dump() elif hasattr(chunk, "__dict__"): chunk_data = chunk.__dict__ else: chunk_data = str(chunk) yield (json.dumps(chunk_data, default=str) + "\n").encode("utf-8") except Exception as e: # Fallback to string representation logger.warning("JSON serialization failed", error=str(e)) yield ( json.dumps( {"error": f"Serialization failed: {e}", "raw": str(chunk)} ) + "\n" ).encode("utf-8") logger.debug("Stream complete", total_chunks=chunk_count) logger.info("Response generated", log_prefix=log_prefix, response=full_response) except Exception as e: logger.error("Exception in streaming", error=str(e)) import traceback traceback.print_exc() raise # Generic async response function for non-streaming async def async_response( client, prompt: str, model: str, extra_headers: dict = None, previous_response_id: str = None, log_prefix: str = "response", ): try: logger.info("User prompt received", prompt=prompt) # Build request parameters request_params = { "model": model, "input": prompt, "stream": False, "include": ["tool_call.results"], } if previous_response_id is not None: request_params["previous_response_id"] = previous_response_id if extra_headers: request_params["extra_headers"] = extra_headers if "x-api-key" not in client.default_headers: if hasattr(client, "api_key") and extra_headers is not None: extra_headers["x-api-key"] = client.api_key response = await client.responses.create(**request_params) response_text = response.output_text logger.info("Response generated", log_prefix=log_prefix, response=response_text) # Extract and store response_id if available response_id = getattr(response, "id", None) or getattr( response, "response_id", None ) return response_text, response_id, response except Exception as e: logger.error("Exception in non-streaming response", error=str(e)) import traceback traceback.print_exc() raise # Unified streaming function for both chat and langflow async def async_stream( client, prompt: str, model: str, extra_headers: dict = None, previous_response_id: str = None, log_prefix: str = "response", ): async for chunk in async_response_stream( client, prompt, model, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix=log_prefix, ): yield chunk # Async langflow function (non-streaming only) async def async_langflow( langflow_client, flow_id: str, prompt: str, extra_headers: dict = None, previous_response_id: str = None, ): response_text, response_id, response_obj = await async_response( langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow", ) return response_text, response_id # Async langflow function for streaming (alias for compatibility) async def async_langflow_stream( langflow_client, flow_id: str, prompt: str, extra_headers: dict = None, previous_response_id: str = None, ): logger.debug("Starting langflow stream", prompt=prompt) try: async for chunk in async_stream( langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow", ): logger.debug( "Yielding chunk from langflow stream", chunk_preview=chunk[:100].decode("utf-8", errors="replace"), ) yield chunk logger.debug("Langflow stream completed") except Exception as e: logger.error("Exception in langflow stream", error=str(e)) import traceback traceback.print_exc() raise # Async chat function (non-streaming only) async def async_chat( async_client, prompt: str, user_id: str, model: str = "gpt-4.1-mini", previous_response_id: str = None, ): logger.debug( "async_chat called", user_id=user_id, previous_response_id=previous_response_id ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) logger.debug( "Got conversation state", message_count=len(conversation_state["messages"]) ) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) logger.debug( "Added user message", message_count=len(conversation_state["messages"]) ) response_text, response_id, response_obj = await async_response( async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent", ) logger.debug( "Got response", response_preview=response_text[:50], response_id=response_id ) # Add assistant response to conversation with response_id, timestamp, and full response object assistant_message = { "role": "assistant", "content": response_text, "response_id": response_id, "timestamp": datetime.now(), "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( "Added assistant message", message_count=len(conversation_state["messages"]) ) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) logger.debug( "Stored conversation thread", user_id=user_id, response_id=response_id ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) logger.debug( "User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys()), ) else: logger.warning("No response_id received, conversation not stored") return response_text, response_id # Async chat function for streaming (alias for compatibility) async def async_chat_stream( async_client, prompt: str, user_id: str, model: str = "gpt-4.1-mini", previous_response_id: str = None, ): # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) full_response = "" response_id = None async for chunk in async_stream( async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent", ): # Extract text content to build full response for history try: import json chunk_data = json.loads(chunk.decode("utf-8")) if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk if "id" in chunk_data: response_id = chunk_data["id"] elif "response_id" in chunk_data: response_id = chunk_data["response_id"] except: pass yield chunk # Add the complete assistant response to message history with response_id and timestamp if full_response: assistant_message = { "role": "assistant", "content": full_response, "response_id": response_id, "timestamp": datetime.now(), } conversation_state["messages"].append(assistant_message) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) logger.debug( f"Stored conversation thread for user {user_id} with response_id: {response_id}" ) # Async langflow function with conversation storage (non-streaming) async def async_langflow_chat( langflow_client, flow_id: str, prompt: str, user_id: str, extra_headers: dict = None, previous_response_id: str = None, store_conversation: bool = True, ): logger.debug( "async_langflow_chat called", user_id=user_id, previous_response_id=previous_response_id, ) if store_conversation: # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) logger.debug( "Got langflow conversation state", message_count=len(conversation_state["messages"]), ) # Add user message to conversation with timestamp from datetime import datetime if store_conversation: user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) logger.debug( "Added user message to langflow", message_count=len(conversation_state["messages"]), ) response_text, response_id, response_obj = await async_response( langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow", ) logger.debug( "Got langflow response", response_preview=response_text[:50], response_id=response_id, ) logger.debug( "Got langflow response", response_preview=response_text[:50], response_id=response_id, ) if store_conversation: # Add assistant response to conversation with response_id and timestamp assistant_message = { "role": "assistant", "content": response_text, "response_id": response_id, "timestamp": datetime.now(), "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( "Added assistant message to langflow", message_count=len(conversation_state["messages"]), ) if not store_conversation: return response_text, response_id # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service session_ownership_service.claim_session(user_id, response_id) logger.debug(f"Claimed session {response_id} for user {user_id}") except Exception as e: logger.warning(f"Failed to claim session ownership: {e}") logger.debug( f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( "Stored langflow conversation thread", user_id=user_id, response_id=response_id, ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) logger.debug( "User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys()), ) else: logger.warning("No response_id received from langflow, conversation not stored") return response_text, response_id # Async langflow function with conversation storage (streaming) async def async_langflow_chat_stream( langflow_client, flow_id: str, prompt: str, user_id: str, extra_headers: dict = None, previous_response_id: str = None, ): logger.debug( "async_langflow_chat_stream called", user_id=user_id, previous_response_id=previous_response_id, ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) full_response = "" response_id = None collected_chunks = [] # Store all chunks for function call data async for chunk in async_stream( langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow", ): # Extract text content to build full response for history try: import json chunk_data = json.loads(chunk.decode("utf-8")) collected_chunks.append(chunk_data) # Collect all chunk data if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk if "id" in chunk_data: response_id = chunk_data["id"] elif "response_id" in chunk_data: response_id = chunk_data["response_id"] except: pass yield chunk # Add the complete assistant response to message history with response_id, timestamp, and function call data if full_response: assistant_message = { "role": "assistant", "content": full_response, "response_id": response_id, "timestamp": datetime.now(), "chunks": collected_chunks, # Store complete chunk data for function calls } conversation_state["messages"].append(assistant_message) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service session_ownership_service.claim_session(user_id, response_id) logger.debug(f"Claimed session {response_id} for user {user_id}") except Exception as e: logger.warning(f"Failed to claim session ownership: {e}") logger.debug( f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) def delete_user_conversation(user_id: str, response_id: str) -> bool: """Delete a conversation for a user from both memory and persistent storage""" deleted = False try: # Delete from in-memory storage if user_id in active_conversations and response_id in active_conversations[user_id]: del active_conversations[user_id][response_id] logger.debug(f"Deleted conversation {response_id} from memory for user {user_id}") deleted = True # Delete from persistent storage conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id) if conversation_deleted: logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}") deleted = True # Release session ownership try: from services.session_ownership_service import session_ownership_service session_ownership_service.release_session(user_id, response_id) logger.debug(f"Released session ownership for {response_id} for user {user_id}") except Exception as e: logger.warning(f"Failed to release session ownership: {e}") return deleted except Exception as e: logger.error(f"Error deleting conversation {response_id} for user {user_id}: {e}") return False