LANGFLOW_URL

This commit is contained in:
phact 2025-08-27 23:09:11 -04:00
parent 262646c0b0
commit 859458efdf
7 changed files with 208 additions and 25 deletions

View file

@ -17,3 +17,6 @@ OPENAI_API_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
# OPTIONAL url for openrag link to langflow in the UI
LANGFLOW_PUBLIC_URL=

View file

@ -48,7 +48,10 @@ services:
environment:
- OPENSEARCH_HOST=opensearch
- LANGFLOW_URL=http://langflow:7860
- LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL}
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
@ -95,3 +98,8 @@ services:
- OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER
- LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}

View file

@ -49,7 +49,9 @@ services:
environment:
- OPENSEARCH_HOST=opensearch
- LANGFLOW_URL=http://langflow:7860
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
@ -97,3 +99,8 @@ services:
- OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER
- LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}

View file

@ -51,8 +51,33 @@ function KnowledgeSourcesPage() {
const [isSyncing, setIsSyncing] = useState<string | null>(null)
const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({})
const [maxFiles, setMaxFiles] = useState<number>(10)
// Settings state
// Note: backend internal Langflow URL is not needed on the frontend
const [flowId, setFlowId] = useState<string>('1098eea1-6649-4e1d-aed1-b77249fb8dd0')
const [langflowEditUrl, setLangflowEditUrl] = useState<string>('')
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>('')
// Fetch settings from backend
const fetchSettings = useCallback(async () => {
try {
const response = await fetch('/api/settings')
if (response.ok) {
const settings = await response.json()
if (settings.flow_id) {
setFlowId(settings.flow_id)
}
if (settings.langflow_edit_url) {
setLangflowEditUrl(settings.langflow_edit_url)
}
if (settings.langflow_public_url) {
setPublicLangflowUrl(settings.langflow_public_url)
}
}
} catch (error) {
console.error('Failed to fetch settings:', error)
}
}, [])
// Helper function to get connector icon
const getConnectorIcon = (iconName: string) => {
@ -239,6 +264,13 @@ function KnowledgeSourcesPage() {
}
}
// Fetch settings on mount when authenticated
useEffect(() => {
if (isAuthenticated) {
fetchSettings()
}
}, [isAuthenticated, fetchSettings])
// Check connector status on mount and when returning from OAuth
useEffect(() => {
if (isAuthenticated) {
@ -291,7 +323,15 @@ function KnowledgeSourcesPage() {
<p className="text-sm text-muted-foreground">Adjust your retrieval agent flow</p>
</div>
<Button
onClick={() => window.open('http://localhost:7860/flow/1098eea1-6649-4e1d-aed1-b77249fb8dd0', '_blank')}
onClick={() => {
const derivedFromWindow = typeof window !== 'undefined'
? `${window.location.protocol}//${window.location.hostname}:7860`
: ''
const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '')
const computed = flowId ? `${base}/flow/${flowId}` : base
const url = langflowEditUrl || computed
window.open(url, '_blank')
}}
>
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="22" viewBox="0 0 24 22" className="h-4 w-4 mr-2">
<path fill="currentColor" d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"></path>

View file

@ -1,4 +1,7 @@
import os
import requests
import asyncio
import time
from dotenv import load_dotenv
from opensearchpy import AsyncOpenSearch
from opensearchpy._async.http_aiohttp import AIOHttpConnection
@ -15,8 +18,14 @@ OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", "9200"))
OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME", "admin")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD")
LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860")
# Optional: public URL for browser links (e.g., http://localhost:7860)
LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL")
FLOW_ID = os.getenv("FLOW_ID")
LANGFLOW_KEY = os.getenv("LANGFLOW_SECRET_KEY")
# Langflow superuser credentials for API key generation
LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER")
LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD")
# Allow explicit key via environment; generation will be skipped if set
LANGFLOW_KEY = os.getenv("LANGFLOW_KEY")
SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-production")
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
@ -70,6 +79,78 @@ INDEX_BODY = {
}
}
async def generate_langflow_api_key():
"""Generate Langflow API key using superuser credentials at startup"""
global LANGFLOW_KEY
# If key already provided via env, do not attempt generation
if LANGFLOW_KEY:
print("[INFO] Using LANGFLOW_KEY from environment; skipping generation")
return LANGFLOW_KEY
if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD:
print("[WARNING] LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation")
return None
try:
print("[INFO] Generating Langflow API key using superuser credentials...")
max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15"))
delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0"))
last_error = None
for attempt in range(1, max_attempts + 1):
try:
# Login to get access token
login_response = requests.post(
f"{LANGFLOW_URL}/api/v1/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": LANGFLOW_SUPERUSER,
"password": LANGFLOW_SUPERUSER_PASSWORD
},
timeout=10
)
login_response.raise_for_status()
access_token = login_response.json().get("access_token")
if not access_token:
raise KeyError("access_token")
# Create API key
api_key_response = requests.post(
f"{LANGFLOW_URL}/api/v1/api_key/",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
},
json={"name": "openrag-auto-generated"},
timeout=10
)
api_key_response.raise_for_status()
api_key = api_key_response.json().get("api_key")
if not api_key:
raise KeyError("api_key")
LANGFLOW_KEY = api_key
print(f"[INFO] Successfully generated Langflow API key: {api_key[:8]}...")
return api_key
except (requests.exceptions.RequestException, KeyError) as e:
last_error = e
print(f"[WARN] Attempt {attempt}/{max_attempts} to generate Langflow API key failed: {e}")
if attempt < max_attempts:
time.sleep(delay_seconds)
else:
raise
except requests.exceptions.RequestException as e:
print(f"[ERROR] Failed to generate Langflow API key: {e}")
return None
except KeyError as e:
print(f"[ERROR] Unexpected response format from Langflow: missing {e}")
return None
except Exception as e:
print(f"[ERROR] Unexpected error generating Langflow API key: {e}")
return None
class AppClients:
def __init__(self):
self.opensearch = None
@ -77,7 +158,10 @@ class AppClients:
self.patched_async_client = None
self.converter = None
def initialize(self):
async def initialize(self):
# Generate Langflow API key first
await generate_langflow_api_key()
# Initialize OpenSearch client
self.opensearch = AsyncOpenSearch(
hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}],
@ -90,11 +174,18 @@ class AppClients:
http_compress=True,
)
# Initialize Langflow client
self.langflow_client = AsyncOpenAI(
base_url=f"{LANGFLOW_URL}/api/v1",
api_key=LANGFLOW_KEY
)
# Initialize Langflow client with generated/provided API key
if LANGFLOW_KEY and self.langflow_client is None:
try:
self.langflow_client = AsyncOpenAI(
base_url=f"{LANGFLOW_URL}/api/v1",
api_key=LANGFLOW_KEY
)
except Exception as e:
print(f"[WARNING] Failed to initialize Langflow client: {e}")
self.langflow_client = None
if self.langflow_client is None:
print("[WARNING] No Langflow client initialized yet; will attempt later on first use")
# Initialize patched OpenAI client
self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI())
@ -103,6 +194,24 @@ class AppClients:
self.converter = DocumentConverter()
return self
async def ensure_langflow_client(self):
"""Ensure Langflow client exists; try to generate key and create client lazily."""
if self.langflow_client is not None:
return self.langflow_client
# Try generating key again (with retries)
await generate_langflow_api_key()
if LANGFLOW_KEY and self.langflow_client is None:
try:
self.langflow_client = AsyncOpenAI(
base_url=f"{LANGFLOW_URL}/api/v1",
api_key=LANGFLOW_KEY
)
print("[INFO] Langflow client initialized on-demand")
except Exception as e:
print(f"[ERROR] Failed to initialize Langflow client on-demand: {e}")
self.langflow_client = None
return self.langflow_client
def create_user_opensearch_client(self, jwt_token: str):
"""Create OpenSearch client with user's JWT token for OIDC auth"""
@ -120,4 +229,4 @@ class AppClients:
)
# Global clients instance
clients = AppClients()
clients = AppClients()

