Update agent.py
This commit is contained in:
parent
2660da9a93
commit
9358408285
1 changed files with 43 additions and 33 deletions
76
src/agent.py
76
src/agent.py
|
|
@ -8,6 +8,7 @@ from services.conversation_persistence_service import conversation_persistence
|
||||||
# In-memory storage for active conversation threads (preserves function calls)
|
# In-memory storage for active conversation threads (preserves function calls)
|
||||||
active_conversations = {}
|
active_conversations = {}
|
||||||
|
|
||||||
|
|
||||||
def get_user_conversations(user_id: str):
|
def get_user_conversations(user_id: str):
|
||||||
"""Get conversation metadata for a user from persistent storage"""
|
"""Get conversation metadata for a user from persistent storage"""
|
||||||
return conversation_persistence.get_user_conversations(user_id)
|
return conversation_persistence.get_user_conversations(user_id)
|
||||||
|
|
@ -23,7 +24,9 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
||||||
|
|
||||||
# If we have a previous_response_id, try to get the existing conversation
|
# If we have a previous_response_id, try to get the existing conversation
|
||||||
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
||||||
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}")
|
logger.debug(
|
||||||
|
f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}"
|
||||||
|
)
|
||||||
return active_conversations[user_id][previous_response_id]
|
return active_conversations[user_id][previous_response_id]
|
||||||
|
|
||||||
# Create new conversation thread
|
# Create new conversation thread
|
||||||
|
|
@ -48,7 +51,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
||||||
if user_id not in active_conversations:
|
if user_id not in active_conversations:
|
||||||
active_conversations[user_id] = {}
|
active_conversations[user_id] = {}
|
||||||
active_conversations[user_id][response_id] = conversation_state
|
active_conversations[user_id][response_id] = conversation_state
|
||||||
|
|
||||||
# 2. Store only essential metadata to disk (simplified JSON)
|
# 2. Store only essential metadata to disk (simplified JSON)
|
||||||
messages = conversation_state.get("messages", [])
|
messages = conversation_state.get("messages", [])
|
||||||
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
|
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
|
||||||
|
|
@ -56,7 +59,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
||||||
if first_user_msg:
|
if first_user_msg:
|
||||||
content = first_user_msg.get("content", "")
|
content = first_user_msg.get("content", "")
|
||||||
title = content[:50] + "..." if len(content) > 50 else content
|
title = content[:50] + "..." if len(content) > 50 else content
|
||||||
|
|
||||||
metadata_only = {
|
metadata_only = {
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"title": title,
|
"title": title,
|
||||||
|
|
@ -64,11 +67,15 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
||||||
"created_at": conversation_state.get("created_at"),
|
"created_at": conversation_state.get("created_at"),
|
||||||
"last_activity": conversation_state.get("last_activity"),
|
"last_activity": conversation_state.get("last_activity"),
|
||||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||||
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]),
|
"total_messages": len(
|
||||||
|
[msg for msg in messages if msg.get("role") in ["user", "assistant"]]
|
||||||
|
),
|
||||||
# Don't store actual messages - Langflow has them
|
# Don't store actual messages - Langflow has them
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only)
|
conversation_persistence.store_conversation_thread(
|
||||||
|
user_id, response_id, metadata_only
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# Legacy function for backward compatibility
|
# Legacy function for backward compatibility
|
||||||
|
|
@ -76,10 +83,12 @@ def get_user_conversation(user_id: str):
|
||||||
"""Get the most recent conversation for a user (for backward compatibility)"""
|
"""Get the most recent conversation for a user (for backward compatibility)"""
|
||||||
# Check in-memory conversations first (with function calls)
|
# Check in-memory conversations first (with function calls)
|
||||||
if user_id in active_conversations and active_conversations[user_id]:
|
if user_id in active_conversations and active_conversations[user_id]:
|
||||||
latest_response_id = max(active_conversations[user_id].keys(),
|
latest_response_id = max(
|
||||||
key=lambda k: active_conversations[user_id][k]["last_activity"])
|
active_conversations[user_id].keys(),
|
||||||
|
key=lambda k: active_conversations[user_id][k]["last_activity"],
|
||||||
|
)
|
||||||
return active_conversations[user_id][latest_response_id]
|
return active_conversations[user_id][latest_response_id]
|
||||||
|
|
||||||
# Fallback to metadata-only conversations
|
# Fallback to metadata-only conversations
|
||||||
conversations = get_user_conversations(user_id)
|
conversations = get_user_conversations(user_id)
|
||||||
if not conversations:
|
if not conversations:
|
||||||
|
|
@ -342,7 +351,9 @@ async def async_chat(
|
||||||
"content": response_text,
|
"content": response_text,
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
|
"response_data": response_obj.model_dump()
|
||||||
|
if hasattr(response_obj, "model_dump")
|
||||||
|
else str(response_obj), # Store complete response for function calls
|
||||||
}
|
}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
@ -428,7 +439,7 @@ async def async_chat_stream(
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored conversation thread", user_id=user_id, response_id=response_id
|
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -443,11 +454,8 @@ async def async_langflow_chat(
|
||||||
store_conversation: bool = True,
|
store_conversation: bool = True,
|
||||||
):
|
):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
||||||
"async_langflow_chat called",
|
"async_langflow_chat called",
|
||||||
|
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
|
|
||||||
previous_response_id=previous_response_id,
|
previous_response_id=previous_response_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -496,7 +504,9 @@ async def async_langflow_chat(
|
||||||
"content": response_text,
|
"content": response_text,
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
|
"response_data": response_obj.model_dump()
|
||||||
|
if hasattr(response_obj, "model_dump")
|
||||||
|
else str(response_obj), # Store complete response for function calls
|
||||||
}
|
}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
@ -511,17 +521,18 @@ async def async_langflow_chat(
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
|
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
from services.session_ownership_service import session_ownership_service
|
from services.session_ownership_service import session_ownership_service
|
||||||
session_ownership_service.claim_session(user_id, response_id)
|
|
||||||
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[WARNING] Failed to claim session ownership: {e}")
|
|
||||||
|
|
||||||
print(
|
session_ownership_service.claim_session(user_id, response_id)
|
||||||
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to claim session ownership: {e}")
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored langflow conversation thread",
|
"Stored langflow conversation thread",
|
||||||
|
|
@ -570,7 +581,7 @@ async def async_langflow_chat_stream(
|
||||||
full_response = ""
|
full_response = ""
|
||||||
response_id = None
|
response_id = None
|
||||||
collected_chunks = [] # Store all chunks for function call data
|
collected_chunks = [] # Store all chunks for function call data
|
||||||
|
|
||||||
async for chunk in async_stream(
|
async for chunk in async_stream(
|
||||||
langflow_client,
|
langflow_client,
|
||||||
prompt,
|
prompt,
|
||||||
|
|
@ -585,7 +596,7 @@ async def async_langflow_chat_stream(
|
||||||
|
|
||||||
chunk_data = json.loads(chunk.decode("utf-8"))
|
chunk_data = json.loads(chunk.decode("utf-8"))
|
||||||
collected_chunks.append(chunk_data) # Collect all chunk data
|
collected_chunks.append(chunk_data) # Collect all chunk data
|
||||||
|
|
||||||
if "delta" in chunk_data and "content" in chunk_data["delta"]:
|
if "delta" in chunk_data and "content" in chunk_data["delta"]:
|
||||||
full_response += chunk_data["delta"]["content"]
|
full_response += chunk_data["delta"]["content"]
|
||||||
# Extract response_id from chunk
|
# Extract response_id from chunk
|
||||||
|
|
@ -612,20 +623,19 @@ async def async_langflow_chat_stream(
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
|
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
from services.session_ownership_service import session_ownership_service
|
from services.session_ownership_service import session_ownership_service
|
||||||
|
|
||||||
session_ownership_service.claim_session(user_id, response_id)
|
session_ownership_service.claim_session(user_id, response_id)
|
||||||
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
|
logger.debug(f"Claimed session {response_id} for user {user_id}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WARNING] Failed to claim session ownership: {e}")
|
logger.warning(f"Failed to claim session ownership: {e}")
|
||||||
|
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored langflow conversation thread",
|
f"Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
user_id=user_id,
|
|
||||||
response_id=response_id,
|
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue