Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
Lucas Oliveira
d53e6cd2ed put config.yaml on docker compose 2025-09-26 17:40:50 -03:00
Lucas Oliveira
343dc442dd initialize index on startup when feature flag is enabled 2025-09-26 17:13:53 -03:00
Lucas Oliveira
44e4f3d0d6 update backend to not update embedding model when flag is disabled 2025-09-26 17:08:19 -03:00
Lucas Oliveira
c96d4943d5 ensure index if disable ingest with langflow is active 2025-09-26 17:04:20 -03:00
Lucas Oliveira
de447a6ae5 hard-coded openai models 2025-09-26 14:21:20 -03:00
7 changed files with 275 additions and 111 deletions

View file

@ -74,6 +74,7 @@ services:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z
- ./flows:/app/flows:Z - ./flows:/app/flows:Z
- ./config.yaml:/app/config.yaml:Z
openrag-frontend: openrag-frontend:
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -73,6 +73,7 @@ services:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z
- ./flows:/app/flows:z - ./flows:/app/flows:z
- ./config.yaml:/app/config.yaml:Z
gpus: all gpus: all
openrag-frontend: openrag-frontend:

View file

@ -2,7 +2,12 @@
from starlette.requests import Request from starlette.requests import Request
from config.settings import DISABLE_INGEST_WITH_LANGFLOW from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
clients,
INDEX_NAME,
INDEX_BODY,
)
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -22,7 +27,7 @@ class ConnectorRouter:
self.openrag_connector_service = openrag_connector_service self.openrag_connector_service = openrag_connector_service
logger.debug( logger.debug(
"ConnectorRouter initialized", "ConnectorRouter initialized",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
) )
def get_active_service(self): def get_active_service(self):
@ -38,6 +43,8 @@ class ConnectorRouter:
async def initialize(self): async def initialize(self):
"""Initialize the active connector service.""" """Initialize the active connector service."""
# Initialize OpenSearch index if using traditional OpenRAG connector service
return await self.get_active_service().initialize() return await self.get_active_service().initialize()
@property @property
@ -49,7 +56,9 @@ class ConnectorRouter:
"""Get a connector instance from the active service.""" """Get a connector instance from the active service."""
return await self.get_active_service().get_connector(connection_id) return await self.get_active_service().get_connector(connection_id)
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None): async def sync_specific_files(
self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None
):
"""Sync specific files using the active service.""" """Sync specific files using the active service."""
return await self.get_active_service().sync_specific_files( return await self.get_active_service().sync_specific_files(
connection_id, user_id, file_list, jwt_token connection_id, user_id, file_list, jwt_token
@ -64,4 +73,6 @@ class ConnectorRouter:
if hasattr(active_service, name): if hasattr(active_service, name):
return getattr(active_service, name) return getattr(active_service, name)
else: else:
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'") raise AttributeError(
f"'{type(active_service).__name__}' object has no attribute '{name}'"
)

View file

@ -3,6 +3,7 @@ import platform
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from utils.logging_config import get_logger from utils.logging_config import get_logger
from config.settings import ( from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
LANGFLOW_URL, LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID, LANGFLOW_INGEST_FLOW_ID,
@ -412,7 +413,7 @@ async def onboarding(request, flows_service):
config_updated = True config_updated = True
# Update knowledge settings # Update knowledge settings
if "embedding_model" in body: if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
if ( if (
not isinstance(body["embedding_model"], str) not isinstance(body["embedding_model"], str)
or not body["embedding_model"].strip() or not body["embedding_model"].strip()
@ -561,11 +562,16 @@ async def onboarding(request, flows_service):
# Import here to avoid circular imports # Import here to avoid circular imports
from main import init_index from main import init_index
logger.info("Initializing OpenSearch index after onboarding configuration") logger.info(
"Initializing OpenSearch index after onboarding configuration"
)
await init_index() await init_index()
logger.info("OpenSearch index initialization completed successfully") logger.info("OpenSearch index initialization completed successfully")
except Exception as e: except Exception as e:
logger.error("Failed to initialize OpenSearch index after onboarding", error=str(e)) logger.error(
"Failed to initialize OpenSearch index after onboarding",
error=str(e),
)
# Don't fail the entire onboarding process if index creation fails # Don't fail the entire onboarding process if index creation fails
# The application can still work, but document operations may fail # The application can still work, but document operations may fail

View file

@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth
from config.settings import ( from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW, DISABLE_INGEST_WITH_LANGFLOW,
EMBED_MODEL, EMBED_MODEL,
INDEX_BODY,
INDEX_NAME, INDEX_NAME,
SESSION_SECRET, SESSION_SECRET,
clients, clients,
@ -82,6 +83,7 @@ logger.info(
cuda_version=torch.version.cuda, cuda_version=torch.version.cuda,
) )
async def wait_for_opensearch(): async def wait_for_opensearch():
"""Wait for OpenSearch to be ready with retries""" """Wait for OpenSearch to be ready with retries"""
max_retries = 30 max_retries = 30
@ -128,6 +130,34 @@ async def configure_alerting_security():
# Don't fail startup if alerting config fails # Don't fail startup if alerting config fails
async def _ensure_opensearch_index(self):
"""Ensure OpenSearch index exists when using traditional connector service."""
try:
# Check if index already exists
if await clients.opensearch.indices.exists(index=INDEX_NAME):
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
return
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
logger.info(
"Created OpenSearch index for traditional connector service",
index_name=INDEX_NAME,
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][
"dimension"
],
)
except Exception as e:
logger.error(
"Failed to initialize OpenSearch index for traditional connector service",
error=str(e),
index_name=INDEX_NAME,
)
# Don't raise the exception to avoid breaking the initialization
# The service can still function, document operations might fail later
async def init_index(): async def init_index():
"""Initialize OpenSearch index and security roles""" """Initialize OpenSearch index and security roles"""
await wait_for_opensearch() await wait_for_opensearch()
@ -141,10 +171,20 @@ async def init_index():
# Create documents index # Create documents index
if not await clients.opensearch.indices.exists(index=INDEX_NAME): if not await clients.opensearch.indices.exists(index=INDEX_NAME):
await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body) await clients.opensearch.indices.create(
logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model) index=INDEX_NAME, body=dynamic_index_body
)
logger.info(
"Created OpenSearch index",
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
else: else:
logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model) logger.info(
"Index already exists, skipping creation",
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
# Create knowledge filters index # Create knowledge filters index
knowledge_filter_index_name = "knowledge_filters" knowledge_filter_index_name = "knowledge_filters"
@ -402,6 +442,9 @@ async def startup_tasks(services):
# Index will be created after onboarding when we know the embedding model # Index will be created after onboarding when we know the embedding model
await wait_for_opensearch() await wait_for_opensearch()
if DISABLE_INGEST_WITH_LANGFLOW:
await _ensure_opensearch_index()
# Configure alerting security # Configure alerting security
await configure_alerting_security() await configure_alerting_security()
@ -1075,14 +1118,6 @@ async def create_app():
return app return app
async def startup():
"""Application startup tasks"""
await init_index()
# Get services from app state if needed for initialization
# services = app.state.services
# await services['connector_service'].initialize()
def cleanup(): def cleanup():
"""Cleanup on application shutdown""" """Cleanup on application shutdown"""
# Cleanup process pools only (webhooks handled by Starlette shutdown) # Cleanup process pools only (webhooks handled by Starlette shutdown)

View file

@ -1,5 +1,6 @@
import asyncio import asyncio
from config.settings import ( from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
NUDGES_FLOW_ID, NUDGES_FLOW_ID,
LANGFLOW_URL, LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
@ -73,17 +74,17 @@ class FlowsService:
# Scan all JSON files in the flows directory # Scan all JSON files in the flows directory
try: try:
for filename in os.listdir(flows_dir): for filename in os.listdir(flows_dir):
if not filename.endswith('.json'): if not filename.endswith(".json"):
continue continue
file_path = os.path.join(flows_dir, filename) file_path = os.path.join(flows_dir, filename)
try: try:
with open(file_path, 'r') as f: with open(file_path, "r") as f:
flow_data = json.load(f) flow_data = json.load(f)
# Check if this file contains the flow we're looking for # Check if this file contains the flow we're looking for
if flow_data.get('id') == flow_id: if flow_data.get("id") == flow_id:
# Cache the result # Cache the result
self._flow_file_cache[flow_id] = file_path self._flow_file_cache[flow_id] = file_path
logger.info(f"Found flow {flow_id} in file: {filename}") logger.info(f"Found flow {flow_id} in file: {filename}")
@ -99,6 +100,7 @@ class FlowsService:
logger.warning(f"Flow with ID {flow_id} not found in flows directory") logger.warning(f"Flow with ID {flow_id} not found in flows directory")
return None return None
async def reset_langflow_flow(self, flow_type: str): async def reset_langflow_flow(self, flow_type: str):
"""Reset a Langflow flow by uploading the corresponding JSON file """Reset a Langflow flow by uploading the corresponding JSON file
@ -135,7 +137,9 @@ class FlowsService:
try: try:
with open(flow_path, "r") as f: with open(flow_path, "r") as f:
flow_data = json.load(f) flow_data = json.load(f)
logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}") logger.info(
f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}"
)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}") raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
except FileNotFoundError: except FileNotFoundError:
@ -161,43 +165,62 @@ class FlowsService:
# Check if configuration has been edited (onboarding completed) # Check if configuration has been edited (onboarding completed)
if config.edited: if config.edited:
logger.info(f"Updating {flow_type} flow with current configuration settings") logger.info(
f"Updating {flow_type} flow with current configuration settings"
)
provider = config.provider.model_provider.lower() provider = config.provider.model_provider.lower()
# Step 1: Assign model provider (replace components) if not OpenAI # Step 1: Assign model provider (replace components) if not OpenAI
if provider != "openai": if provider != "openai":
logger.info(f"Assigning {provider} components to {flow_type} flow") logger.info(
f"Assigning {provider} components to {flow_type} flow"
)
provider_result = await self.assign_model_provider(provider) provider_result = await self.assign_model_provider(provider)
if not provider_result.get("success"): if not provider_result.get("success"):
logger.warning(f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}") logger.warning(
f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}"
)
# Continue anyway, maybe just value updates will work # Continue anyway, maybe just value updates will work
# Step 2: Update model values for the specific flow being reset # Step 2: Update model values for the specific flow being reset
single_flow_config = [{ single_flow_config = [
"name": flow_type, {
"flow_id": flow_id, "name": flow_type,
}] "flow_id": flow_id,
}
]
logger.info(f"Updating {flow_type} flow model values") logger.info(f"Updating {flow_type} flow model values")
update_result = await self.change_langflow_model_value( update_result = await self.change_langflow_model_value(
provider=provider, provider=provider,
embedding_model=config.knowledge.embedding_model, embedding_model=config.knowledge.embedding_model,
llm_model=config.agent.llm_model, llm_model=config.agent.llm_model,
endpoint=config.provider.endpoint if config.provider.endpoint else None, endpoint=config.provider.endpoint
flow_configs=single_flow_config if config.provider.endpoint
else None,
flow_configs=single_flow_config,
) )
if update_result.get("success"): if update_result.get("success"):
logger.info(f"Successfully updated {flow_type} flow with current configuration") logger.info(
f"Successfully updated {flow_type} flow with current configuration"
)
else: else:
logger.warning(f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}") logger.warning(
f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}"
)
else: else:
logger.info(f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow") logger.info(
f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow"
)
except Exception as e: except Exception as e:
logger.error(f"Error updating {flow_type} flow with current configuration", error=str(e)) logger.error(
f"Error updating {flow_type} flow with current configuration",
error=str(e),
)
# Don't fail the entire reset operation if configuration update fails # Don't fail the entire reset operation if configuration update fails
return { return {
@ -243,7 +266,9 @@ class FlowsService:
try: try:
# Load component templates based on provider # Load component templates based on provider
llm_template, embedding_template, llm_text_template = self._load_component_templates(provider) llm_template, embedding_template, llm_text_template = (
self._load_component_templates(provider)
)
logger.info(f"Assigning {provider} components") logger.info(f"Assigning {provider} components")
@ -358,7 +383,9 @@ class FlowsService:
logger.info(f"Loaded component templates for {provider}") logger.info(f"Loaded component templates for {provider}")
return llm_template, embedding_template, llm_text_template return llm_template, embedding_template, llm_text_template
async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template): async def _update_flow_components(
self, config, llm_template, embedding_template, llm_text_template
):
"""Update components in a specific flow""" """Update components in a specific flow"""
flow_name = config["name"] flow_name = config["name"]
flow_id = config["flow_id"] flow_id = config["flow_id"]
@ -383,20 +410,23 @@ class FlowsService:
components_updated = [] components_updated = []
# Replace embedding component # Replace embedding component
embedding_node = self._find_node_by_id(flow_data, old_embedding_id) if not DISABLE_INGEST_WITH_LANGFLOW:
if embedding_node: embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
# Preserve position if embedding_node:
original_position = embedding_node.get("position", {}) # Preserve position
original_position = embedding_node.get("position", {})
# Replace with new template # Replace with new template
new_embedding_node = embedding_template.copy() new_embedding_node = embedding_template.copy()
new_embedding_node["position"] = original_position new_embedding_node["position"] = original_position
# Replace in flow # Replace in flow
self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node) self._replace_node_in_flow(
components_updated.append( flow_data, old_embedding_id, new_embedding_node
f"embedding: {old_embedding_id} -> {new_embedding_id}" )
) components_updated.append(
f"embedding: {old_embedding_id} -> {new_embedding_id}"
)
# Replace LLM component (if exists in this flow) # Replace LLM component (if exists in this flow)
if old_llm_id: if old_llm_id:
@ -425,27 +455,30 @@ class FlowsService:
new_llm_text_node["position"] = original_position new_llm_text_node["position"] = original_position
# Replace in flow # Replace in flow
self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node) self._replace_node_in_flow(
components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}") flow_data, old_llm_text_id, new_llm_text_node
)
components_updated.append(
f"llm: {old_llm_text_id} -> {new_llm_text_id}"
)
# Update all edge references using regex replacement # Update all edge references using regex replacement
flow_json_str = json.dumps(flow_data) flow_json_str = json.dumps(flow_data)
# Replace embedding ID references # Replace embedding ID references
flow_json_str = re.sub( if not DISABLE_INGEST_WITH_LANGFLOW:
re.escape(old_embedding_id), new_embedding_id, flow_json_str flow_json_str = re.sub(
) re.escape(old_embedding_id), new_embedding_id, flow_json_str
flow_json_str = re.sub( )
re.escape(old_embedding_id.split("-")[0]), flow_json_str = re.sub(
new_embedding_id.split("-")[0], re.escape(old_embedding_id.split("-")[0]),
flow_json_str, new_embedding_id.split("-")[0],
) flow_json_str,
)
# Replace LLM ID references (if applicable) # Replace LLM ID references (if applicable)
if old_llm_id: if old_llm_id:
flow_json_str = re.sub( flow_json_str = re.sub(re.escape(old_llm_id), new_llm_id, flow_json_str)
re.escape(old_llm_id), new_llm_id, flow_json_str
)
if old_llm_text_id: if old_llm_text_id:
flow_json_str = re.sub( flow_json_str = re.sub(
re.escape(old_llm_text_id), new_llm_text_id, flow_json_str re.escape(old_llm_text_id), new_llm_text_id, flow_json_str
@ -506,7 +539,14 @@ class FlowsService:
return None, None return None, None
async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None): async def _update_flow_field(
self,
flow_id: str,
field_name: str,
field_value: str,
node_display_name: str = None,
node_id: str = None,
):
""" """
Generic helper function to update any field in any Langflow component. Generic helper function to update any field in any Langflow component.
@ -521,22 +561,26 @@ class FlowsService:
raise ValueError("flow_id is required") raise ValueError("flow_id is required")
# Get the current flow data from Langflow # Get the current flow data from Langflow
response = await clients.langflow_request( response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
"GET", f"/api/v1/flows/{flow_id}"
)
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}") raise Exception(
f"Failed to get flow: HTTP {response.status_code} - {response.text}"
)
flow_data = response.json() flow_data = response.json()
# Find the target component by display name first, then by ID as fallback # Find the target component by display name first, then by ID as fallback
target_node, target_node_index = None, None target_node, target_node_index = None, None
if node_display_name: if node_display_name:
target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name) target_node, target_node_index = self._find_node_in_flow(
flow_data, display_name=node_display_name
)
if target_node is None and node_id: if target_node is None and node_id:
target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id) target_node, target_node_index = self._find_node_in_flow(
flow_data, node_id=node_id
)
if target_node is None: if target_node is None:
identifier = node_display_name or node_id identifier = node_display_name or node_id
@ -545,7 +589,9 @@ class FlowsService:
# Update the field value directly in the existing node # Update the field value directly in the existing node
template = target_node.get("data", {}).get("node", {}).get("template", {}) template = target_node.get("data", {}).get("node", {}).get("template", {})
if template.get(field_name): if template.get(field_name):
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][
field_name
]["value"] = field_value
else: else:
identifier = node_display_name or node_id identifier = node_display_name or node_id
raise Exception(f"{field_name} field not found in {identifier} component") raise Exception(f"{field_name} field not found in {identifier} component")
@ -556,21 +602,31 @@ class FlowsService:
) )
if patch_response.status_code != 200: if patch_response.status_code != 200:
raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}") raise Exception(
f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}"
)
async def update_chat_flow_model(self, model_name: str): async def update_chat_flow_model(self, model_name: str):
"""Helper function to update the model in the chat flow""" """Helper function to update the model in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID: if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name, await self._update_flow_field(
node_display_name="Language Model") LANGFLOW_CHAT_FLOW_ID,
"model_name",
model_name,
node_display_name="Language Model",
)
async def update_chat_flow_system_prompt(self, system_prompt: str): async def update_chat_flow_system_prompt(self, system_prompt: str):
"""Helper function to update the system prompt in the chat flow""" """Helper function to update the system prompt in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID: if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt, await self._update_flow_field(
node_display_name="Agent") LANGFLOW_CHAT_FLOW_ID,
"system_prompt",
system_prompt,
node_display_name="Agent",
)
async def update_flow_docling_preset(self, preset: str, preset_config: dict): async def update_flow_docling_preset(self, preset: str, preset_config: dict):
"""Helper function to update docling preset in the ingest flow""" """Helper function to update docling preset in the ingest flow"""
@ -578,29 +634,46 @@ class FlowsService:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
from config.settings import DOCLING_COMPONENT_ID from config.settings import DOCLING_COMPONENT_ID
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config,
node_id=DOCLING_COMPONENT_ID) await self._update_flow_field(
LANGFLOW_INGEST_FLOW_ID,
"docling_serve_opts",
preset_config,
node_id=DOCLING_COMPONENT_ID,
)
async def update_ingest_flow_chunk_size(self, chunk_size: int): async def update_ingest_flow_chunk_size(self, chunk_size: int):
"""Helper function to update chunk size in the ingest flow""" """Helper function to update chunk size in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID: if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size, await self._update_flow_field(
node_display_name="Split Text") LANGFLOW_INGEST_FLOW_ID,
"chunk_size",
chunk_size,
node_display_name="Split Text",
)
async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int): async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int):
"""Helper function to update chunk overlap in the ingest flow""" """Helper function to update chunk overlap in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID: if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap, await self._update_flow_field(
node_display_name="Split Text") LANGFLOW_INGEST_FLOW_ID,
"chunk_overlap",
chunk_overlap,
node_display_name="Split Text",
)
async def update_ingest_flow_embedding_model(self, embedding_model: str): async def update_ingest_flow_embedding_model(self, embedding_model: str):
"""Helper function to update embedding model in the ingest flow""" """Helper function to update embedding model in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID: if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model, await self._update_flow_field(
node_display_name="Embedding Model") LANGFLOW_INGEST_FLOW_ID,
"model",
embedding_model,
node_display_name="Embedding Model",
)
def _replace_node_in_flow(self, flow_data, old_id, new_node): def _replace_node_in_flow(self, flow_data, old_id, new_node):
"""Replace a node in the flow data""" """Replace a node in the flow data"""
@ -612,7 +685,12 @@ class FlowsService:
return False return False
async def change_langflow_model_value( async def change_langflow_model_value(
self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None, flow_configs: list = None self,
provider: str,
embedding_model: str,
llm_model: str,
endpoint: str = None,
flow_configs: list = None,
): ):
""" """
Change dropdown values for provider-specific components across flows Change dropdown values for provider-specific components across flows
@ -656,8 +734,8 @@ class FlowsService:
] ]
# Determine target component IDs based on provider # Determine target component IDs based on provider
target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids( target_embedding_id, target_llm_id, target_llm_text_id = (
provider self._get_provider_component_ids(provider)
) )
results = [] results = []
@ -713,12 +791,24 @@ class FlowsService:
def _get_provider_component_ids(self, provider: str): def _get_provider_component_ids(self, provider: str):
"""Get the component IDs for a specific provider""" """Get the component IDs for a specific provider"""
if provider == "watsonx": if provider == "watsonx":
return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID return (
WATSONX_EMBEDDING_COMPONENT_ID,
WATSONX_LLM_COMPONENT_ID,
WATSONX_LLM_TEXT_COMPONENT_ID,
)
elif provider == "ollama": elif provider == "ollama":
return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID return (
OLLAMA_EMBEDDING_COMPONENT_ID,
OLLAMA_LLM_COMPONENT_ID,
OLLAMA_LLM_TEXT_COMPONENT_ID,
)
elif provider == "openai": elif provider == "openai":
# OpenAI components are the default ones # OpenAI components are the default ones
return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID return (
OPENAI_EMBEDDING_COMPONENT_ID,
OPENAI_LLM_COMPONENT_ID,
OPENAI_LLM_TEXT_COMPONENT_ID,
)
else: else:
raise ValueError(f"Unsupported provider: {provider}") raise ValueError(f"Unsupported provider: {provider}")
@ -738,9 +828,7 @@ class FlowsService:
flow_id = config["flow_id"] flow_id = config["flow_id"]
# Get flow data from Langflow API instead of file # Get flow data from Langflow API instead of file
response = await clients.langflow_request( response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
"GET", f"/api/v1/flows/{flow_id}"
)
if response.status_code != 200: if response.status_code != 200:
raise Exception( raise Exception(
@ -752,12 +840,13 @@ class FlowsService:
updates_made = [] updates_made = []
# Update embedding component # Update embedding component
embedding_node = self._find_node_by_id(flow_data, target_embedding_id) if not DISABLE_INGEST_WITH_LANGFLOW:
if embedding_node: embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
if self._update_component_fields( if embedding_node:
embedding_node, provider, embedding_model, endpoint if self._update_component_fields(
): embedding_node, provider, embedding_model, endpoint
updates_made.append(f"embedding model: {embedding_model}") ):
updates_made.append(f"embedding model: {embedding_model}")
# Update LLM component (if exists in this flow) # Update LLM component (if exists in this flow)
if target_llm_id: if target_llm_id:

View file

@ -20,6 +20,27 @@ class ModelsService:
"jina-embeddings-v2-base-en", "jina-embeddings-v2-base-en",
] ]
OPENAI_TOOL_CALLING_MODELS = [
"gpt-5",
"gpt-5-mini",
"gpt-5-nano",
"gpt-4o-mini",
"gpt-4o",
"gpt-4.1",
"gpt-4.1-mini",
"gpt-4.1-nano",
"gpt-4-turbo",
"gpt-4-turbo-preview",
"gpt-4",
"gpt-3.5-turbo",
"o1",
"o3-mini",
"o3",
"o3-pro",
"o4-mini",
"o4-mini-high",
]
def __init__(self): def __init__(self):
self.session_manager = None self.session_manager = None
@ -48,12 +69,12 @@ class ModelsService:
model_id = model.get("id", "") model_id = model.get("id", "")
# Language models (GPT models) # Language models (GPT models)
if any(prefix in model_id for prefix in ["gpt-4", "gpt-3.5"]): if model_id in self.OPENAI_TOOL_CALLING_MODELS:
language_models.append( language_models.append(
{ {
"value": model_id, "value": model_id,
"label": model_id, "label": model_id,
"default": model_id == "gpt-4o-mini", "default": model_id == "gpt-5",
} }
) )