View file

@ -34,7 +34,7 @@ from session_manager import SessionManager
from auth_middleware import require_auth, optional_auth
# API endpoints
from api import upload, search, chat, auth, connectors, tasks, oidc, knowledge_filter
from api import upload, search, chat, auth, connectors, tasks, oidc, knowledge_filter, settings
print("CUDA available:", torch.cuda.is_available())
print("CUDA version PyTorch was built with:", torch.version.cuda)
@ -155,13 +155,13 @@ async def init_index_when_ready():
print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready")
def initialize_services():
async def initialize_services():
"""Initialize all services and their dependencies"""
# Generate JWT keys if they don't exist
generate_jwt_keys()
# Initialize clients
clients.initialize()
# Initialize clients (now async to generate Langflow API key)
await clients.initialize()
# Initialize session manager
session_manager = SessionManager(SESSION_SECRET)
@ -202,9 +202,9 @@ def initialize_services():
'session_manager': session_manager
}
def create_app():
async def create_app():
"""Create and configure the Starlette application"""
services = initialize_services()
services = await initialize_services()
# Create route handlers with service dependencies injected
routes = [
@ -441,6 +441,13 @@ def create_app():
partial(oidc.token_introspection,
session_manager=services['session_manager']),
methods=["POST"]),
# Settings endpoint
Route("/settings",
require_auth(services['session_manager'])(
partial(settings.get_settings,
session_manager=services['session_manager'])
), methods=["GET"]),
]
app = Starlette(debug=True, routes=routes)
@ -507,8 +514,8 @@ if __name__ == "__main__":
# Register cleanup function
atexit.register(cleanup)
# Create app
app = create_app()
# Create app asynchronously
app = asyncio.run(create_app())
# Run the server (startup tasks now handled by Starlette startup event)
uvicorn.run(

View file

@ -1,4 +1,4 @@
from config.settings import clients, LANGFLOW_URL, FLOW_ID, LANGFLOW_KEY
from config.settings import clients, LANGFLOW_URL, FLOW_ID
from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream
from auth_context import set_auth_context
import json
@ -28,8 +28,8 @@ class ChatService:
if not prompt:
raise ValueError("Prompt is required")
if not LANGFLOW_URL or not FLOW_ID or not LANGFLOW_KEY:
raise ValueError("LANGFLOW_URL, FLOW_ID, and LANGFLOW_KEY environment variables are required")
if not LANGFLOW_URL or not FLOW_ID:
raise ValueError("LANGFLOW_URL and FLOW_ID environment variables are required")
# Prepare extra headers for JWT authentication
extra_headers = {}
@ -80,12 +80,17 @@ class ChatService:
print(f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}")
extra_headers['X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER'] = json.dumps(filter_expression)
# Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client()
if not langflow_client:
raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.")
if stream:
from agent import async_langflow_chat_stream
return async_langflow_chat_stream(clients.langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
return async_langflow_chat_stream(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
else:
from agent import async_langflow_chat
response_text, response_id = await async_langflow_chat(clients.langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
response_text, response_id = await async_langflow_chat(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id)
response_data = {"response": response_text}
if response_id:
response_data["response_id"] = response_id
@ -101,7 +106,11 @@ class ChatService:
extra_headers = {}
if jwt_token:
extra_headers['X-LANGFLOW-GLOBAL-VAR-JWT'] = jwt_token
response_text, response_id = await async_langflow(clients.langflow_client, FLOW_ID, document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id)
# Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client()
if not langflow_client:
raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.")
response_text, response_id = await async_langflow(langflow_client, FLOW_ID, document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id)
else: # chat
# Set auth context for chat tools and provide user_id
if user_id and jwt_token: