From aad89b91667a8ae9c15e83f916ccb8246600cdad Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:15:46 -0300 Subject: [PATCH 01/21] Add nudges service into backend --- src/agent.py | 163 ++++++++++++----- src/api/nudges.py | 43 +++++ src/config/settings.py | 32 +++- src/main.py | 84 +++++++-- src/services/chat_service.py | 244 ++++++++++++++++++------- src/tui/managers/env_manager.py | 313 +++++++++++++++++++++----------- 6 files changed, 639 insertions(+), 240 deletions(-) create mode 100644 src/api/nudges.py diff --git a/src/agent.py b/src/agent.py index 07fd911e..778607bc 100644 --- a/src/agent.py +++ b/src/agent.py @@ -95,7 +95,9 @@ async def async_response_stream( chunk_count = 0 async for chunk in response: chunk_count += 1 - logger.debug("Stream chunk received", chunk_count=chunk_count, chunk=str(chunk)) + logger.debug( + "Stream chunk received", chunk_count=chunk_count, chunk=str(chunk) + ) # Yield the raw event as JSON for the UI to process import json @@ -171,6 +173,10 @@ async def async_response( if extra_headers: request_params["extra_headers"] = extra_headers + if "x-api-key" not in client.default_headers: + if hasattr(client, "api_key") and extra_headers is not None: + extra_headers["x-api-key"] = client.api_key + response = await client.responses.create(**request_params) response_text = response.output_text @@ -241,7 +247,10 @@ async def async_langflow_stream( previous_response_id=previous_response_id, log_prefix="langflow", ): - logger.debug("Yielding chunk from langflow stream", chunk_preview=chunk[:100].decode('utf-8', errors='replace')) + logger.debug( + "Yielding chunk from langflow stream", + chunk_preview=chunk[:100].decode("utf-8", errors="replace"), + ) yield chunk logger.debug("Langflow stream completed") except Exception as e: @@ -260,18 +269,24 @@ async def async_chat( model: str = "gpt-4.1-mini", previous_response_id: str = None, ): - logger.debug("async_chat called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_chat called", user_id=user_id, previous_response_id=previous_response_id + ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) - logger.debug("Got conversation state", message_count=len(conversation_state['messages'])) + logger.debug( + "Got conversation state", message_count=len(conversation_state["messages"]) + ) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) - logger.debug("Added user message", message_count=len(conversation_state['messages'])) + logger.debug( + "Added user message", message_count=len(conversation_state["messages"]) + ) response_text, response_id = await async_response( async_client, @@ -280,7 +295,9 @@ async def async_chat( previous_response_id=previous_response_id, log_prefix="agent", ) - logger.debug("Got response", response_preview=response_text[:50], response_id=response_id) + logger.debug( + "Got response", response_preview=response_text[:50], response_id=response_id + ) # Add assistant response to conversation with response_id and timestamp assistant_message = { @@ -290,17 +307,26 @@ async def async_chat( "timestamp": datetime.now(), } conversation_state["messages"].append(assistant_message) - logger.debug("Added assistant message", message_count=len(conversation_state['messages'])) + logger.debug( + "Added assistant message", message_count=len(conversation_state["messages"]) + ) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored conversation thread", user_id=user_id, response_id=response_id + ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) + logger.debug( + "User conversations updated", + user_id=user_id, + conversation_count=len(conversations), + conversation_ids=list(conversations.keys()), + ) else: logger.warning("No response_id received, conversation not stored") @@ -363,7 +389,9 @@ async def async_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored conversation thread", user_id=user_id, response_id=response_id + ) # Async langflow function with conversation storage (non-streaming) @@ -374,19 +402,32 @@ async def async_langflow_chat( user_id: str, extra_headers: dict = None, previous_response_id: str = None, + store_conversation: bool = True, ): - logger.debug("async_langflow_chat called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_langflow_chat called", + user_id=user_id, + previous_response_id=previous_response_id, + ) - # Get the specific conversation thread (or create new one) - conversation_state = get_conversation_thread(user_id, previous_response_id) - logger.debug("Got langflow conversation state", message_count=len(conversation_state['messages'])) + if store_conversation: + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) + logger.debug( + "Got langflow conversation state", + message_count=len(conversation_state["messages"]), + ) # Add user message to conversation with timestamp from datetime import datetime - user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} - conversation_state["messages"].append(user_message) - logger.debug("Added user message to langflow", message_count=len(conversation_state['messages'])) + if store_conversation: + user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} + conversation_state["messages"].append(user_message) + logger.debug( + "Added user message to langflow", + message_count=len(conversation_state["messages"]), + ) response_text, response_id = await async_response( langflow_client, @@ -396,45 +437,69 @@ async def async_langflow_chat( previous_response_id=previous_response_id, log_prefix="langflow", ) - logger.debug("Got langflow response", response_preview=response_text[:50], response_id=response_id) + logger.debug( + "Got langflow response", + response_preview=response_text[:50], + response_id=response_id, + ) - # Add assistant response to conversation with response_id and timestamp - assistant_message = { - "role": "assistant", - "content": response_text, - "response_id": response_id, - "timestamp": datetime.now(), - } - conversation_state["messages"].append(assistant_message) - logger.debug("Added assistant message to langflow", message_count=len(conversation_state['messages'])) + if store_conversation: + # Add assistant response to conversation with response_id and timestamp + assistant_message = { + "role": "assistant", + "content": response_text, + "response_id": response_id, + "timestamp": datetime.now(), + } + conversation_state["messages"].append(assistant_message) + logger.debug( + "Added assistant message to langflow", + message_count=len(conversation_state["messages"]), + ) + + if not store_conversation: + return response_text, response_id # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership if this is a Google user try: from services.session_ownership_service import session_ownership_service from services.user_binding_service import user_binding_service - + # Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID) if user_id.isdigit() and user_binding_service.has_binding(user_id): langflow_user_id = user_binding_service.get_langflow_user_id(user_id) if langflow_user_id: - session_ownership_service.claim_session(user_id, response_id, langflow_user_id) - print(f"[DEBUG] Claimed session {response_id} for Google user {user_id}") + session_ownership_service.claim_session( + user_id, response_id, langflow_user_id + ) + print( + f"[DEBUG] Claimed session {response_id} for Google user {user_id}" + ) except Exception as e: print(f"[WARNING] Failed to claim session ownership: {e}") - + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) - logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored langflow conversation thread", + user_id=user_id, + response_id=response_id, + ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) + logger.debug( + "User conversations updated", + user_id=user_id, + conversation_count=len(conversations), + conversation_ids=list(conversations.keys()), + ) else: logger.warning("No response_id received from langflow, conversation not stored") @@ -450,7 +515,11 @@ async def async_langflow_chat_stream( extra_headers: dict = None, previous_response_id: str = None, ): - logger.debug("async_langflow_chat_stream called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_langflow_chat_stream called", + user_id=user_id, + previous_response_id=previous_response_id, + ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) @@ -501,22 +570,32 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership if this is a Google user try: from services.session_ownership_service import session_ownership_service from services.user_binding_service import user_binding_service - + # Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID) if user_id.isdigit() and user_binding_service.has_binding(user_id): - langflow_user_id = user_binding_service.get_langflow_user_id(user_id) + langflow_user_id = user_binding_service.get_langflow_user_id( + user_id + ) if langflow_user_id: - session_ownership_service.claim_session(user_id, response_id, langflow_user_id) - print(f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)") + session_ownership_service.claim_session( + user_id, response_id, langflow_user_id + ) + print( + f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)" + ) except Exception as e: print(f"[WARNING] Failed to claim session ownership (streaming): {e}") - + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) - logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored langflow conversation thread", + user_id=user_id, + response_id=response_id, + ) diff --git a/src/api/nudges.py b/src/api/nudges.py new file mode 100644 index 00000000..910542da --- /dev/null +++ b/src/api/nudges.py @@ -0,0 +1,43 @@ +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def nudges_from_kb_endpoint(request: Request, chat_service, session_manager): + """Get nudges for a user""" + user = request.state.user + user_id = user.user_id + jwt_token = request.state.jwt_token + + try: + result = await chat_service.langflow_nudges_chat( + user_id, + jwt_token, + ) + return JSONResponse(result) + except Exception as e: + return JSONResponse( + {"error": f"Failed to get nudges: {str(e)}"}, status_code=500 + ) + + +async def nudges_from_chat_id_endpoint(request: Request, chat_service, session_manager): + """Get nudges for a user""" + user = request.state.user + user_id = user.user_id + chat_id = request.path_params["chat_id"] + jwt_token = request.state.jwt_token + + try: + result = await chat_service.langflow_nudges_chat( + user_id, + jwt_token, + previous_response_id=chat_id, + ) + return JSONResponse(result) + except Exception as e: + return JSONResponse( + {"error": f"Failed to get nudges: {str(e)}"}, status_code=500 + ) diff --git a/src/config/settings.py b/src/config/settings.py index c9a02e89..b3592f66 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -24,6 +24,7 @@ LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860") # Optional: public URL for browser links (e.g., http://localhost:7860) LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") FLOW_ID = os.getenv("FLOW_ID") +NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID") # Langflow superuser credentials for API key generation LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER") LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD") @@ -37,7 +38,12 @@ GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) - logger.debug("Checking auth mode", no_auth_mode=result, has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None) + logger.debug( + "Checking auth mode", + no_auth_mode=result, + has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, + has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None, + ) return result @@ -100,7 +106,9 @@ async def generate_langflow_api_key(): return LANGFLOW_KEY if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: - logger.warning("LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation") + logger.warning( + "LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation" + ) return None try: @@ -142,11 +150,19 @@ async def generate_langflow_api_key(): raise KeyError("api_key") LANGFLOW_KEY = api_key - logger.info("Successfully generated Langflow API key", api_key_preview=api_key[:8]) + logger.info( + "Successfully generated Langflow API key", + api_key_preview=api_key[:8], + ) return api_key except (requests.exceptions.RequestException, KeyError) as e: last_error = e - logger.warning("Attempt to generate Langflow API key failed", attempt=attempt, max_attempts=max_attempts, error=str(e)) + logger.warning( + "Attempt to generate Langflow API key failed", + attempt=attempt, + max_attempts=max_attempts, + error=str(e), + ) if attempt < max_attempts: time.sleep(delay_seconds) else: @@ -196,7 +212,9 @@ class AppClients: logger.warning("Failed to initialize Langflow client", error=str(e)) self.langflow_client = None if self.langflow_client is None: - logger.warning("No Langflow client initialized yet, will attempt later on first use") + logger.warning( + "No Langflow client initialized yet, will attempt later on first use" + ) # Initialize patched OpenAI client self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI()) @@ -219,7 +237,9 @@ class AppClients: ) logger.info("Langflow client initialized on-demand") except Exception as e: - logger.error("Failed to initialize Langflow client on-demand", error=str(e)) + logger.error( + "Failed to initialize Langflow client on-demand", error=str(e) + ) self.langflow_client = None return self.langflow_client diff --git a/src/main.py b/src/main.py index 7230e201..5e22dae7 100644 --- a/src/main.py +++ b/src/main.py @@ -3,11 +3,13 @@ import sys # Check for TUI flag FIRST, before any heavy imports if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui": from tui.main import run_tui + run_tui() sys.exit(0) # Configure structured logging early from utils.logging_config import configure_from_env, get_logger + configure_from_env() logger = get_logger(__name__) @@ -48,6 +50,7 @@ from auth_middleware import require_auth, optional_auth # API endpoints from api import ( + nudges, upload, search, chat, @@ -59,7 +62,11 @@ from api import ( settings, ) -logger.info("CUDA device information", cuda_available=torch.cuda.is_available(), cuda_version=torch.version.cuda) +logger.info( + "CUDA device information", + cuda_available=torch.cuda.is_available(), + cuda_version=torch.version.cuda, +) async def wait_for_opensearch(): @@ -73,7 +80,12 @@ async def wait_for_opensearch(): logger.info("OpenSearch is ready") return except Exception as e: - logger.warning("OpenSearch not ready yet", attempt=attempt + 1, max_retries=max_retries, error=str(e)) + logger.warning( + "OpenSearch not ready yet", + attempt=attempt + 1, + max_retries=max_retries, + error=str(e), + ) if attempt < max_retries - 1: await asyncio.sleep(retry_delay) else: @@ -95,7 +107,9 @@ async def configure_alerting_security(): # Use admin client (clients.opensearch uses admin credentials) response = await clients.opensearch.cluster.put_settings(body=alerting_settings) - logger.info("Alerting security settings configured successfully", response=response) + logger.info( + "Alerting security settings configured successfully", response=response + ) except Exception as e: logger.warning("Failed to configure alerting security settings", error=str(e)) # Don't fail startup if alerting config fails @@ -135,9 +149,14 @@ async def init_index(): await clients.opensearch.indices.create( index=knowledge_filter_index_name, body=knowledge_filter_index_body ) - logger.info("Created knowledge filters index", index_name=knowledge_filter_index_name) + logger.info( + "Created knowledge filters index", index_name=knowledge_filter_index_name + ) else: - logger.info("Knowledge filters index already exists, skipping creation", index_name=knowledge_filter_index_name) + logger.info( + "Knowledge filters index already exists, skipping creation", + index_name=knowledge_filter_index_name, + ) # Configure alerting plugin security settings await configure_alerting_security() @@ -192,7 +211,9 @@ async def init_index_when_ready(): logger.info("OpenSearch index initialization completed successfully") except Exception as e: logger.error("OpenSearch index initialization failed", error=str(e)) - logger.warning("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") + logger.warning( + "OIDC endpoints will still work, but document operations may fail until OpenSearch is ready" + ) async def initialize_services(): @@ -239,9 +260,14 @@ async def initialize_services(): try: await connector_service.initialize() loaded_count = len(connector_service.connection_manager.connections) - logger.info("Loaded persisted connector connections on startup", loaded_count=loaded_count) + logger.info( + "Loaded persisted connector connections on startup", + loaded_count=loaded_count, + ) except Exception as e: - logger.warning("Failed to load persisted connections on startup", error=str(e)) + logger.warning( + "Failed to load persisted connections on startup", error=str(e) + ) else: logger.info("Skipping connector loading in no-auth mode") @@ -626,6 +652,28 @@ async def create_app(): ), methods=["GET"], ), + Route( + "/nudges", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_kb_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/nudges/{chat_id}", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_chat_id_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), ] app = Starlette(debug=True, routes=routes) @@ -678,18 +726,30 @@ async def cleanup_subscriptions_proper(services): for connection in active_connections: try: - logger.info("Cancelling subscription for connection", connection_id=connection.connection_id) + logger.info( + "Cancelling subscription for connection", + connection_id=connection.connection_id, + ) connector = await connector_service.get_connector( connection.connection_id ) if connector: subscription_id = connection.config.get("webhook_channel_id") await connector.cleanup_subscription(subscription_id) - logger.info("Cancelled subscription", subscription_id=subscription_id) + logger.info( + "Cancelled subscription", subscription_id=subscription_id + ) except Exception as e: - logger.error("Failed to cancel subscription", connection_id=connection.connection_id, error=str(e)) + logger.error( + "Failed to cancel subscription", + connection_id=connection.connection_id, + error=str(e), + ) - logger.info("Finished cancelling subscriptions", subscription_count=len(active_connections)) + logger.info( + "Finished cancelling subscriptions", + subscription_count=len(active_connections), + ) except Exception as e: logger.error("Failed to cleanup subscriptions", error=str(e)) diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 93fddcc8..bd2d5b90 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,5 +1,9 @@ -from config.settings import clients, LANGFLOW_URL, FLOW_ID -from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream +from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID +from agent import ( + async_chat, + async_langflow, + async_chat_stream, +) from auth_context import set_auth_context import json from utils.logging_config import get_logger @@ -111,7 +115,10 @@ class ChatService: # Pass the complete filter expression as a single header to Langflow (only if we have something to send) if filter_expression: - logger.info("Sending OpenRAG query filter to Langflow", filter_expression=filter_expression) + logger.info( + "Sending OpenRAG query filter to Langflow", + filter_expression=filter_expression, + ) extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps( filter_expression ) @@ -150,6 +157,62 @@ class ChatService: response_data["response_id"] = response_id return response_data + async def langflow_nudges_chat( + self, + user_id: str = None, + jwt_token: str = None, + previous_response_id: str = None, + ): + """Handle Langflow chat requests""" + + if not LANGFLOW_URL or not NUDGES_FLOW_ID: + raise ValueError( + "LANGFLOW_URL and NUDGES_FLOW_ID environment variables are required" + ) + + # Prepare extra headers for JWT authentication + extra_headers = {} + if jwt_token: + extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token + + # Ensure the Langflow client exists; try lazy init if needed + langflow_client = await clients.ensure_langflow_client() + if not langflow_client: + raise ValueError( + "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." + ) + prompt = "" + if previous_response_id: + from agent import get_conversation_thread + + conversation_history = get_conversation_thread( + user_id, previous_response_id + ) + if conversation_history: + conversation_history = "\n".join( + [ + f"{msg['role']}: {msg['content']}" + for msg in conversation_history["messages"] + if msg["role"] in ["user", "assistant"] + ] + ) + prompt = f"{conversation_history}" + + from agent import async_langflow_chat + + response_text, response_id = await async_langflow_chat( + langflow_client, + NUDGES_FLOW_ID, + prompt, + user_id, + extra_headers=extra_headers, + store_conversation=False, + ) + response_data = {"response": response_text} + if response_id: + response_data["response_id"] = response_id + return response_data + async def upload_context_chat( self, document_content: str, @@ -201,7 +264,11 @@ class ChatService: return {"error": "User ID is required", "conversations": []} conversations_dict = get_user_conversations(user_id) - logger.debug("Getting chat history for user", user_id=user_id, conversation_count=len(conversations_dict)) + logger.debug( + "Getting chat history for user", + user_id=user_id, + conversation_count=len(conversations_dict), + ) # Convert conversations dict to list format with metadata conversations = [] @@ -270,16 +337,16 @@ class ChatService: from agent import get_user_conversations from services.langflow_history_service import langflow_history_service from services.user_binding_service import user_binding_service - + if not user_id: return {"error": "User ID is required", "conversations": []} - + all_conversations = [] - + try: # 1. Get in-memory OpenRAG conversations (current session) conversations_dict = get_user_conversations(user_id) - + for response_id, conversation_state in conversations_dict.items(): # Filter out system messages messages = [] @@ -295,7 +362,7 @@ class ChatService: if msg.get("response_id"): message_data["response_id"] = msg["response_id"] messages.append(message_data) - + if messages: # Only include conversations with actual messages # Generate title from first user message first_user_msg = next( @@ -308,43 +375,59 @@ class ChatService: if first_user_msg else "New chat" ) - - all_conversations.append({ - "response_id": response_id, - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() - if conversation_state.get("created_at") - else None, - "last_activity": conversation_state.get("last_activity").isoformat() - if conversation_state.get("last_activity") - else None, - "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len(messages), - "source": "openrag_memory" - }) - - # 2. Get historical conversations from Langflow database + + all_conversations.append( + { + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation_state.get( + "created_at" + ).isoformat() + if conversation_state.get("created_at") + else None, + "last_activity": conversation_state.get( + "last_activity" + ).isoformat() + if conversation_state.get("last_activity") + else None, + "previous_response_id": conversation_state.get( + "previous_response_id" + ), + "total_messages": len(messages), + "source": "openrag_memory", + } + ) + + # 2. Get historical conversations from Langflow database # (works with both Google-bound users and direct Langflow users) print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") - langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID) - + langflow_history = ( + await langflow_history_service.get_user_conversation_history( + user_id, flow_id=FLOW_ID + ) + ) + if langflow_history.get("conversations"): for conversation in langflow_history["conversations"]: # Convert Langflow format to OpenRAG format messages = [] for msg in conversation.get("messages", []): - messages.append({ - "role": msg["role"], - "content": msg["content"], - "timestamp": msg.get("timestamp"), - "langflow_message_id": msg.get("langflow_message_id"), - "source": "langflow" - }) - + messages.append( + { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("langflow_message_id"), + "source": "langflow", + } + ) + if messages: - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) title = ( first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 @@ -352,33 +435,39 @@ class ChatService: if first_user_msg else "Langflow chat" ) - - all_conversations.append({ - "response_id": conversation["session_id"], - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation.get("created_at"), - "last_activity": conversation.get("last_activity"), - "total_messages": len(messages), - "source": "langflow_database", - "langflow_session_id": conversation["session_id"], - "langflow_flow_id": conversation.get("flow_id") - }) - - print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + + all_conversations.append( + { + "response_id": conversation["session_id"], + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation.get("created_at"), + "last_activity": conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_database", + "langflow_session_id": conversation["session_id"], + "langflow_flow_id": conversation.get("flow_id"), + } + ) + + print( + f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + ) elif langflow_history.get("error"): - print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + print( + f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + ) else: print(f"[DEBUG] No Langflow conversations found for user {user_id}") - + except Exception as e: print(f"[ERROR] Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations - + # Deduplicate conversations by response_id (in-memory takes priority over database) deduplicated_conversations = {} - + for conversation in all_conversations: response_id = conversation.get("response_id") if response_id: @@ -390,37 +479,52 @@ class ChatService: existing = deduplicated_conversations[response_id] current_source = conversation.get("source") existing_source = existing.get("source") - - if current_source == "openrag_memory" and existing_source == "langflow_database": + + if ( + current_source == "openrag_memory" + and existing_source == "langflow_database" + ): # Replace database version with in-memory version deduplicated_conversations[response_id] = conversation - print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version") + print( + f"[DEBUG] Replaced database conversation {response_id} with in-memory version" + ) # Otherwise keep existing (in-memory has priority, or first database entry) else: # No response_id - add with unique key based on content and timestamp unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}" if unique_key not in deduplicated_conversations: deduplicated_conversations[unique_key] = conversation - + final_conversations = list(deduplicated_conversations.values()) - + # Sort by last activity (most recent first) final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - + # Calculate source statistics after deduplication sources = { - "memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]), - "langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]), - "duplicates_removed": len(all_conversations) - len(final_conversations) + "memory": len( + [c for c in final_conversations if c.get("source") == "openrag_memory"] + ), + "langflow_db": len( + [ + c + for c in final_conversations + if c.get("source") == "langflow_database" + ] + ), + "duplicates_removed": len(all_conversations) - len(final_conversations), } - + if sources["duplicates_removed"] > 0: - print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations") - + print( + f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations" + ) + return { "user_id": user_id, "endpoint": "langflow", "conversations": final_conversations, "total_conversations": len(final_conversations), - "sources": sources + "sources": sources, } diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index 61ec2f07..b163fd78 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -13,13 +13,14 @@ from ..utils.validation import ( validate_non_empty, validate_url, validate_documents_paths, - sanitize_env_value + sanitize_env_value, ) @dataclass class EnvConfig: """Environment configuration data.""" + # Core settings openai_api_key: str = "" opensearch_password: str = "" @@ -27,155 +28,187 @@ class EnvConfig: langflow_superuser: str = "admin" langflow_superuser_password: str = "" flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" - + # OAuth settings google_oauth_client_id: str = "" google_oauth_client_secret: str = "" microsoft_graph_oauth_client_id: str = "" microsoft_graph_oauth_client_secret: str = "" - + # Optional settings webhook_base_url: str = "" aws_access_key_id: str = "" aws_secret_access_key: str = "" langflow_public_url: str = "" - + # Langflow auth settings langflow_auto_login: str = "False" langflow_new_user_is_active: str = "False" langflow_enable_superuser_cli: str = "False" - + # Document paths (comma-separated) openrag_documents_paths: str = "./documents" - + # Validation errors validation_errors: Dict[str, str] = field(default_factory=dict) class EnvManager: """Manages environment configuration for OpenRAG.""" - + def __init__(self, env_file: Optional[Path] = None): self.env_file = env_file or Path(".env") self.config = EnvConfig() - + def generate_secure_password(self) -> str: """Generate a secure password for OpenSearch.""" # Generate a 16-character password with letters, digits, and symbols alphabet = string.ascii_letters + string.digits + "!@#$%^&*" - return ''.join(secrets.choice(alphabet) for _ in range(16)) - + return "".join(secrets.choice(alphabet) for _ in range(16)) + def generate_langflow_secret_key(self) -> str: """Generate a secure secret key for Langflow.""" return secrets.token_urlsafe(32) - + def load_existing_env(self) -> bool: """Load existing .env file if it exists.""" if not self.env_file.exists(): return False - + try: - with open(self.env_file, 'r') as f: + with open(self.env_file, "r") as f: for line in f: line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue - - if '=' in line: - key, value = line.split('=', 1) + + if "=" in line: + key, value = line.split("=", 1) key = key.strip() value = sanitize_env_value(value) - + # Map env vars to config attributes attr_map = { - 'OPENAI_API_KEY': 'openai_api_key', - 'OPENSEARCH_PASSWORD': 'opensearch_password', - 'LANGFLOW_SECRET_KEY': 'langflow_secret_key', - 'LANGFLOW_SUPERUSER': 'langflow_superuser', - 'LANGFLOW_SUPERUSER_PASSWORD': 'langflow_superuser_password', - 'FLOW_ID': 'flow_id', - 'GOOGLE_OAUTH_CLIENT_ID': 'google_oauth_client_id', - 'GOOGLE_OAUTH_CLIENT_SECRET': 'google_oauth_client_secret', - 'MICROSOFT_GRAPH_OAUTH_CLIENT_ID': 'microsoft_graph_oauth_client_id', - 'MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET': 'microsoft_graph_oauth_client_secret', - 'WEBHOOK_BASE_URL': 'webhook_base_url', - 'AWS_ACCESS_KEY_ID': 'aws_access_key_id', - 'AWS_SECRET_ACCESS_KEY': 'aws_secret_access_key', - 'LANGFLOW_PUBLIC_URL': 'langflow_public_url', - 'OPENRAG_DOCUMENTS_PATHS': 'openrag_documents_paths', - 'LANGFLOW_AUTO_LOGIN': 'langflow_auto_login', - 'LANGFLOW_NEW_USER_IS_ACTIVE': 'langflow_new_user_is_active', - 'LANGFLOW_ENABLE_SUPERUSER_CLI': 'langflow_enable_superuser_cli', + "OPENAI_API_KEY": "openai_api_key", + "OPENSEARCH_PASSWORD": "opensearch_password", + "LANGFLOW_SECRET_KEY": "langflow_secret_key", + "LANGFLOW_SUPERUSER": "langflow_superuser", + "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", + "FLOW_ID": "flow_id", + "NUDGES_FLOW_ID": "nudges_flow_id", + "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", + "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", + "MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id", + "MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET": "microsoft_graph_oauth_client_secret", + "WEBHOOK_BASE_URL": "webhook_base_url", + "AWS_ACCESS_KEY_ID": "aws_access_key_id", + "AWS_SECRET_ACCESS_KEY": "aws_secret_access_key", + "LANGFLOW_PUBLIC_URL": "langflow_public_url", + "OPENRAG_DOCUMENTS_PATHS": "openrag_documents_paths", + "LANGFLOW_AUTO_LOGIN": "langflow_auto_login", + "LANGFLOW_NEW_USER_IS_ACTIVE": "langflow_new_user_is_active", + "LANGFLOW_ENABLE_SUPERUSER_CLI": "langflow_enable_superuser_cli", } - + if key in attr_map: setattr(self.config, attr_map[key], value) - + return True - + except Exception as e: print(f"Error loading .env file: {e}") return False - + def setup_secure_defaults(self) -> None: """Set up secure default values for passwords and keys.""" if not self.config.opensearch_password: self.config.opensearch_password = self.generate_secure_password() - + if not self.config.langflow_secret_key: self.config.langflow_secret_key = self.generate_langflow_secret_key() - + if not self.config.langflow_superuser_password: self.config.langflow_superuser_password = self.generate_secure_password() - + def validate_config(self, mode: str = "full") -> bool: """ Validate the current configuration. - + Args: mode: "no_auth" for minimal validation, "full" for complete validation """ self.config.validation_errors.clear() - + # Always validate OpenAI API key if not validate_openai_api_key(self.config.openai_api_key): - self.config.validation_errors['openai_api_key'] = "Invalid OpenAI API key format (should start with sk-)" - + self.config.validation_errors["openai_api_key"] = ( + "Invalid OpenAI API key format (should start with sk-)" + ) + # Validate documents paths only if provided (optional) if self.config.openrag_documents_paths: - is_valid, error_msg, _ = validate_documents_paths(self.config.openrag_documents_paths) + is_valid, error_msg, _ = validate_documents_paths( + self.config.openrag_documents_paths + ) if not is_valid: - self.config.validation_errors['openrag_documents_paths'] = error_msg - + self.config.validation_errors["openrag_documents_paths"] = error_msg + # Validate required fields if not validate_non_empty(self.config.opensearch_password): - self.config.validation_errors['opensearch_password'] = "OpenSearch password is required" - + self.config.validation_errors["opensearch_password"] = ( + "OpenSearch password is required" + ) + # Langflow secret key is auto-generated; no user input required if not validate_non_empty(self.config.langflow_superuser_password): - self.config.validation_errors['langflow_superuser_password'] = "Langflow superuser password is required" - + self.config.validation_errors["langflow_superuser_password"] = ( + "Langflow superuser password is required" + ) + if mode == "full": # Validate OAuth settings if provided - if self.config.google_oauth_client_id and not validate_google_oauth_client_id(self.config.google_oauth_client_id): - self.config.validation_errors['google_oauth_client_id'] = "Invalid Google OAuth client ID format" - - if self.config.google_oauth_client_id and not validate_non_empty(self.config.google_oauth_client_secret): - self.config.validation_errors['google_oauth_client_secret'] = "Google OAuth client secret required when client ID is provided" - - if self.config.microsoft_graph_oauth_client_id and not validate_non_empty(self.config.microsoft_graph_oauth_client_secret): - self.config.validation_errors['microsoft_graph_oauth_client_secret'] = "Microsoft Graph client secret required when client ID is provided" - + if ( + self.config.google_oauth_client_id + and not validate_google_oauth_client_id( + self.config.google_oauth_client_id + ) + ): + self.config.validation_errors["google_oauth_client_id"] = ( + "Invalid Google OAuth client ID format" + ) + + if self.config.google_oauth_client_id and not validate_non_empty( + self.config.google_oauth_client_secret + ): + self.config.validation_errors["google_oauth_client_secret"] = ( + "Google OAuth client secret required when client ID is provided" + ) + + if self.config.microsoft_graph_oauth_client_id and not validate_non_empty( + self.config.microsoft_graph_oauth_client_secret + ): + self.config.validation_errors["microsoft_graph_oauth_client_secret"] = ( + "Microsoft Graph client secret required when client ID is provided" + ) + # Validate optional URLs if provided - if self.config.webhook_base_url and not validate_url(self.config.webhook_base_url): - self.config.validation_errors['webhook_base_url'] = "Invalid webhook URL format" - - if self.config.langflow_public_url and not validate_url(self.config.langflow_public_url): - self.config.validation_errors['langflow_public_url'] = "Invalid Langflow public URL format" - + if self.config.webhook_base_url and not validate_url( + self.config.webhook_base_url + ): + self.config.validation_errors["webhook_base_url"] = ( + "Invalid webhook URL format" + ) + + if self.config.langflow_public_url and not validate_url( + self.config.langflow_public_url + ): + self.config.validation_errors["langflow_public_url"] = ( + "Invalid Langflow public URL format" + ) + return len(self.config.validation_errors) == 0 - + def save_env_file(self) -> bool: """Save current configuration to .env file.""" try: @@ -183,44 +216,67 @@ class EnvManager: self.setup_secure_defaults() # Create backup if file exists if self.env_file.exists(): - backup_file = self.env_file.with_suffix('.env.backup') + backup_file = self.env_file.with_suffix(".env.backup") self.env_file.rename(backup_file) - - with open(self.env_file, 'w') as f: + + with open(self.env_file, "w") as f: f.write("# OpenRAG Environment Configuration\n") f.write("# Generated by OpenRAG TUI\n\n") - + # Core settings f.write("# Core settings\n") f.write(f"LANGFLOW_SECRET_KEY={self.config.langflow_secret_key}\n") f.write(f"LANGFLOW_SUPERUSER={self.config.langflow_superuser}\n") - f.write(f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n") + f.write( + f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n" + ) f.write(f"FLOW_ID={self.config.flow_id}\n") + f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n") f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") - f.write(f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n") + f.write( + f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n" + ) f.write("\n") - + # Langflow auth settings f.write("# Langflow auth settings\n") f.write(f"LANGFLOW_AUTO_LOGIN={self.config.langflow_auto_login}\n") - f.write(f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n") - f.write(f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n") + f.write( + f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n" + ) + f.write( + f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n" + ) f.write("\n") - + # OAuth settings - if self.config.google_oauth_client_id or self.config.google_oauth_client_secret: + if ( + self.config.google_oauth_client_id + or self.config.google_oauth_client_secret + ): f.write("# Google OAuth settings\n") - f.write(f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n") - f.write(f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n") + f.write( + f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n" + ) + f.write( + f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n" + ) f.write("\n") - - if self.config.microsoft_graph_oauth_client_id or self.config.microsoft_graph_oauth_client_secret: + + if ( + self.config.microsoft_graph_oauth_client_id + or self.config.microsoft_graph_oauth_client_secret + ): f.write("# Microsoft Graph OAuth settings\n") - f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n") - f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n") + f.write( + f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n" + ) + f.write( + f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n" + ) f.write("\n") - + # Optional settings optional_vars = [ ("WEBHOOK_BASE_URL", self.config.webhook_base_url), @@ -228,7 +284,7 @@ class EnvManager: ("AWS_SECRET_ACCESS_KEY", self.config.aws_secret_access_key), ("LANGFLOW_PUBLIC_URL", self.config.langflow_public_url), ] - + optional_written = False for var_name, var_value in optional_vars: if var_value: @@ -236,52 +292,89 @@ class EnvManager: f.write("# Optional settings\n") optional_written = True f.write(f"{var_name}={var_value}\n") - + if optional_written: f.write("\n") - + return True - + except Exception as e: print(f"Error saving .env file: {e}") return False - + def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]: """Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate).""" return [ ("openai_api_key", "OpenAI API Key", "sk-...", False), - ("opensearch_password", "OpenSearch Password", "Will be auto-generated if empty", True), - ("langflow_superuser_password", "Langflow Superuser Password", "Will be auto-generated if empty", True), - ("openrag_documents_paths", "Documents Paths", "./documents,/path/to/more/docs", False), + ( + "opensearch_password", + "OpenSearch Password", + "Will be auto-generated if empty", + True, + ), + ( + "langflow_superuser_password", + "Langflow Superuser Password", + "Will be auto-generated if empty", + True, + ), + ( + "openrag_documents_paths", + "Documents Paths", + "./documents,/path/to/more/docs", + False, + ), ] - + def get_full_setup_fields(self) -> List[tuple[str, str, str, bool]]: """Get all fields for full setup mode.""" base_fields = self.get_no_auth_setup_fields() - + oauth_fields = [ - ("google_oauth_client_id", "Google OAuth Client ID", "xxx.apps.googleusercontent.com", False), + ( + "google_oauth_client_id", + "Google OAuth Client ID", + "xxx.apps.googleusercontent.com", + False, + ), ("google_oauth_client_secret", "Google OAuth Client Secret", "", False), ("microsoft_graph_oauth_client_id", "Microsoft Graph Client ID", "", False), - ("microsoft_graph_oauth_client_secret", "Microsoft Graph Client Secret", "", False), + ( + "microsoft_graph_oauth_client_secret", + "Microsoft Graph Client Secret", + "", + False, + ), ] - + optional_fields = [ - ("webhook_base_url", "Webhook Base URL (optional)", "https://your-domain.com", False), + ( + "webhook_base_url", + "Webhook Base URL (optional)", + "https://your-domain.com", + False, + ), ("aws_access_key_id", "AWS Access Key ID (optional)", "", False), ("aws_secret_access_key", "AWS Secret Access Key (optional)", "", False), - ("langflow_public_url", "Langflow Public URL (optional)", "http://localhost:7860", False), + ( + "langflow_public_url", + "Langflow Public URL (optional)", + "http://localhost:7860", + False, + ), ] - + return base_fields + oauth_fields + optional_fields - + def generate_compose_volume_mounts(self) -> List[str]: """Generate Docker Compose volume mount strings from documents paths.""" - is_valid, _, validated_paths = validate_documents_paths(self.config.openrag_documents_paths) - + is_valid, _, validated_paths = validate_documents_paths( + self.config.openrag_documents_paths + ) + if not is_valid: return ["./documents:/app/documents:Z"] # fallback - + volume_mounts = [] for i, path in enumerate(validated_paths): if i == 0: @@ -289,6 +382,6 @@ class EnvManager: volume_mounts.append(f"{path}:/app/documents:Z") else: # Additional paths map to numbered directories - volume_mounts.append(f"{path}:/app/documents{i+1}:Z") - + volume_mounts.append(f"{path}:/app/documents{i + 1}:Z") + return volume_mounts From 33ba6abf49ae47f2fe83948eec8c10e4a2d7721a Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:15:54 -0300 Subject: [PATCH 02/21] Add react query --- frontend/package-lock.json | 27 +++++++++++++++++++++++++++ frontend/package.json | 1 + 2 files changed, 28 insertions(+) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 103dd7aa..3883b3b2 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -22,6 +22,7 @@ "@radix-ui/react-switch": "^1.2.5", "@tailwindcss/forms": "^0.5.10", "@tailwindcss/typography": "^0.5.16", + "@tanstack/react-query": "^5.86.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.1.1", @@ -2203,6 +2204,32 @@ "node": ">=4" } }, + "node_modules/@tanstack/query-core": { + "version": "5.86.0", + "resolved": "https://registry.npmjs.org/@tanstack/query-core/-/query-core-5.86.0.tgz", + "integrity": "sha512-Y6ibQm6BXbw6w1p3a5LrPn8Ae64M0dx7hGmnhrm9P+XAkCCKXOwZN0J5Z1wK/0RdNHtR9o+sWHDXd4veNI60tQ==", + "license": "MIT", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/tannerlinsley" + } + }, + "node_modules/@tanstack/react-query": { + "version": "5.86.0", + "resolved": "https://registry.npmjs.org/@tanstack/react-query/-/react-query-5.86.0.tgz", + "integrity": "sha512-jgS/v0oSJkGHucv9zxOS8rL7mjATh1XO3K4eqAV4WMpAly8okcBrGi1YxRZN5S4B59F54x9JFjWrK5vMAvJYqA==", + "license": "MIT", + "dependencies": { + "@tanstack/query-core": "5.86.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/tannerlinsley" + }, + "peerDependencies": { + "react": "^18 || ^19" + } + }, "node_modules/@tybys/wasm-util": { "version": "0.10.0", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 4f1ebfd8..0731820b 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -23,6 +23,7 @@ "@radix-ui/react-switch": "^1.2.5", "@tailwindcss/forms": "^0.5.10", "@tailwindcss/typography": "^0.5.16", + "@tanstack/react-query": "^5.86.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.1.1", From 1197f51518076d779f8a220d208afa19216ddb03 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:06 -0300 Subject: [PATCH 03/21] create useGetNudgesQuery to get nudges easily --- .../src/app/api/queries/useGetNudgesQuery.ts | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 frontend/src/app/api/queries/useGetNudgesQuery.ts diff --git a/frontend/src/app/api/queries/useGetNudgesQuery.ts b/frontend/src/app/api/queries/useGetNudgesQuery.ts new file mode 100644 index 00000000..809ca71e --- /dev/null +++ b/frontend/src/app/api/queries/useGetNudgesQuery.ts @@ -0,0 +1,42 @@ +import { + useQuery, + useQueryClient, + UseQueryOptions, +} from "@tanstack/react-query"; + +type Nudge = string; + +const DEFAULT_NUDGES = [ + "Show me this quarter's top 10 deals", + "Summarize recent client interactions", + "Search OpenSearch for mentions of our competitors", +]; + +export const useGetNudgesQuery = ( + chatId?: string | null, + options?: Omit, +) => { + const queryClient = useQueryClient(); + console.log(chatId); + async function getNudges(): Promise { + try { + const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`); + const data = await response.json(); + return data.response.split("\n").filter(Boolean) || DEFAULT_NUDGES; + } catch (error) { + console.error("Error getting nudges", error); + return DEFAULT_NUDGES; + } + } + + const queryResult = useQuery( + { + queryKey: ["nudges", chatId], + queryFn: getNudges, + ...options, + }, + queryClient, + ); + + return queryResult; +}; From 70d3434e5bd4c2c3a82f06d2f764902b0bedb769 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:16 -0300 Subject: [PATCH 04/21] Create get query client to work on next --- frontend/src/app/api/get-query-client.ts | 37 ++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 frontend/src/app/api/get-query-client.ts diff --git a/frontend/src/app/api/get-query-client.ts b/frontend/src/app/api/get-query-client.ts new file mode 100644 index 00000000..4602496c --- /dev/null +++ b/frontend/src/app/api/get-query-client.ts @@ -0,0 +1,37 @@ +import { + QueryClient, + defaultShouldDehydrateQuery, + isServer, +} from "@tanstack/react-query"; + +function makeQueryClient() { + return new QueryClient({ + defaultOptions: { + queries: { + staleTime: 60 * 1000, + }, + dehydrate: { + // include pending queries in dehydration + shouldDehydrateQuery: (query) => + defaultShouldDehydrateQuery(query) || + query.state.status === "pending", + }, + }, + }); +} + +let browserQueryClient: QueryClient | undefined = undefined; + +export function getQueryClient() { + if (isServer) { + // Server: always make a new query client + return makeQueryClient(); + } else { + // Browser: make a new query client if we don't already have one + // This is very important, so we don't re-make a new client if React + // suspends during the initial render. This may not be needed if we + // have a suspense boundary BELOW the creation of the query client + if (!browserQueryClient) browserQueryClient = makeQueryClient(); + return browserQueryClient; + } +} From 047fb305c6a86065c9128a8ee1c24a5c63adad66 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:28 -0300 Subject: [PATCH 05/21] Add react query provider to app --- frontend/src/app/chat/page.tsx | 151 +++++++++++++++++---------------- frontend/src/app/layout.tsx | 24 +++--- frontend/src/app/providers.tsx | 12 +++ 3 files changed, 100 insertions(+), 87 deletions(-) create mode 100644 frontend/src/app/providers.tsx diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index 19703214..b3cfca4a 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -22,6 +22,7 @@ import { Zap, } from "lucide-react"; import { useEffect, useRef, useState } from "react"; +import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery"; interface Message { role: "user" | "assistant"; @@ -191,7 +192,7 @@ function ChatPage() { "Upload failed with status:", response.status, "Response:", - errorText + errorText, ); throw new Error("Failed to process document"); } @@ -244,7 +245,7 @@ function ChatPage() { ...prev, [endpoint]: result.response_id, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(result.response_id); @@ -436,8 +437,8 @@ function ChatPage() { // 2. It's different from the last loaded conversation AND // 3. User is not in the middle of an interaction if ( - conversationData && - conversationData.messages && + conversationData && + conversationData.messages && lastLoadedConversationRef.current !== conversationData.response_id && !isUserInteracting && !isForkingInProgress @@ -445,7 +446,7 @@ function ChatPage() { console.log( "Loading conversation with", conversationData.messages.length, - "messages" + "messages", ); // Convert backend message format to frontend Message interface const convertedMessages: Message[] = conversationData.messages.map( @@ -459,7 +460,7 @@ function ChatPage() { content: msg.content, timestamp: new Date(msg.timestamp || new Date()), // Add any other necessary properties - }) + }), ); setMessages(convertedMessages); @@ -471,11 +472,7 @@ function ChatPage() { [conversationData.endpoint]: conversationData.response_id, })); } - }, [ - conversationData, - isUserInteracting, - isForkingInProgress, - ]); + }, [conversationData, isUserInteracting, isForkingInProgress]); // Handle new conversation creation - only reset messages when placeholderConversation is set useEffect(() => { @@ -547,7 +544,7 @@ function ChatPage() { console.log( "Chat page received file upload error event:", filename, - error + error, ); // Replace the last message with error message @@ -561,37 +558,37 @@ function ChatPage() { window.addEventListener( "fileUploadStart", - handleFileUploadStart as EventListener + handleFileUploadStart as EventListener, ); window.addEventListener( "fileUploaded", - handleFileUploaded as EventListener + handleFileUploaded as EventListener, ); window.addEventListener( "fileUploadComplete", - handleFileUploadComplete as EventListener + handleFileUploadComplete as EventListener, ); window.addEventListener( "fileUploadError", - handleFileUploadError as EventListener + handleFileUploadError as EventListener, ); return () => { window.removeEventListener( "fileUploadStart", - handleFileUploadStart as EventListener + handleFileUploadStart as EventListener, ); window.removeEventListener( "fileUploaded", - handleFileUploaded as EventListener + handleFileUploaded as EventListener, ); window.removeEventListener( "fileUploadComplete", - handleFileUploadComplete as EventListener + handleFileUploadComplete as EventListener, ); window.removeEventListener( "fileUploadError", - handleFileUploadError as EventListener + handleFileUploadError as EventListener, ); }; }, [endpoint, setPreviousResponseIds]); @@ -617,6 +614,10 @@ function ChatPage() { }; }, [isFilterDropdownOpen]); + const { data: nudges = [], refetch: refetchNudges } = useGetNudgesQuery( + previousResponseIds[endpoint], + ); + const handleSSEStream = async (userMessage: Message) => { const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; @@ -719,7 +720,7 @@ function ChatPage() { console.log( "Received chunk:", chunk.type || chunk.object, - chunk + chunk, ); // Extract response ID if present @@ -735,14 +736,14 @@ function ChatPage() { if (chunk.delta.function_call) { console.log( "Function call in delta:", - chunk.delta.function_call + chunk.delta.function_call, ); // Check if this is a new function call if (chunk.delta.function_call.name) { console.log( "New function call:", - chunk.delta.function_call.name + chunk.delta.function_call.name, ); const functionCall: FunctionCall = { name: chunk.delta.function_call.name, @@ -758,7 +759,7 @@ function ChatPage() { else if (chunk.delta.function_call.arguments) { console.log( "Function call arguments delta:", - chunk.delta.function_call.arguments + chunk.delta.function_call.arguments, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; @@ -770,14 +771,14 @@ function ChatPage() { chunk.delta.function_call.arguments; console.log( "Accumulated arguments:", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); // Try to parse arguments if they look complete if (lastFunctionCall.argumentsString.includes("}")) { try { const parsed = JSON.parse( - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; @@ -785,7 +786,7 @@ function ChatPage() { } catch (e) { console.log( "Arguments not yet complete or invalid JSON:", - e + e, ); } } @@ -818,7 +819,7 @@ function ChatPage() { else if (toolCall.function.arguments) { console.log( "Tool call arguments delta:", - toolCall.function.arguments + toolCall.function.arguments, ); const lastFunctionCall = currentFunctionCalls[ @@ -832,7 +833,7 @@ function ChatPage() { toolCall.function.arguments; console.log( "Accumulated tool arguments:", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); // Try to parse arguments if they look complete @@ -841,7 +842,7 @@ function ChatPage() { ) { try { const parsed = JSON.parse( - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; @@ -849,7 +850,7 @@ function ChatPage() { } catch (e) { console.log( "Tool arguments not yet complete or invalid JSON:", - e + e, ); } } @@ -881,7 +882,7 @@ function ChatPage() { console.log( "Error parsing function call on finish:", fc, - e + e, ); } } @@ -897,12 +898,12 @@ function ChatPage() { console.log( "🟢 CREATING function call (added):", chunk.item.id, - chunk.item.tool_name || chunk.item.name + chunk.item.tool_name || chunk.item.name, ); // Try to find an existing pending call to update (created by earlier deltas) let existing = currentFunctionCalls.find( - (fc) => fc.id === chunk.item.id + (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] @@ -911,7 +912,7 @@ function ChatPage() { (fc) => fc.status === "pending" && !fc.id && - fc.name === (chunk.item.tool_name || chunk.item.name) + fc.name === (chunk.item.tool_name || chunk.item.name), ); } @@ -924,7 +925,7 @@ function ChatPage() { chunk.item.inputs || existing.arguments; console.log( "🟢 UPDATED existing pending function call with id:", - existing.id + existing.id, ); } else { const functionCall: FunctionCall = { @@ -942,7 +943,7 @@ function ChatPage() { currentFunctionCalls.map((fc) => ({ id: fc.id, name: fc.name, - })) + })), ); } } @@ -953,7 +954,7 @@ function ChatPage() { ) { console.log( "Function args delta (Realtime API):", - chunk.delta + chunk.delta, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; @@ -964,7 +965,7 @@ function ChatPage() { lastFunctionCall.argumentsString += chunk.delta || ""; console.log( "Accumulated arguments (Realtime API):", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); } } @@ -975,26 +976,26 @@ function ChatPage() { ) { console.log( "Function args done (Realtime API):", - chunk.arguments + chunk.arguments, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { try { lastFunctionCall.arguments = JSON.parse( - chunk.arguments || "{}" + chunk.arguments || "{}", ); lastFunctionCall.status = "completed"; console.log( "Parsed function arguments (Realtime API):", - lastFunctionCall.arguments + lastFunctionCall.arguments, ); } catch (e) { lastFunctionCall.arguments = { raw: chunk.arguments }; lastFunctionCall.status = "error"; console.log( "Error parsing function arguments (Realtime API):", - e + e, ); } } @@ -1008,14 +1009,14 @@ function ChatPage() { console.log( "🔵 UPDATING function call (done):", chunk.item.id, - chunk.item.tool_name || chunk.item.name + chunk.item.tool_name || chunk.item.name, ); console.log( "🔵 Looking for existing function calls:", currentFunctionCalls.map((fc) => ({ id: fc.id, name: fc.name, - })) + })), ); // Find existing function call by ID or name @@ -1023,14 +1024,14 @@ function ChatPage() { (fc) => fc.id === chunk.item.id || fc.name === chunk.item.tool_name || - fc.name === chunk.item.name + fc.name === chunk.item.name, ); if (functionCall) { console.log( "🔵 FOUND existing function call, updating:", functionCall.id, - functionCall.name + functionCall.name, ); // Update existing function call with completion data functionCall.status = @@ -1053,7 +1054,7 @@ function ChatPage() { "🔴 WARNING: Could not find existing function call to update:", chunk.item.id, chunk.item.tool_name, - chunk.item.name + chunk.item.name, ); } } @@ -1074,7 +1075,7 @@ function ChatPage() { fc.name === chunk.item.name || fc.name === chunk.item.type || fc.name.includes(chunk.item.type.replace("_call", "")) || - chunk.item.type.includes(fc.name) + chunk.item.type.includes(fc.name), ); if (functionCall) { @@ -1118,12 +1119,12 @@ function ChatPage() { "🟡 CREATING tool call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name, - chunk.item.type + chunk.item.type, ); // Dedupe by id or pending with same name let existing = currentFunctionCalls.find( - (fc) => fc.id === chunk.item.id + (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] @@ -1135,7 +1136,7 @@ function ChatPage() { fc.name === (chunk.item.tool_name || chunk.item.name || - chunk.item.type) + chunk.item.type), ); } @@ -1151,7 +1152,7 @@ function ChatPage() { chunk.item.inputs || existing.arguments; console.log( "🟡 UPDATED existing pending tool call with id:", - existing.id + existing.id, ); } else { const functionCall = { @@ -1172,7 +1173,7 @@ function ChatPage() { id: fc.id, name: fc.name, type: fc.type, - })) + })), ); } } @@ -1268,6 +1269,9 @@ function ChatPage() { if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { setMessages((prev) => [...prev, finalMessage]); setStreamingMessage(null); + if (previousResponseIds[endpoint]) { + refetchNudges(); + } } // Store the response ID for the next request for this endpoint @@ -1280,7 +1284,7 @@ function ChatPage() { ...prev, [endpoint]: newResponseId, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(newResponseId); @@ -1385,6 +1389,9 @@ function ChatPage() { timestamp: new Date(), }; setMessages((prev) => [...prev, assistantMessage]); + if (result.response_id) { + refetchNudges(); + } // Store the response ID if present for this endpoint if (result.response_id) { @@ -1392,7 +1399,7 @@ function ChatPage() { ...prev, [endpoint]: result.response_id, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(result.response_id); @@ -1440,7 +1447,7 @@ function ChatPage() { const handleForkConversation = ( messageIndex: number, - event?: React.MouseEvent + event?: React.MouseEvent, ) => { // Prevent any default behavior and stop event propagation if (event) { @@ -1508,7 +1515,7 @@ function ChatPage() { const renderFunctionCalls = ( functionCalls: FunctionCall[], - messageIndex?: number + messageIndex?: number, ) => { if (!functionCalls || functionCalls.length === 0) return null; @@ -1737,12 +1744,6 @@ function ChatPage() { ); }; - const suggestionChips = [ - "Show me this quarter's top 10 deals", - "Summarize recent client interactions", - "Search OpenSearch for mentions of our competitors", - ]; - const handleSuggestionClick = (suggestion: string) => { setInput(suggestion); inputRef.current?.focus(); @@ -1883,7 +1884,7 @@ function ChatPage() {
{renderFunctionCalls( message.functionCalls || [], - index + index, )}

{message.content} @@ -1914,7 +1915,7 @@ function ChatPage() {

{renderFunctionCalls( streamingMessage.functionCalls, - messages.length + messages.length, )}

{streamingMessage.content} @@ -1963,8 +1964,8 @@ function ChatPage() { {!streamingMessage && (

-
- {suggestionChips.map((suggestion, index) => ( +
+ {(nudges as string[]).map((suggestion: string, index: number) => ( + ))} +
+ {/* Fade out gradient on the right */} +
+
+
+ ); +} From 06b3850057280e472cb07ddb6f1aebb55f2b3bde Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 10:20:17 -0300 Subject: [PATCH 10/21] Componentize message send, and send message when clicking nudges --- frontend/src/app/chat/page.tsx | 50 ++++++++++++++-------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index b3cfca4a..34b27647 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -23,6 +23,7 @@ import { } from "lucide-react"; import { useEffect, useRef, useState } from "react"; import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery"; +import Nudges from "./nudges"; interface Message { role: "user" | "assistant"; @@ -128,7 +129,6 @@ function ChatPage() { const [dropdownDismissed, setDropdownDismissed] = useState(false); const [isUserInteracting, setIsUserInteracting] = useState(false); const [isForkingInProgress, setIsForkingInProgress] = useState(false); - const [lastForkTimestamp, setLastForkTimestamp] = useState(0); const dragCounterRef = useRef(0); const messagesEndRef = useRef(null); const inputRef = useRef(null); @@ -472,7 +472,12 @@ function ChatPage() { [conversationData.endpoint]: conversationData.response_id, })); } - }, [conversationData, isUserInteracting, isForkingInProgress]); + }, [ + conversationData, + isUserInteracting, + isForkingInProgress, + setPreviousResponseIds, + ]); // Handle new conversation creation - only reset messages when placeholderConversation is set useEffect(() => { @@ -1312,13 +1317,12 @@ function ChatPage() { } }; - const handleSubmit = async (e: React.FormEvent) => { - e.preventDefault(); - if (!input.trim() || loading) return; + const handleSendMessage = async (inputMessage: string) => { + if (!inputMessage.trim() || loading) return; const userMessage: Message = { role: "user", - content: input.trim(), + content: inputMessage.trim(), timestamp: new Date(), }; @@ -1433,6 +1437,11 @@ function ChatPage() { setLoading(false); }; + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + handleSendMessage(input); + }; + const toggleFunctionCall = (functionCallId: string) => { setExpandedFunctionCalls((prev) => { const newSet = new Set(prev); @@ -1456,10 +1465,8 @@ function ChatPage() { } // Set interaction state to prevent auto-scroll interference - const forkTimestamp = Date.now(); setIsUserInteracting(true); setIsForkingInProgress(true); - setLastForkTimestamp(forkTimestamp); console.log("Fork conversation called for message index:", messageIndex); @@ -1472,7 +1479,6 @@ function ChatPage() { console.error("Fork button should only be on assistant messages"); setIsUserInteracting(false); setIsForkingInProgress(false); - setLastForkTimestamp(0); return; } @@ -1745,8 +1751,7 @@ function ChatPage() { }; const handleSuggestionClick = (suggestion: string) => { - setInput(suggestion); - inputRef.current?.focus(); + handleSendMessage(suggestion); }; return ( @@ -1962,27 +1967,14 @@ function ChatPage() { {/* Suggestion chips - always show unless streaming */} {!streamingMessage && ( -
-
-
- {(nudges as string[]).map((suggestion: string, index: number) => ( - - ))} -
- {/* Fade out gradient on the right */} -
-
-
+ )} {/* Input Area - Fixed at bottom */} -
+
From 92519792076ba801f1c97cfd4ac91c062566b4a6 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 10:23:44 -0300 Subject: [PATCH 11/21] Added padding top --- frontend/src/app/chat/nudges.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/app/chat/nudges.tsx b/frontend/src/app/chat/nudges.tsx index f817d4db..0d713c89 100644 --- a/frontend/src/app/chat/nudges.tsx +++ b/frontend/src/app/chat/nudges.tsx @@ -6,7 +6,7 @@ export default function Nudges({ handleSuggestionClick: (suggestion: string) => void; }) { return ( -
+
{nudges.map((suggestion: string, index: number) => ( From 9e78798a0490ca4a2cbaa014ab9193e4ac0cacd0 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:00:29 -0300 Subject: [PATCH 12/21] added animate to package json --- frontend/package-lock.json | 69 ++++++++++++++++++++++++++++++++++++++ frontend/package.json | 1 + 2 files changed, 70 insertions(+) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 3883b3b2..03f63e53 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -27,6 +27,7 @@ "clsx": "^2.1.1", "cmdk": "^1.1.1", "lucide-react": "^0.525.0", + "motion": "^12.23.12", "next": "15.3.5", "next-themes": "^0.4.6", "react": "^19.0.0", @@ -4574,6 +4575,33 @@ "url": "https://github.com/sponsors/rawify" } }, + "node_modules/framer-motion": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/framer-motion/-/framer-motion-12.23.12.tgz", + "integrity": "sha512-6e78rdVtnBvlEVgu6eFEAgG9v3wLnYEboM8I5O5EXvfKC8gxGQB8wXJdhkMy10iVcn05jl6CNw7/HTsTCfwcWg==", + "license": "MIT", + "dependencies": { + "motion-dom": "^12.23.12", + "motion-utils": "^12.23.6", + "tslib": "^2.4.0" + }, + "peerDependencies": { + "@emotion/is-prop-valid": "*", + "react": "^18.0.0 || ^19.0.0", + "react-dom": "^18.0.0 || ^19.0.0" + }, + "peerDependenciesMeta": { + "@emotion/is-prop-valid": { + "optional": true + }, + "react": { + "optional": true + }, + "react-dom": { + "optional": true + } + } + }, "node_modules/fsevents": { "version": "2.3.3", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", @@ -5708,6 +5736,47 @@ "node": ">=16 || 14 >=14.17" } }, + "node_modules/motion": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/motion/-/motion-12.23.12.tgz", + "integrity": "sha512-8jCD8uW5GD1csOoqh1WhH1A6j5APHVE15nuBkFeRiMzYBdRwyAHmSP/oXSuW0WJPZRXTFdBoG4hY9TFWNhhwng==", + "license": "MIT", + "dependencies": { + "framer-motion": "^12.23.12", + "tslib": "^2.4.0" + }, + "peerDependencies": { + "@emotion/is-prop-valid": "*", + "react": "^18.0.0 || ^19.0.0", + "react-dom": "^18.0.0 || ^19.0.0" + }, + "peerDependenciesMeta": { + "@emotion/is-prop-valid": { + "optional": true + }, + "react": { + "optional": true + }, + "react-dom": { + "optional": true + } + } + }, + "node_modules/motion-dom": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/motion-dom/-/motion-dom-12.23.12.tgz", + "integrity": "sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==", + "license": "MIT", + "dependencies": { + "motion-utils": "^12.23.6" + } + }, + "node_modules/motion-utils": { + "version": "12.23.6", + "resolved": "https://registry.npmjs.org/motion-utils/-/motion-utils-12.23.6.tgz", + "integrity": "sha512-eAWoPgr4eFEOFfg2WjIsMoqJTW6Z8MTUCgn/GZ3VRpClWBdnbjryiA3ZSNLyxCTmCQx4RmYX6jX1iWHbenUPNQ==", + "license": "MIT" + }, "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", diff --git a/frontend/package.json b/frontend/package.json index 0731820b..8c037ea2 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -28,6 +28,7 @@ "clsx": "^2.1.1", "cmdk": "^1.1.1", "lucide-react": "^0.525.0", + "motion": "^12.23.12", "next": "15.3.5", "next-themes": "^0.4.6", "react": "^19.0.0", From 01b10d403a0f3cb720c79ecc6712a1b28ef6ea4a Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:00:52 -0300 Subject: [PATCH 13/21] Create cancelNudges, to remove the query result when sending a message --- frontend/src/app/api/queries/useGetNudgesQuery.ts | 8 ++++++-- frontend/src/app/chat/page.tsx | 8 ++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/frontend/src/app/api/queries/useGetNudgesQuery.ts b/frontend/src/app/api/queries/useGetNudgesQuery.ts index 809ca71e..38cc9502 100644 --- a/frontend/src/app/api/queries/useGetNudgesQuery.ts +++ b/frontend/src/app/api/queries/useGetNudgesQuery.ts @@ -17,7 +17,11 @@ export const useGetNudgesQuery = ( options?: Omit, ) => { const queryClient = useQueryClient(); - console.log(chatId); + + function cancel() { + queryClient.removeQueries({ queryKey: ["nudges", chatId] }); + } + async function getNudges(): Promise { try { const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`); @@ -38,5 +42,5 @@ export const useGetNudgesQuery = ( queryClient, ); - return queryResult; + return { ...queryResult, cancel }; }; diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index 34b27647..987786d9 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -619,7 +619,7 @@ function ChatPage() { }; }, [isFilterDropdownOpen]); - const { data: nudges = [], refetch: refetchNudges } = useGetNudgesQuery( + const { data: nudges = [], cancel: cancelNudges } = useGetNudgesQuery( previousResponseIds[endpoint], ); @@ -1275,7 +1275,7 @@ function ChatPage() { setMessages((prev) => [...prev, finalMessage]); setStreamingMessage(null); if (previousResponseIds[endpoint]) { - refetchNudges(); + cancelNudges(); } } @@ -1394,7 +1394,7 @@ function ChatPage() { }; setMessages((prev) => [...prev, assistantMessage]); if (result.response_id) { - refetchNudges(); + cancelNudges(); } // Store the response ID if present for this endpoint @@ -1968,7 +1968,7 @@ function ChatPage() { {/* Suggestion chips - always show unless streaming */} {!streamingMessage && ( )} From 560683d782fb8f993868cc845c2fa8c20ec996ad Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:01:03 -0300 Subject: [PATCH 14/21] Added animation to displaying nudges --- frontend/src/app/chat/nudges.tsx | 51 ++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/frontend/src/app/chat/nudges.tsx b/frontend/src/app/chat/nudges.tsx index 0d713c89..c5929924 100644 --- a/frontend/src/app/chat/nudges.tsx +++ b/frontend/src/app/chat/nudges.tsx @@ -1,27 +1,46 @@ +import { motion, AnimatePresence } from "motion/react"; + export default function Nudges({ nudges, handleSuggestionClick, }: { nudges: string[]; + handleSuggestionClick: (suggestion: string) => void; }) { return ( -
-
-
- {nudges.map((suggestion: string, index: number) => ( - - ))} -
- {/* Fade out gradient on the right */} -
-
+
+ + {nudges.length > 0 && ( + +
+
+
+ {nudges.map((suggestion: string, index: number) => ( + + ))} +
+ {/* Fade out gradient on the right */} +
+
+
+
+ )} +
); } From b9da6437f8c60ba986a95810030cf62d17f9654d Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 11:08:45 -0700 Subject: [PATCH 15/21] Properly set the project root for the gdrive token --- src/connectors/google_drive/connector.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 887ffeca..c0ef2ff3 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -102,9 +102,8 @@ class GoogleDriveConnector(BaseConnector): client_secret = config.get("client_secret") or env_client_secret # Token file default (so callback & workers don’t need to pass it) - token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE") - if not token_file: - token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json") + project_root = Path(__file__).resolve().parent.parent.parent.parent + token_file = config.get("token_file") or str(project_root / "google_drive_token.json") Path(token_file).parent.mkdir(parents=True, exist_ok=True) if not isinstance(client_id, str) or not client_id.strip(): From 192693d82dcda80ecd6fd75501e9fa5a5af94115 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Mon, 8 Sep 2025 16:36:03 -0300 Subject: [PATCH 16/21] fixed agent.py extra comma --- src/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/agent.py b/src/agent.py index 7c8f000c..6d94094f 100644 --- a/src/agent.py +++ b/src/agent.py @@ -442,7 +442,6 @@ async def async_langflow_chat( user_id=user_id, previous_response_id=previous_response_id, - , ) if store_conversation: From cff28fc4f94b43c45524401f7564954e7f4acd33 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 12:56:38 -0700 Subject: [PATCH 17/21] feat: Cleanup duplicate connections --- src/connectors/connection_manager.py | 187 +++++++++++++++++++++++---- 1 file changed, 161 insertions(+), 26 deletions(-) diff --git a/src/connectors/connection_manager.py b/src/connectors/connection_manager.py index 2d0d08bd..9ce9ffd3 100644 --- a/src/connectors/connection_manager.py +++ b/src/connectors/connection_manager.py @@ -1,6 +1,5 @@ import json import uuid -import asyncio import aiofiles from typing import Dict, List, Any, Optional from datetime import datetime @@ -62,6 +61,9 @@ class ConnectionManager: config = ConnectionConfig(**conn_data) self.connections[config.connection_id] = config + # Now that connections are loaded, clean up duplicates + await self.cleanup_duplicate_connections(remove_duplicates=True) + async def save_connections(self): """Save connections to persistent storage""" data = {"connections": []} @@ -78,33 +80,95 @@ class ConnectionManager: async with aiofiles.open(self.connections_file, "w") as f: await f.write(json.dumps(data, indent=2)) - async def create_connection( - self, - connector_type: str, - name: str, - config: Dict[str, Any], - user_id: Optional[str] = None, - ) -> str: - """Create a new connection configuration""" - connection_id = str(uuid.uuid4()) + async def _get_existing_connection( + self, connector_type: str, user_id: Optional[str] = None + ) -> Optional[ConnectionConfig]: + """Find existing active connection for the same connector type and user""" + for connection in self.connections.values(): + if ( + connection.connector_type == connector_type + and connection.user_id == user_id + and connection.is_active + ): + return connection + return None - connection_config = ConnectionConfig( - connection_id=connection_id, - connector_type=connector_type, - name=name, - config=config, - user_id=user_id, - ) - - self.connections[connection_id] = connection_config - await self.save_connections() - - return connection_id - - async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]: - """Get connection configuration""" - return self.connections.get(connection_id) + async def _delete_old_connections( + self, connector_type: str, user_id: Optional[str] = None, exclude_id: str = None + ): + """Delete old connections for the same connector type and user""" + connections_to_delete = [] + + for connection_id, connection in self.connections.items(): + if ( + connection.connector_type == connector_type + and connection.user_id == user_id + and connection.is_active + and connection_id != exclude_id + ): + connections_to_delete.append(connection_id) + + # Delete old connections + for connection_id in connections_to_delete: + logger.info( + f"Deleting old connection for {connector_type}", + connection_id=connection_id + ) + await self.delete_connection(connection_id) + async def cleanup_duplicate_connections(self, remove_duplicates=False): + """ + Clean up duplicate connections, keeping only the most recent connection + per provider per user + + Args: + remove_duplicates: If True, physically removes duplicates from connections.json + If False (default), just deactivates them + """ + logger.info("Starting cleanup of duplicate connections") + + # Group connections by (connector_type, user_id) + grouped_connections = {} + + for connection_id, connection in self.connections.items(): + if not connection.is_active: + continue # Skip inactive connections + + key = (connection.connector_type, connection.user_id) + + if key not in grouped_connections: + grouped_connections[key] = [] + + grouped_connections[key].append((connection_id, connection)) + + # For each group, keep only the most recent connection + connections_to_remove = [] + + for (connector_type, user_id), connections in grouped_connections.items(): + if len(connections) <= 1: + continue # No duplicates + + logger.info(f"Found {len(connections)} duplicate connections for {connector_type}, user {user_id}") + + # Sort by created_at, keep the most recent + connections.sort(key=lambda x: x[1].created_at, reverse=True) + + # Keep the first (most recent), remove/deactivate the rest + for connection_id, connection in connections[1:]: + connections_to_remove.append((connection_id, connection)) + logger.info(f"Marking connection {connection_id} for {'removal' if remove_duplicates else 'deactivation'}") + + # Remove or deactivate duplicate connections + for connection_id, connection in connections_to_remove: + if remove_duplicates: + await self.delete_connection(connection_id) # Handles token cleanup + else: + await self.deactivate_connection(connection_id) + + action = "Removed" if remove_duplicates else "Deactivated" + logger.info(f"Cleanup complete. {action} {len(connections_to_remove)} duplicate connections") + return len(connections_to_remove) + async def update_connection( self, connection_id: str, @@ -146,6 +210,61 @@ class ConnectionManager: return True + async def create_connection( + self, + connector_type: str, + name: str, + config: Dict[str, Any], + user_id: Optional[str] = None, + ) -> str: + """Create a new connection configuration, ensuring only one per provider per user""" + + # Check if we already have an active connection for this provider and user + existing_connection = await self._get_existing_connection(connector_type, user_id) + + if existing_connection: + # Check if the existing connection has a valid token + try: + connector = self._create_connector(existing_connection) + if await connector.authenticate(): + logger.info( + f"Using existing valid connection for {connector_type}", + connection_id=existing_connection.connection_id + ) + # Update the existing connection with new config if needed + if config != existing_connection.config: + logger.info("Updating existing connection config") + await self.update_connection( + existing_connection.connection_id, + config=config + ) + return existing_connection.connection_id + except Exception as e: + logger.warning( + f"Existing connection authentication failed: {e}", + connection_id=existing_connection.connection_id + ) + # If authentication fails, we'll create a new connection and clean up the old one + + # Create new connection + connection_id = str(uuid.uuid4()) + + connection_config = ConnectionConfig( + connection_id=connection_id, + connector_type=connector_type, + name=name, + config=config, + user_id=user_id, + ) + + self.connections[connection_id] = connection_config + + # Deactivate/remove any old connections for this provider and user + await self._delete_old_connections(connector_type, user_id, connection_id) + await self.save_connections() + + return connection_id + async def list_connections( self, user_id: Optional[str] = None, connector_type: Optional[str] = None ) -> List[ConnectionConfig]: @@ -165,6 +284,18 @@ class ConnectionManager: if connection_id not in self.connections: return False + connection = self.connections[connection_id] + + # Clean up token file if it exists + if connection.config.get("token_file"): + token_file = Path(connection.config["token_file"]) + if token_file.exists(): + try: + token_file.unlink() + logger.info(f"Deleted token file: {token_file}") + except Exception as e: + logger.warning(f"Failed to delete token file {token_file}: {e}") + # Clean up active connector if exists if connection_id in self.active_connectors: connector = self.active_connectors[connection_id] @@ -296,6 +427,10 @@ class ConnectionManager: return True return False + + async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]: + """Get connection configuration""" + return self.connections.get(connection_id) async def get_connection_by_webhook_id( self, webhook_id: str From 617615532d16f9b9746066fa5836c15195278385 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 13:00:00 -0700 Subject: [PATCH 18/21] Update connection_manager.py --- src/connectors/connection_manager.py | 29 +++------------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/src/connectors/connection_manager.py b/src/connectors/connection_manager.py index 9ce9ffd3..05cc85c9 100644 --- a/src/connectors/connection_manager.py +++ b/src/connectors/connection_manager.py @@ -93,29 +93,6 @@ class ConnectionManager: return connection return None - async def _delete_old_connections( - self, connector_type: str, user_id: Optional[str] = None, exclude_id: str = None - ): - """Delete old connections for the same connector type and user""" - connections_to_delete = [] - - for connection_id, connection in self.connections.items(): - if ( - connection.connector_type == connector_type - and connection.user_id == user_id - and connection.is_active - and connection_id != exclude_id - ): - connections_to_delete.append(connection_id) - - # Delete old connections - for connection_id in connections_to_delete: - logger.info( - f"Deleting old connection for {connector_type}", - connection_id=connection_id - ) - await self.delete_connection(connection_id) - async def cleanup_duplicate_connections(self, remove_duplicates=False): """ Clean up duplicate connections, keeping only the most recent connection @@ -259,10 +236,10 @@ class ConnectionManager: self.connections[connection_id] = connection_config - # Deactivate/remove any old connections for this provider and user - await self._delete_old_connections(connector_type, user_id, connection_id) + # Clean up duplicates (will keep the newest, which is the one we just created) + await self.cleanup_duplicate_connections(remove_duplicates=True) + await self.save_connections() - return connection_id async def list_connections( From 73f039afb5b1cd026a18f9296f25a996b6bc7d6d Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:07:06 -0400 Subject: [PATCH 19/21] logging --- src/agent.py | 53 +++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/src/agent.py b/src/agent.py index 6d94094f..749eb299 100644 --- a/src/agent.py +++ b/src/agent.py @@ -189,35 +189,42 @@ async def async_response( previous_response_id: str = None, log_prefix: str = "response", ): - logger.info("User prompt received", prompt=prompt) + try: + logger.info("User prompt received", prompt=prompt) - # Build request parameters - request_params = { - "model": model, - "input": prompt, - "stream": False, - "include": ["tool_call.results"], - } - if previous_response_id is not None: - request_params["previous_response_id"] = previous_response_id - if extra_headers: - request_params["extra_headers"] = extra_headers + # Build request parameters + request_params = { + "model": model, + "input": prompt, + "stream": False, + "include": ["tool_call.results"], + } + if previous_response_id is not None: + request_params["previous_response_id"] = previous_response_id + if extra_headers: + request_params["extra_headers"] = extra_headers - if "x-api-key" not in client.default_headers: - if hasattr(client, "api_key") and extra_headers is not None: - extra_headers["x-api-key"] = client.api_key + if "x-api-key" not in client.default_headers: + if hasattr(client, "api_key") and extra_headers is not None: + extra_headers["x-api-key"] = client.api_key - response = await client.responses.create(**request_params) + response = await client.responses.create(**request_params) - response_text = response.output_text - logger.info("Response generated", log_prefix=log_prefix, response=response_text) + response_text = response.output_text + logger.info("Response generated", log_prefix=log_prefix, response=response_text) - # Extract and store response_id if available - response_id = getattr(response, "id", None) or getattr( - response, "response_id", None - ) + # Extract and store response_id if available + response_id = getattr(response, "id", None) or getattr( + response, "response_id", None + ) - return response_text, response_id, response + return response_text, response_id, response + except Exception as e: + logger.error("Exception in non-streaming response", error=str(e)) + import traceback + + traceback.print_exc() + raise # Unified streaming function for both chat and langflow From 3c6c8f999be6491aff389b32803ceb43cbc2ce63 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:22:20 -0400 Subject: [PATCH 20/21] compose versions env vars --- docker-compose-cpu.yml | 13 +++++++------ docker-compose.yml | 10 +++++----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 3279561a..0414a523 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -1,8 +1,9 @@ services: opensearch: - build: - context: . - dockerfile: Dockerfile + image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest} + #build: + # context: . + # dockerfile: Dockerfile container_name: os depends_on: - openrag-backend @@ -38,7 +39,7 @@ services: - "5601:5601" openrag-backend: - image: phact/openrag-backend:latest + image: phact/openrag-backend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.backend @@ -72,7 +73,7 @@ services: - ./keys:/app/keys:Z openrag-frontend: - image: phact/openrag-frontend:latest + image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.frontend @@ -87,7 +88,7 @@ services: langflow: volumes: - ./flows:/app/flows:Z - image: phact/langflow:responses + image: phact/langflow:${LANGFLOW_VERSION:-responses} container_name: langflow ports: - "7860:7860" diff --git a/docker-compose.yml b/docker-compose.yml index 16e9046f..23499cb9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: opensearch: - image: phact/openrag-opensearch:latest + image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile @@ -39,7 +39,7 @@ services: - "5601:5601" openrag-backend: - image: phact/openrag-backend:latest + image: phact/openrag-backend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.backend @@ -73,7 +73,7 @@ services: gpus: all openrag-frontend: - image: phact/openrag-frontend:latest + image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.frontend @@ -88,7 +88,7 @@ services: langflow: volumes: - ./flows:/app/flows:Z - image: phact/langflow:responses + image: phact/langflow:${LANGFLOW_VERSION:-responses} container_name: langflow ports: - "7860:7860" @@ -104,4 +104,4 @@ services: - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE} - - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} \ No newline at end of file + - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} From 5a5541ce00c3774ac48f2207192c3fd1d83ec5c1 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:22:29 -0400 Subject: [PATCH 21/21] 0.1.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3ed39646..2b4e821d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "openrag" -version = "0.1.0" +version = "0.1.1" description = "Add your description here" readme = "README.md" requires-python = ">=3.13"