From 52896a8da7d18d16e119a4995a2ac1a19cceed6e Mon Sep 17 00:00:00 2001 From: Lucas Oliveira <62335616+lucaseduoli@users.noreply.github.com> Date: Wed, 1 Oct 2025 16:50:21 -0300 Subject: [PATCH] fix: set defined models for openai, make default embedding be used when disable ingest with LF feature flag is enabled (#128) * hard-coded openai models * ensure index if disable ingest with langflow is active * update backend to not update embedding model when flag is disabled * initialize index on startup when feature flag is enabled * put config.yaml on docker compose --- docker-compose-cpu.yml | 1 + docker-compose.yml | 1 + src/api/connector_router.py | 37 +++-- src/api/settings.py | 12 +- src/main.py | 57 ++++++-- src/services/flows_service.py | 253 ++++++++++++++++++++++----------- src/services/models_service.py | 25 +++- 7 files changed, 275 insertions(+), 111 deletions(-) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index d0de6ce9..0bcf66ac 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -74,6 +74,7 @@ services: - ./documents:/app/documents:Z - ./keys:/app/keys:Z - ./flows:/app/flows:Z + - ./config.yaml:/app/config.yaml:Z openrag-frontend: image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} diff --git a/docker-compose.yml b/docker-compose.yml index daa921ae..d7ac3ac3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,6 +73,7 @@ services: - ./documents:/app/documents:Z - ./keys:/app/keys:Z - ./flows:/app/flows:z + - ./config.yaml:/app/config.yaml:Z gpus: all openrag-frontend: diff --git a/src/api/connector_router.py b/src/api/connector_router.py index 2a692ae4..dd98e474 100644 --- a/src/api/connector_router.py +++ b/src/api/connector_router.py @@ -2,7 +2,12 @@ from starlette.requests import Request -from config.settings import DISABLE_INGEST_WITH_LANGFLOW +from config.settings import ( + DISABLE_INGEST_WITH_LANGFLOW, + clients, + INDEX_NAME, + INDEX_BODY, +) from utils.logging_config import get_logger logger = get_logger(__name__) @@ -12,19 +17,19 @@ class ConnectorRouter: """ Router that automatically chooses between LangflowConnectorService and ConnectorService based on the DISABLE_INGEST_WITH_LANGFLOW configuration. - + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService """ - + def __init__(self, langflow_connector_service, openrag_connector_service): self.langflow_connector_service = langflow_connector_service self.openrag_connector_service = openrag_connector_service logger.debug( "ConnectorRouter initialized", - disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, ) - + def get_active_service(self): """Get the currently active connector service based on configuration.""" if DISABLE_INGEST_WITH_LANGFLOW: @@ -33,28 +38,32 @@ class ConnectorRouter: else: logger.debug("Using Langflow connector service") return self.langflow_connector_service - + # Proxy all connector service methods to the active service - + async def initialize(self): """Initialize the active connector service.""" + # Initialize OpenSearch index if using traditional OpenRAG connector service + return await self.get_active_service().initialize() - + @property def connection_manager(self): """Get the connection manager from the active service.""" return self.get_active_service().connection_manager - + async def get_connector(self, connection_id: str): """Get a connector instance from the active service.""" return await self.get_active_service().get_connector(connection_id) - - async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None): + + async def sync_specific_files( + self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None + ): """Sync specific files using the active service.""" return await self.get_active_service().sync_specific_files( connection_id, user_id, file_list, jwt_token ) - + def __getattr__(self, name): """ Proxy any other method calls to the active service. @@ -64,4 +73,6 @@ class ConnectorRouter: if hasattr(active_service, name): return getattr(active_service, name) else: - raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'") + raise AttributeError( + f"'{type(active_service).__name__}' object has no attribute '{name}'" + ) diff --git a/src/api/settings.py b/src/api/settings.py index a3fdeee3..686912b6 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -4,6 +4,7 @@ from starlette.responses import JSONResponse from utils.container_utils import transform_localhost_url from utils.logging_config import get_logger from config.settings import ( + DISABLE_INGEST_WITH_LANGFLOW, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, @@ -450,7 +451,7 @@ async def onboarding(request, flows_service): config_updated = True # Update knowledge settings - if "embedding_model" in body: + if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW: if ( not isinstance(body["embedding_model"], str) or not body["embedding_model"].strip() @@ -600,11 +601,16 @@ async def onboarding(request, flows_service): # Import here to avoid circular imports from main import init_index - logger.info("Initializing OpenSearch index after onboarding configuration") + logger.info( + "Initializing OpenSearch index after onboarding configuration" + ) await init_index() logger.info("OpenSearch index initialization completed successfully") except Exception as e: - logger.error("Failed to initialize OpenSearch index after onboarding", error=str(e)) + logger.error( + "Failed to initialize OpenSearch index after onboarding", + error=str(e), + ) # Don't fail the entire onboarding process if index creation fails # The application can still work, but document operations may fail diff --git a/src/main.py b/src/main.py index 69f2ad9f..4ed57b75 100644 --- a/src/main.py +++ b/src/main.py @@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, EMBED_MODEL, + INDEX_BODY, INDEX_NAME, SESSION_SECRET, clients, @@ -82,6 +83,7 @@ logger.info( cuda_version=torch.version.cuda, ) + async def wait_for_opensearch(): """Wait for OpenSearch to be ready with retries""" max_retries = 30 @@ -128,6 +130,34 @@ async def configure_alerting_security(): # Don't fail startup if alerting config fails +async def _ensure_opensearch_index(self): + """Ensure OpenSearch index exists when using traditional connector service.""" + try: + # Check if index already exists + if await clients.opensearch.indices.exists(index=INDEX_NAME): + logger.debug("OpenSearch index already exists", index_name=INDEX_NAME) + return + + # Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions) + await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY) + logger.info( + "Created OpenSearch index for traditional connector service", + index_name=INDEX_NAME, + vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][ + "dimension" + ], + ) + + except Exception as e: + logger.error( + "Failed to initialize OpenSearch index for traditional connector service", + error=str(e), + index_name=INDEX_NAME, + ) + # Don't raise the exception to avoid breaking the initialization + # The service can still function, document operations might fail later + + async def init_index(): """Initialize OpenSearch index and security roles""" await wait_for_opensearch() @@ -141,10 +171,20 @@ async def init_index(): # Create documents index if not await clients.opensearch.indices.exists(index=INDEX_NAME): - await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body) - logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model) + await clients.opensearch.indices.create( + index=INDEX_NAME, body=dynamic_index_body + ) + logger.info( + "Created OpenSearch index", + index_name=INDEX_NAME, + embedding_model=embedding_model, + ) else: - logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model) + logger.info( + "Index already exists, skipping creation", + index_name=INDEX_NAME, + embedding_model=embedding_model, + ) # Create knowledge filters index knowledge_filter_index_name = "knowledge_filters" @@ -402,6 +442,9 @@ async def startup_tasks(services): # Index will be created after onboarding when we know the embedding model await wait_for_opensearch() + if DISABLE_INGEST_WITH_LANGFLOW: + await _ensure_opensearch_index() + # Configure alerting security await configure_alerting_security() @@ -1075,14 +1118,6 @@ async def create_app(): return app -async def startup(): - """Application startup tasks""" - await init_index() - # Get services from app state if needed for initialization - # services = app.state.services - # await services['connector_service'].initialize() - - def cleanup(): """Cleanup on application shutdown""" # Cleanup process pools only (webhooks handled by Starlette shutdown) diff --git a/src/services/flows_service.py b/src/services/flows_service.py index 164fc122..13b75a82 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -1,5 +1,6 @@ import asyncio from config.settings import ( + DISABLE_INGEST_WITH_LANGFLOW, NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, @@ -73,17 +74,17 @@ class FlowsService: # Scan all JSON files in the flows directory try: for filename in os.listdir(flows_dir): - if not filename.endswith('.json'): + if not filename.endswith(".json"): continue file_path = os.path.join(flows_dir, filename) try: - with open(file_path, 'r') as f: + with open(file_path, "r") as f: flow_data = json.load(f) # Check if this file contains the flow we're looking for - if flow_data.get('id') == flow_id: + if flow_data.get("id") == flow_id: # Cache the result self._flow_file_cache[flow_id] = file_path logger.info(f"Found flow {flow_id} in file: {filename}") @@ -99,6 +100,7 @@ class FlowsService: logger.warning(f"Flow with ID {flow_id} not found in flows directory") return None + async def reset_langflow_flow(self, flow_type: str): """Reset a Langflow flow by uploading the corresponding JSON file @@ -135,7 +137,9 @@ class FlowsService: try: with open(flow_path, "r") as f: flow_data = json.load(f) - logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}") + logger.info( + f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}" + ) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}") except FileNotFoundError: @@ -161,43 +165,62 @@ class FlowsService: # Check if configuration has been edited (onboarding completed) if config.edited: - logger.info(f"Updating {flow_type} flow with current configuration settings") + logger.info( + f"Updating {flow_type} flow with current configuration settings" + ) provider = config.provider.model_provider.lower() # Step 1: Assign model provider (replace components) if not OpenAI if provider != "openai": - logger.info(f"Assigning {provider} components to {flow_type} flow") + logger.info( + f"Assigning {provider} components to {flow_type} flow" + ) provider_result = await self.assign_model_provider(provider) if not provider_result.get("success"): - logger.warning(f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}") + logger.warning( + f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}" + ) # Continue anyway, maybe just value updates will work # Step 2: Update model values for the specific flow being reset - single_flow_config = [{ - "name": flow_type, - "flow_id": flow_id, - }] + single_flow_config = [ + { + "name": flow_type, + "flow_id": flow_id, + } + ] logger.info(f"Updating {flow_type} flow model values") update_result = await self.change_langflow_model_value( provider=provider, embedding_model=config.knowledge.embedding_model, llm_model=config.agent.llm_model, - endpoint=config.provider.endpoint if config.provider.endpoint else None, - flow_configs=single_flow_config + endpoint=config.provider.endpoint + if config.provider.endpoint + else None, + flow_configs=single_flow_config, ) if update_result.get("success"): - logger.info(f"Successfully updated {flow_type} flow with current configuration") + logger.info( + f"Successfully updated {flow_type} flow with current configuration" + ) else: - logger.warning(f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}") + logger.warning( + f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}" + ) else: - logger.info(f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow") + logger.info( + f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow" + ) except Exception as e: - logger.error(f"Error updating {flow_type} flow with current configuration", error=str(e)) + logger.error( + f"Error updating {flow_type} flow with current configuration", + error=str(e), + ) # Don't fail the entire reset operation if configuration update fails return { @@ -243,7 +266,9 @@ class FlowsService: try: # Load component templates based on provider - llm_template, embedding_template, llm_text_template = self._load_component_templates(provider) + llm_template, embedding_template, llm_text_template = ( + self._load_component_templates(provider) + ) logger.info(f"Assigning {provider} components") @@ -358,7 +383,9 @@ class FlowsService: logger.info(f"Loaded component templates for {provider}") return llm_template, embedding_template, llm_text_template - async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template): + async def _update_flow_components( + self, config, llm_template, embedding_template, llm_text_template + ): """Update components in a specific flow""" flow_name = config["name"] flow_id = config["flow_id"] @@ -383,20 +410,23 @@ class FlowsService: components_updated = [] # Replace embedding component - embedding_node = self._find_node_by_id(flow_data, old_embedding_id) - if embedding_node: - # Preserve position - original_position = embedding_node.get("position", {}) + if not DISABLE_INGEST_WITH_LANGFLOW: + embedding_node = self._find_node_by_id(flow_data, old_embedding_id) + if embedding_node: + # Preserve position + original_position = embedding_node.get("position", {}) - # Replace with new template - new_embedding_node = embedding_template.copy() - new_embedding_node["position"] = original_position + # Replace with new template + new_embedding_node = embedding_template.copy() + new_embedding_node["position"] = original_position - # Replace in flow - self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node) - components_updated.append( - f"embedding: {old_embedding_id} -> {new_embedding_id}" - ) + # Replace in flow + self._replace_node_in_flow( + flow_data, old_embedding_id, new_embedding_node + ) + components_updated.append( + f"embedding: {old_embedding_id} -> {new_embedding_id}" + ) # Replace LLM component (if exists in this flow) if old_llm_id: @@ -425,27 +455,30 @@ class FlowsService: new_llm_text_node["position"] = original_position # Replace in flow - self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node) - components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}") + self._replace_node_in_flow( + flow_data, old_llm_text_id, new_llm_text_node + ) + components_updated.append( + f"llm: {old_llm_text_id} -> {new_llm_text_id}" + ) # Update all edge references using regex replacement flow_json_str = json.dumps(flow_data) # Replace embedding ID references - flow_json_str = re.sub( - re.escape(old_embedding_id), new_embedding_id, flow_json_str - ) - flow_json_str = re.sub( - re.escape(old_embedding_id.split("-")[0]), - new_embedding_id.split("-")[0], - flow_json_str, - ) + if not DISABLE_INGEST_WITH_LANGFLOW: + flow_json_str = re.sub( + re.escape(old_embedding_id), new_embedding_id, flow_json_str + ) + flow_json_str = re.sub( + re.escape(old_embedding_id.split("-")[0]), + new_embedding_id.split("-")[0], + flow_json_str, + ) # Replace LLM ID references (if applicable) if old_llm_id: - flow_json_str = re.sub( - re.escape(old_llm_id), new_llm_id, flow_json_str - ) + flow_json_str = re.sub(re.escape(old_llm_id), new_llm_id, flow_json_str) if old_llm_text_id: flow_json_str = re.sub( re.escape(old_llm_text_id), new_llm_text_id, flow_json_str @@ -506,7 +539,14 @@ class FlowsService: return None, None - async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None): + async def _update_flow_field( + self, + flow_id: str, + field_name: str, + field_value: str, + node_display_name: str = None, + node_id: str = None, + ): """ Generic helper function to update any field in any Langflow component. @@ -521,22 +561,26 @@ class FlowsService: raise ValueError("flow_id is required") # Get the current flow data from Langflow - response = await clients.langflow_request( - "GET", f"/api/v1/flows/{flow_id}" - ) + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") if response.status_code != 200: - raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}") + raise Exception( + f"Failed to get flow: HTTP {response.status_code} - {response.text}" + ) flow_data = response.json() # Find the target component by display name first, then by ID as fallback target_node, target_node_index = None, None if node_display_name: - target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name) + target_node, target_node_index = self._find_node_in_flow( + flow_data, display_name=node_display_name + ) if target_node is None and node_id: - target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id) + target_node, target_node_index = self._find_node_in_flow( + flow_data, node_id=node_id + ) if target_node is None: identifier = node_display_name or node_id @@ -545,7 +589,9 @@ class FlowsService: # Update the field value directly in the existing node template = target_node.get("data", {}).get("node", {}).get("template", {}) if template.get(field_name): - flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value + flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][ + field_name + ]["value"] = field_value else: identifier = node_display_name or node_id raise Exception(f"{field_name} field not found in {identifier} component") @@ -556,21 +602,31 @@ class FlowsService: ) if patch_response.status_code != 200: - raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}") + raise Exception( + f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}" + ) async def update_chat_flow_model(self, model_name: str): """Helper function to update the model in the chat flow""" if not LANGFLOW_CHAT_FLOW_ID: raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name, - node_display_name="Language Model") + await self._update_flow_field( + LANGFLOW_CHAT_FLOW_ID, + "model_name", + model_name, + node_display_name="Language Model", + ) async def update_chat_flow_system_prompt(self, system_prompt: str): """Helper function to update the system prompt in the chat flow""" if not LANGFLOW_CHAT_FLOW_ID: raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt, - node_display_name="Agent") + await self._update_flow_field( + LANGFLOW_CHAT_FLOW_ID, + "system_prompt", + system_prompt, + node_display_name="Agent", + ) async def update_flow_docling_preset(self, preset: str, preset_config: dict): """Helper function to update docling preset in the ingest flow""" @@ -578,29 +634,46 @@ class FlowsService: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") from config.settings import DOCLING_COMPONENT_ID - await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config, - node_id=DOCLING_COMPONENT_ID) + + await self._update_flow_field( + LANGFLOW_INGEST_FLOW_ID, + "docling_serve_opts", + preset_config, + node_id=DOCLING_COMPONENT_ID, + ) async def update_ingest_flow_chunk_size(self, chunk_size: int): """Helper function to update chunk size in the ingest flow""" if not LANGFLOW_INGEST_FLOW_ID: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size, - node_display_name="Split Text") + await self._update_flow_field( + LANGFLOW_INGEST_FLOW_ID, + "chunk_size", + chunk_size, + node_display_name="Split Text", + ) async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int): """Helper function to update chunk overlap in the ingest flow""" if not LANGFLOW_INGEST_FLOW_ID: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap, - node_display_name="Split Text") + await self._update_flow_field( + LANGFLOW_INGEST_FLOW_ID, + "chunk_overlap", + chunk_overlap, + node_display_name="Split Text", + ) async def update_ingest_flow_embedding_model(self, embedding_model: str): """Helper function to update embedding model in the ingest flow""" if not LANGFLOW_INGEST_FLOW_ID: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model, - node_display_name="Embedding Model") + await self._update_flow_field( + LANGFLOW_INGEST_FLOW_ID, + "model", + embedding_model, + node_display_name="Embedding Model", + ) def _replace_node_in_flow(self, flow_data, old_id, new_node): """Replace a node in the flow data""" @@ -612,7 +685,12 @@ class FlowsService: return False async def change_langflow_model_value( - self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None, flow_configs: list = None + self, + provider: str, + embedding_model: str, + llm_model: str, + endpoint: str = None, + flow_configs: list = None, ): """ Change dropdown values for provider-specific components across flows @@ -656,8 +734,8 @@ class FlowsService: ] # Determine target component IDs based on provider - target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids( - provider + target_embedding_id, target_llm_id, target_llm_text_id = ( + self._get_provider_component_ids(provider) ) results = [] @@ -713,12 +791,24 @@ class FlowsService: def _get_provider_component_ids(self, provider: str): """Get the component IDs for a specific provider""" if provider == "watsonx": - return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID + return ( + WATSONX_EMBEDDING_COMPONENT_ID, + WATSONX_LLM_COMPONENT_ID, + WATSONX_LLM_TEXT_COMPONENT_ID, + ) elif provider == "ollama": - return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID + return ( + OLLAMA_EMBEDDING_COMPONENT_ID, + OLLAMA_LLM_COMPONENT_ID, + OLLAMA_LLM_TEXT_COMPONENT_ID, + ) elif provider == "openai": # OpenAI components are the default ones - return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID + return ( + OPENAI_EMBEDDING_COMPONENT_ID, + OPENAI_LLM_COMPONENT_ID, + OPENAI_LLM_TEXT_COMPONENT_ID, + ) else: raise ValueError(f"Unsupported provider: {provider}") @@ -738,26 +828,25 @@ class FlowsService: flow_id = config["flow_id"] # Get flow data from Langflow API instead of file - response = await clients.langflow_request( - "GET", f"/api/v1/flows/{flow_id}" - ) - + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + if response.status_code != 200: raise Exception( f"Failed to get flow from Langflow: HTTP {response.status_code} - {response.text}" ) - + flow_data = response.json() updates_made = [] # Update embedding component - embedding_node = self._find_node_by_id(flow_data, target_embedding_id) - if embedding_node: - if self._update_component_fields( - embedding_node, provider, embedding_model, endpoint - ): - updates_made.append(f"embedding model: {embedding_model}") + if not DISABLE_INGEST_WITH_LANGFLOW: + embedding_node = self._find_node_by_id(flow_data, target_embedding_id) + if embedding_node: + if self._update_component_fields( + embedding_node, provider, embedding_model, endpoint + ): + updates_made.append(f"embedding model: {embedding_model}") # Update LLM component (if exists in this flow) if target_llm_id: diff --git a/src/services/models_service.py b/src/services/models_service.py index a90a74f4..8c779940 100644 --- a/src/services/models_service.py +++ b/src/services/models_service.py @@ -21,6 +21,27 @@ class ModelsService: "jina-embeddings-v2-base-en", ] + OPENAI_TOOL_CALLING_MODELS = [ + "gpt-5", + "gpt-5-mini", + "gpt-5-nano", + "gpt-4o-mini", + "gpt-4o", + "gpt-4.1", + "gpt-4.1-mini", + "gpt-4.1-nano", + "gpt-4-turbo", + "gpt-4-turbo-preview", + "gpt-4", + "gpt-3.5-turbo", + "o1", + "o3-mini", + "o3", + "o3-pro", + "o4-mini", + "o4-mini-high", + ] + def __init__(self): self.session_manager = None @@ -49,12 +70,12 @@ class ModelsService: model_id = model.get("id", "") # Language models (GPT models) - if any(prefix in model_id for prefix in ["gpt-4", "gpt-3.5"]): + if model_id in self.OPENAI_TOOL_CALLING_MODELS: language_models.append( { "value": model_id, "label": model_id, - "default": model_id == "gpt-4o-mini", + "default": model_id == "gpt-5", } )