Add nudges service into backend

This commit is contained in:
Lucas Oliveira 2025-09-04 18:15:46 -03:00
parent c87877bb80
commit aad89b9166
6 changed files with 639 additions and 240 deletions

View file

@ -95,7 +95,9 @@ async def async_response_stream(
chunk_count = 0
async for chunk in response:
chunk_count += 1
logger.debug("Stream chunk received", chunk_count=chunk_count, chunk=str(chunk))
logger.debug(
"Stream chunk received", chunk_count=chunk_count, chunk=str(chunk)
)
# Yield the raw event as JSON for the UI to process
import json
@ -171,6 +173,10 @@ async def async_response(
if extra_headers:
request_params["extra_headers"] = extra_headers
if "x-api-key" not in client.default_headers:
if hasattr(client, "api_key") and extra_headers is not None:
extra_headers["x-api-key"] = client.api_key
response = await client.responses.create(**request_params)
response_text = response.output_text
@ -241,7 +247,10 @@ async def async_langflow_stream(
previous_response_id=previous_response_id,
log_prefix="langflow",
):
logger.debug("Yielding chunk from langflow stream", chunk_preview=chunk[:100].decode('utf-8', errors='replace'))
logger.debug(
"Yielding chunk from langflow stream",
chunk_preview=chunk[:100].decode("utf-8", errors="replace"),
)
yield chunk
logger.debug("Langflow stream completed")
except Exception as e:
@ -260,18 +269,24 @@ async def async_chat(
model: str = "gpt-4.1-mini",
previous_response_id: str = None,
):
logger.debug("async_chat called", user_id=user_id, previous_response_id=previous_response_id)
logger.debug(
"async_chat called", user_id=user_id, previous_response_id=previous_response_id
)
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
logger.debug("Got conversation state", message_count=len(conversation_state['messages']))
logger.debug(
"Got conversation state", message_count=len(conversation_state["messages"])
)
# Add user message to conversation with timestamp
from datetime import datetime
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
logger.debug("Added user message", message_count=len(conversation_state['messages']))
logger.debug(
"Added user message", message_count=len(conversation_state["messages"])
)
response_text, response_id = await async_response(
async_client,
@ -280,7 +295,9 @@ async def async_chat(
previous_response_id=previous_response_id,
log_prefix="agent",
)
logger.debug("Got response", response_preview=response_text[:50], response_id=response_id)
logger.debug(
"Got response", response_preview=response_text[:50], response_id=response_id
)
# Add assistant response to conversation with response_id and timestamp
assistant_message = {
@ -290,17 +307,26 @@ async def async_chat(
"timestamp": datetime.now(),
}
conversation_state["messages"].append(assistant_message)
logger.debug("Added assistant message", message_count=len(conversation_state['messages']))
logger.debug(
"Added assistant message", message_count=len(conversation_state["messages"])
)
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id
)
# Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id)
logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys()))
logger.debug(
"User conversations updated",
user_id=user_id,
conversation_count=len(conversations),
conversation_ids=list(conversations.keys()),
)
else:
logger.warning("No response_id received, conversation not stored")
@ -363,7 +389,9 @@ async def async_chat_stream(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id
)
# Async langflow function with conversation storage (non-streaming)
@ -374,19 +402,32 @@ async def async_langflow_chat(
user_id: str,
extra_headers: dict = None,
previous_response_id: str = None,
store_conversation: bool = True,
):
logger.debug("async_langflow_chat called", user_id=user_id, previous_response_id=previous_response_id)
logger.debug(
"async_langflow_chat called",
user_id=user_id,
previous_response_id=previous_response_id,
)
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
logger.debug("Got langflow conversation state", message_count=len(conversation_state['messages']))
if store_conversation:
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
logger.debug(
"Got langflow conversation state",
message_count=len(conversation_state["messages"]),
)
# Add user message to conversation with timestamp
from datetime import datetime
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
logger.debug("Added user message to langflow", message_count=len(conversation_state['messages']))
if store_conversation:
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
logger.debug(
"Added user message to langflow",
message_count=len(conversation_state["messages"]),
)
response_text, response_id = await async_response(
langflow_client,
@ -396,45 +437,69 @@ async def async_langflow_chat(
previous_response_id=previous_response_id,
log_prefix="langflow",
)
logger.debug("Got langflow response", response_preview=response_text[:50], response_id=response_id)
logger.debug(
"Got langflow response",
response_preview=response_text[:50],
response_id=response_id,
)
# Add assistant response to conversation with response_id and timestamp
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
}
conversation_state["messages"].append(assistant_message)
logger.debug("Added assistant message to langflow", message_count=len(conversation_state['messages']))
if store_conversation:
# Add assistant response to conversation with response_id and timestamp
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
}
conversation_state["messages"].append(assistant_message)
logger.debug(
"Added assistant message to langflow",
message_count=len(conversation_state["messages"]),
)
if not store_conversation:
return response_text, response_id
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership if this is a Google user
try:
from services.session_ownership_service import session_ownership_service
from services.user_binding_service import user_binding_service
# Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID)
if user_id.isdigit() and user_binding_service.has_binding(user_id):
langflow_user_id = user_binding_service.get_langflow_user_id(user_id)
if langflow_user_id:
session_ownership_service.claim_session(user_id, response_id, langflow_user_id)
print(f"[DEBUG] Claimed session {response_id} for Google user {user_id}")
session_ownership_service.claim_session(
user_id, response_id, langflow_user_id
)
print(
f"[DEBUG] Claimed session {response_id} for Google user {user_id}"
)
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)
# Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id)
logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys()))
logger.debug(
"User conversations updated",
user_id=user_id,
conversation_count=len(conversations),
conversation_ids=list(conversations.keys()),
)
else:
logger.warning("No response_id received from langflow, conversation not stored")
@ -450,7 +515,11 @@ async def async_langflow_chat_stream(
extra_headers: dict = None,
previous_response_id: str = None,
):
logger.debug("async_langflow_chat_stream called", user_id=user_id, previous_response_id=previous_response_id)
logger.debug(
"async_langflow_chat_stream called",
user_id=user_id,
previous_response_id=previous_response_id,
)
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
@ -501,22 +570,32 @@ async def async_langflow_chat_stream(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership if this is a Google user
try:
from services.session_ownership_service import session_ownership_service
from services.user_binding_service import user_binding_service
# Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID)
if user_id.isdigit() and user_binding_service.has_binding(user_id):
langflow_user_id = user_binding_service.get_langflow_user_id(user_id)
langflow_user_id = user_binding_service.get_langflow_user_id(
user_id
)
if langflow_user_id:
session_ownership_service.claim_session(user_id, response_id, langflow_user_id)
print(f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)")
session_ownership_service.claim_session(
user_id, response_id, langflow_user_id
)
print(
f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)"
)
except Exception as e:
print(f"[WARNING] Failed to claim session ownership (streaming): {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)

43
src/api/nudges.py Normal file
View file

@ -0,0 +1,43 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def nudges_from_kb_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)
async def nudges_from_chat_id_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
chat_id = request.path_params["chat_id"]
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
previous_response_id=chat_id,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)

