diff --git a/src/agent.py b/src/agent.py index 749eb299..5eba9762 100644 --- a/src/agent.py +++ b/src/agent.py @@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence # In-memory storage for active conversation threads (preserves function calls) active_conversations = {} + def get_user_conversations(user_id: str): """Get conversation metadata for a user from persistent storage""" return conversation_persistence.get_user_conversations(user_id) @@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None): # If we have a previous_response_id, try to get the existing conversation if previous_response_id and previous_response_id in active_conversations[user_id]: - logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}") + logger.debug( + f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}" + ) return active_conversations[user_id][previous_response_id] # Create new conversation thread @@ -48,7 +51,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if user_id not in active_conversations: active_conversations[user_id] = {} active_conversations[user_id][response_id] = conversation_state - + # 2. Store only essential metadata to disk (simplified JSON) messages = conversation_state.get("messages", []) first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) @@ -56,7 +59,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state if first_user_msg: content = first_user_msg.get("content", "") title = content[:50] + "..." if len(content) > 50 else content - + metadata_only = { "response_id": response_id, "title": title, @@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state "created_at": conversation_state.get("created_at"), "last_activity": conversation_state.get("last_activity"), "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]), + "total_messages": len( + [msg for msg in messages if msg.get("role") in ["user", "assistant"]] + ), # Don't store actual messages - Langflow has them } - - conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only) + + conversation_persistence.store_conversation_thread( + user_id, response_id, metadata_only + ) # Legacy function for backward compatibility @@ -76,10 +83,12 @@ def get_user_conversation(user_id: str): """Get the most recent conversation for a user (for backward compatibility)""" # Check in-memory conversations first (with function calls) if user_id in active_conversations and active_conversations[user_id]: - latest_response_id = max(active_conversations[user_id].keys(), - key=lambda k: active_conversations[user_id][k]["last_activity"]) + latest_response_id = max( + active_conversations[user_id].keys(), + key=lambda k: active_conversations[user_id][k]["last_activity"], + ) return active_conversations[user_id][latest_response_id] - + # Fallback to metadata-only conversations conversations = get_user_conversations(user_id) if not conversations: @@ -342,7 +351,9 @@ async def async_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -428,7 +439,7 @@ async def async_chat_stream( conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) logger.debug( - "Stored conversation thread", user_id=user_id, response_id=response_id + f"Stored conversation thread for user {user_id} with response_id: {response_id}" ) @@ -443,11 +454,8 @@ async def async_langflow_chat( store_conversation: bool = True, ): logger.debug( - "async_langflow_chat called", - user_id=user_id, - previous_response_id=previous_response_id, ) @@ -496,7 +504,9 @@ async def async_langflow_chat( "content": response_text, "response_id": response_id, "timestamp": datetime.now(), - "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls + "response_data": response_obj.model_dump() + if hasattr(response_obj, "model_dump") + else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -511,17 +521,18 @@ async def async_langflow_chat( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service - session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") - except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + session_ownership_service.claim_session(user_id, response_id) + logger.debug(f"Claimed session {response_id} for user {user_id}") + except Exception as e: + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( "Stored langflow conversation thread", @@ -570,7 +581,7 @@ async def async_langflow_chat_stream( full_response = "" response_id = None collected_chunks = [] # Store all chunks for function call data - + async for chunk in async_stream( langflow_client, prompt, @@ -585,7 +596,7 @@ async def async_langflow_chat_stream( chunk_data = json.loads(chunk.decode("utf-8")) collected_chunks.append(chunk_data) # Collect all chunk data - + if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk @@ -612,20 +623,19 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership for this user try: from services.session_ownership_service import session_ownership_service + session_ownership_service.claim_session(user_id, response_id) - print(f"[DEBUG] Claimed session {response_id} for user {user_id}") + logger.debug(f"Claimed session {response_id} for user {user_id}") except Exception as e: - print(f"[WARNING] Failed to claim session ownership: {e}") - - print( - f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" + logger.warning(f"Failed to claim session ownership: {e}") + + logger.debug( + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) logger.debug( - "Stored langflow conversation thread", - user_id=user_id, - response_id=response_id, + f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}" )