update FLOW_ID to LANGFLOW_CHAT_FLOW_ID
This commit is contained in:
parent
e1d58c7421
commit
d77ebb5f37
1 changed files with 193 additions and 77 deletions
|
|
@ -1,43 +1,75 @@
|
|||
from config.settings import clients, LANGFLOW_URL, FLOW_ID
|
||||
from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream
|
||||
from auth_context import set_auth_context
|
||||
import json
|
||||
|
||||
from agent import async_chat, async_chat_stream, async_langflow
|
||||
from auth_context import set_auth_context
|
||||
from config.settings import LANGFLOW_CHAT_FLOW_ID, LANGFLOW_URL, clients
|
||||
|
||||
|
||||
class ChatService:
|
||||
|
||||
async def chat(self, prompt: str, user_id: str = None, jwt_token: str = None, previous_response_id: str = None, stream: bool = False):
|
||||
async def chat(
|
||||
self,
|
||||
prompt: str,
|
||||
user_id: str = None,
|
||||
jwt_token: str = None,
|
||||
previous_response_id: str = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
"""Handle chat requests using the patched OpenAI client"""
|
||||
if not prompt:
|
||||
raise ValueError("Prompt is required")
|
||||
|
||||
|
||||
# Set authentication context for this request so tools can access it
|
||||
if user_id and jwt_token:
|
||||
set_auth_context(user_id, jwt_token)
|
||||
|
||||
|
||||
if stream:
|
||||
return async_chat_stream(clients.patched_async_client, prompt, user_id, previous_response_id=previous_response_id)
|
||||
return async_chat_stream(
|
||||
clients.patched_async_client,
|
||||
prompt,
|
||||
user_id,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
else:
|
||||
response_text, response_id = await async_chat(clients.patched_async_client, prompt, user_id, previous_response_id=previous_response_id)
|
||||
response_text, response_id = await async_chat(
|
||||
clients.patched_async_client,
|
||||
prompt,
|
||||
user_id,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
response_data = {"response": response_text}
|
||||
if response_id:
|
||||
response_data["response_id"] = response_id
|
||||
return response_data
|
||||
|
||||
async def langflow_chat(self, prompt: str, user_id: str = None, jwt_token: str = None, previous_response_id: str = None, stream: bool = False):
|
||||
async def langflow_chat(
|
||||
self,
|
||||
prompt: str,
|
||||
user_id: str = None,
|
||||
jwt_token: str = None,
|
||||
previous_response_id: str = None,
|
||||
stream: bool = False,
|
||||
):
|
||||
"""Handle Langflow chat requests"""
|
||||
if not prompt:
|
||||
raise ValueError("Prompt is required")
|
||||
|
||||
if not LANGFLOW_URL or not FLOW_ID:
|
||||
raise ValueError("LANGFLOW_URL and FLOW_ID environment variables are required")
|
||||
if not LANGFLOW_URL or not LANGFLOW_CHAT_FLOW_ID:
|
||||
raise ValueError(
|
||||
"LANGFLOW_URL and LANGFLOW_CHAT_FLOW_ID environment variables are required"
|
||||
)
|
||||
|
||||
# Prepare extra headers for JWT authentication
|
||||
extra_headers = {}
|
||||
if jwt_token:
|
||||
extra_headers['X-LANGFLOW-GLOBAL-VAR-JWT'] = jwt_token
|
||||
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
|
||||
|
||||
# Get context variables for filters, limit, and threshold
|
||||
from auth_context import get_search_filters, get_search_limit, get_score_threshold
|
||||
from auth_context import (
|
||||
get_score_threshold,
|
||||
get_search_filters,
|
||||
get_search_limit,
|
||||
)
|
||||
|
||||
filters = get_search_filters()
|
||||
limit = get_search_limit()
|
||||
score_threshold = get_score_threshold()
|
||||
|
|
@ -49,86 +81,130 @@ class ChatService:
|
|||
# Map frontend filter names to backend field names
|
||||
field_mapping = {
|
||||
"data_sources": "filename",
|
||||
"document_types": "mimetype",
|
||||
"owners": "owner"
|
||||
"document_types": "mimetype",
|
||||
"owners": "owner",
|
||||
}
|
||||
|
||||
|
||||
for filter_key, values in filters.items():
|
||||
if values is not None and isinstance(values, list) and len(values) > 0:
|
||||
# Map frontend key to backend field name
|
||||
field_name = field_mapping.get(filter_key, filter_key)
|
||||
|
||||
|
||||
if len(values) == 1:
|
||||
# Single value filter
|
||||
filter_clauses.append({"term": {field_name: values[0]}})
|
||||
else:
|
||||
# Multiple values filter
|
||||
filter_clauses.append({"terms": {field_name: values}})
|
||||
|
||||
|
||||
if filter_clauses:
|
||||
filter_expression["filter"] = filter_clauses
|
||||
|
||||
|
||||
# Add limit and score threshold to the filter expression (only if different from defaults)
|
||||
if limit and limit != 10: # 10 is the default limit
|
||||
filter_expression["limit"] = limit
|
||||
|
||||
|
||||
if score_threshold and score_threshold != 0: # 0 is the default threshold
|
||||
filter_expression["score_threshold"] = score_threshold
|
||||
|
||||
# Pass the complete filter expression as a single header to Langflow (only if we have something to send)
|
||||
if filter_expression:
|
||||
print(f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}")
|
||||
extra_headers['X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER'] = json.dumps(filter_expression)
|
||||
print(
|
||||
f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}"
|
||||
)
|
||||
extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps(
|
||||
filter_expression
|
||||
)
|
||||
|
||||
# Ensure the Langflow client exists; try lazy init if needed
|
||||
langflow_client = await clients.ensure_langflow_client()
|
||||
if not langflow_client:
|
||||
raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.")
|
||||
raise ValueError(
|
||||
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
|
||||
)
|
||||
|
||||
if stream:
|
||||
from agent import async_langflow_chat_stream
|
||||
return async_langflow_chat_stream(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
|
||||
|
||||
return async_langflow_chat_stream(
|
||||
langflow_client,
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
prompt,
|
||||
user_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
else:
|
||||
from agent import async_langflow_chat
|
||||
response_text, response_id = await async_langflow_chat(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
|
||||
|
||||
response_text, response_id = await async_langflow_chat(
|
||||
langflow_client,
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
prompt,
|
||||
user_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
response_data = {"response": response_text}
|
||||
if response_id:
|
||||
response_data["response_id"] = response_id
|
||||
return response_data
|
||||
|
||||
async def upload_context_chat(self, document_content: str, filename: str,
|
||||
user_id: str = None, jwt_token: str = None, previous_response_id: str = None, endpoint: str = "langflow"):
|
||||
async def upload_context_chat(
|
||||
self,
|
||||
document_content: str,
|
||||
filename: str,
|
||||
user_id: str = None,
|
||||
jwt_token: str = None,
|
||||
previous_response_id: str = None,
|
||||
endpoint: str = "langflow",
|
||||
):
|
||||
"""Send document content as user message to get proper response_id"""
|
||||
document_prompt = f"I'm uploading a document called '{filename}'. Here is its content:\n\n{document_content}\n\nPlease confirm you've received this document and are ready to answer questions about it."
|
||||
|
||||
|
||||
if endpoint == "langflow":
|
||||
# Prepare extra headers for JWT authentication
|
||||
extra_headers = {}
|
||||
if jwt_token:
|
||||
extra_headers['X-LANGFLOW-GLOBAL-VAR-JWT'] = jwt_token
|
||||
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
|
||||
# Ensure the Langflow client exists; try lazy init if needed
|
||||
langflow_client = await clients.ensure_langflow_client()
|
||||
if not langflow_client:
|
||||
raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.")
|
||||
response_text, response_id = await async_langflow(langflow_client, FLOW_ID, document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id)
|
||||
raise ValueError(
|
||||
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
|
||||
)
|
||||
response_text, response_id = await async_langflow(
|
||||
langflow_client,
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
document_prompt,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
else: # chat
|
||||
# Set auth context for chat tools and provide user_id
|
||||
if user_id and jwt_token:
|
||||
set_auth_context(user_id, jwt_token)
|
||||
response_text, response_id = await async_chat(clients.patched_async_client, document_prompt, user_id, previous_response_id=previous_response_id)
|
||||
|
||||
response_text, response_id = await async_chat(
|
||||
clients.patched_async_client,
|
||||
document_prompt,
|
||||
user_id,
|
||||
previous_response_id=previous_response_id,
|
||||
)
|
||||
|
||||
return response_text, response_id
|
||||
|
||||
async def get_chat_history(self, user_id: str):
|
||||
"""Get chat conversation history for a user"""
|
||||
from agent import get_user_conversations
|
||||
|
||||
|
||||
if not user_id:
|
||||
return {"error": "User ID is required", "conversations": []}
|
||||
|
||||
|
||||
conversations_dict = get_user_conversations(user_id)
|
||||
print(f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations")
|
||||
|
||||
print(
|
||||
f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations"
|
||||
)
|
||||
|
||||
# Convert conversations dict to list format with metadata
|
||||
conversations = []
|
||||
for response_id, conversation_state in conversations_dict.items():
|
||||
|
|
@ -139,47 +215,67 @@ class ChatService:
|
|||
message_data = {
|
||||
"role": msg["role"],
|
||||
"content": msg["content"],
|
||||
"timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None
|
||||
"timestamp": msg.get("timestamp").isoformat()
|
||||
if msg.get("timestamp")
|
||||
else None,
|
||||
}
|
||||
if msg.get("response_id"):
|
||||
message_data["response_id"] = msg["response_id"]
|
||||
messages.append(message_data)
|
||||
|
||||
|
||||
if messages: # Only include conversations with actual messages
|
||||
# Generate title from first user message
|
||||
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
||||
title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat"
|
||||
|
||||
conversations.append({
|
||||
"response_id": response_id,
|
||||
"title": title,
|
||||
"endpoint": "chat",
|
||||
"messages": messages,
|
||||
"created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None,
|
||||
"last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None,
|
||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||
"total_messages": len(messages)
|
||||
})
|
||||
|
||||
first_user_msg = next(
|
||||
(msg for msg in messages if msg["role"] == "user"), None
|
||||
)
|
||||
title = (
|
||||
first_user_msg["content"][:50] + "..."
|
||||
if first_user_msg and len(first_user_msg["content"]) > 50
|
||||
else first_user_msg["content"]
|
||||
if first_user_msg
|
||||
else "New chat"
|
||||
)
|
||||
|
||||
conversations.append(
|
||||
{
|
||||
"response_id": response_id,
|
||||
"title": title,
|
||||
"endpoint": "chat",
|
||||
"messages": messages,
|
||||
"created_at": conversation_state.get("created_at").isoformat()
|
||||
if conversation_state.get("created_at")
|
||||
else None,
|
||||
"last_activity": conversation_state.get(
|
||||
"last_activity"
|
||||
).isoformat()
|
||||
if conversation_state.get("last_activity")
|
||||
else None,
|
||||
"previous_response_id": conversation_state.get(
|
||||
"previous_response_id"
|
||||
),
|
||||
"total_messages": len(messages),
|
||||
}
|
||||
)
|
||||
|
||||
# Sort by last activity (most recent first)
|
||||
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
|
||||
|
||||
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"endpoint": "chat",
|
||||
"conversations": conversations,
|
||||
"total_conversations": len(conversations)
|
||||
"total_conversations": len(conversations),
|
||||
}
|
||||
|
||||
async def get_langflow_history(self, user_id: str):
|
||||
"""Get langflow conversation history for a user"""
|
||||
from agent import get_user_conversations
|
||||
|
||||
|
||||
if not user_id:
|
||||
return {"error": "User ID is required", "conversations": []}
|
||||
|
||||
|
||||
conversations_dict = get_user_conversations(user_id)
|
||||
|
||||
|
||||
# Convert conversations dict to list format with metadata
|
||||
conversations = []
|
||||
for response_id, conversation_state in conversations_dict.items():
|
||||
|
|
@ -190,34 +286,54 @@ class ChatService:
|
|||
message_data = {
|
||||
"role": msg["role"],
|
||||
"content": msg["content"],
|
||||
"timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None
|
||||
"timestamp": msg.get("timestamp").isoformat()
|
||||
if msg.get("timestamp")
|
||||
else None,
|
||||
}
|
||||
if msg.get("response_id"):
|
||||
message_data["response_id"] = msg["response_id"]
|
||||
messages.append(message_data)
|
||||
|
||||
|
||||
if messages: # Only include conversations with actual messages
|
||||
# Generate title from first user message
|
||||
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
||||
title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat"
|
||||
|
||||
conversations.append({
|
||||
"response_id": response_id,
|
||||
"title": title,
|
||||
"endpoint": "langflow",
|
||||
"messages": messages,
|
||||
"created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None,
|
||||
"last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None,
|
||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||
"total_messages": len(messages)
|
||||
})
|
||||
|
||||
first_user_msg = next(
|
||||
(msg for msg in messages if msg["role"] == "user"), None
|
||||
)
|
||||
title = (
|
||||
first_user_msg["content"][:50] + "..."
|
||||
if first_user_msg and len(first_user_msg["content"]) > 50
|
||||
else first_user_msg["content"]
|
||||
if first_user_msg
|
||||
else "New chat"
|
||||
)
|
||||
|
||||
conversations.append(
|
||||
{
|
||||
"response_id": response_id,
|
||||
"title": title,
|
||||
"endpoint": "langflow",
|
||||
"messages": messages,
|
||||
"created_at": conversation_state.get("created_at").isoformat()
|
||||
if conversation_state.get("created_at")
|
||||
else None,
|
||||
"last_activity": conversation_state.get(
|
||||
"last_activity"
|
||||
).isoformat()
|
||||
if conversation_state.get("last_activity")
|
||||
else None,
|
||||
"previous_response_id": conversation_state.get(
|
||||
"previous_response_id"
|
||||
),
|
||||
"total_messages": len(messages),
|
||||
}
|
||||
)
|
||||
|
||||
# Sort by last activity (most recent first)
|
||||
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
|
||||
|
||||
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"endpoint": "langflow",
|
||||
"conversations": conversations,
|
||||
"total_conversations": len(conversations)
|
||||
"total_conversations": len(conversations),
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue