* implement delete user conversation on agent * format * implement delete session endpoint * implement delete session on persistence services * added deletion of sessions and added fetch sessions with query instead of with useEffect * removed unused texts * implemented dropdown menu on conversations
669 lines
23 KiB
Python
669 lines
23 KiB
Python
from utils.logging_config import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Import persistent storage
|
|
from services.conversation_persistence_service import conversation_persistence
|
|
|
|
# In-memory storage for active conversation threads (preserves function calls)
|
|
active_conversations = {}
|
|
|
|
|
|
def get_user_conversations(user_id: str):
|
|
"""Get conversation metadata for a user from persistent storage"""
|
|
return conversation_persistence.get_user_conversations(user_id)
|
|
|
|
|
|
def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
|
"""Get or create a specific conversation thread with function call preservation"""
|
|
from datetime import datetime
|
|
|
|
# Create user namespace if it doesn't exist
|
|
if user_id not in active_conversations:
|
|
active_conversations[user_id] = {}
|
|
|
|
# If we have a previous_response_id, try to get the existing conversation
|
|
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
|
logger.debug(
|
|
f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}"
|
|
)
|
|
return active_conversations[user_id][previous_response_id]
|
|
|
|
# Create new conversation thread
|
|
new_conversation = {
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": "You are a helpful assistant. Always use the search_tools to answer questions.",
|
|
}
|
|
],
|
|
"previous_response_id": previous_response_id, # Parent response_id for branching
|
|
"created_at": datetime.now(),
|
|
"last_activity": datetime.now(),
|
|
}
|
|
|
|
return new_conversation
|
|
|
|
|
|
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
|
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
|
|
# 1. Store full conversation in memory for function call preservation
|
|
if user_id not in active_conversations:
|
|
active_conversations[user_id] = {}
|
|
active_conversations[user_id][response_id] = conversation_state
|
|
|
|
# 2. Store only essential metadata to disk (simplified JSON)
|
|
messages = conversation_state.get("messages", [])
|
|
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
|
|
title = "New Chat"
|
|
if first_user_msg:
|
|
content = first_user_msg.get("content", "")
|
|
title = content[:50] + "..." if len(content) > 50 else content
|
|
|
|
metadata_only = {
|
|
"response_id": response_id,
|
|
"title": title,
|
|
"endpoint": "langflow",
|
|
"created_at": conversation_state.get("created_at"),
|
|
"last_activity": conversation_state.get("last_activity"),
|
|
"previous_response_id": conversation_state.get("previous_response_id"),
|
|
"total_messages": len(
|
|
[msg for msg in messages if msg.get("role") in ["user", "assistant"]]
|
|
),
|
|
# Don't store actual messages - Langflow has them
|
|
}
|
|
|
|
conversation_persistence.store_conversation_thread(
|
|
user_id, response_id, metadata_only
|
|
)
|
|
|
|
|
|
# Legacy function for backward compatibility
|
|
def get_user_conversation(user_id: str):
|
|
"""Get the most recent conversation for a user (for backward compatibility)"""
|
|
# Check in-memory conversations first (with function calls)
|
|
if user_id in active_conversations and active_conversations[user_id]:
|
|
latest_response_id = max(
|
|
active_conversations[user_id].keys(),
|
|
key=lambda k: active_conversations[user_id][k]["last_activity"],
|
|
)
|
|
return active_conversations[user_id][latest_response_id]
|
|
|
|
# Fallback to metadata-only conversations
|
|
conversations = get_user_conversations(user_id)
|
|
if not conversations:
|
|
return get_conversation_thread(user_id)
|
|
|
|
# Return the most recently active conversation metadata
|
|
latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"])
|
|
return latest_conversation
|
|
|
|
|
|
# Generic async response function for streaming
|
|
async def async_response_stream(
|
|
client,
|
|
prompt: str,
|
|
model: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
log_prefix: str = "response",
|
|
):
|
|
logger.info("User prompt received", prompt=prompt)
|
|
|
|
try:
|
|
# Build request parameters
|
|
request_params = {
|
|
"model": model,
|
|
"input": prompt,
|
|
"stream": True,
|
|
"include": ["tool_call.results"],
|
|
}
|
|
if previous_response_id is not None:
|
|
request_params["previous_response_id"] = previous_response_id
|
|
|
|
if "x-api-key" not in client.default_headers:
|
|
if hasattr(client, "api_key") and extra_headers is not None:
|
|
extra_headers["x-api-key"] = client.api_key
|
|
|
|
if extra_headers:
|
|
request_params["extra_headers"] = extra_headers
|
|
|
|
response = await client.responses.create(**request_params)
|
|
|
|
full_response = ""
|
|
chunk_count = 0
|
|
async for chunk in response:
|
|
chunk_count += 1
|
|
logger.debug(
|
|
"Stream chunk received", chunk_count=chunk_count, chunk=str(chunk)
|
|
)
|
|
|
|
# Yield the raw event as JSON for the UI to process
|
|
import json
|
|
|
|
# Also extract text content for logging
|
|
if hasattr(chunk, "output_text") and chunk.output_text:
|
|
full_response += chunk.output_text
|
|
elif hasattr(chunk, "delta") and chunk.delta:
|
|
# Handle delta properly - it might be a dict or string
|
|
if isinstance(chunk.delta, dict):
|
|
delta_text = (
|
|
chunk.delta.get("content", "")
|
|
or chunk.delta.get("text", "")
|
|
or str(chunk.delta)
|
|
)
|
|
else:
|
|
delta_text = str(chunk.delta)
|
|
full_response += delta_text
|
|
|
|
# Send the raw event as JSON followed by newline for easy parsing
|
|
try:
|
|
# Try to serialize the chunk object
|
|
if hasattr(chunk, "model_dump"):
|
|
# Pydantic model
|
|
chunk_data = chunk.model_dump()
|
|
elif hasattr(chunk, "__dict__"):
|
|
chunk_data = chunk.__dict__
|
|
else:
|
|
chunk_data = str(chunk)
|
|
|
|
yield (json.dumps(chunk_data, default=str) + "\n").encode("utf-8")
|
|
except Exception as e:
|
|
# Fallback to string representation
|
|
logger.warning("JSON serialization failed", error=str(e))
|
|
yield (
|
|
json.dumps(
|
|
{"error": f"Serialization failed: {e}", "raw": str(chunk)}
|
|
)
|
|
+ "\n"
|
|
).encode("utf-8")
|
|
|
|
logger.debug("Stream complete", total_chunks=chunk_count)
|
|
logger.info("Response generated", log_prefix=log_prefix, response=full_response)
|
|
|
|
except Exception as e:
|
|
logger.error("Exception in streaming", error=str(e))
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
# Generic async response function for non-streaming
|
|
async def async_response(
|
|
client,
|
|
prompt: str,
|
|
model: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
log_prefix: str = "response",
|
|
):
|
|
try:
|
|
logger.info("User prompt received", prompt=prompt)
|
|
|
|
# Build request parameters
|
|
request_params = {
|
|
"model": model,
|
|
"input": prompt,
|
|
"stream": False,
|
|
"include": ["tool_call.results"],
|
|
}
|
|
if previous_response_id is not None:
|
|
request_params["previous_response_id"] = previous_response_id
|
|
if extra_headers:
|
|
request_params["extra_headers"] = extra_headers
|
|
|
|
if "x-api-key" not in client.default_headers:
|
|
if hasattr(client, "api_key") and extra_headers is not None:
|
|
extra_headers["x-api-key"] = client.api_key
|
|
|
|
response = await client.responses.create(**request_params)
|
|
|
|
response_text = response.output_text
|
|
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
|
|
|
|
# Extract and store response_id if available
|
|
response_id = getattr(response, "id", None) or getattr(
|
|
response, "response_id", None
|
|
)
|
|
|
|
return response_text, response_id, response
|
|
except Exception as e:
|
|
logger.error("Exception in non-streaming response", error=str(e))
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
# Unified streaming function for both chat and langflow
|
|
async def async_stream(
|
|
client,
|
|
prompt: str,
|
|
model: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
log_prefix: str = "response",
|
|
):
|
|
async for chunk in async_response_stream(
|
|
client,
|
|
prompt,
|
|
model,
|
|
extra_headers=extra_headers,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix=log_prefix,
|
|
):
|
|
yield chunk
|
|
|
|
|
|
# Async langflow function (non-streaming only)
|
|
async def async_langflow(
|
|
langflow_client,
|
|
flow_id: str,
|
|
prompt: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
):
|
|
response_text, response_id, response_obj = await async_response(
|
|
langflow_client,
|
|
prompt,
|
|
flow_id,
|
|
extra_headers=extra_headers,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="langflow",
|
|
)
|
|
return response_text, response_id
|
|
|
|
|
|
# Async langflow function for streaming (alias for compatibility)
|
|
async def async_langflow_stream(
|
|
langflow_client,
|
|
flow_id: str,
|
|
prompt: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
):
|
|
logger.debug("Starting langflow stream", prompt=prompt)
|
|
try:
|
|
async for chunk in async_stream(
|
|
langflow_client,
|
|
prompt,
|
|
flow_id,
|
|
extra_headers=extra_headers,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="langflow",
|
|
):
|
|
logger.debug(
|
|
"Yielding chunk from langflow stream",
|
|
chunk_preview=chunk[:100].decode("utf-8", errors="replace"),
|
|
)
|
|
yield chunk
|
|
logger.debug("Langflow stream completed")
|
|
except Exception as e:
|
|
logger.error("Exception in langflow stream", error=str(e))
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
# Async chat function (non-streaming only)
|
|
async def async_chat(
|
|
async_client,
|
|
prompt: str,
|
|
user_id: str,
|
|
model: str = "gpt-4.1-mini",
|
|
previous_response_id: str = None,
|
|
):
|
|
logger.debug(
|
|
"async_chat called", user_id=user_id, previous_response_id=previous_response_id
|
|
)
|
|
|
|
# Get the specific conversation thread (or create new one)
|
|
conversation_state = get_conversation_thread(user_id, previous_response_id)
|
|
logger.debug(
|
|
"Got conversation state", message_count=len(conversation_state["messages"])
|
|
)
|
|
|
|
# Add user message to conversation with timestamp
|
|
from datetime import datetime
|
|
|
|
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
|
|
conversation_state["messages"].append(user_message)
|
|
logger.debug(
|
|
"Added user message", message_count=len(conversation_state["messages"])
|
|
)
|
|
|
|
response_text, response_id, response_obj = await async_response(
|
|
async_client,
|
|
prompt,
|
|
model,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="agent",
|
|
)
|
|
logger.debug(
|
|
"Got response", response_preview=response_text[:50], response_id=response_id
|
|
)
|
|
|
|
# Add assistant response to conversation with response_id, timestamp, and full response object
|
|
assistant_message = {
|
|
"role": "assistant",
|
|
"content": response_text,
|
|
"response_id": response_id,
|
|
"timestamp": datetime.now(),
|
|
"response_data": response_obj.model_dump()
|
|
if hasattr(response_obj, "model_dump")
|
|
else str(response_obj), # Store complete response for function calls
|
|
}
|
|
conversation_state["messages"].append(assistant_message)
|
|
logger.debug(
|
|
"Added assistant message", message_count=len(conversation_state["messages"])
|
|
)
|
|
|
|
# Store the conversation thread with its response_id
|
|
if response_id:
|
|
conversation_state["last_activity"] = datetime.now()
|
|
store_conversation_thread(user_id, response_id, conversation_state)
|
|
logger.debug(
|
|
"Stored conversation thread", user_id=user_id, response_id=response_id
|
|
)
|
|
|
|
# Debug: Check what's in user_conversations now
|
|
conversations = get_user_conversations(user_id)
|
|
logger.debug(
|
|
"User conversations updated",
|
|
user_id=user_id,
|
|
conversation_count=len(conversations),
|
|
conversation_ids=list(conversations.keys()),
|
|
)
|
|
else:
|
|
logger.warning("No response_id received, conversation not stored")
|
|
|
|
return response_text, response_id
|
|
|
|
|
|
# Async chat function for streaming (alias for compatibility)
|
|
async def async_chat_stream(
|
|
async_client,
|
|
prompt: str,
|
|
user_id: str,
|
|
model: str = "gpt-4.1-mini",
|
|
previous_response_id: str = None,
|
|
):
|
|
# Get the specific conversation thread (or create new one)
|
|
conversation_state = get_conversation_thread(user_id, previous_response_id)
|
|
|
|
# Add user message to conversation with timestamp
|
|
from datetime import datetime
|
|
|
|
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
|
|
conversation_state["messages"].append(user_message)
|
|
|
|
full_response = ""
|
|
response_id = None
|
|
async for chunk in async_stream(
|
|
async_client,
|
|
prompt,
|
|
model,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="agent",
|
|
):
|
|
# Extract text content to build full response for history
|
|
try:
|
|
import json
|
|
|
|
chunk_data = json.loads(chunk.decode("utf-8"))
|
|
if "delta" in chunk_data and "content" in chunk_data["delta"]:
|
|
full_response += chunk_data["delta"]["content"]
|
|
# Extract response_id from chunk
|
|
if "id" in chunk_data:
|
|
response_id = chunk_data["id"]
|
|
elif "response_id" in chunk_data:
|
|
response_id = chunk_data["response_id"]
|
|
except:
|
|
pass
|
|
yield chunk
|
|
|
|
# Add the complete assistant response to message history with response_id and timestamp
|
|
if full_response:
|
|
assistant_message = {
|
|
"role": "assistant",
|
|
"content": full_response,
|
|
"response_id": response_id,
|
|
"timestamp": datetime.now(),
|
|
}
|
|
conversation_state["messages"].append(assistant_message)
|
|
|
|
# Store the conversation thread with its response_id
|
|
if response_id:
|
|
conversation_state["last_activity"] = datetime.now()
|
|
store_conversation_thread(user_id, response_id, conversation_state)
|
|
logger.debug(
|
|
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
|
)
|
|
|
|
|
|
# Async langflow function with conversation storage (non-streaming)
|
|
async def async_langflow_chat(
|
|
langflow_client,
|
|
flow_id: str,
|
|
prompt: str,
|
|
user_id: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
store_conversation: bool = True,
|
|
):
|
|
logger.debug(
|
|
"async_langflow_chat called",
|
|
user_id=user_id,
|
|
previous_response_id=previous_response_id,
|
|
)
|
|
|
|
if store_conversation:
|
|
# Get the specific conversation thread (or create new one)
|
|
conversation_state = get_conversation_thread(user_id, previous_response_id)
|
|
logger.debug(
|
|
"Got langflow conversation state",
|
|
message_count=len(conversation_state["messages"]),
|
|
)
|
|
|
|
# Add user message to conversation with timestamp
|
|
from datetime import datetime
|
|
|
|
if store_conversation:
|
|
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
|
|
conversation_state["messages"].append(user_message)
|
|
logger.debug(
|
|
"Added user message to langflow",
|
|
message_count=len(conversation_state["messages"]),
|
|
)
|
|
|
|
response_text, response_id, response_obj = await async_response(
|
|
langflow_client,
|
|
prompt,
|
|
flow_id,
|
|
extra_headers=extra_headers,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="langflow",
|
|
)
|
|
logger.debug(
|
|
"Got langflow response",
|
|
response_preview=response_text[:50],
|
|
response_id=response_id,
|
|
)
|
|
logger.debug(
|
|
"Got langflow response",
|
|
response_preview=response_text[:50],
|
|
response_id=response_id,
|
|
)
|
|
|
|
if store_conversation:
|
|
# Add assistant response to conversation with response_id and timestamp
|
|
assistant_message = {
|
|
"role": "assistant",
|
|
"content": response_text,
|
|
"response_id": response_id,
|
|
"timestamp": datetime.now(),
|
|
"response_data": response_obj.model_dump()
|
|
if hasattr(response_obj, "model_dump")
|
|
else str(response_obj), # Store complete response for function calls
|
|
}
|
|
conversation_state["messages"].append(assistant_message)
|
|
logger.debug(
|
|
"Added assistant message to langflow",
|
|
message_count=len(conversation_state["messages"]),
|
|
)
|
|
|
|
if not store_conversation:
|
|
return response_text, response_id
|
|
|
|
# Store the conversation thread with its response_id
|
|
if response_id:
|
|
conversation_state["last_activity"] = datetime.now()
|
|
store_conversation_thread(user_id, response_id, conversation_state)
|
|
|
|
# Claim session ownership for this user
|
|
try:
|
|
from services.session_ownership_service import session_ownership_service
|
|
|
|
session_ownership_service.claim_session(user_id, response_id)
|
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to claim session ownership: {e}")
|
|
|
|
logger.debug(
|
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
|
)
|
|
logger.debug(
|
|
"Stored langflow conversation thread",
|
|
user_id=user_id,
|
|
response_id=response_id,
|
|
)
|
|
|
|
# Debug: Check what's in user_conversations now
|
|
conversations = get_user_conversations(user_id)
|
|
logger.debug(
|
|
"User conversations updated",
|
|
user_id=user_id,
|
|
conversation_count=len(conversations),
|
|
conversation_ids=list(conversations.keys()),
|
|
)
|
|
else:
|
|
logger.warning("No response_id received from langflow, conversation not stored")
|
|
|
|
return response_text, response_id
|
|
|
|
|
|
# Async langflow function with conversation storage (streaming)
|
|
async def async_langflow_chat_stream(
|
|
langflow_client,
|
|
flow_id: str,
|
|
prompt: str,
|
|
user_id: str,
|
|
extra_headers: dict = None,
|
|
previous_response_id: str = None,
|
|
):
|
|
logger.debug(
|
|
"async_langflow_chat_stream called",
|
|
user_id=user_id,
|
|
previous_response_id=previous_response_id,
|
|
)
|
|
|
|
# Get the specific conversation thread (or create new one)
|
|
conversation_state = get_conversation_thread(user_id, previous_response_id)
|
|
|
|
# Add user message to conversation with timestamp
|
|
from datetime import datetime
|
|
|
|
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
|
|
conversation_state["messages"].append(user_message)
|
|
|
|
full_response = ""
|
|
response_id = None
|
|
collected_chunks = [] # Store all chunks for function call data
|
|
|
|
async for chunk in async_stream(
|
|
langflow_client,
|
|
prompt,
|
|
flow_id,
|
|
extra_headers=extra_headers,
|
|
previous_response_id=previous_response_id,
|
|
log_prefix="langflow",
|
|
):
|
|
# Extract text content to build full response for history
|
|
try:
|
|
import json
|
|
|
|
chunk_data = json.loads(chunk.decode("utf-8"))
|
|
collected_chunks.append(chunk_data) # Collect all chunk data
|
|
|
|
if "delta" in chunk_data and "content" in chunk_data["delta"]:
|
|
full_response += chunk_data["delta"]["content"]
|
|
# Extract response_id from chunk
|
|
if "id" in chunk_data:
|
|
response_id = chunk_data["id"]
|
|
elif "response_id" in chunk_data:
|
|
response_id = chunk_data["response_id"]
|
|
except:
|
|
pass
|
|
yield chunk
|
|
|
|
# Add the complete assistant response to message history with response_id, timestamp, and function call data
|
|
if full_response:
|
|
assistant_message = {
|
|
"role": "assistant",
|
|
"content": full_response,
|
|
"response_id": response_id,
|
|
"timestamp": datetime.now(),
|
|
"chunks": collected_chunks, # Store complete chunk data for function calls
|
|
}
|
|
conversation_state["messages"].append(assistant_message)
|
|
|
|
# Store the conversation thread with its response_id
|
|
if response_id:
|
|
conversation_state["last_activity"] = datetime.now()
|
|
store_conversation_thread(user_id, response_id, conversation_state)
|
|
|
|
# Claim session ownership for this user
|
|
try:
|
|
from services.session_ownership_service import session_ownership_service
|
|
|
|
session_ownership_service.claim_session(user_id, response_id)
|
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to claim session ownership: {e}")
|
|
|
|
logger.debug(
|
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
|
)
|
|
|
|
|
|
def delete_user_conversation(user_id: str, response_id: str) -> bool:
|
|
"""Delete a conversation for a user from both memory and persistent storage"""
|
|
deleted = False
|
|
|
|
try:
|
|
# Delete from in-memory storage
|
|
if user_id in active_conversations and response_id in active_conversations[user_id]:
|
|
del active_conversations[user_id][response_id]
|
|
logger.debug(f"Deleted conversation {response_id} from memory for user {user_id}")
|
|
deleted = True
|
|
|
|
# Delete from persistent storage
|
|
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
|
|
if conversation_deleted:
|
|
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
|
|
deleted = True
|
|
|
|
# Release session ownership
|
|
try:
|
|
from services.session_ownership_service import session_ownership_service
|
|
session_ownership_service.release_session(user_id, response_id)
|
|
logger.debug(f"Released session ownership for {response_id} for user {user_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to release session ownership: {e}")
|
|
|
|
return deleted
|
|
except Exception as e:
|
|
logger.error(f"Error deleting conversation {response_id} for user {user_id}: {e}")
|
|
return False
|