From aad89b91667a8ae9c15e83f916ccb8246600cdad Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:15:46 -0300 Subject: [PATCH 01/26] Add nudges service into backend --- src/agent.py | 163 ++++++++++++----- src/api/nudges.py | 43 +++++ src/config/settings.py | 32 +++- src/main.py | 84 +++++++-- src/services/chat_service.py | 244 ++++++++++++++++++------- src/tui/managers/env_manager.py | 313 +++++++++++++++++++++----------- 6 files changed, 639 insertions(+), 240 deletions(-) create mode 100644 src/api/nudges.py diff --git a/src/agent.py b/src/agent.py index 07fd911e..778607bc 100644 --- a/src/agent.py +++ b/src/agent.py @@ -95,7 +95,9 @@ async def async_response_stream( chunk_count = 0 async for chunk in response: chunk_count += 1 - logger.debug("Stream chunk received", chunk_count=chunk_count, chunk=str(chunk)) + logger.debug( + "Stream chunk received", chunk_count=chunk_count, chunk=str(chunk) + ) # Yield the raw event as JSON for the UI to process import json @@ -171,6 +173,10 @@ async def async_response( if extra_headers: request_params["extra_headers"] = extra_headers + if "x-api-key" not in client.default_headers: + if hasattr(client, "api_key") and extra_headers is not None: + extra_headers["x-api-key"] = client.api_key + response = await client.responses.create(**request_params) response_text = response.output_text @@ -241,7 +247,10 @@ async def async_langflow_stream( previous_response_id=previous_response_id, log_prefix="langflow", ): - logger.debug("Yielding chunk from langflow stream", chunk_preview=chunk[:100].decode('utf-8', errors='replace')) + logger.debug( + "Yielding chunk from langflow stream", + chunk_preview=chunk[:100].decode("utf-8", errors="replace"), + ) yield chunk logger.debug("Langflow stream completed") except Exception as e: @@ -260,18 +269,24 @@ async def async_chat( model: str = "gpt-4.1-mini", previous_response_id: str = None, ): - logger.debug("async_chat called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_chat called", user_id=user_id, previous_response_id=previous_response_id + ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) - logger.debug("Got conversation state", message_count=len(conversation_state['messages'])) + logger.debug( + "Got conversation state", message_count=len(conversation_state["messages"]) + ) # Add user message to conversation with timestamp from datetime import datetime user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} conversation_state["messages"].append(user_message) - logger.debug("Added user message", message_count=len(conversation_state['messages'])) + logger.debug( + "Added user message", message_count=len(conversation_state["messages"]) + ) response_text, response_id = await async_response( async_client, @@ -280,7 +295,9 @@ async def async_chat( previous_response_id=previous_response_id, log_prefix="agent", ) - logger.debug("Got response", response_preview=response_text[:50], response_id=response_id) + logger.debug( + "Got response", response_preview=response_text[:50], response_id=response_id + ) # Add assistant response to conversation with response_id and timestamp assistant_message = { @@ -290,17 +307,26 @@ async def async_chat( "timestamp": datetime.now(), } conversation_state["messages"].append(assistant_message) - logger.debug("Added assistant message", message_count=len(conversation_state['messages'])) + logger.debug( + "Added assistant message", message_count=len(conversation_state["messages"]) + ) # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored conversation thread", user_id=user_id, response_id=response_id + ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) + logger.debug( + "User conversations updated", + user_id=user_id, + conversation_count=len(conversations), + conversation_ids=list(conversations.keys()), + ) else: logger.warning("No response_id received, conversation not stored") @@ -363,7 +389,9 @@ async def async_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug("Stored conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored conversation thread", user_id=user_id, response_id=response_id + ) # Async langflow function with conversation storage (non-streaming) @@ -374,19 +402,32 @@ async def async_langflow_chat( user_id: str, extra_headers: dict = None, previous_response_id: str = None, + store_conversation: bool = True, ): - logger.debug("async_langflow_chat called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_langflow_chat called", + user_id=user_id, + previous_response_id=previous_response_id, + ) - # Get the specific conversation thread (or create new one) - conversation_state = get_conversation_thread(user_id, previous_response_id) - logger.debug("Got langflow conversation state", message_count=len(conversation_state['messages'])) + if store_conversation: + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) + logger.debug( + "Got langflow conversation state", + message_count=len(conversation_state["messages"]), + ) # Add user message to conversation with timestamp from datetime import datetime - user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} - conversation_state["messages"].append(user_message) - logger.debug("Added user message to langflow", message_count=len(conversation_state['messages'])) + if store_conversation: + user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} + conversation_state["messages"].append(user_message) + logger.debug( + "Added user message to langflow", + message_count=len(conversation_state["messages"]), + ) response_text, response_id = await async_response( langflow_client, @@ -396,45 +437,69 @@ async def async_langflow_chat( previous_response_id=previous_response_id, log_prefix="langflow", ) - logger.debug("Got langflow response", response_preview=response_text[:50], response_id=response_id) + logger.debug( + "Got langflow response", + response_preview=response_text[:50], + response_id=response_id, + ) - # Add assistant response to conversation with response_id and timestamp - assistant_message = { - "role": "assistant", - "content": response_text, - "response_id": response_id, - "timestamp": datetime.now(), - } - conversation_state["messages"].append(assistant_message) - logger.debug("Added assistant message to langflow", message_count=len(conversation_state['messages'])) + if store_conversation: + # Add assistant response to conversation with response_id and timestamp + assistant_message = { + "role": "assistant", + "content": response_text, + "response_id": response_id, + "timestamp": datetime.now(), + } + conversation_state["messages"].append(assistant_message) + logger.debug( + "Added assistant message to langflow", + message_count=len(conversation_state["messages"]), + ) + + if not store_conversation: + return response_text, response_id # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership if this is a Google user try: from services.session_ownership_service import session_ownership_service from services.user_binding_service import user_binding_service - + # Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID) if user_id.isdigit() and user_binding_service.has_binding(user_id): langflow_user_id = user_binding_service.get_langflow_user_id(user_id) if langflow_user_id: - session_ownership_service.claim_session(user_id, response_id, langflow_user_id) - print(f"[DEBUG] Claimed session {response_id} for Google user {user_id}") + session_ownership_service.claim_session( + user_id, response_id, langflow_user_id + ) + print( + f"[DEBUG] Claimed session {response_id} for Google user {user_id}" + ) except Exception as e: print(f"[WARNING] Failed to claim session ownership: {e}") - + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) - logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored langflow conversation thread", + user_id=user_id, + response_id=response_id, + ) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) - logger.debug("User conversations updated", user_id=user_id, conversation_count=len(conversations), conversation_ids=list(conversations.keys())) + logger.debug( + "User conversations updated", + user_id=user_id, + conversation_count=len(conversations), + conversation_ids=list(conversations.keys()), + ) else: logger.warning("No response_id received from langflow, conversation not stored") @@ -450,7 +515,11 @@ async def async_langflow_chat_stream( extra_headers: dict = None, previous_response_id: str = None, ): - logger.debug("async_langflow_chat_stream called", user_id=user_id, previous_response_id=previous_response_id) + logger.debug( + "async_langflow_chat_stream called", + user_id=user_id, + previous_response_id=previous_response_id, + ) # Get the specific conversation thread (or create new one) conversation_state = get_conversation_thread(user_id, previous_response_id) @@ -501,22 +570,32 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - + # Claim session ownership if this is a Google user try: from services.session_ownership_service import session_ownership_service from services.user_binding_service import user_binding_service - + # Check if this is a Google user (Google IDs are numeric, Langflow IDs are UUID) if user_id.isdigit() and user_binding_service.has_binding(user_id): - langflow_user_id = user_binding_service.get_langflow_user_id(user_id) + langflow_user_id = user_binding_service.get_langflow_user_id( + user_id + ) if langflow_user_id: - session_ownership_service.claim_session(user_id, response_id, langflow_user_id) - print(f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)") + session_ownership_service.claim_session( + user_id, response_id, langflow_user_id + ) + print( + f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)" + ) except Exception as e: print(f"[WARNING] Failed to claim session ownership (streaming): {e}") - + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) - logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) + logger.debug( + "Stored langflow conversation thread", + user_id=user_id, + response_id=response_id, + ) diff --git a/src/api/nudges.py b/src/api/nudges.py new file mode 100644 index 00000000..910542da --- /dev/null +++ b/src/api/nudges.py @@ -0,0 +1,43 @@ +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def nudges_from_kb_endpoint(request: Request, chat_service, session_manager): + """Get nudges for a user""" + user = request.state.user + user_id = user.user_id + jwt_token = request.state.jwt_token + + try: + result = await chat_service.langflow_nudges_chat( + user_id, + jwt_token, + ) + return JSONResponse(result) + except Exception as e: + return JSONResponse( + {"error": f"Failed to get nudges: {str(e)}"}, status_code=500 + ) + + +async def nudges_from_chat_id_endpoint(request: Request, chat_service, session_manager): + """Get nudges for a user""" + user = request.state.user + user_id = user.user_id + chat_id = request.path_params["chat_id"] + jwt_token = request.state.jwt_token + + try: + result = await chat_service.langflow_nudges_chat( + user_id, + jwt_token, + previous_response_id=chat_id, + ) + return JSONResponse(result) + except Exception as e: + return JSONResponse( + {"error": f"Failed to get nudges: {str(e)}"}, status_code=500 + ) diff --git a/src/config/settings.py b/src/config/settings.py index c9a02e89..b3592f66 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -24,6 +24,7 @@ LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860") # Optional: public URL for browser links (e.g., http://localhost:7860) LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") FLOW_ID = os.getenv("FLOW_ID") +NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID") # Langflow superuser credentials for API key generation LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER") LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD") @@ -37,7 +38,12 @@ GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) - logger.debug("Checking auth mode", no_auth_mode=result, has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None) + logger.debug( + "Checking auth mode", + no_auth_mode=result, + has_client_id=GOOGLE_OAUTH_CLIENT_ID is not None, + has_client_secret=GOOGLE_OAUTH_CLIENT_SECRET is not None, + ) return result @@ -100,7 +106,9 @@ async def generate_langflow_api_key(): return LANGFLOW_KEY if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: - logger.warning("LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation") + logger.warning( + "LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation" + ) return None try: @@ -142,11 +150,19 @@ async def generate_langflow_api_key(): raise KeyError("api_key") LANGFLOW_KEY = api_key - logger.info("Successfully generated Langflow API key", api_key_preview=api_key[:8]) + logger.info( + "Successfully generated Langflow API key", + api_key_preview=api_key[:8], + ) return api_key except (requests.exceptions.RequestException, KeyError) as e: last_error = e - logger.warning("Attempt to generate Langflow API key failed", attempt=attempt, max_attempts=max_attempts, error=str(e)) + logger.warning( + "Attempt to generate Langflow API key failed", + attempt=attempt, + max_attempts=max_attempts, + error=str(e), + ) if attempt < max_attempts: time.sleep(delay_seconds) else: @@ -196,7 +212,9 @@ class AppClients: logger.warning("Failed to initialize Langflow client", error=str(e)) self.langflow_client = None if self.langflow_client is None: - logger.warning("No Langflow client initialized yet, will attempt later on first use") + logger.warning( + "No Langflow client initialized yet, will attempt later on first use" + ) # Initialize patched OpenAI client self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI()) @@ -219,7 +237,9 @@ class AppClients: ) logger.info("Langflow client initialized on-demand") except Exception as e: - logger.error("Failed to initialize Langflow client on-demand", error=str(e)) + logger.error( + "Failed to initialize Langflow client on-demand", error=str(e) + ) self.langflow_client = None return self.langflow_client diff --git a/src/main.py b/src/main.py index 7230e201..5e22dae7 100644 --- a/src/main.py +++ b/src/main.py @@ -3,11 +3,13 @@ import sys # Check for TUI flag FIRST, before any heavy imports if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui": from tui.main import run_tui + run_tui() sys.exit(0) # Configure structured logging early from utils.logging_config import configure_from_env, get_logger + configure_from_env() logger = get_logger(__name__) @@ -48,6 +50,7 @@ from auth_middleware import require_auth, optional_auth # API endpoints from api import ( + nudges, upload, search, chat, @@ -59,7 +62,11 @@ from api import ( settings, ) -logger.info("CUDA device information", cuda_available=torch.cuda.is_available(), cuda_version=torch.version.cuda) +logger.info( + "CUDA device information", + cuda_available=torch.cuda.is_available(), + cuda_version=torch.version.cuda, +) async def wait_for_opensearch(): @@ -73,7 +80,12 @@ async def wait_for_opensearch(): logger.info("OpenSearch is ready") return except Exception as e: - logger.warning("OpenSearch not ready yet", attempt=attempt + 1, max_retries=max_retries, error=str(e)) + logger.warning( + "OpenSearch not ready yet", + attempt=attempt + 1, + max_retries=max_retries, + error=str(e), + ) if attempt < max_retries - 1: await asyncio.sleep(retry_delay) else: @@ -95,7 +107,9 @@ async def configure_alerting_security(): # Use admin client (clients.opensearch uses admin credentials) response = await clients.opensearch.cluster.put_settings(body=alerting_settings) - logger.info("Alerting security settings configured successfully", response=response) + logger.info( + "Alerting security settings configured successfully", response=response + ) except Exception as e: logger.warning("Failed to configure alerting security settings", error=str(e)) # Don't fail startup if alerting config fails @@ -135,9 +149,14 @@ async def init_index(): await clients.opensearch.indices.create( index=knowledge_filter_index_name, body=knowledge_filter_index_body ) - logger.info("Created knowledge filters index", index_name=knowledge_filter_index_name) + logger.info( + "Created knowledge filters index", index_name=knowledge_filter_index_name + ) else: - logger.info("Knowledge filters index already exists, skipping creation", index_name=knowledge_filter_index_name) + logger.info( + "Knowledge filters index already exists, skipping creation", + index_name=knowledge_filter_index_name, + ) # Configure alerting plugin security settings await configure_alerting_security() @@ -192,7 +211,9 @@ async def init_index_when_ready(): logger.info("OpenSearch index initialization completed successfully") except Exception as e: logger.error("OpenSearch index initialization failed", error=str(e)) - logger.warning("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") + logger.warning( + "OIDC endpoints will still work, but document operations may fail until OpenSearch is ready" + ) async def initialize_services(): @@ -239,9 +260,14 @@ async def initialize_services(): try: await connector_service.initialize() loaded_count = len(connector_service.connection_manager.connections) - logger.info("Loaded persisted connector connections on startup", loaded_count=loaded_count) + logger.info( + "Loaded persisted connector connections on startup", + loaded_count=loaded_count, + ) except Exception as e: - logger.warning("Failed to load persisted connections on startup", error=str(e)) + logger.warning( + "Failed to load persisted connections on startup", error=str(e) + ) else: logger.info("Skipping connector loading in no-auth mode") @@ -626,6 +652,28 @@ async def create_app(): ), methods=["GET"], ), + Route( + "/nudges", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_kb_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/nudges/{chat_id}", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_chat_id_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), ] app = Starlette(debug=True, routes=routes) @@ -678,18 +726,30 @@ async def cleanup_subscriptions_proper(services): for connection in active_connections: try: - logger.info("Cancelling subscription for connection", connection_id=connection.connection_id) + logger.info( + "Cancelling subscription for connection", + connection_id=connection.connection_id, + ) connector = await connector_service.get_connector( connection.connection_id ) if connector: subscription_id = connection.config.get("webhook_channel_id") await connector.cleanup_subscription(subscription_id) - logger.info("Cancelled subscription", subscription_id=subscription_id) + logger.info( + "Cancelled subscription", subscription_id=subscription_id + ) except Exception as e: - logger.error("Failed to cancel subscription", connection_id=connection.connection_id, error=str(e)) + logger.error( + "Failed to cancel subscription", + connection_id=connection.connection_id, + error=str(e), + ) - logger.info("Finished cancelling subscriptions", subscription_count=len(active_connections)) + logger.info( + "Finished cancelling subscriptions", + subscription_count=len(active_connections), + ) except Exception as e: logger.error("Failed to cleanup subscriptions", error=str(e)) diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 93fddcc8..bd2d5b90 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,5 +1,9 @@ -from config.settings import clients, LANGFLOW_URL, FLOW_ID -from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream +from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID +from agent import ( + async_chat, + async_langflow, + async_chat_stream, +) from auth_context import set_auth_context import json from utils.logging_config import get_logger @@ -111,7 +115,10 @@ class ChatService: # Pass the complete filter expression as a single header to Langflow (only if we have something to send) if filter_expression: - logger.info("Sending OpenRAG query filter to Langflow", filter_expression=filter_expression) + logger.info( + "Sending OpenRAG query filter to Langflow", + filter_expression=filter_expression, + ) extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps( filter_expression ) @@ -150,6 +157,62 @@ class ChatService: response_data["response_id"] = response_id return response_data + async def langflow_nudges_chat( + self, + user_id: str = None, + jwt_token: str = None, + previous_response_id: str = None, + ): + """Handle Langflow chat requests""" + + if not LANGFLOW_URL or not NUDGES_FLOW_ID: + raise ValueError( + "LANGFLOW_URL and NUDGES_FLOW_ID environment variables are required" + ) + + # Prepare extra headers for JWT authentication + extra_headers = {} + if jwt_token: + extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token + + # Ensure the Langflow client exists; try lazy init if needed + langflow_client = await clients.ensure_langflow_client() + if not langflow_client: + raise ValueError( + "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." + ) + prompt = "" + if previous_response_id: + from agent import get_conversation_thread + + conversation_history = get_conversation_thread( + user_id, previous_response_id + ) + if conversation_history: + conversation_history = "\n".join( + [ + f"{msg['role']}: {msg['content']}" + for msg in conversation_history["messages"] + if msg["role"] in ["user", "assistant"] + ] + ) + prompt = f"{conversation_history}" + + from agent import async_langflow_chat + + response_text, response_id = await async_langflow_chat( + langflow_client, + NUDGES_FLOW_ID, + prompt, + user_id, + extra_headers=extra_headers, + store_conversation=False, + ) + response_data = {"response": response_text} + if response_id: + response_data["response_id"] = response_id + return response_data + async def upload_context_chat( self, document_content: str, @@ -201,7 +264,11 @@ class ChatService: return {"error": "User ID is required", "conversations": []} conversations_dict = get_user_conversations(user_id) - logger.debug("Getting chat history for user", user_id=user_id, conversation_count=len(conversations_dict)) + logger.debug( + "Getting chat history for user", + user_id=user_id, + conversation_count=len(conversations_dict), + ) # Convert conversations dict to list format with metadata conversations = [] @@ -270,16 +337,16 @@ class ChatService: from agent import get_user_conversations from services.langflow_history_service import langflow_history_service from services.user_binding_service import user_binding_service - + if not user_id: return {"error": "User ID is required", "conversations": []} - + all_conversations = [] - + try: # 1. Get in-memory OpenRAG conversations (current session) conversations_dict = get_user_conversations(user_id) - + for response_id, conversation_state in conversations_dict.items(): # Filter out system messages messages = [] @@ -295,7 +362,7 @@ class ChatService: if msg.get("response_id"): message_data["response_id"] = msg["response_id"] messages.append(message_data) - + if messages: # Only include conversations with actual messages # Generate title from first user message first_user_msg = next( @@ -308,43 +375,59 @@ class ChatService: if first_user_msg else "New chat" ) - - all_conversations.append({ - "response_id": response_id, - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() - if conversation_state.get("created_at") - else None, - "last_activity": conversation_state.get("last_activity").isoformat() - if conversation_state.get("last_activity") - else None, - "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len(messages), - "source": "openrag_memory" - }) - - # 2. Get historical conversations from Langflow database + + all_conversations.append( + { + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation_state.get( + "created_at" + ).isoformat() + if conversation_state.get("created_at") + else None, + "last_activity": conversation_state.get( + "last_activity" + ).isoformat() + if conversation_state.get("last_activity") + else None, + "previous_response_id": conversation_state.get( + "previous_response_id" + ), + "total_messages": len(messages), + "source": "openrag_memory", + } + ) + + # 2. Get historical conversations from Langflow database # (works with both Google-bound users and direct Langflow users) print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") - langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID) - + langflow_history = ( + await langflow_history_service.get_user_conversation_history( + user_id, flow_id=FLOW_ID + ) + ) + if langflow_history.get("conversations"): for conversation in langflow_history["conversations"]: # Convert Langflow format to OpenRAG format messages = [] for msg in conversation.get("messages", []): - messages.append({ - "role": msg["role"], - "content": msg["content"], - "timestamp": msg.get("timestamp"), - "langflow_message_id": msg.get("langflow_message_id"), - "source": "langflow" - }) - + messages.append( + { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("langflow_message_id"), + "source": "langflow", + } + ) + if messages: - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) title = ( first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 @@ -352,33 +435,39 @@ class ChatService: if first_user_msg else "Langflow chat" ) - - all_conversations.append({ - "response_id": conversation["session_id"], - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation.get("created_at"), - "last_activity": conversation.get("last_activity"), - "total_messages": len(messages), - "source": "langflow_database", - "langflow_session_id": conversation["session_id"], - "langflow_flow_id": conversation.get("flow_id") - }) - - print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + + all_conversations.append( + { + "response_id": conversation["session_id"], + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation.get("created_at"), + "last_activity": conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_database", + "langflow_session_id": conversation["session_id"], + "langflow_flow_id": conversation.get("flow_id"), + } + ) + + print( + f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + ) elif langflow_history.get("error"): - print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + print( + f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + ) else: print(f"[DEBUG] No Langflow conversations found for user {user_id}") - + except Exception as e: print(f"[ERROR] Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations - + # Deduplicate conversations by response_id (in-memory takes priority over database) deduplicated_conversations = {} - + for conversation in all_conversations: response_id = conversation.get("response_id") if response_id: @@ -390,37 +479,52 @@ class ChatService: existing = deduplicated_conversations[response_id] current_source = conversation.get("source") existing_source = existing.get("source") - - if current_source == "openrag_memory" and existing_source == "langflow_database": + + if ( + current_source == "openrag_memory" + and existing_source == "langflow_database" + ): # Replace database version with in-memory version deduplicated_conversations[response_id] = conversation - print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version") + print( + f"[DEBUG] Replaced database conversation {response_id} with in-memory version" + ) # Otherwise keep existing (in-memory has priority, or first database entry) else: # No response_id - add with unique key based on content and timestamp unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}" if unique_key not in deduplicated_conversations: deduplicated_conversations[unique_key] = conversation - + final_conversations = list(deduplicated_conversations.values()) - + # Sort by last activity (most recent first) final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - + # Calculate source statistics after deduplication sources = { - "memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]), - "langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]), - "duplicates_removed": len(all_conversations) - len(final_conversations) + "memory": len( + [c for c in final_conversations if c.get("source") == "openrag_memory"] + ), + "langflow_db": len( + [ + c + for c in final_conversations + if c.get("source") == "langflow_database" + ] + ), + "duplicates_removed": len(all_conversations) - len(final_conversations), } - + if sources["duplicates_removed"] > 0: - print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations") - + print( + f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations" + ) + return { "user_id": user_id, "endpoint": "langflow", "conversations": final_conversations, "total_conversations": len(final_conversations), - "sources": sources + "sources": sources, } diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index 61ec2f07..b163fd78 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -13,13 +13,14 @@ from ..utils.validation import ( validate_non_empty, validate_url, validate_documents_paths, - sanitize_env_value + sanitize_env_value, ) @dataclass class EnvConfig: """Environment configuration data.""" + # Core settings openai_api_key: str = "" opensearch_password: str = "" @@ -27,155 +28,187 @@ class EnvConfig: langflow_superuser: str = "admin" langflow_superuser_password: str = "" flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" - + # OAuth settings google_oauth_client_id: str = "" google_oauth_client_secret: str = "" microsoft_graph_oauth_client_id: str = "" microsoft_graph_oauth_client_secret: str = "" - + # Optional settings webhook_base_url: str = "" aws_access_key_id: str = "" aws_secret_access_key: str = "" langflow_public_url: str = "" - + # Langflow auth settings langflow_auto_login: str = "False" langflow_new_user_is_active: str = "False" langflow_enable_superuser_cli: str = "False" - + # Document paths (comma-separated) openrag_documents_paths: str = "./documents" - + # Validation errors validation_errors: Dict[str, str] = field(default_factory=dict) class EnvManager: """Manages environment configuration for OpenRAG.""" - + def __init__(self, env_file: Optional[Path] = None): self.env_file = env_file or Path(".env") self.config = EnvConfig() - + def generate_secure_password(self) -> str: """Generate a secure password for OpenSearch.""" # Generate a 16-character password with letters, digits, and symbols alphabet = string.ascii_letters + string.digits + "!@#$%^&*" - return ''.join(secrets.choice(alphabet) for _ in range(16)) - + return "".join(secrets.choice(alphabet) for _ in range(16)) + def generate_langflow_secret_key(self) -> str: """Generate a secure secret key for Langflow.""" return secrets.token_urlsafe(32) - + def load_existing_env(self) -> bool: """Load existing .env file if it exists.""" if not self.env_file.exists(): return False - + try: - with open(self.env_file, 'r') as f: + with open(self.env_file, "r") as f: for line in f: line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue - - if '=' in line: - key, value = line.split('=', 1) + + if "=" in line: + key, value = line.split("=", 1) key = key.strip() value = sanitize_env_value(value) - + # Map env vars to config attributes attr_map = { - 'OPENAI_API_KEY': 'openai_api_key', - 'OPENSEARCH_PASSWORD': 'opensearch_password', - 'LANGFLOW_SECRET_KEY': 'langflow_secret_key', - 'LANGFLOW_SUPERUSER': 'langflow_superuser', - 'LANGFLOW_SUPERUSER_PASSWORD': 'langflow_superuser_password', - 'FLOW_ID': 'flow_id', - 'GOOGLE_OAUTH_CLIENT_ID': 'google_oauth_client_id', - 'GOOGLE_OAUTH_CLIENT_SECRET': 'google_oauth_client_secret', - 'MICROSOFT_GRAPH_OAUTH_CLIENT_ID': 'microsoft_graph_oauth_client_id', - 'MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET': 'microsoft_graph_oauth_client_secret', - 'WEBHOOK_BASE_URL': 'webhook_base_url', - 'AWS_ACCESS_KEY_ID': 'aws_access_key_id', - 'AWS_SECRET_ACCESS_KEY': 'aws_secret_access_key', - 'LANGFLOW_PUBLIC_URL': 'langflow_public_url', - 'OPENRAG_DOCUMENTS_PATHS': 'openrag_documents_paths', - 'LANGFLOW_AUTO_LOGIN': 'langflow_auto_login', - 'LANGFLOW_NEW_USER_IS_ACTIVE': 'langflow_new_user_is_active', - 'LANGFLOW_ENABLE_SUPERUSER_CLI': 'langflow_enable_superuser_cli', + "OPENAI_API_KEY": "openai_api_key", + "OPENSEARCH_PASSWORD": "opensearch_password", + "LANGFLOW_SECRET_KEY": "langflow_secret_key", + "LANGFLOW_SUPERUSER": "langflow_superuser", + "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", + "FLOW_ID": "flow_id", + "NUDGES_FLOW_ID": "nudges_flow_id", + "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", + "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", + "MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id", + "MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET": "microsoft_graph_oauth_client_secret", + "WEBHOOK_BASE_URL": "webhook_base_url", + "AWS_ACCESS_KEY_ID": "aws_access_key_id", + "AWS_SECRET_ACCESS_KEY": "aws_secret_access_key", + "LANGFLOW_PUBLIC_URL": "langflow_public_url", + "OPENRAG_DOCUMENTS_PATHS": "openrag_documents_paths", + "LANGFLOW_AUTO_LOGIN": "langflow_auto_login", + "LANGFLOW_NEW_USER_IS_ACTIVE": "langflow_new_user_is_active", + "LANGFLOW_ENABLE_SUPERUSER_CLI": "langflow_enable_superuser_cli", } - + if key in attr_map: setattr(self.config, attr_map[key], value) - + return True - + except Exception as e: print(f"Error loading .env file: {e}") return False - + def setup_secure_defaults(self) -> None: """Set up secure default values for passwords and keys.""" if not self.config.opensearch_password: self.config.opensearch_password = self.generate_secure_password() - + if not self.config.langflow_secret_key: self.config.langflow_secret_key = self.generate_langflow_secret_key() - + if not self.config.langflow_superuser_password: self.config.langflow_superuser_password = self.generate_secure_password() - + def validate_config(self, mode: str = "full") -> bool: """ Validate the current configuration. - + Args: mode: "no_auth" for minimal validation, "full" for complete validation """ self.config.validation_errors.clear() - + # Always validate OpenAI API key if not validate_openai_api_key(self.config.openai_api_key): - self.config.validation_errors['openai_api_key'] = "Invalid OpenAI API key format (should start with sk-)" - + self.config.validation_errors["openai_api_key"] = ( + "Invalid OpenAI API key format (should start with sk-)" + ) + # Validate documents paths only if provided (optional) if self.config.openrag_documents_paths: - is_valid, error_msg, _ = validate_documents_paths(self.config.openrag_documents_paths) + is_valid, error_msg, _ = validate_documents_paths( + self.config.openrag_documents_paths + ) if not is_valid: - self.config.validation_errors['openrag_documents_paths'] = error_msg - + self.config.validation_errors["openrag_documents_paths"] = error_msg + # Validate required fields if not validate_non_empty(self.config.opensearch_password): - self.config.validation_errors['opensearch_password'] = "OpenSearch password is required" - + self.config.validation_errors["opensearch_password"] = ( + "OpenSearch password is required" + ) + # Langflow secret key is auto-generated; no user input required if not validate_non_empty(self.config.langflow_superuser_password): - self.config.validation_errors['langflow_superuser_password'] = "Langflow superuser password is required" - + self.config.validation_errors["langflow_superuser_password"] = ( + "Langflow superuser password is required" + ) + if mode == "full": # Validate OAuth settings if provided - if self.config.google_oauth_client_id and not validate_google_oauth_client_id(self.config.google_oauth_client_id): - self.config.validation_errors['google_oauth_client_id'] = "Invalid Google OAuth client ID format" - - if self.config.google_oauth_client_id and not validate_non_empty(self.config.google_oauth_client_secret): - self.config.validation_errors['google_oauth_client_secret'] = "Google OAuth client secret required when client ID is provided" - - if self.config.microsoft_graph_oauth_client_id and not validate_non_empty(self.config.microsoft_graph_oauth_client_secret): - self.config.validation_errors['microsoft_graph_oauth_client_secret'] = "Microsoft Graph client secret required when client ID is provided" - + if ( + self.config.google_oauth_client_id + and not validate_google_oauth_client_id( + self.config.google_oauth_client_id + ) + ): + self.config.validation_errors["google_oauth_client_id"] = ( + "Invalid Google OAuth client ID format" + ) + + if self.config.google_oauth_client_id and not validate_non_empty( + self.config.google_oauth_client_secret + ): + self.config.validation_errors["google_oauth_client_secret"] = ( + "Google OAuth client secret required when client ID is provided" + ) + + if self.config.microsoft_graph_oauth_client_id and not validate_non_empty( + self.config.microsoft_graph_oauth_client_secret + ): + self.config.validation_errors["microsoft_graph_oauth_client_secret"] = ( + "Microsoft Graph client secret required when client ID is provided" + ) + # Validate optional URLs if provided - if self.config.webhook_base_url and not validate_url(self.config.webhook_base_url): - self.config.validation_errors['webhook_base_url'] = "Invalid webhook URL format" - - if self.config.langflow_public_url and not validate_url(self.config.langflow_public_url): - self.config.validation_errors['langflow_public_url'] = "Invalid Langflow public URL format" - + if self.config.webhook_base_url and not validate_url( + self.config.webhook_base_url + ): + self.config.validation_errors["webhook_base_url"] = ( + "Invalid webhook URL format" + ) + + if self.config.langflow_public_url and not validate_url( + self.config.langflow_public_url + ): + self.config.validation_errors["langflow_public_url"] = ( + "Invalid Langflow public URL format" + ) + return len(self.config.validation_errors) == 0 - + def save_env_file(self) -> bool: """Save current configuration to .env file.""" try: @@ -183,44 +216,67 @@ class EnvManager: self.setup_secure_defaults() # Create backup if file exists if self.env_file.exists(): - backup_file = self.env_file.with_suffix('.env.backup') + backup_file = self.env_file.with_suffix(".env.backup") self.env_file.rename(backup_file) - - with open(self.env_file, 'w') as f: + + with open(self.env_file, "w") as f: f.write("# OpenRAG Environment Configuration\n") f.write("# Generated by OpenRAG TUI\n\n") - + # Core settings f.write("# Core settings\n") f.write(f"LANGFLOW_SECRET_KEY={self.config.langflow_secret_key}\n") f.write(f"LANGFLOW_SUPERUSER={self.config.langflow_superuser}\n") - f.write(f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n") + f.write( + f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n" + ) f.write(f"FLOW_ID={self.config.flow_id}\n") + f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n") f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") - f.write(f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n") + f.write( + f"OPENRAG_DOCUMENTS_PATHS={self.config.openrag_documents_paths}\n" + ) f.write("\n") - + # Langflow auth settings f.write("# Langflow auth settings\n") f.write(f"LANGFLOW_AUTO_LOGIN={self.config.langflow_auto_login}\n") - f.write(f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n") - f.write(f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n") + f.write( + f"LANGFLOW_NEW_USER_IS_ACTIVE={self.config.langflow_new_user_is_active}\n" + ) + f.write( + f"LANGFLOW_ENABLE_SUPERUSER_CLI={self.config.langflow_enable_superuser_cli}\n" + ) f.write("\n") - + # OAuth settings - if self.config.google_oauth_client_id or self.config.google_oauth_client_secret: + if ( + self.config.google_oauth_client_id + or self.config.google_oauth_client_secret + ): f.write("# Google OAuth settings\n") - f.write(f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n") - f.write(f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n") + f.write( + f"GOOGLE_OAUTH_CLIENT_ID={self.config.google_oauth_client_id}\n" + ) + f.write( + f"GOOGLE_OAUTH_CLIENT_SECRET={self.config.google_oauth_client_secret}\n" + ) f.write("\n") - - if self.config.microsoft_graph_oauth_client_id or self.config.microsoft_graph_oauth_client_secret: + + if ( + self.config.microsoft_graph_oauth_client_id + or self.config.microsoft_graph_oauth_client_secret + ): f.write("# Microsoft Graph OAuth settings\n") - f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n") - f.write(f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n") + f.write( + f"MICROSOFT_GRAPH_OAUTH_CLIENT_ID={self.config.microsoft_graph_oauth_client_id}\n" + ) + f.write( + f"MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET={self.config.microsoft_graph_oauth_client_secret}\n" + ) f.write("\n") - + # Optional settings optional_vars = [ ("WEBHOOK_BASE_URL", self.config.webhook_base_url), @@ -228,7 +284,7 @@ class EnvManager: ("AWS_SECRET_ACCESS_KEY", self.config.aws_secret_access_key), ("LANGFLOW_PUBLIC_URL", self.config.langflow_public_url), ] - + optional_written = False for var_name, var_value in optional_vars: if var_value: @@ -236,52 +292,89 @@ class EnvManager: f.write("# Optional settings\n") optional_written = True f.write(f"{var_name}={var_value}\n") - + if optional_written: f.write("\n") - + return True - + except Exception as e: print(f"Error saving .env file: {e}") return False - + def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]: """Get fields required for no-auth setup mode. Returns (field_name, display_name, placeholder, can_generate).""" return [ ("openai_api_key", "OpenAI API Key", "sk-...", False), - ("opensearch_password", "OpenSearch Password", "Will be auto-generated if empty", True), - ("langflow_superuser_password", "Langflow Superuser Password", "Will be auto-generated if empty", True), - ("openrag_documents_paths", "Documents Paths", "./documents,/path/to/more/docs", False), + ( + "opensearch_password", + "OpenSearch Password", + "Will be auto-generated if empty", + True, + ), + ( + "langflow_superuser_password", + "Langflow Superuser Password", + "Will be auto-generated if empty", + True, + ), + ( + "openrag_documents_paths", + "Documents Paths", + "./documents,/path/to/more/docs", + False, + ), ] - + def get_full_setup_fields(self) -> List[tuple[str, str, str, bool]]: """Get all fields for full setup mode.""" base_fields = self.get_no_auth_setup_fields() - + oauth_fields = [ - ("google_oauth_client_id", "Google OAuth Client ID", "xxx.apps.googleusercontent.com", False), + ( + "google_oauth_client_id", + "Google OAuth Client ID", + "xxx.apps.googleusercontent.com", + False, + ), ("google_oauth_client_secret", "Google OAuth Client Secret", "", False), ("microsoft_graph_oauth_client_id", "Microsoft Graph Client ID", "", False), - ("microsoft_graph_oauth_client_secret", "Microsoft Graph Client Secret", "", False), + ( + "microsoft_graph_oauth_client_secret", + "Microsoft Graph Client Secret", + "", + False, + ), ] - + optional_fields = [ - ("webhook_base_url", "Webhook Base URL (optional)", "https://your-domain.com", False), + ( + "webhook_base_url", + "Webhook Base URL (optional)", + "https://your-domain.com", + False, + ), ("aws_access_key_id", "AWS Access Key ID (optional)", "", False), ("aws_secret_access_key", "AWS Secret Access Key (optional)", "", False), - ("langflow_public_url", "Langflow Public URL (optional)", "http://localhost:7860", False), + ( + "langflow_public_url", + "Langflow Public URL (optional)", + "http://localhost:7860", + False, + ), ] - + return base_fields + oauth_fields + optional_fields - + def generate_compose_volume_mounts(self) -> List[str]: """Generate Docker Compose volume mount strings from documents paths.""" - is_valid, _, validated_paths = validate_documents_paths(self.config.openrag_documents_paths) - + is_valid, _, validated_paths = validate_documents_paths( + self.config.openrag_documents_paths + ) + if not is_valid: return ["./documents:/app/documents:Z"] # fallback - + volume_mounts = [] for i, path in enumerate(validated_paths): if i == 0: @@ -289,6 +382,6 @@ class EnvManager: volume_mounts.append(f"{path}:/app/documents:Z") else: # Additional paths map to numbered directories - volume_mounts.append(f"{path}:/app/documents{i+1}:Z") - + volume_mounts.append(f"{path}:/app/documents{i + 1}:Z") + return volume_mounts From 33ba6abf49ae47f2fe83948eec8c10e4a2d7721a Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:15:54 -0300 Subject: [PATCH 02/26] Add react query --- frontend/package-lock.json | 27 +++++++++++++++++++++++++++ frontend/package.json | 1 + 2 files changed, 28 insertions(+) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 103dd7aa..3883b3b2 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -22,6 +22,7 @@ "@radix-ui/react-switch": "^1.2.5", "@tailwindcss/forms": "^0.5.10", "@tailwindcss/typography": "^0.5.16", + "@tanstack/react-query": "^5.86.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.1.1", @@ -2203,6 +2204,32 @@ "node": ">=4" } }, + "node_modules/@tanstack/query-core": { + "version": "5.86.0", + "resolved": "https://registry.npmjs.org/@tanstack/query-core/-/query-core-5.86.0.tgz", + "integrity": "sha512-Y6ibQm6BXbw6w1p3a5LrPn8Ae64M0dx7hGmnhrm9P+XAkCCKXOwZN0J5Z1wK/0RdNHtR9o+sWHDXd4veNI60tQ==", + "license": "MIT", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/tannerlinsley" + } + }, + "node_modules/@tanstack/react-query": { + "version": "5.86.0", + "resolved": "https://registry.npmjs.org/@tanstack/react-query/-/react-query-5.86.0.tgz", + "integrity": "sha512-jgS/v0oSJkGHucv9zxOS8rL7mjATh1XO3K4eqAV4WMpAly8okcBrGi1YxRZN5S4B59F54x9JFjWrK5vMAvJYqA==", + "license": "MIT", + "dependencies": { + "@tanstack/query-core": "5.86.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/tannerlinsley" + }, + "peerDependencies": { + "react": "^18 || ^19" + } + }, "node_modules/@tybys/wasm-util": { "version": "0.10.0", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 4f1ebfd8..0731820b 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -23,6 +23,7 @@ "@radix-ui/react-switch": "^1.2.5", "@tailwindcss/forms": "^0.5.10", "@tailwindcss/typography": "^0.5.16", + "@tanstack/react-query": "^5.86.0", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "cmdk": "^1.1.1", From 1197f51518076d779f8a220d208afa19216ddb03 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:06 -0300 Subject: [PATCH 03/26] create useGetNudgesQuery to get nudges easily --- .../src/app/api/queries/useGetNudgesQuery.ts | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 frontend/src/app/api/queries/useGetNudgesQuery.ts diff --git a/frontend/src/app/api/queries/useGetNudgesQuery.ts b/frontend/src/app/api/queries/useGetNudgesQuery.ts new file mode 100644 index 00000000..809ca71e --- /dev/null +++ b/frontend/src/app/api/queries/useGetNudgesQuery.ts @@ -0,0 +1,42 @@ +import { + useQuery, + useQueryClient, + UseQueryOptions, +} from "@tanstack/react-query"; + +type Nudge = string; + +const DEFAULT_NUDGES = [ + "Show me this quarter's top 10 deals", + "Summarize recent client interactions", + "Search OpenSearch for mentions of our competitors", +]; + +export const useGetNudgesQuery = ( + chatId?: string | null, + options?: Omit, +) => { + const queryClient = useQueryClient(); + console.log(chatId); + async function getNudges(): Promise { + try { + const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`); + const data = await response.json(); + return data.response.split("\n").filter(Boolean) || DEFAULT_NUDGES; + } catch (error) { + console.error("Error getting nudges", error); + return DEFAULT_NUDGES; + } + } + + const queryResult = useQuery( + { + queryKey: ["nudges", chatId], + queryFn: getNudges, + ...options, + }, + queryClient, + ); + + return queryResult; +}; From 70d3434e5bd4c2c3a82f06d2f764902b0bedb769 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:16 -0300 Subject: [PATCH 04/26] Create get query client to work on next --- frontend/src/app/api/get-query-client.ts | 37 ++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 frontend/src/app/api/get-query-client.ts diff --git a/frontend/src/app/api/get-query-client.ts b/frontend/src/app/api/get-query-client.ts new file mode 100644 index 00000000..4602496c --- /dev/null +++ b/frontend/src/app/api/get-query-client.ts @@ -0,0 +1,37 @@ +import { + QueryClient, + defaultShouldDehydrateQuery, + isServer, +} from "@tanstack/react-query"; + +function makeQueryClient() { + return new QueryClient({ + defaultOptions: { + queries: { + staleTime: 60 * 1000, + }, + dehydrate: { + // include pending queries in dehydration + shouldDehydrateQuery: (query) => + defaultShouldDehydrateQuery(query) || + query.state.status === "pending", + }, + }, + }); +} + +let browserQueryClient: QueryClient | undefined = undefined; + +export function getQueryClient() { + if (isServer) { + // Server: always make a new query client + return makeQueryClient(); + } else { + // Browser: make a new query client if we don't already have one + // This is very important, so we don't re-make a new client if React + // suspends during the initial render. This may not be needed if we + // have a suspense boundary BELOW the creation of the query client + if (!browserQueryClient) browserQueryClient = makeQueryClient(); + return browserQueryClient; + } +} From 047fb305c6a86065c9128a8ee1c24a5c63adad66 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Thu, 4 Sep 2025 18:16:28 -0300 Subject: [PATCH 05/26] Add react query provider to app --- frontend/src/app/chat/page.tsx | 151 +++++++++++++++++---------------- frontend/src/app/layout.tsx | 24 +++--- frontend/src/app/providers.tsx | 12 +++ 3 files changed, 100 insertions(+), 87 deletions(-) create mode 100644 frontend/src/app/providers.tsx diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index 19703214..b3cfca4a 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -22,6 +22,7 @@ import { Zap, } from "lucide-react"; import { useEffect, useRef, useState } from "react"; +import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery"; interface Message { role: "user" | "assistant"; @@ -191,7 +192,7 @@ function ChatPage() { "Upload failed with status:", response.status, "Response:", - errorText + errorText, ); throw new Error("Failed to process document"); } @@ -244,7 +245,7 @@ function ChatPage() { ...prev, [endpoint]: result.response_id, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(result.response_id); @@ -436,8 +437,8 @@ function ChatPage() { // 2. It's different from the last loaded conversation AND // 3. User is not in the middle of an interaction if ( - conversationData && - conversationData.messages && + conversationData && + conversationData.messages && lastLoadedConversationRef.current !== conversationData.response_id && !isUserInteracting && !isForkingInProgress @@ -445,7 +446,7 @@ function ChatPage() { console.log( "Loading conversation with", conversationData.messages.length, - "messages" + "messages", ); // Convert backend message format to frontend Message interface const convertedMessages: Message[] = conversationData.messages.map( @@ -459,7 +460,7 @@ function ChatPage() { content: msg.content, timestamp: new Date(msg.timestamp || new Date()), // Add any other necessary properties - }) + }), ); setMessages(convertedMessages); @@ -471,11 +472,7 @@ function ChatPage() { [conversationData.endpoint]: conversationData.response_id, })); } - }, [ - conversationData, - isUserInteracting, - isForkingInProgress, - ]); + }, [conversationData, isUserInteracting, isForkingInProgress]); // Handle new conversation creation - only reset messages when placeholderConversation is set useEffect(() => { @@ -547,7 +544,7 @@ function ChatPage() { console.log( "Chat page received file upload error event:", filename, - error + error, ); // Replace the last message with error message @@ -561,37 +558,37 @@ function ChatPage() { window.addEventListener( "fileUploadStart", - handleFileUploadStart as EventListener + handleFileUploadStart as EventListener, ); window.addEventListener( "fileUploaded", - handleFileUploaded as EventListener + handleFileUploaded as EventListener, ); window.addEventListener( "fileUploadComplete", - handleFileUploadComplete as EventListener + handleFileUploadComplete as EventListener, ); window.addEventListener( "fileUploadError", - handleFileUploadError as EventListener + handleFileUploadError as EventListener, ); return () => { window.removeEventListener( "fileUploadStart", - handleFileUploadStart as EventListener + handleFileUploadStart as EventListener, ); window.removeEventListener( "fileUploaded", - handleFileUploaded as EventListener + handleFileUploaded as EventListener, ); window.removeEventListener( "fileUploadComplete", - handleFileUploadComplete as EventListener + handleFileUploadComplete as EventListener, ); window.removeEventListener( "fileUploadError", - handleFileUploadError as EventListener + handleFileUploadError as EventListener, ); }; }, [endpoint, setPreviousResponseIds]); @@ -617,6 +614,10 @@ function ChatPage() { }; }, [isFilterDropdownOpen]); + const { data: nudges = [], refetch: refetchNudges } = useGetNudgesQuery( + previousResponseIds[endpoint], + ); + const handleSSEStream = async (userMessage: Message) => { const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; @@ -719,7 +720,7 @@ function ChatPage() { console.log( "Received chunk:", chunk.type || chunk.object, - chunk + chunk, ); // Extract response ID if present @@ -735,14 +736,14 @@ function ChatPage() { if (chunk.delta.function_call) { console.log( "Function call in delta:", - chunk.delta.function_call + chunk.delta.function_call, ); // Check if this is a new function call if (chunk.delta.function_call.name) { console.log( "New function call:", - chunk.delta.function_call.name + chunk.delta.function_call.name, ); const functionCall: FunctionCall = { name: chunk.delta.function_call.name, @@ -758,7 +759,7 @@ function ChatPage() { else if (chunk.delta.function_call.arguments) { console.log( "Function call arguments delta:", - chunk.delta.function_call.arguments + chunk.delta.function_call.arguments, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; @@ -770,14 +771,14 @@ function ChatPage() { chunk.delta.function_call.arguments; console.log( "Accumulated arguments:", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); // Try to parse arguments if they look complete if (lastFunctionCall.argumentsString.includes("}")) { try { const parsed = JSON.parse( - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; @@ -785,7 +786,7 @@ function ChatPage() { } catch (e) { console.log( "Arguments not yet complete or invalid JSON:", - e + e, ); } } @@ -818,7 +819,7 @@ function ChatPage() { else if (toolCall.function.arguments) { console.log( "Tool call arguments delta:", - toolCall.function.arguments + toolCall.function.arguments, ); const lastFunctionCall = currentFunctionCalls[ @@ -832,7 +833,7 @@ function ChatPage() { toolCall.function.arguments; console.log( "Accumulated tool arguments:", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); // Try to parse arguments if they look complete @@ -841,7 +842,7 @@ function ChatPage() { ) { try { const parsed = JSON.parse( - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; @@ -849,7 +850,7 @@ function ChatPage() { } catch (e) { console.log( "Tool arguments not yet complete or invalid JSON:", - e + e, ); } } @@ -881,7 +882,7 @@ function ChatPage() { console.log( "Error parsing function call on finish:", fc, - e + e, ); } } @@ -897,12 +898,12 @@ function ChatPage() { console.log( "🟢 CREATING function call (added):", chunk.item.id, - chunk.item.tool_name || chunk.item.name + chunk.item.tool_name || chunk.item.name, ); // Try to find an existing pending call to update (created by earlier deltas) let existing = currentFunctionCalls.find( - (fc) => fc.id === chunk.item.id + (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] @@ -911,7 +912,7 @@ function ChatPage() { (fc) => fc.status === "pending" && !fc.id && - fc.name === (chunk.item.tool_name || chunk.item.name) + fc.name === (chunk.item.tool_name || chunk.item.name), ); } @@ -924,7 +925,7 @@ function ChatPage() { chunk.item.inputs || existing.arguments; console.log( "🟢 UPDATED existing pending function call with id:", - existing.id + existing.id, ); } else { const functionCall: FunctionCall = { @@ -942,7 +943,7 @@ function ChatPage() { currentFunctionCalls.map((fc) => ({ id: fc.id, name: fc.name, - })) + })), ); } } @@ -953,7 +954,7 @@ function ChatPage() { ) { console.log( "Function args delta (Realtime API):", - chunk.delta + chunk.delta, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; @@ -964,7 +965,7 @@ function ChatPage() { lastFunctionCall.argumentsString += chunk.delta || ""; console.log( "Accumulated arguments (Realtime API):", - lastFunctionCall.argumentsString + lastFunctionCall.argumentsString, ); } } @@ -975,26 +976,26 @@ function ChatPage() { ) { console.log( "Function args done (Realtime API):", - chunk.arguments + chunk.arguments, ); const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { try { lastFunctionCall.arguments = JSON.parse( - chunk.arguments || "{}" + chunk.arguments || "{}", ); lastFunctionCall.status = "completed"; console.log( "Parsed function arguments (Realtime API):", - lastFunctionCall.arguments + lastFunctionCall.arguments, ); } catch (e) { lastFunctionCall.arguments = { raw: chunk.arguments }; lastFunctionCall.status = "error"; console.log( "Error parsing function arguments (Realtime API):", - e + e, ); } } @@ -1008,14 +1009,14 @@ function ChatPage() { console.log( "🔵 UPDATING function call (done):", chunk.item.id, - chunk.item.tool_name || chunk.item.name + chunk.item.tool_name || chunk.item.name, ); console.log( "🔵 Looking for existing function calls:", currentFunctionCalls.map((fc) => ({ id: fc.id, name: fc.name, - })) + })), ); // Find existing function call by ID or name @@ -1023,14 +1024,14 @@ function ChatPage() { (fc) => fc.id === chunk.item.id || fc.name === chunk.item.tool_name || - fc.name === chunk.item.name + fc.name === chunk.item.name, ); if (functionCall) { console.log( "🔵 FOUND existing function call, updating:", functionCall.id, - functionCall.name + functionCall.name, ); // Update existing function call with completion data functionCall.status = @@ -1053,7 +1054,7 @@ function ChatPage() { "🔴 WARNING: Could not find existing function call to update:", chunk.item.id, chunk.item.tool_name, - chunk.item.name + chunk.item.name, ); } } @@ -1074,7 +1075,7 @@ function ChatPage() { fc.name === chunk.item.name || fc.name === chunk.item.type || fc.name.includes(chunk.item.type.replace("_call", "")) || - chunk.item.type.includes(fc.name) + chunk.item.type.includes(fc.name), ); if (functionCall) { @@ -1118,12 +1119,12 @@ function ChatPage() { "🟡 CREATING tool call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name, - chunk.item.type + chunk.item.type, ); // Dedupe by id or pending with same name let existing = currentFunctionCalls.find( - (fc) => fc.id === chunk.item.id + (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] @@ -1135,7 +1136,7 @@ function ChatPage() { fc.name === (chunk.item.tool_name || chunk.item.name || - chunk.item.type) + chunk.item.type), ); } @@ -1151,7 +1152,7 @@ function ChatPage() { chunk.item.inputs || existing.arguments; console.log( "🟡 UPDATED existing pending tool call with id:", - existing.id + existing.id, ); } else { const functionCall = { @@ -1172,7 +1173,7 @@ function ChatPage() { id: fc.id, name: fc.name, type: fc.type, - })) + })), ); } } @@ -1268,6 +1269,9 @@ function ChatPage() { if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { setMessages((prev) => [...prev, finalMessage]); setStreamingMessage(null); + if (previousResponseIds[endpoint]) { + refetchNudges(); + } } // Store the response ID for the next request for this endpoint @@ -1280,7 +1284,7 @@ function ChatPage() { ...prev, [endpoint]: newResponseId, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(newResponseId); @@ -1385,6 +1389,9 @@ function ChatPage() { timestamp: new Date(), }; setMessages((prev) => [...prev, assistantMessage]); + if (result.response_id) { + refetchNudges(); + } // Store the response ID if present for this endpoint if (result.response_id) { @@ -1392,7 +1399,7 @@ function ChatPage() { ...prev, [endpoint]: result.response_id, })); - + // If this is a new conversation (no currentConversationId), set it now if (!currentConversationId) { setCurrentConversationId(result.response_id); @@ -1440,7 +1447,7 @@ function ChatPage() { const handleForkConversation = ( messageIndex: number, - event?: React.MouseEvent + event?: React.MouseEvent, ) => { // Prevent any default behavior and stop event propagation if (event) { @@ -1508,7 +1515,7 @@ function ChatPage() { const renderFunctionCalls = ( functionCalls: FunctionCall[], - messageIndex?: number + messageIndex?: number, ) => { if (!functionCalls || functionCalls.length === 0) return null; @@ -1737,12 +1744,6 @@ function ChatPage() { ); }; - const suggestionChips = [ - "Show me this quarter's top 10 deals", - "Summarize recent client interactions", - "Search OpenSearch for mentions of our competitors", - ]; - const handleSuggestionClick = (suggestion: string) => { setInput(suggestion); inputRef.current?.focus(); @@ -1883,7 +1884,7 @@ function ChatPage() {
{renderFunctionCalls( message.functionCalls || [], - index + index, )}

{message.content} @@ -1914,7 +1915,7 @@ function ChatPage() {

{renderFunctionCalls( streamingMessage.functionCalls, - messages.length + messages.length, )}

{streamingMessage.content} @@ -1963,8 +1964,8 @@ function ChatPage() { {!streamingMessage && (

-
- {suggestionChips.map((suggestion, index) => ( +
+ {(nudges as string[]).map((suggestion: string, index: number) => ( + ))} +
+ {/* Fade out gradient on the right */} +
+
+
+ ); +} From 06b3850057280e472cb07ddb6f1aebb55f2b3bde Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 10:20:17 -0300 Subject: [PATCH 10/26] Componentize message send, and send message when clicking nudges --- frontend/src/app/chat/page.tsx | 50 ++++++++++++++-------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index b3cfca4a..34b27647 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -23,6 +23,7 @@ import { } from "lucide-react"; import { useEffect, useRef, useState } from "react"; import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery"; +import Nudges from "./nudges"; interface Message { role: "user" | "assistant"; @@ -128,7 +129,6 @@ function ChatPage() { const [dropdownDismissed, setDropdownDismissed] = useState(false); const [isUserInteracting, setIsUserInteracting] = useState(false); const [isForkingInProgress, setIsForkingInProgress] = useState(false); - const [lastForkTimestamp, setLastForkTimestamp] = useState(0); const dragCounterRef = useRef(0); const messagesEndRef = useRef(null); const inputRef = useRef(null); @@ -472,7 +472,12 @@ function ChatPage() { [conversationData.endpoint]: conversationData.response_id, })); } - }, [conversationData, isUserInteracting, isForkingInProgress]); + }, [ + conversationData, + isUserInteracting, + isForkingInProgress, + setPreviousResponseIds, + ]); // Handle new conversation creation - only reset messages when placeholderConversation is set useEffect(() => { @@ -1312,13 +1317,12 @@ function ChatPage() { } }; - const handleSubmit = async (e: React.FormEvent) => { - e.preventDefault(); - if (!input.trim() || loading) return; + const handleSendMessage = async (inputMessage: string) => { + if (!inputMessage.trim() || loading) return; const userMessage: Message = { role: "user", - content: input.trim(), + content: inputMessage.trim(), timestamp: new Date(), }; @@ -1433,6 +1437,11 @@ function ChatPage() { setLoading(false); }; + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + handleSendMessage(input); + }; + const toggleFunctionCall = (functionCallId: string) => { setExpandedFunctionCalls((prev) => { const newSet = new Set(prev); @@ -1456,10 +1465,8 @@ function ChatPage() { } // Set interaction state to prevent auto-scroll interference - const forkTimestamp = Date.now(); setIsUserInteracting(true); setIsForkingInProgress(true); - setLastForkTimestamp(forkTimestamp); console.log("Fork conversation called for message index:", messageIndex); @@ -1472,7 +1479,6 @@ function ChatPage() { console.error("Fork button should only be on assistant messages"); setIsUserInteracting(false); setIsForkingInProgress(false); - setLastForkTimestamp(0); return; } @@ -1745,8 +1751,7 @@ function ChatPage() { }; const handleSuggestionClick = (suggestion: string) => { - setInput(suggestion); - inputRef.current?.focus(); + handleSendMessage(suggestion); }; return ( @@ -1962,27 +1967,14 @@ function ChatPage() { {/* Suggestion chips - always show unless streaming */} {!streamingMessage && ( -
-
-
- {(nudges as string[]).map((suggestion: string, index: number) => ( - - ))} -
- {/* Fade out gradient on the right */} -
-
-
+ )} {/* Input Area - Fixed at bottom */} -
+
From 92519792076ba801f1c97cfd4ac91c062566b4a6 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 10:23:44 -0300 Subject: [PATCH 11/26] Added padding top --- frontend/src/app/chat/nudges.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/app/chat/nudges.tsx b/frontend/src/app/chat/nudges.tsx index f817d4db..0d713c89 100644 --- a/frontend/src/app/chat/nudges.tsx +++ b/frontend/src/app/chat/nudges.tsx @@ -6,7 +6,7 @@ export default function Nudges({ handleSuggestionClick: (suggestion: string) => void; }) { return ( -
+
{nudges.map((suggestion: string, index: number) => ( From 9e78798a0490ca4a2cbaa014ab9193e4ac0cacd0 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:00:29 -0300 Subject: [PATCH 12/26] added animate to package json --- frontend/package-lock.json | 69 ++++++++++++++++++++++++++++++++++++++ frontend/package.json | 1 + 2 files changed, 70 insertions(+) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 3883b3b2..03f63e53 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -27,6 +27,7 @@ "clsx": "^2.1.1", "cmdk": "^1.1.1", "lucide-react": "^0.525.0", + "motion": "^12.23.12", "next": "15.3.5", "next-themes": "^0.4.6", "react": "^19.0.0", @@ -4574,6 +4575,33 @@ "url": "https://github.com/sponsors/rawify" } }, + "node_modules/framer-motion": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/framer-motion/-/framer-motion-12.23.12.tgz", + "integrity": "sha512-6e78rdVtnBvlEVgu6eFEAgG9v3wLnYEboM8I5O5EXvfKC8gxGQB8wXJdhkMy10iVcn05jl6CNw7/HTsTCfwcWg==", + "license": "MIT", + "dependencies": { + "motion-dom": "^12.23.12", + "motion-utils": "^12.23.6", + "tslib": "^2.4.0" + }, + "peerDependencies": { + "@emotion/is-prop-valid": "*", + "react": "^18.0.0 || ^19.0.0", + "react-dom": "^18.0.0 || ^19.0.0" + }, + "peerDependenciesMeta": { + "@emotion/is-prop-valid": { + "optional": true + }, + "react": { + "optional": true + }, + "react-dom": { + "optional": true + } + } + }, "node_modules/fsevents": { "version": "2.3.3", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", @@ -5708,6 +5736,47 @@ "node": ">=16 || 14 >=14.17" } }, + "node_modules/motion": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/motion/-/motion-12.23.12.tgz", + "integrity": "sha512-8jCD8uW5GD1csOoqh1WhH1A6j5APHVE15nuBkFeRiMzYBdRwyAHmSP/oXSuW0WJPZRXTFdBoG4hY9TFWNhhwng==", + "license": "MIT", + "dependencies": { + "framer-motion": "^12.23.12", + "tslib": "^2.4.0" + }, + "peerDependencies": { + "@emotion/is-prop-valid": "*", + "react": "^18.0.0 || ^19.0.0", + "react-dom": "^18.0.0 || ^19.0.0" + }, + "peerDependenciesMeta": { + "@emotion/is-prop-valid": { + "optional": true + }, + "react": { + "optional": true + }, + "react-dom": { + "optional": true + } + } + }, + "node_modules/motion-dom": { + "version": "12.23.12", + "resolved": "https://registry.npmjs.org/motion-dom/-/motion-dom-12.23.12.tgz", + "integrity": "sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==", + "license": "MIT", + "dependencies": { + "motion-utils": "^12.23.6" + } + }, + "node_modules/motion-utils": { + "version": "12.23.6", + "resolved": "https://registry.npmjs.org/motion-utils/-/motion-utils-12.23.6.tgz", + "integrity": "sha512-eAWoPgr4eFEOFfg2WjIsMoqJTW6Z8MTUCgn/GZ3VRpClWBdnbjryiA3ZSNLyxCTmCQx4RmYX6jX1iWHbenUPNQ==", + "license": "MIT" + }, "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", diff --git a/frontend/package.json b/frontend/package.json index 0731820b..8c037ea2 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -28,6 +28,7 @@ "clsx": "^2.1.1", "cmdk": "^1.1.1", "lucide-react": "^0.525.0", + "motion": "^12.23.12", "next": "15.3.5", "next-themes": "^0.4.6", "react": "^19.0.0", From 01b10d403a0f3cb720c79ecc6712a1b28ef6ea4a Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:00:52 -0300 Subject: [PATCH 13/26] Create cancelNudges, to remove the query result when sending a message --- frontend/src/app/api/queries/useGetNudgesQuery.ts | 8 ++++++-- frontend/src/app/chat/page.tsx | 8 ++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/frontend/src/app/api/queries/useGetNudgesQuery.ts b/frontend/src/app/api/queries/useGetNudgesQuery.ts index 809ca71e..38cc9502 100644 --- a/frontend/src/app/api/queries/useGetNudgesQuery.ts +++ b/frontend/src/app/api/queries/useGetNudgesQuery.ts @@ -17,7 +17,11 @@ export const useGetNudgesQuery = ( options?: Omit, ) => { const queryClient = useQueryClient(); - console.log(chatId); + + function cancel() { + queryClient.removeQueries({ queryKey: ["nudges", chatId] }); + } + async function getNudges(): Promise { try { const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`); @@ -38,5 +42,5 @@ export const useGetNudgesQuery = ( queryClient, ); - return queryResult; + return { ...queryResult, cancel }; }; diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index 34b27647..987786d9 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -619,7 +619,7 @@ function ChatPage() { }; }, [isFilterDropdownOpen]); - const { data: nudges = [], refetch: refetchNudges } = useGetNudgesQuery( + const { data: nudges = [], cancel: cancelNudges } = useGetNudgesQuery( previousResponseIds[endpoint], ); @@ -1275,7 +1275,7 @@ function ChatPage() { setMessages((prev) => [...prev, finalMessage]); setStreamingMessage(null); if (previousResponseIds[endpoint]) { - refetchNudges(); + cancelNudges(); } } @@ -1394,7 +1394,7 @@ function ChatPage() { }; setMessages((prev) => [...prev, assistantMessage]); if (result.response_id) { - refetchNudges(); + cancelNudges(); } // Store the response ID if present for this endpoint @@ -1968,7 +1968,7 @@ function ChatPage() { {/* Suggestion chips - always show unless streaming */} {!streamingMessage && ( )} From 560683d782fb8f993868cc845c2fa8c20ec996ad Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 5 Sep 2025 17:01:03 -0300 Subject: [PATCH 14/26] Added animation to displaying nudges --- frontend/src/app/chat/nudges.tsx | 51 ++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/frontend/src/app/chat/nudges.tsx b/frontend/src/app/chat/nudges.tsx index 0d713c89..c5929924 100644 --- a/frontend/src/app/chat/nudges.tsx +++ b/frontend/src/app/chat/nudges.tsx @@ -1,27 +1,46 @@ +import { motion, AnimatePresence } from "motion/react"; + export default function Nudges({ nudges, handleSuggestionClick, }: { nudges: string[]; + handleSuggestionClick: (suggestion: string) => void; }) { return ( -
-
-
- {nudges.map((suggestion: string, index: number) => ( - - ))} -
- {/* Fade out gradient on the right */} -
-
+
+ + {nudges.length > 0 && ( + +
+
+
+ {nudges.map((suggestion: string, index: number) => ( + + ))} +
+ {/* Fade out gradient on the right */} +
+
+
+
+ )} +
); } From b9da6437f8c60ba986a95810030cf62d17f9654d Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 11:08:45 -0700 Subject: [PATCH 15/26] Properly set the project root for the gdrive token --- src/connectors/google_drive/connector.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 887ffeca..c0ef2ff3 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -102,9 +102,8 @@ class GoogleDriveConnector(BaseConnector): client_secret = config.get("client_secret") or env_client_secret # Token file default (so callback & workers don’t need to pass it) - token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE") - if not token_file: - token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json") + project_root = Path(__file__).resolve().parent.parent.parent.parent + token_file = config.get("token_file") or str(project_root / "google_drive_token.json") Path(token_file).parent.mkdir(parents=True, exist_ok=True) if not isinstance(client_id, str) or not client_id.strip(): From 192693d82dcda80ecd6fd75501e9fa5a5af94115 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Mon, 8 Sep 2025 16:36:03 -0300 Subject: [PATCH 16/26] fixed agent.py extra comma --- src/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/agent.py b/src/agent.py index 7c8f000c..6d94094f 100644 --- a/src/agent.py +++ b/src/agent.py @@ -442,7 +442,6 @@ async def async_langflow_chat( user_id=user_id, previous_response_id=previous_response_id, - , ) if store_conversation: From 327b4358147fe9b6d6c469fde1a8f453d96e08e1 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 16:39:41 -0300 Subject: [PATCH 17/26] fix imports in task_service --- src/services/task_service.py | 61 ++++++++++-------------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 0537e933..705f6f3c 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,10 +1,11 @@ import asyncio import random -from typing import Dict, Optional +import time +import uuid -from models.tasks import TaskStatus, UploadTask, FileTask -from utils.gpu_detection import get_worker_count +from models.tasks import FileTask, TaskStatus, UploadTask from session_manager import AnonymousUser +from utils.gpu_detection import get_worker_count from utils.logging_config import get_logger logger = get_logger(__name__) @@ -14,9 +15,7 @@ class TaskService: def __init__(self, document_service=None, process_pool=None): self.document_service = document_service self.process_pool = process_pool - self.task_store: Dict[ - str, Dict[str, UploadTask] - ] = {} # user_id -> {task_id -> UploadTask} + self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} self.background_tasks = set() if self.process_pool is None: @@ -67,9 +66,7 @@ class TaskService: self.task_store[user_id][task_id] = upload_task # Start background processing - background_task = asyncio.create_task( - self.background_custom_processor(user_id, task_id, items) - ) + background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items)) self.background_tasks.add(background_task) background_task.add_done_callback(self.background_tasks.discard) @@ -87,27 +84,18 @@ class TaskService: # Process files with limited concurrency to avoid overwhelming the system max_workers = get_worker_count() - semaphore = asyncio.Semaphore( - max_workers * 2 - ) # Allow 2x process pool size for async I/O + semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task( - upload_task, file_path - ) + await self.document_service.process_single_file_task(upload_task, file_path) - tasks = [ - process_with_semaphore(file_path) - for file_path in upload_task.file_tasks.keys() - ] + tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: - logger.error( - "Background upload processor failed", task_id=task_id, error=str(e) - ) + logger.error("Background upload processor failed", task_id=task_id, error=str(e)) import traceback traceback.print_exc() @@ -115,9 +103,7 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - async def background_custom_processor( - self, user_id: str, task_id: str, items: list - ) -> None: + async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None: """Background task to process items using custom processor""" try: upload_task = self.task_store[user_id][task_id] @@ -139,9 +125,7 @@ class TaskService: try: await processor.process_item(upload_task, item, file_task) except Exception as e: - logger.error( - "Failed to process item", item=str(item), error=str(e) - ) + logger.error("Failed to process item", item=str(item), error=str(e)) import traceback traceback.print_exc() @@ -168,9 +152,7 @@ class TaskService: pass raise # Re-raise to properly handle cancellation except Exception as e: - logger.error( - "Background custom processor failed", task_id=task_id, error=str(e) - ) + logger.error("Background custom processor failed", task_id=task_id, error=str(e)) import traceback traceback.print_exc() @@ -178,7 +160,7 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]: + def get_task_status(self, user_id: str, task_id: str) -> dict | None: """Get the status of a specific upload task Includes fallback to shared tasks stored under the "anonymous" user key @@ -192,10 +174,7 @@ class TaskService: upload_task = None for candidate_user_id in candidate_user_ids: - if ( - candidate_user_id in self.task_store - and task_id in self.task_store[candidate_user_id] - ): + if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: upload_task = self.task_store[candidate_user_id][task_id] break @@ -269,10 +248,7 @@ class TaskService: store_user_id = None for candidate_user_id in candidate_user_ids: - if ( - candidate_user_id in self.task_store - and task_id in self.task_store[candidate_user_id] - ): + if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: store_user_id = candidate_user_id break @@ -286,10 +262,7 @@ class TaskService: return False # Cancel the background task to stop scheduling new work - if ( - hasattr(upload_task, "background_task") - and not upload_task.background_task.done() - ): + if hasattr(upload_task, "background_task") and not upload_task.background_task.done(): upload_task.background_task.cancel() # Mark task as failed (cancelled) From 542082e5bf2a96a760c3b7e4d6c455e5b81b97d6 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 16:40:06 -0300 Subject: [PATCH 18/26] Add LangflowConnectorFileProcessor for handling connector file uploads This commit introduces the LangflowConnectorFileProcessor class, which processes connector file uploads using the Langflow service. It includes initialization parameters for user and connection details, and implements the process_item method to handle file processing asynchronously. Additionally, it cleans up the existing ConnectorFileProcessor by removing unused imports and streamlining file info retrieval. These changes enhance the code's robustness and maintainability in line with async development practices. --- src/models/processors.py | 78 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/src/models/processors.py b/src/models/processors.py index ed5a1bb4..02836020 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Dict +from typing import Any from .tasks import UploadTask, FileTask from utils.logging_config import get_logger @@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor): ) -> None: """Process a connector file using ConnectorService""" from models.tasks import TaskStatus - import time file_id = item # item is the connector file ID - file_info = self.file_info_map.get(file_id) + self.file_info_map.get(file_id) # Get the connector and connection info connector = await self.connector_service.get_connector(self.connection_id) @@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor): upload_task.successful_files += 1 +class LangflowConnectorFileProcessor(TaskProcessor): + """Processor for connector file uploads using Langflow""" + + def __init__( + self, + langflow_connector_service, + connection_id: str, + files_to_process: list, + user_id: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + ): + self.langflow_connector_service = langflow_connector_service + self.connection_id = connection_id + self.files_to_process = files_to_process + self.user_id = user_id + self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email + # Create lookup map for file info - handle both file objects and file IDs + self.file_info_map = {} + for f in files_to_process: + if isinstance(f, dict): + # Full file info objects + self.file_info_map[f["id"]] = f + else: + # Just file IDs - will need to fetch metadata during processing + self.file_info_map[f] = None + + async def process_item( + self, upload_task: UploadTask, item: str, file_task: FileTask + ) -> None: + """Process a connector file using LangflowConnectorService""" + from models.tasks import TaskStatus + + file_id = item # item is the connector file ID + self.file_info_map.get(file_id) + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( + self.connection_id + ) + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") + + # Get file content from connector (the connector will fetch metadata if needed) + document = await connector.get_file_content(file_id) + + # Use the user_id passed during initialization + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 + + class S3FileProcessor(TaskProcessor): """Processor for files stored in S3 buckets""" From beee47068c3ed89520707d7fcc632d22d2e8e7a0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 16:40:24 -0300 Subject: [PATCH 19/26] Add LangflowConnectorService for managing connector documents This commit introduces the LangflowConnectorService class, which facilitates the management and processing of connector documents using the Langflow service. It includes methods for initializing the service, retrieving connectors, processing documents, and syncing files. The implementation emphasizes asynchronous processing and robust error handling, enhancing the overall maintainability and documentation of the codebase. --- src/connectors/langflow_connector_service.py | 290 +++++++++++++++++++ 1 file changed, 290 insertions(+) create mode 100644 src/connectors/langflow_connector_service.py diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py new file mode 100644 index 00000000..fa82eee7 --- /dev/null +++ b/src/connectors/langflow_connector_service.py @@ -0,0 +1,290 @@ +import os +import tempfile +from typing import Any, Dict, List, Optional + +# Create custom processor for connector files using Langflow +from models.processors import LangflowConnectorFileProcessor +from services.langflow_file_service import LangflowFileService +from utils.logging_config import get_logger + +from .base import BaseConnector, ConnectorDocument +from .connection_manager import ConnectionManager + +logger = get_logger(__name__) + + +class LangflowConnectorService: + """Service to manage connector documents and process them via Langflow""" + + def __init__( + self, + task_service=None, + session_manager=None, + ): + self.task_service = task_service + self.session_manager = session_manager + self.connection_manager = ConnectionManager() + + # Initialize LangflowFileService for processing connector documents + self.langflow_service = LangflowFileService() + + async def initialize(self): + """Initialize the service by loading existing connections""" + await self.connection_manager.load_connections() + + async def get_connector(self, connection_id: str) -> Optional[BaseConnector]: + """Get a connector by connection ID""" + return await self.connection_manager.get_connector(connection_id) + + async def process_connector_document( + self, + document: ConnectorDocument, + owner_user_id: str, + connector_type: str, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + ) -> Dict[str, Any]: + """Process a document from a connector using LangflowFileService pattern""" + + logger.debug( + "Processing connector document via Langflow", + document_id=document.id, + filename=document.filename, + ) + + # Create temporary file from document content + with tempfile.NamedTemporaryFile( + delete=False, suffix=self._get_file_extension(document.mimetype) + ) as tmp_file: + tmp_file.write(document.content) + tmp_file.flush() + + try: + # Step 1: Upload file to Langflow + logger.debug("Uploading file to Langflow", filename=document.filename) + content = document.content + file_tuple = ( + document.filename, + content, + document.mimetype or "application/octet-stream", + ) + + upload_result = await self.langflow_service.upload_user_file( + file_tuple, jwt_token + ) + langflow_file_id = upload_result["id"] + langflow_file_path = upload_result["path"] + + logger.debug( + "File uploaded to Langflow", + file_id=langflow_file_id, + path=langflow_file_path, + ) + + # Step 2: Run ingestion flow with the uploaded file + logger.debug( + "Running Langflow ingestion flow", file_path=langflow_file_path + ) + + # Use the same tweaks pattern as LangflowFileService + tweaks = {} # Let Langflow handle the ingestion with default settings + + ingestion_result = await self.langflow_service.run_ingestion_flow( + file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks + ) + + logger.debug("Ingestion flow completed", result=ingestion_result) + + # Step 3: Delete the file from Langflow + logger.debug("Deleting file from Langflow", file_id=langflow_file_id) + await self.langflow_service.delete_user_file(langflow_file_id) + logger.debug("File deleted from Langflow", file_id=langflow_file_id) + + return { + "status": "indexed", + "filename": document.filename, + "source_url": document.source_url, + "document_id": document.id, + "connector_type": connector_type, + "langflow_result": ingestion_result, + } + + except Exception as e: + logger.error( + "Failed to process connector document via Langflow", + document_id=document.id, + error=str(e), + ) + # Try to clean up Langflow file if upload succeeded but processing failed + if "langflow_file_id" in locals(): + try: + await self.langflow_service.delete_user_file(langflow_file_id) + logger.debug( + "Cleaned up Langflow file after error", + file_id=langflow_file_id, + ) + except Exception as cleanup_error: + logger.warning( + "Failed to cleanup Langflow file", + file_id=langflow_file_id, + error=str(cleanup_error), + ) + raise + + finally: + # Clean up temporary file + os.unlink(tmp_file.name) + + def _get_file_extension(self, mimetype: str) -> str: + """Get file extension based on MIME type""" + mime_to_ext = { + "application/pdf": ".pdf", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", + "application/msword": ".doc", + "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", + "application/vnd.ms-powerpoint": ".ppt", + "text/plain": ".txt", + "text/html": ".html", + "application/rtf": ".rtf", + "application/vnd.google-apps.document": ".pdf", # Exported as PDF + "application/vnd.google-apps.presentation": ".pdf", + "application/vnd.google-apps.spreadsheet": ".pdf", + } + return mime_to_ext.get(mimetype, ".bin") + + async def sync_connector_files( + self, + connection_id: str, + user_id: str, + max_files: int = None, + jwt_token: str = None, + ) -> str: + """Sync files from a connector connection using Langflow processing""" + if not self.task_service: + raise ValueError( + "TaskService not available - connector sync requires task service dependency" + ) + + logger.debug( + "Starting Langflow-based sync for connection", + connection_id=connection_id, + max_files=max_files, + ) + + connector = await self.get_connector(connection_id) + if not connector: + raise ValueError( + f"Connection '{connection_id}' not found or not authenticated" + ) + + logger.debug("Got connector", authenticated=connector.is_authenticated) + + if not connector.is_authenticated: + raise ValueError(f"Connection '{connection_id}' not authenticated") + + # Collect files to process (limited by max_files) + files_to_process = [] + page_token = None + + # Calculate page size to minimize API calls + page_size = min(max_files or 100, 1000) if max_files else 100 + + while True: + # List files from connector with limit + logger.debug( + "Calling list_files", page_size=page_size, page_token=page_token + ) + file_list = await connector.list_files(page_token, limit=page_size) + logger.debug( + "Got files from connector", file_count=len(file_list.get("files", [])) + ) + files = file_list["files"] + + if not files: + break + + for file_info in files: + if max_files and len(files_to_process) >= max_files: + break + files_to_process.append(file_info) + + # Stop if we have enough files or no more pages + if (max_files and len(files_to_process) >= max_files) or not file_list.get( + "nextPageToken" + ): + break + + page_token = file_list.get("nextPageToken") + + # Get user information + user = self.session_manager.get_user(user_id) if self.session_manager else None + owner_name = user.name if user else None + owner_email = user.email if user else None + + processor = LangflowConnectorFileProcessor( + self, + connection_id, + files_to_process, + user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + ) + + # Use file IDs as items + file_ids = [file_info["id"] for file_info in files_to_process] + + # Create custom task using TaskService + task_id = await self.task_service.create_custom_task( + user_id, file_ids, processor + ) + + return task_id + + async def sync_specific_files( + self, + connection_id: str, + user_id: str, + file_ids: List[str], + jwt_token: str = None, + ) -> str: + """Sync specific files by their IDs using Langflow processing""" + if not self.task_service: + raise ValueError( + "TaskService not available - connector sync requires task service dependency" + ) + + connector = await self.get_connector(connection_id) + if not connector: + raise ValueError( + f"Connection '{connection_id}' not found or not authenticated" + ) + + if not connector.is_authenticated: + raise ValueError(f"Connection '{connection_id}' not authenticated") + + if not file_ids: + raise ValueError("No file IDs provided") + + # Get user information + user = self.session_manager.get_user(user_id) if self.session_manager else None + owner_name = user.name if user else None + owner_email = user.email if user else None + + processor = LangflowConnectorFileProcessor( + self, + connection_id, + file_ids, + user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + ) + + # Create custom task using TaskService + task_id = await self.task_service.create_custom_task( + user_id, file_ids, processor + ) + + return task_id From 6d0a94e4b44b98fed60c75815c47fbe1d8331372 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 16:42:07 -0300 Subject: [PATCH 20/26] Refactor connector service initialization to use LangflowConnectorService This commit updates the service initialization in main.py to replace the existing ConnectorService with LangflowConnectorService. This change enhances the management of connector documents by leveraging the new service's capabilities, aligning with the ongoing improvements in asynchronous processing and code maintainability. --- src/main.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main.py b/src/main.py index 9810ca24..890faa98 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,7 @@ import sys # Configure structured logging early +from connectors.langflow_connector_service import LangflowConnectorService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -49,7 +50,6 @@ from config.settings import ( ) # Existing services -from connectors.service import ConnectorService from services.auth_service import AuthService from services.chat_service import ChatService @@ -301,11 +301,7 @@ async def initialize_services(): document_service.process_pool = process_pool # Initialize connector service - connector_service = ConnectorService( - patched_async_client=clients.patched_async_client, - process_pool=process_pool, - embed_model="text-embedding-3-small", - index_name=INDEX_NAME, + connector_service = LangflowConnectorService( task_service=task_service, session_manager=session_manager, ) From cff28fc4f94b43c45524401f7564954e7f4acd33 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 12:56:38 -0700 Subject: [PATCH 21/26] feat: Cleanup duplicate connections --- src/connectors/connection_manager.py | 187 +++++++++++++++++++++++---- 1 file changed, 161 insertions(+), 26 deletions(-) diff --git a/src/connectors/connection_manager.py b/src/connectors/connection_manager.py index 2d0d08bd..9ce9ffd3 100644 --- a/src/connectors/connection_manager.py +++ b/src/connectors/connection_manager.py @@ -1,6 +1,5 @@ import json import uuid -import asyncio import aiofiles from typing import Dict, List, Any, Optional from datetime import datetime @@ -62,6 +61,9 @@ class ConnectionManager: config = ConnectionConfig(**conn_data) self.connections[config.connection_id] = config + # Now that connections are loaded, clean up duplicates + await self.cleanup_duplicate_connections(remove_duplicates=True) + async def save_connections(self): """Save connections to persistent storage""" data = {"connections": []} @@ -78,33 +80,95 @@ class ConnectionManager: async with aiofiles.open(self.connections_file, "w") as f: await f.write(json.dumps(data, indent=2)) - async def create_connection( - self, - connector_type: str, - name: str, - config: Dict[str, Any], - user_id: Optional[str] = None, - ) -> str: - """Create a new connection configuration""" - connection_id = str(uuid.uuid4()) + async def _get_existing_connection( + self, connector_type: str, user_id: Optional[str] = None + ) -> Optional[ConnectionConfig]: + """Find existing active connection for the same connector type and user""" + for connection in self.connections.values(): + if ( + connection.connector_type == connector_type + and connection.user_id == user_id + and connection.is_active + ): + return connection + return None - connection_config = ConnectionConfig( - connection_id=connection_id, - connector_type=connector_type, - name=name, - config=config, - user_id=user_id, - ) - - self.connections[connection_id] = connection_config - await self.save_connections() - - return connection_id - - async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]: - """Get connection configuration""" - return self.connections.get(connection_id) + async def _delete_old_connections( + self, connector_type: str, user_id: Optional[str] = None, exclude_id: str = None + ): + """Delete old connections for the same connector type and user""" + connections_to_delete = [] + + for connection_id, connection in self.connections.items(): + if ( + connection.connector_type == connector_type + and connection.user_id == user_id + and connection.is_active + and connection_id != exclude_id + ): + connections_to_delete.append(connection_id) + + # Delete old connections + for connection_id in connections_to_delete: + logger.info( + f"Deleting old connection for {connector_type}", + connection_id=connection_id + ) + await self.delete_connection(connection_id) + async def cleanup_duplicate_connections(self, remove_duplicates=False): + """ + Clean up duplicate connections, keeping only the most recent connection + per provider per user + + Args: + remove_duplicates: If True, physically removes duplicates from connections.json + If False (default), just deactivates them + """ + logger.info("Starting cleanup of duplicate connections") + + # Group connections by (connector_type, user_id) + grouped_connections = {} + + for connection_id, connection in self.connections.items(): + if not connection.is_active: + continue # Skip inactive connections + + key = (connection.connector_type, connection.user_id) + + if key not in grouped_connections: + grouped_connections[key] = [] + + grouped_connections[key].append((connection_id, connection)) + + # For each group, keep only the most recent connection + connections_to_remove = [] + + for (connector_type, user_id), connections in grouped_connections.items(): + if len(connections) <= 1: + continue # No duplicates + + logger.info(f"Found {len(connections)} duplicate connections for {connector_type}, user {user_id}") + + # Sort by created_at, keep the most recent + connections.sort(key=lambda x: x[1].created_at, reverse=True) + + # Keep the first (most recent), remove/deactivate the rest + for connection_id, connection in connections[1:]: + connections_to_remove.append((connection_id, connection)) + logger.info(f"Marking connection {connection_id} for {'removal' if remove_duplicates else 'deactivation'}") + + # Remove or deactivate duplicate connections + for connection_id, connection in connections_to_remove: + if remove_duplicates: + await self.delete_connection(connection_id) # Handles token cleanup + else: + await self.deactivate_connection(connection_id) + + action = "Removed" if remove_duplicates else "Deactivated" + logger.info(f"Cleanup complete. {action} {len(connections_to_remove)} duplicate connections") + return len(connections_to_remove) + async def update_connection( self, connection_id: str, @@ -146,6 +210,61 @@ class ConnectionManager: return True + async def create_connection( + self, + connector_type: str, + name: str, + config: Dict[str, Any], + user_id: Optional[str] = None, + ) -> str: + """Create a new connection configuration, ensuring only one per provider per user""" + + # Check if we already have an active connection for this provider and user + existing_connection = await self._get_existing_connection(connector_type, user_id) + + if existing_connection: + # Check if the existing connection has a valid token + try: + connector = self._create_connector(existing_connection) + if await connector.authenticate(): + logger.info( + f"Using existing valid connection for {connector_type}", + connection_id=existing_connection.connection_id + ) + # Update the existing connection with new config if needed + if config != existing_connection.config: + logger.info("Updating existing connection config") + await self.update_connection( + existing_connection.connection_id, + config=config + ) + return existing_connection.connection_id + except Exception as e: + logger.warning( + f"Existing connection authentication failed: {e}", + connection_id=existing_connection.connection_id + ) + # If authentication fails, we'll create a new connection and clean up the old one + + # Create new connection + connection_id = str(uuid.uuid4()) + + connection_config = ConnectionConfig( + connection_id=connection_id, + connector_type=connector_type, + name=name, + config=config, + user_id=user_id, + ) + + self.connections[connection_id] = connection_config + + # Deactivate/remove any old connections for this provider and user + await self._delete_old_connections(connector_type, user_id, connection_id) + await self.save_connections() + + return connection_id + async def list_connections( self, user_id: Optional[str] = None, connector_type: Optional[str] = None ) -> List[ConnectionConfig]: @@ -165,6 +284,18 @@ class ConnectionManager: if connection_id not in self.connections: return False + connection = self.connections[connection_id] + + # Clean up token file if it exists + if connection.config.get("token_file"): + token_file = Path(connection.config["token_file"]) + if token_file.exists(): + try: + token_file.unlink() + logger.info(f"Deleted token file: {token_file}") + except Exception as e: + logger.warning(f"Failed to delete token file {token_file}: {e}") + # Clean up active connector if exists if connection_id in self.active_connectors: connector = self.active_connectors[connection_id] @@ -296,6 +427,10 @@ class ConnectionManager: return True return False + + async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]: + """Get connection configuration""" + return self.connections.get(connection_id) async def get_connection_by_webhook_id( self, webhook_id: str From 617615532d16f9b9746066fa5836c15195278385 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Mon, 8 Sep 2025 13:00:00 -0700 Subject: [PATCH 22/26] Update connection_manager.py --- src/connectors/connection_manager.py | 29 +++------------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/src/connectors/connection_manager.py b/src/connectors/connection_manager.py index 9ce9ffd3..05cc85c9 100644 --- a/src/connectors/connection_manager.py +++ b/src/connectors/connection_manager.py @@ -93,29 +93,6 @@ class ConnectionManager: return connection return None - async def _delete_old_connections( - self, connector_type: str, user_id: Optional[str] = None, exclude_id: str = None - ): - """Delete old connections for the same connector type and user""" - connections_to_delete = [] - - for connection_id, connection in self.connections.items(): - if ( - connection.connector_type == connector_type - and connection.user_id == user_id - and connection.is_active - and connection_id != exclude_id - ): - connections_to_delete.append(connection_id) - - # Delete old connections - for connection_id in connections_to_delete: - logger.info( - f"Deleting old connection for {connector_type}", - connection_id=connection_id - ) - await self.delete_connection(connection_id) - async def cleanup_duplicate_connections(self, remove_duplicates=False): """ Clean up duplicate connections, keeping only the most recent connection @@ -259,10 +236,10 @@ class ConnectionManager: self.connections[connection_id] = connection_config - # Deactivate/remove any old connections for this provider and user - await self._delete_old_connections(connector_type, user_id, connection_id) + # Clean up duplicates (will keep the newest, which is the one we just created) + await self.cleanup_duplicate_connections(remove_duplicates=True) + await self.save_connections() - return connection_id async def list_connections( From 73f039afb5b1cd026a18f9296f25a996b6bc7d6d Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:07:06 -0400 Subject: [PATCH 23/26] logging --- src/agent.py | 53 +++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/src/agent.py b/src/agent.py index 6d94094f..749eb299 100644 --- a/src/agent.py +++ b/src/agent.py @@ -189,35 +189,42 @@ async def async_response( previous_response_id: str = None, log_prefix: str = "response", ): - logger.info("User prompt received", prompt=prompt) + try: + logger.info("User prompt received", prompt=prompt) - # Build request parameters - request_params = { - "model": model, - "input": prompt, - "stream": False, - "include": ["tool_call.results"], - } - if previous_response_id is not None: - request_params["previous_response_id"] = previous_response_id - if extra_headers: - request_params["extra_headers"] = extra_headers + # Build request parameters + request_params = { + "model": model, + "input": prompt, + "stream": False, + "include": ["tool_call.results"], + } + if previous_response_id is not None: + request_params["previous_response_id"] = previous_response_id + if extra_headers: + request_params["extra_headers"] = extra_headers - if "x-api-key" not in client.default_headers: - if hasattr(client, "api_key") and extra_headers is not None: - extra_headers["x-api-key"] = client.api_key + if "x-api-key" not in client.default_headers: + if hasattr(client, "api_key") and extra_headers is not None: + extra_headers["x-api-key"] = client.api_key - response = await client.responses.create(**request_params) + response = await client.responses.create(**request_params) - response_text = response.output_text - logger.info("Response generated", log_prefix=log_prefix, response=response_text) + response_text = response.output_text + logger.info("Response generated", log_prefix=log_prefix, response=response_text) - # Extract and store response_id if available - response_id = getattr(response, "id", None) or getattr( - response, "response_id", None - ) + # Extract and store response_id if available + response_id = getattr(response, "id", None) or getattr( + response, "response_id", None + ) - return response_text, response_id, response + return response_text, response_id, response + except Exception as e: + logger.error("Exception in non-streaming response", error=str(e)) + import traceback + + traceback.print_exc() + raise # Unified streaming function for both chat and langflow From 3c6c8f999be6491aff389b32803ceb43cbc2ce63 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:22:20 -0400 Subject: [PATCH 24/26] compose versions env vars --- docker-compose-cpu.yml | 13 +++++++------ docker-compose.yml | 10 +++++----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 3279561a..0414a523 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -1,8 +1,9 @@ services: opensearch: - build: - context: . - dockerfile: Dockerfile + image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest} + #build: + # context: . + # dockerfile: Dockerfile container_name: os depends_on: - openrag-backend @@ -38,7 +39,7 @@ services: - "5601:5601" openrag-backend: - image: phact/openrag-backend:latest + image: phact/openrag-backend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.backend @@ -72,7 +73,7 @@ services: - ./keys:/app/keys:Z openrag-frontend: - image: phact/openrag-frontend:latest + image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.frontend @@ -87,7 +88,7 @@ services: langflow: volumes: - ./flows:/app/flows:Z - image: phact/langflow:responses + image: phact/langflow:${LANGFLOW_VERSION:-responses} container_name: langflow ports: - "7860:7860" diff --git a/docker-compose.yml b/docker-compose.yml index 16e9046f..23499cb9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: opensearch: - image: phact/openrag-opensearch:latest + image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile @@ -39,7 +39,7 @@ services: - "5601:5601" openrag-backend: - image: phact/openrag-backend:latest + image: phact/openrag-backend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.backend @@ -73,7 +73,7 @@ services: gpus: all openrag-frontend: - image: phact/openrag-frontend:latest + image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} #build: #context: . #dockerfile: Dockerfile.frontend @@ -88,7 +88,7 @@ services: langflow: volumes: - ./flows:/app/flows:Z - image: phact/langflow:responses + image: phact/langflow:${LANGFLOW_VERSION:-responses} container_name: langflow ports: - "7860:7860" @@ -104,4 +104,4 @@ services: - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE} - - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} \ No newline at end of file + - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} From 5a5541ce00c3774ac48f2207192c3fd1d83ec5c1 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:22:29 -0400 Subject: [PATCH 25/26] 0.1.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3ed39646..2b4e821d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "openrag" -version = "0.1.0" +version = "0.1.1" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" From f3616db4cd829aca5606520d5592a4e0bf790d88 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 16:33:06 -0400 Subject: [PATCH 26/26] FLOW_ID --- src/services/chat_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 6c0bb825..d2fe7ca9 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,4 +1,4 @@ -from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID +from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL from agent import ( async_chat, async_langflow,