Merge branch 'ingestion-flow' into langflow-ingestion-modes

This commit is contained in:
Edwin Jose 2025-09-08 14:44:33 -04:00
commit 1e97ec693a
3 changed files with 108 additions and 103 deletions

View file

@ -884,7 +884,7 @@
],
"frozen": false,
"icon": "file-text",
"last_updated": "2025-09-08T11:35:39.784Z",
"last_updated": "2025-09-08T17:45:33.714Z",
"legacy": false,
"lf_version": "1.5.0.post2",
"metadata": {},
@ -906,23 +906,6 @@
"Message"
],
"value": "__UNDEFINED__"
},
{
"allows_loop": false,
"cache": true,
"display_name": "File Path",
"group_outputs": false,
"hidden": null,
"method": "load_files_path",
"name": "path",
"options": null,
"required_inputs": null,
"selected": "Message",
"tool_mode": true,
"types": [
"Message"
],
"value": "__UNDEFINED__"
}
],
"pinned": false,
@ -1069,9 +1052,7 @@
"bz2",
"gz"
],
"file_path": [
"242b5797-4104-4a01-8da1-b8036813100d/test_ingestion.txt"
],
"file_path": [],
"info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz",
"list": true,
"list_add_label": "Add More",
@ -1151,7 +1132,7 @@
"dragging": false,
"id": "File-PSU37",
"measured": {
"height": 234,
"height": 230,
"width": 320
},
"position": {
@ -1735,7 +1716,7 @@
"x": 2218.9287723423276,
"y": 1332.2598463956504
},
"selected": true,
"selected": false,
"type": "genericNode"
}
],

View file

@ -1,4 +1,5 @@
import json
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -178,9 +179,9 @@ class ChatService:
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
response_text, response_id = await async_langflow(
langflow_client,
LANGFLOW_CHAT_FLOW_ID,
document_prompt,
langflow_client=langflow_client,
flow_id=LANGFLOW_CHAT_FLOW_ID,
prompt=document_prompt,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
)
@ -199,17 +200,17 @@ class ChatService:
async def get_chat_history(self, user_id: str):
"""Get chat conversation history for a user"""
from agent import get_user_conversations, active_conversations
from agent import active_conversations, get_user_conversations
if not user_id:
return {"error": "User ID is required", "conversations": []}
# Get metadata from persistent storage
conversations_dict = get_user_conversations(user_id)
# Get in-memory conversations (with function calls)
in_memory_conversations = active_conversations.get(user_id, {})
logger.debug(
"Getting chat history for user",
user_id=user_id,
@ -219,7 +220,7 @@ class ChatService:
# Convert conversations dict to list format with metadata
conversations = []
# First, process in-memory conversations (they have function calls)
for response_id, conversation_state in in_memory_conversations.items():
# Filter out system messages
@ -235,13 +236,13 @@ class ChatService:
}
if msg.get("response_id"):
message_data["response_id"] = msg["response_id"]
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages: # Only include conversations with actual messages
@ -275,25 +276,27 @@ class ChatService:
"previous_response_id"
),
"total_messages": len(messages),
"source": "in_memory"
"source": "in_memory",
}
)
# Then, add any persistent metadata that doesn't have in-memory data
for response_id, metadata in conversations_dict.items():
if response_id not in in_memory_conversations:
# This is metadata-only conversation (no function calls)
conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
conversations.append(
{
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only",
}
)
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
@ -309,33 +312,37 @@ class ChatService:
"""Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database"""
from agent import get_user_conversations
from services.langflow_history_service import langflow_history_service
if not user_id:
return {"error": "User ID is required", "conversations": []}
all_conversations = []
try:
# 1. Get local conversation metadata (no actual messages stored here)
conversations_dict = get_user_conversations(user_id)
local_metadata = {}
for response_id, conversation_metadata in conversations_dict.items():
# Store metadata for later use with Langflow data
local_metadata[response_id] = conversation_metadata
# 2. Get actual conversations from Langflow database (source of truth for messages)
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
langflow_history = (
await langflow_history_service.get_user_conversation_history(
user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
)
)
if langflow_history.get("conversations"):
for conversation in langflow_history["conversations"]:
session_id = conversation["session_id"]
# Only process sessions that belong to this user (exist in local metadata)
if session_id not in local_metadata:
continue
# Use Langflow messages (with function calls) as source of truth
messages = []
for msg in conversation.get("messages", []):
@ -344,76 +351,91 @@ class ChatService:
"content": msg["content"],
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow"
"source": "langflow",
}
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages:
# Use local metadata if available, otherwise generate from Langflow data
metadata = local_metadata.get(session_id, {})
if not metadata.get("title"):
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None
)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
if first_user_msg
and len(first_user_msg["content"]) > 50
else first_user_msg["content"]
if first_user_msg
else "Langflow chat"
)
else:
title = metadata["title"]
all_conversations.append({
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at") or conversation.get("created_at"),
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id")
})
all_conversations.append(
{
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at")
or conversation.get("created_at"),
"last_activity": metadata.get("last_activity")
or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id"),
}
)
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
for response_id, metadata in local_metadata.items():
if not any(c["response_id"] == response_id for c in all_conversations):
all_conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "langflow",
"messages": [], # Will be filled when Langflow sync catches up
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
all_conversations.append(
{
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "langflow",
"messages": [], # Will be filled when Langflow sync catches up
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only",
}
)
if langflow_history.get("conversations"):
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
print(
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow"
)
elif langflow_history.get("error"):
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
print(
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
)
else:
print(f"[DEBUG] No Langflow conversations found for user {user_id}")
except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Sort by last activity (most recent first)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)")
print(
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
)
return {
"user_id": user_id,
"endpoint": "langflow",

View file

@ -1,23 +1,23 @@
"""Environment configuration manager for OpenRAG TUI."""
import os
import secrets
import string
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, List
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger(__name__)
from ..utils.validation import (
validate_openai_api_key,
sanitize_env_value,
validate_documents_paths,
validate_google_oauth_client_id,
validate_non_empty,
validate_openai_api_key,
validate_url,
validate_documents_paths,
sanitize_env_value,
)
@ -31,7 +31,7 @@ class EnvConfig:
langflow_secret_key: str = ""
langflow_superuser: str = "admin"
langflow_superuser_password: str = ""
flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
langflow_chat_flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
langflow_ingest_flow_id: str = "5488df7c-b93f-4f87-a446-b67028bc0813"
# OAuth settings
@ -99,7 +99,7 @@ class EnvManager:
"LANGFLOW_SECRET_KEY": "langflow_secret_key",
"LANGFLOW_SUPERUSER": "langflow_superuser",
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"FLOW_ID": "flow_id",
"LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id",
"LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
@ -236,8 +236,10 @@ class EnvManager:
f.write(
f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n"
)
f.write(f"FLOW_ID={self.config.flow_id}\n")
f.write(f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n")
f.write(f"LANGFLOW_CHAT_FLOW_ID={self.config.langflow_chat_flow_id}\n")
f.write(
f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n"
)
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")
f.write(