View file

@ -24,6 +24,7 @@ LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860")
# Optional: public URL for browser links (e.g., http://localhost:7860)
LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL")
FLOW_ID = os.getenv("FLOW_ID")
NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID")
# Langflow superuser credentials for API key generation
LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER")
LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD")
@ -37,7 +38,12 @@ GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)"""
result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET)
logger.debug("Checking auth mode", no_auth_mode=result, has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None)
logger.debug(
"Checking auth mode",
no_auth_mode=result,
has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None,
has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None,
)
return result
@ -100,7 +106,9 @@ async def generate_langflow_api_key():
return LANGFLOW_KEY
if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD:
logger.warning("LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation")
logger.warning(
"LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation"
)
return None
try:
@ -142,11 +150,19 @@ async def generate_langflow_api_key():
raise KeyError("api_key")
LANGFLOW_KEY = api_key
logger.info("Successfully generated Langflow API key", api_key_preview=api_key[:8])
logger.info(
"Successfully generated Langflow API key",
api_key_preview=api_key[:8],
)
return api_key
except (requests.exceptions.RequestException, KeyError) as e:
last_error = e
logger.warning("Attempt to generate Langflow API key failed", attempt=attempt, max_attempts=max_attempts, error=str(e))
logger.warning(
"Attempt to generate Langflow API key failed",
attempt=attempt,
max_attempts=max_attempts,
error=str(e),
)
if attempt < max_attempts:
time.sleep(delay_seconds)
else:
@ -196,7 +212,9 @@ class AppClients:
logger.warning("Failed to initialize Langflow client", error=str(e))
self.langflow_client = None
if self.langflow_client is None:
logger.warning("No Langflow client initialized yet, will attempt later on first use")
logger.warning(
"No Langflow client initialized yet, will attempt later on first use"
)
# Initialize patched OpenAI client
self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI())
@ -219,7 +237,9 @@ class AppClients:
)
logger.info("Langflow client initialized on-demand")
except Exception as e:
logger.error("Failed to initialize Langflow client on-demand", error=str(e))
logger.error(
"Failed to initialize Langflow client on-demand", error=str(e)
)
self.langflow_client = None
return self.langflow_client

View file

@ -3,11 +3,13 @@ import sys
# Check for TUI flag FIRST, before any heavy imports
if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui":
from tui.main import run_tui
run_tui()
sys.exit(0)
# Configure structured logging early
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
logger = get_logger(__name__)
@ -48,6 +50,7 @@ from auth_middleware import require_auth, optional_auth
# API endpoints
from api import (
nudges,
upload,
search,
chat,
@ -59,7 +62,11 @@ from api import (
settings,
)
logger.info("CUDA device information", cuda_available=torch.cuda.is_available(), cuda_version=torch.version.cuda)
logger.info(
"CUDA device information",
cuda_available=torch.cuda.is_available(),
cuda_version=torch.version.cuda,
)
async def wait_for_opensearch():
@ -73,7 +80,12 @@ async def wait_for_opensearch():
logger.info("OpenSearch is ready")
return
except Exception as e:
logger.warning("OpenSearch not ready yet", attempt=attempt + 1, max_retries=max_retries, error=str(e))
logger.warning(
"OpenSearch not ready yet",
attempt=attempt + 1,
max_retries=max_retries,
error=str(e),
)
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
@ -95,7 +107,9 @@ async def configure_alerting_security():
# Use admin client (clients.opensearch uses admin credentials)
response = await clients.opensearch.cluster.put_settings(body=alerting_settings)
logger.info("Alerting security settings configured successfully", response=response)
logger.info(
"Alerting security settings configured successfully", response=response
)
except Exception as e:
logger.warning("Failed to configure alerting security settings", error=str(e))
# Don't fail startup if alerting config fails
@ -135,9 +149,14 @@ async def init_index():
await clients.opensearch.indices.create(
index=knowledge_filter_index_name, body=knowledge_filter_index_body
)
logger.info("Created knowledge filters index", index_name=knowledge_filter_index_name)
logger.info(
"Created knowledge filters index", index_name=knowledge_filter_index_name
)
else:
logger.info("Knowledge filters index already exists, skipping creation", index_name=knowledge_filter_index_name)
logger.info(
"Knowledge filters index already exists, skipping creation",
index_name=knowledge_filter_index_name,
)
# Configure alerting plugin security settings
await configure_alerting_security()
@ -192,7 +211,9 @@ async def init_index_when_ready():
logger.info("OpenSearch index initialization completed successfully")
except Exception as e:
logger.error("OpenSearch index initialization failed", error=str(e))
logger.warning("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready")
logger.warning(
"OIDC endpoints will still work, but document operations may fail until OpenSearch is ready"
)
async def initialize_services():
@ -239,9 +260,14 @@ async def initialize_services():
try:
await connector_service.initialize()
loaded_count = len(connector_service.connection_manager.connections)
logger.info("Loaded persisted connector connections on startup", loaded_count=loaded_count)
logger.info(
"Loaded persisted connector connections on startup",
loaded_count=loaded_count,
)
except Exception as e:
logger.warning("Failed to load persisted connections on startup", error=str(e))
logger.warning(
"Failed to load persisted connections on startup", error=str(e)
)
else:
logger.info("Skipping connector loading in no-auth mode")
@ -626,6 +652,28 @@ async def create_app():
),
methods=["GET"],
),
Route(
"/nudges",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_kb_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route(
"/nudges/{chat_id}",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_chat_id_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
]
app = Starlette(debug=True, routes=routes)
@ -678,18 +726,30 @@ async def cleanup_subscriptions_proper(services):
for connection in active_connections:
try:
logger.info("Cancelling subscription for connection", connection_id=connection.connection_id)
logger.info(
"Cancelling subscription for connection",
connection_id=connection.connection_id,
)
connector = await connector_service.get_connector(
connection.connection_id
)
if connector:
subscription_id = connection.config.get("webhook_channel_id")
await connector.cleanup_subscription(subscription_id)
logger.info("Cancelled subscription", subscription_id=subscription_id)
logger.info(
"Cancelled subscription", subscription_id=subscription_id
)
except Exception as e:
logger.error("Failed to cancel subscription", connection_id=connection.connection_id, error=str(e))
logger.error(
"Failed to cancel subscription",
connection_id=connection.connection_id,
error=str(e),
)
logger.info("Finished cancelling subscriptions", subscription_count=len(active_connections))
logger.info(
"Finished cancelling subscriptions",
subscription_count=len(active_connections),
)
except Exception as e:
logger.error("Failed to cleanup subscriptions", error=str(e))

View file

@ -1,5 +1,9 @@
from config.settings import clients, LANGFLOW_URL, FLOW_ID
from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID
from agent import (
async_chat,
async_langflow,
async_chat_stream,
)
from auth_context import set_auth_context
import json
from utils.logging_config import get_logger
@ -111,7 +115,10 @@ class ChatService:
# Pass the complete filter expression as a single header to Langflow (only if we have something to send)
if filter_expression:
logger.info("Sending OpenRAG query filter to Langflow", filter_expression=filter_expression)
logger.info(
"Sending OpenRAG query filter to Langflow",
filter_expression=filter_expression,
)
extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps(
filter_expression
)
@ -150,6 +157,62 @@ class ChatService:
response_data["response_id"] = response_id
return response_data
async def langflow_nudges_chat(
self,
user_id: str = None,
jwt_token: str = None,
previous_response_id: str = None,
):
"""Handle Langflow chat requests"""
if not LANGFLOW_URL or not NUDGES_FLOW_ID:
raise ValueError(
"LANGFLOW_URL and NUDGES_FLOW_ID environment variables are required"
)
# Prepare extra headers for JWT authentication
extra_headers = {}
if jwt_token:
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
# Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client()
if not langflow_client:
raise ValueError(
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
prompt = ""
if previous_response_id:
from agent import get_conversation_thread
conversation_history = get_conversation_thread(
user_id, previous_response_id
)
if conversation_history:
conversation_history = "\n".join(
[
f"{msg['role']}: {msg['content']}"
for msg in conversation_history["messages"]
if msg["role"] in ["user", "assistant"]
]
)
prompt = f"{conversation_history}"
from agent import async_langflow_chat
response_text, response_id = await async_langflow_chat(
langflow_client,
NUDGES_FLOW_ID,
prompt,
user_id,
extra_headers=extra_headers,
store_conversation=False,
)
response_data = {"response": response_text}
if response_id:
response_data["response_id"] = response_id
return response_data
async def upload_context_chat(
self,
document_content: str,
@ -201,7 +264,11 @@ class ChatService:
return {"error": "User ID is required", "conversations": []}
conversations_dict = get_user_conversations(user_id)
logger.debug("Getting chat history for user", user_id=user_id, conversation_count=len(conversations_dict))
logger.debug(
"Getting chat history for user",
user_id=user_id,
conversation_count=len(conversations_dict),
)
# Convert conversations dict to list format with metadata
conversations = []
@ -270,16 +337,16 @@ class ChatService:
from agent import get_user_conversations
from services.langflow_history_service import langflow_history_service
from services.user_binding_service import user_binding_service
if not user_id:
return {"error": "User ID is required", "conversations": []}
all_conversations = []
try:
# 1. Get in-memory OpenRAG conversations (current session)
conversations_dict = get_user_conversations(user_id)
for response_id, conversation_state in conversations_dict.items():
# Filter out system messages
messages = []
@ -295,7 +362,7 @@ class ChatService:
if msg.get("response_id"):
message_data["response_id"] = msg["response_id"]
messages.append(message_data)
if messages: # Only include conversations with actual messages
# Generate title from first user message
first_user_msg = next(
@ -308,43 +375,59 @@ class ChatService:
if first_user_msg
else "New chat"
)
all_conversations.append({
"response_id": response_id,
"title": title,
"endpoint": "langflow",
"messages": messages,
"created_at": conversation_state.get("created_at").isoformat()
if conversation_state.get("created_at")
else None,
"last_activity": conversation_state.get("last_activity").isoformat()
if conversation_state.get("last_activity")
else None,
"previous_response_id": conversation_state.get("previous_response_id"),
"total_messages": len(messages),
"source": "openrag_memory"
})
# 2. Get historical conversations from Langflow database
all_conversations.append(
{
"response_id": response_id,
"title": title,
"endpoint": "langflow",
"messages": messages,
"created_at": conversation_state.get(
"created_at"
).isoformat()
if conversation_state.get("created_at")
else None,
"last_activity": conversation_state.get(
"last_activity"
).isoformat()
if conversation_state.get("last_activity")
else None,
"previous_response_id": conversation_state.get(
"previous_response_id"
),
"total_messages": len(messages),
"source": "openrag_memory",
}
)
# 2. Get historical conversations from Langflow database
# (works with both Google-bound users and direct Langflow users)
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
langflow_history = (
await langflow_history_service.get_user_conversation_history(
user_id, flow_id=FLOW_ID
)
)
if langflow_history.get("conversations"):
for conversation in langflow_history["conversations"]:
# Convert Langflow format to OpenRAG format
messages = []
for msg in conversation.get("messages", []):
messages.append({
"role": msg["role"],
"content": msg["content"],
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow"
})
messages.append(
{
"role": msg["role"],
"content": msg["content"],
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow",
}
)
if messages:
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None
)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
@ -352,33 +435,39 @@ class ChatService:
if first_user_msg
else "Langflow chat"
)
all_conversations.append({
"response_id": conversation["session_id"],
"title": title,
"endpoint": "langflow",
"messages": messages,
"created_at": conversation.get("created_at"),
"last_activity": conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_database",
"langflow_session_id": conversation["session_id"],
"langflow_flow_id": conversation.get("flow_id")
})
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
all_conversations.append(
{
"response_id": conversation["session_id"],
"title": title,
"endpoint": "langflow",
"messages": messages,
"created_at": conversation.get("created_at"),
"last_activity": conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_database",
"langflow_session_id": conversation["session_id"],
"langflow_flow_id": conversation.get("flow_id"),
}
)
print(
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow"
)
elif langflow_history.get("error"):
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
print(
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
)
else:
print(f"[DEBUG] No Langflow conversations found for user {user_id}")
except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Deduplicate conversations by response_id (in-memory takes priority over database)
deduplicated_conversations = {}
for conversation in all_conversations:
response_id = conversation.get("response_id")
if response_id:
@ -390,37 +479,52 @@ class ChatService:
existing = deduplicated_conversations[response_id]
current_source = conversation.get("source")
existing_source = existing.get("source")
if current_source == "openrag_memory" and existing_source == "langflow_database":
if (
current_source == "openrag_memory"
and existing_source == "langflow_database"
):
# Replace database version with in-memory version
deduplicated_conversations[response_id] = conversation
print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version")
print(
f"[DEBUG] Replaced database conversation {response_id} with in-memory version"
)
# Otherwise keep existing (in-memory has priority, or first database entry)
else:
# No response_id - add with unique key based on content and timestamp
unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}"
if unique_key not in deduplicated_conversations:
deduplicated_conversations[unique_key] = conversation
final_conversations = list(deduplicated_conversations.values())
# Sort by last activity (most recent first)
final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
# Calculate source statistics after deduplication
sources = {
"memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]),
"langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]),
"duplicates_removed": len(all_conversations) - len(final_conversations)
"memory": len(
[c for c in final_conversations if c.get("source") == "openrag_memory"]
),
"langflow_db": len(
[
c
for c in final_conversations
if c.get("source") == "langflow_database"
]
),
"duplicates_removed": len(all_conversations) - len(final_conversations),
}
if sources["duplicates_removed"] > 0:
print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations")
print(
f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations"
)
return {
"user_id": user_id,
"endpoint": "langflow",
"conversations": final_conversations,
"total_conversations": len(final_conversations),
"sources": sources
"sources": sources,
}

View file

@ -13,13 +13,14 @@ from ..utils.validation import (
validate_non_empty,
validate_url,
validate_documents_paths,
sanitize_env_value
sanitize_env_value,
)
@dataclass
class EnvConfig:
"""Environment configuration data."""
# Core settings
openai_api_key: str = ""
opensearch_password: str = ""
@ -27,155 +28,187 @@ class EnvConfig:
langflow_superuser: str = "admin"
langflow_superuser_password: str = ""
flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
# OAuth settings
google_oauth_client_id: str = ""
google_oauth_client_secret: str = ""
microsoft_graph_oauth_client_id: str = ""
microsoft_graph_oauth_client_secret: str = ""
# Optional settings
webhook_base_url: str = ""
aws_access_key_id: str = ""
aws_secret_access_key: str = ""
langflow_public_url: str = ""
# Langflow auth settings
langflow_auto_login: str = "False"
langflow_new_user_is_active: str = "False"
langflow_enable_superuser_cli: str = "False"
# Document paths (comma-separated)
openrag_documents_paths: str = "./documents"
# Validation errors
validation_errors: Dict[str, str] = field(default_factory=dict)
class EnvManager:
"""Manages environment configuration for OpenRAG."""
def __init__(self, env_file: Optional[Path] = None):
self.env_file = env_file or Path(".env")
self.config = EnvConfig()
def generate_secure_password(self) -> str:
"""Generate a secure password for OpenSearch."""
# Generate a 16-character password with letters, digits, and symbols
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(16))
return "".join(secrets.choice(alphabet) for _ in range(16))
def generate_langflow_secret_key(self) -> str:
"""Generate a secure secret key for Langflow."""
return secrets.token_urlsafe(32)
def load_existing_env(self) -> bool:
"""Load existing .env file if it exists."""
if not self.env_file.exists():
return False
try:
with open(self.env_file, 'r') as f:
with open(self.env_file, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
if not line or line.startswith("#"):
continue
if '=' in line:
key, value = line.split('=', 1)
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = sanitize_env_value(value)
# Map env vars to config attributes
attr_map = {
'OPENAI_API_KEY': 'openai_api_key',
'OPENSEARCH_PASSWORD': 'opensearch_password',
'LANGFLOW_SECRET_KEY': 'langflow_secret_key',
'LANGFLOW_SUPERUSER': 'langflow_superuser',
'LANGFLOW_SUPERUSER_PASSWORD': 'langflow_superuser_password',
'FLOW_ID': 'flow_id',
'GOOGLE_OAUTH_CLIENT_ID': 'google_oauth_client_id',
'GOOGLE_OAUTH_CLIENT_SECRET': 'google_oauth_client_secret',
'MICROSOFT_GRAPH_OAUTH_CLIENT_ID': 'microsoft_graph_oauth_client_id',
'MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET': 'microsoft_graph_oauth_client_secret',
'WEBHOOK_BASE_URL': 'webhook_base_url',
'AWS_ACCESS_KEY_ID': 'aws_access_key_id',
'AWS_SECRET_ACCESS_KEY': 'aws_secret_access_key',
'LANGFLOW_PUBLIC_URL': 'langflow_public_url',
'OPENRAG_DOCUMENTS_PATHS': 'openrag_documents_paths',
'LANGFLOW_AUTO_LOGIN': 'langflow_auto_login',
'LANGFLOW_NEW_USER_IS_ACTIVE': 'langflow_new_user_is_active',
'LANGFLOW_ENABLE_SUPERUSER_CLI': 'langflow_enable_superuser_cli',
"OPENAI_API_KEY": "openai_api_key",
"OPENSEARCH_PASSWORD": "opensearch_password",
"LANGFLOW_SECRET_KEY": "langflow_secret_key",
"LANGFLOW_SUPERUSER": "langflow_superuser",
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"FLOW_ID": "flow_id",
"NUDGES_FLOW_ID": "nudges_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
"MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id",
"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET": "microsoft_graph_oauth_client_secret",
"WEBHOOK_BASE_URL": "webhook_base_url",
"AWS_ACCESS_KEY_ID": "aws_access_key_id",
"AWS_SECRET_ACCESS_KEY": "aws_secret_access_key",
"LANGFLOW_PUBLIC_URL": "langflow_public_url",
"OPENRAG_DOCUMENTS_PATHS": "openrag_documents_paths",
"LANGFLOW_AUTO_LOGIN": "langflow_auto_login",
"LANGFLOW_NEW_USER_IS_ACTIVE": "langflow_new_user_is_active",
"LANGFLOW_ENABLE_SUPERUSER_CLI": "langflow_enable_superuser_cli",
}
if key in attr_map:
setattr(self.config, attr_map[key], value)
return True
except Exception as e:
print(f"Error loading .env file: {e}")
return False
def setup_secure_defaults(self) -> None:
"""Set up secure default values for passwords and keys."""
if not self.config.opensearch_password:
self.config.opensearch_password = self.generate_secure_password()
if not self.config.langflow_secret_key:
self.config.langflow_secret_key = self.generate_langflow_secret_key()
if not self.config.langflow_superuser_password:
self.config.langflow_superuser_password = self.generate_secure_password()
def validate_config(self, mode: str = "full") -> bool:
"""
Validate the current configuration.
Args:
mode: "no_auth" for minimal validation, "full" for complete validation
"""
self.config.validation_errors.clear()
# Always validate OpenAI API key
if not validate_openai_api_key(self.config.openai_api_key):
self.config.validation_errors['openai_api_key'] = "Invalid OpenAI API key format (should start with sk-)"
self.config.validation_errors["openai_api_key"] = (
"Invalid OpenAI API key format (should start with sk-)"
)
# Validate documents paths only if provided (optional)
if self.config.openrag_documents_paths:
is_valid, error_msg, _ = validate_documents_paths(self.config.openrag_documents_paths)
is_valid, error_msg, _ = validate_documents_paths(
self.config.openrag_documents_paths
)
if not is_valid:
self.config.validation_errors['openrag_documents_paths'] = error_msg
self.config.validation_errors["openrag_documents_paths"] = error_msg
# Validate required fields
if not validate_non_empty(self.config.opensearch_password):
self.config.validation_errors['opensearch_password'] = "OpenSearch password is required"
self.config.validation_errors["opensearch_password"] = (
"OpenSearch password is required"
)
# Langflow secret key is auto-generated; no user input required
if not validate_non_empty(self.config.langflow_superuser_password):
self.config.validation_errors['langflow_superuser_password'] = "Langflow superuser password is required"
self.config.validation_errors["langflow_superuser_password"] = (
"Langflow superuser password is required"
)
if mode == "full":
# Validate OAuth settings if provided
if self.config.google_oauth_client_id and not validate_google_oauth_client_id(self.config.google_oauth_client_id):
self.config.validation_errors['google_oauth_client_id'] = "Invalid Google OAuth client ID format"
if self.config.google_oauth_client_id and not validate_non_empty(self.config.google_oauth_client_secret):
self.config.validation_errors['google_oauth_client_secret'] = "Google OAuth client secret required when client ID is provided"
if self.config.microsoft_graph_oauth_client_id and not validate_non_empty(self.config.microsoft_graph_oauth_client_secret):
self.config.validation_errors['microsoft_graph_oauth_client_secret'] = "Microsoft Graph client secret required when client ID is provided"
if (
self.config.google_oauth_client_id
and not validate_google_oauth_client_id(
self.config.google_oauth_client_id
)
):
self.config.validation_errors["google_oauth_client_id"] = (
"Invalid Google OAuth client ID format"
)
if self.config.google_oauth_client_id and not validate_non_empty(
self.config.google_oauth_client_secret
):
self.config.validation_errors["google_oauth_client_secret"] = (
"Google OAuth client secret required when client ID is provided"
)
if self.config.microsoft_graph_oauth_client_id and not validate_non_empty(
self.config.microsoft_graph_oauth_client_secret
):
self.config.validation_errors["microsoft_graph_oauth_client_secret"] = (
"Microsoft Graph client secret required when client ID is provided"
)
# Validate optional URLs if provided
if self.config.webhook_base_url and not validate_url(self.config.webhook_base_url):
self.config.validation_errors['webhook_base_url'] = "Invalid webhook URL format"
if self.config.langflow_public_url and not validate_url(self.config.langflow_public_url):
self.config.validation_errors['langflow_public_url'] = "Invalid Langflow public URL format"
if self.config.webhook_base_url and not validate_url(
self.config.webhook_base_url
):
self.config.validation_errors["webhook_base_url"] = (
"Invalid webhook URL format"
)
if self.config.langflow_public_url and not validate_url(
self.config.langflow_public_url
):
self.config.validation_errors["langflow_public_url"] = (
"Invalid Langflow public URL format"
)
return len(self.config.validation_errors) == 0
def save_env_file(self) -> bool:
"""Save current configuration to .env file."""
try:
@ -183,44 +216,67 @@ class EnvManager:
self.setup_secure_defaults()
# Create backup if file exists
if self.env_file.exists():
backup_file = self.env_file.with_suffix('.env.backup')
backup_file = self.env_file.with_suffix(".env.backup")
self.env_file.rename(backup_file)
with open(self.env_file, 'w') as f:
with open(self.env_file, "w") as f:
f.write("# OpenRAG Environment Configuration\n")
f.write("# Generated by OpenRAG TUI\n\n")
# Core settings
f.write("# Core settings\n")
f.write(f"LANGFLOW_SECRET_KEY={self.config.langflow_secret_key}\n")
f.write(f"LANGFLOW_SUPERUSER={self.config.langflow_superuser}\n")
f.write(f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n")
f.write(
f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n"
)
f.write(f"FLOW_ID={self.config.flow_id}\n")
f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")
f.write(f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n")
f.write(
f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n"
)
f.write("\n")
# Langflow auth settings
f.write("# Langflow auth settings\n")
f.write(f"LANGFLOW_AUTO_LOGIN={self.config.langflow_auto_login}\n")
f.write(f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n")
f.write(f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n")
f.write(
f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n"
)
f.write(
f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n"
)
f.write("\n")
# OAuth settings
if self.config.google_oauth_client_id or self.config.google_oauth_client_secret:
if (
self.config.google_oauth_client_id
or self.config.google_oauth_client_secret
):
f.write("# Google OAuth settings\n")
f.write(f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n")
f.write(f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n")
f.write(
f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n"
)
f.write(
f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n"
)
f.write("\n")
if self.config.microsoft_graph_oauth_client_id or self.config.microsoft_graph_oauth_client_secret:
if (
self.config.microsoft_graph_oauth_client_id
or self.config.microsoft_graph_oauth_client_secret
):
f.write("# Microsoft Graph OAuth settings\n")
f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n")
f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n")
f.write(
f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n"
)
f.write(
f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n"
)
f.write("\n")
# Optional settings
optional_vars = [
("WEBHOOK_BASE_URL", self.config.webhook_base_url),
@ -228,7 +284,7 @@ class EnvManager:
("AWS_SECRET_ACCESS_KEY", self.config.aws_secret_access_key),
("LANGFLOW_PUBLIC_URL", self.config.langflow_public_url),
]
optional_written = False
for var_name, var_value in optional_vars:
if var_value:
@ -236,52 +292,89 @@ class EnvManager:
f.write("# Optional settings\n")
optional_written = True
f.write(f"{var_name}={var_value}\n")
if optional_written:
f.write("\n")
return True
except Exception as e:
print(f"Error saving .env file: {e}")
return False
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:
"""Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate)."""
return [
("openai_api_key", "OpenAI API Key", "sk-...", False),
("opensearch_password", "OpenSearch Password", "Will be auto-generated if empty", True),
("langflow_superuser_password", "Langflow Superuser Password", "Will be auto-generated if empty", True),
("openrag_documents_paths", "Documents Paths", "./documents,/path/to/more/docs", False),
(
"opensearch_password",
"OpenSearch Password",
"Will be auto-generated if empty",
True,
),
(
"langflow_superuser_password",
"Langflow Superuser Password",
"Will be auto-generated if empty",
True,
),
(
"openrag_documents_paths",
"Documents Paths",
"./documents,/path/to/more/docs",
False,
),
]
def get_full_setup_fields(self) -> List[tuple[str, str, str, bool]]:
"""Get all fields for full setup mode."""
base_fields = self.get_no_auth_setup_fields()
oauth_fields = [
("google_oauth_client_id", "Google OAuth Client ID", "xxx.apps.googleusercontent.com", False),
(
"google_oauth_client_id",
"Google OAuth Client ID",
"xxx.apps.googleusercontent.com",
False,
),
("google_oauth_client_secret", "Google OAuth Client Secret", "", False),
("microsoft_graph_oauth_client_id", "Microsoft Graph Client ID", "", False),
("microsoft_graph_oauth_client_secret", "Microsoft Graph Client Secret", "", False),
(
"microsoft_graph_oauth_client_secret",
"Microsoft Graph Client Secret",
"",
False,
),
]
optional_fields = [
("webhook_base_url", "Webhook Base URL (optional)", "https://your-domain.com", False),
(
"webhook_base_url",
"Webhook Base URL (optional)",
"https://your-domain.com",
False,
),
("aws_access_key_id", "AWS Access Key ID (optional)", "", False),
("aws_secret_access_key", "AWS Secret Access Key (optional)", "", False),
("langflow_public_url", "Langflow Public URL (optional)", "http://localhost:7860", False),
(
"langflow_public_url",
"Langflow Public URL (optional)",
"http://localhost:7860",
False,
),
]
return base_fields + oauth_fields + optional_fields
def generate_compose_volume_mounts(self) -> List[str]:
"""Generate Docker Compose volume mount strings from documents paths."""
is_valid, _, validated_paths = validate_documents_paths(self.config.openrag_documents_paths)
is_valid, _, validated_paths = validate_documents_paths(
self.config.openrag_documents_paths
)
if not is_valid:
return ["./documents:/app/documents:Z"] # fallback
volume_mounts = []
for i, path in enumerate(validated_paths):
if i == 0:
@ -289,6 +382,6 @@ class EnvManager:
volume_mounts.append(f"{path}:/app/documents:Z")
else:
# Additional paths map to numbered directories
volume_mounts.append(f"{path}:/app/documents{i+1}:Z")
volume_mounts.append(f"{path}:/app/documents{i + 1}:Z")
return volume_mounts