diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index 5ee5d1b2..5d872b42 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -884,7 +884,7 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-09-08T11:35:39.784Z", + "last_updated": "2025-09-08T17:45:33.714Z", "legacy": false, "lf_version": "1.5.0.post2", "metadata": {}, @@ -906,23 +906,6 @@ "Message" ], "value": "__UNDEFINED__" - }, - { - "allows_loop": false, - "cache": true, - "display_name": "File Path", - "group_outputs": false, - "hidden": null, - "method": "load_files_path", - "name": "path", - "options": null, - "required_inputs": null, - "selected": "Message", - "tool_mode": true, - "types": [ - "Message" - ], - "value": "__UNDEFINED__" } ], "pinned": false, @@ -1069,9 +1052,7 @@ "bz2", "gz" ], - "file_path": [ - "242b5797-4104-4a01-8da1-b8036813100d/test_ingestion.txt" - ], + "file_path": [], "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": true, "list_add_label": "Add More", @@ -1151,7 +1132,7 @@ "dragging": false, "id": "File-PSU37", "measured": { - "height": 234, + "height": 230, "width": 320 }, "position": { @@ -1735,7 +1716,7 @@ "x": 2218.9287723423276, "y": 1332.2598463956504 }, - "selected": true, + "selected": false, "type": "genericNode" } ], diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 122c90fc..34b28c9a 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,4 +1,5 @@ import json + from utils.logging_config import get_logger logger = get_logger(__name__) @@ -178,9 +179,9 @@ class ChatService: "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." ) response_text, response_id = await async_langflow( - langflow_client, - LANGFLOW_CHAT_FLOW_ID, - document_prompt, + langflow_client=langflow_client, + flow_id=LANGFLOW_CHAT_FLOW_ID, + prompt=document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id, ) @@ -199,17 +200,17 @@ class ChatService: async def get_chat_history(self, user_id: str): """Get chat conversation history for a user""" - from agent import get_user_conversations, active_conversations + from agent import active_conversations, get_user_conversations if not user_id: return {"error": "User ID is required", "conversations": []} # Get metadata from persistent storage conversations_dict = get_user_conversations(user_id) - + # Get in-memory conversations (with function calls) in_memory_conversations = active_conversations.get(user_id, {}) - + logger.debug( "Getting chat history for user", user_id=user_id, @@ -219,7 +220,7 @@ class ChatService: # Convert conversations dict to list format with metadata conversations = [] - + # First, process in-memory conversations (they have function calls) for response_id, conversation_state in in_memory_conversations.items(): # Filter out system messages @@ -235,13 +236,13 @@ class ChatService: } if msg.get("response_id"): message_data["response_id"] = msg["response_id"] - + # Include function call data if present if msg.get("chunks"): message_data["chunks"] = msg["chunks"] if msg.get("response_data"): message_data["response_data"] = msg["response_data"] - + messages.append(message_data) if messages: # Only include conversations with actual messages @@ -275,25 +276,27 @@ class ChatService: "previous_response_id" ), "total_messages": len(messages), - "source": "in_memory" + "source": "in_memory", } ) - + # Then, add any persistent metadata that doesn't have in-memory data for response_id, metadata in conversations_dict.items(): if response_id not in in_memory_conversations: # This is metadata-only conversation (no function calls) - conversations.append({ - "response_id": response_id, - "title": metadata.get("title", "New Chat"), - "endpoint": "chat", - "messages": [], # No messages in metadata-only - "created_at": metadata.get("created_at"), - "last_activity": metadata.get("last_activity"), - "previous_response_id": metadata.get("previous_response_id"), - "total_messages": metadata.get("total_messages", 0), - "source": "metadata_only" - }) + conversations.append( + { + "response_id": response_id, + "title": metadata.get("title", "New Chat"), + "endpoint": "chat", + "messages": [], # No messages in metadata-only + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "previous_response_id": metadata.get("previous_response_id"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only", + } + ) # Sort by last activity (most recent first) conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) @@ -309,33 +312,37 @@ class ChatService: """Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database""" from agent import get_user_conversations from services.langflow_history_service import langflow_history_service - + if not user_id: return {"error": "User ID is required", "conversations": []} - + all_conversations = [] - + try: # 1. Get local conversation metadata (no actual messages stored here) conversations_dict = get_user_conversations(user_id) local_metadata = {} - + for response_id, conversation_metadata in conversations_dict.items(): # Store metadata for later use with Langflow data local_metadata[response_id] = conversation_metadata - + # 2. Get actual conversations from Langflow database (source of truth for messages) print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") - langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID) - + langflow_history = ( + await langflow_history_service.get_user_conversation_history( + user_id, flow_id=LANGFLOW_CHAT_FLOW_ID + ) + ) + if langflow_history.get("conversations"): for conversation in langflow_history["conversations"]: session_id = conversation["session_id"] - + # Only process sessions that belong to this user (exist in local metadata) if session_id not in local_metadata: continue - + # Use Langflow messages (with function calls) as source of truth messages = [] for msg in conversation.get("messages", []): @@ -344,76 +351,91 @@ class ChatService: "content": msg["content"], "timestamp": msg.get("timestamp"), "langflow_message_id": msg.get("langflow_message_id"), - "source": "langflow" + "source": "langflow", } - + # Include function call data if present if msg.get("chunks"): message_data["chunks"] = msg["chunks"] if msg.get("response_data"): message_data["response_data"] = msg["response_data"] - + messages.append(message_data) - + if messages: # Use local metadata if available, otherwise generate from Langflow data metadata = local_metadata.get(session_id, {}) - + if not metadata.get("title"): - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) title = ( first_user_msg["content"][:50] + "..." - if first_user_msg and len(first_user_msg["content"]) > 50 + if first_user_msg + and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "Langflow chat" ) else: title = metadata["title"] - - all_conversations.append({ - "response_id": session_id, - "title": title, - "endpoint": "langflow", - "messages": messages, # Function calls preserved from Langflow - "created_at": metadata.get("created_at") or conversation.get("created_at"), - "last_activity": metadata.get("last_activity") or conversation.get("last_activity"), - "total_messages": len(messages), - "source": "langflow_enhanced", - "langflow_session_id": session_id, - "langflow_flow_id": conversation.get("flow_id") - }) - + + all_conversations.append( + { + "response_id": session_id, + "title": title, + "endpoint": "langflow", + "messages": messages, # Function calls preserved from Langflow + "created_at": metadata.get("created_at") + or conversation.get("created_at"), + "last_activity": metadata.get("last_activity") + or conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_enhanced", + "langflow_session_id": session_id, + "langflow_flow_id": conversation.get("flow_id"), + } + ) + # 3. Add any local metadata that doesn't have Langflow data yet (recent conversations) for response_id, metadata in local_metadata.items(): if not any(c["response_id"] == response_id for c in all_conversations): - all_conversations.append({ - "response_id": response_id, - "title": metadata.get("title", "New Chat"), - "endpoint": "langflow", - "messages": [], # Will be filled when Langflow sync catches up - "created_at": metadata.get("created_at"), - "last_activity": metadata.get("last_activity"), - "total_messages": metadata.get("total_messages", 0), - "source": "metadata_only" - }) - + all_conversations.append( + { + "response_id": response_id, + "title": metadata.get("title", "New Chat"), + "endpoint": "langflow", + "messages": [], # Will be filled when Langflow sync catches up + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only", + } + ) + if langflow_history.get("conversations"): - print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + print( + f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + ) elif langflow_history.get("error"): - print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + print( + f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + ) else: print(f"[DEBUG] No Langflow conversations found for user {user_id}") - + except Exception as e: print(f"[ERROR] Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations - + # Sort by last activity (most recent first) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - - print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)") - + + print( + f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)" + ) + return { "user_id": user_id, "endpoint": "langflow", diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index 6bff9305..8577054b 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -1,23 +1,23 @@ """Environment configuration manager for OpenRAG TUI.""" -import os import secrets import string +from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Dict, Optional, List -from dataclasses import dataclass, field +from typing import Dict, List, Optional + from utils.logging_config import get_logger logger = get_logger(__name__) from ..utils.validation import ( - validate_openai_api_key, + sanitize_env_value, + validate_documents_paths, validate_google_oauth_client_id, validate_non_empty, + validate_openai_api_key, validate_url, - validate_documents_paths, - sanitize_env_value, ) @@ -31,7 +31,7 @@ class EnvConfig: langflow_secret_key: str = "" langflow_superuser: str = "admin" langflow_superuser_password: str = "" - flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" + langflow_chat_flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" langflow_ingest_flow_id: str = "5488df7c-b93f-4f87-a446-b67028bc0813" # OAuth settings @@ -99,7 +99,7 @@ class EnvManager: "LANGFLOW_SECRET_KEY": "langflow_secret_key", "LANGFLOW_SUPERUSER": "langflow_superuser", "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", - "FLOW_ID": "flow_id", + "LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id", "LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id", "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", @@ -236,8 +236,10 @@ class EnvManager: f.write( f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n" ) - f.write(f"FLOW_ID={self.config.flow_id}\n") - f.write(f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n") + f.write(f"LANGFLOW_CHAT_FLOW_ID={self.config.langflow_chat_flow_id}\n") + f.write( + f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n" + ) f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") f.write(