update backend to not update embedding model when flag is disabled

This commit is contained in:
Lucas Oliveira 2025-09-26 17:08:19 -03:00
parent c96d4943d5
commit 44e4f3d0d6
2 changed files with 180 additions and 85 deletions

View file

@ -3,6 +3,7 @@ import platform
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID,
@ -412,7 +413,7 @@ async def onboarding(request, flows_service):
config_updated = True
# Update knowledge settings
if "embedding_model" in body:
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
if (
not isinstance(body["embedding_model"], str)
or not body["embedding_model"].strip()
@ -561,11 +562,16 @@ async def onboarding(request, flows_service):
# Import here to avoid circular imports
from main import init_index
logger.info("Initializing OpenSearch index after onboarding configuration")
logger.info(
"Initializing OpenSearch index after onboarding configuration"
)
await init_index()
logger.info("OpenSearch index initialization completed successfully")
except Exception as e:
logger.error("Failed to initialize OpenSearch index after onboarding", error=str(e))
logger.error(
"Failed to initialize OpenSearch index after onboarding",
error=str(e),
)
# Don't fail the entire onboarding process if index creation fails
# The application can still work, but document operations may fail

View file

@ -1,5 +1,6 @@
import asyncio
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
NUDGES_FLOW_ID,
LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID,
@ -73,17 +74,17 @@ class FlowsService:
# Scan all JSON files in the flows directory
try:
for filename in os.listdir(flows_dir):
if not filename.endswith('.json'):
if not filename.endswith(".json"):
continue
file_path = os.path.join(flows_dir, filename)
try:
with open(file_path, 'r') as f:
with open(file_path, "r") as f:
flow_data = json.load(f)
# Check if this file contains the flow we're looking for
if flow_data.get('id') == flow_id:
if flow_data.get("id") == flow_id:
# Cache the result
self._flow_file_cache[flow_id] = file_path
logger.info(f"Found flow {flow_id} in file: {filename}")
@ -99,6 +100,7 @@ class FlowsService:
logger.warning(f"Flow with ID {flow_id} not found in flows directory")
return None
async def reset_langflow_flow(self, flow_type: str):
"""Reset a Langflow flow by uploading the corresponding JSON file
@ -135,7 +137,9 @@ class FlowsService:
try:
with open(flow_path, "r") as f:
flow_data = json.load(f)
logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}")
logger.info(
f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}"
)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
except FileNotFoundError:
@ -161,43 +165,62 @@ class FlowsService:
# Check if configuration has been edited (onboarding completed)
if config.edited:
logger.info(f"Updating {flow_type} flow with current configuration settings")
logger.info(
f"Updating {flow_type} flow with current configuration settings"
)
provider = config.provider.model_provider.lower()
# Step 1: Assign model provider (replace components) if not OpenAI
if provider != "openai":
logger.info(f"Assigning {provider} components to {flow_type} flow")
logger.info(
f"Assigning {provider} components to {flow_type} flow"
)
provider_result = await self.assign_model_provider(provider)
if not provider_result.get("success"):
logger.warning(f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}")
logger.warning(
f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}"
)
# Continue anyway, maybe just value updates will work
# Step 2: Update model values for the specific flow being reset
single_flow_config = [{
"name": flow_type,
"flow_id": flow_id,
}]
single_flow_config = [
{
"name": flow_type,
"flow_id": flow_id,
}
]
logger.info(f"Updating {flow_type} flow model values")
update_result = await self.change_langflow_model_value(
provider=provider,
embedding_model=config.knowledge.embedding_model,
llm_model=config.agent.llm_model,
endpoint=config.provider.endpoint if config.provider.endpoint else None,
flow_configs=single_flow_config
endpoint=config.provider.endpoint
if config.provider.endpoint
else None,
flow_configs=single_flow_config,
)
if update_result.get("success"):
logger.info(f"Successfully updated {flow_type} flow with current configuration")
logger.info(
f"Successfully updated {flow_type} flow with current configuration"
)
else:
logger.warning(f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}")
logger.warning(
f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}"
)
else:
logger.info(f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow")
logger.info(
f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow"
)
except Exception as e:
logger.error(f"Error updating {flow_type} flow with current configuration", error=str(e))
logger.error(
f"Error updating {flow_type} flow with current configuration",
error=str(e),
)
# Don't fail the entire reset operation if configuration update fails
return {
@ -243,7 +266,9 @@ class FlowsService:
try:
# Load component templates based on provider
llm_template, embedding_template, llm_text_template = self._load_component_templates(provider)
llm_template, embedding_template, llm_text_template = (
self._load_component_templates(provider)
)
logger.info(f"Assigning {provider} components")
@ -358,7 +383,9 @@ class FlowsService:
logger.info(f"Loaded component templates for {provider}")
return llm_template, embedding_template, llm_text_template
async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template):
async def _update_flow_components(
self, config, llm_template, embedding_template, llm_text_template
):
"""Update components in a specific flow"""
flow_name = config["name"]
flow_id = config["flow_id"]
@ -383,20 +410,23 @@ class FlowsService:
components_updated = []
# Replace embedding component
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
if embedding_node:
# Preserve position
original_position = embedding_node.get("position", {})
if not DISABLE_INGEST_WITH_LANGFLOW:
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
if embedding_node:
# Preserve position
original_position = embedding_node.get("position", {})
# Replace with new template
new_embedding_node = embedding_template.copy()
new_embedding_node["position"] = original_position
# Replace with new template
new_embedding_node = embedding_template.copy()
new_embedding_node["position"] = original_position
# Replace in flow
self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node)
components_updated.append(
f"embedding: {old_embedding_id} -> {new_embedding_id}"
)
# Replace in flow
self._replace_node_in_flow(
flow_data, old_embedding_id, new_embedding_node
)
components_updated.append(
f"embedding: {old_embedding_id} -> {new_embedding_id}"
)
# Replace LLM component (if exists in this flow)
if old_llm_id:
@ -425,27 +455,30 @@ class FlowsService:
new_llm_text_node["position"] = original_position
# Replace in flow
self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node)
components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}")
self._replace_node_in_flow(
flow_data, old_llm_text_id, new_llm_text_node
)
components_updated.append(
f"llm: {old_llm_text_id} -> {new_llm_text_id}"
)
# Update all edge references using regex replacement
flow_json_str = json.dumps(flow_data)
# Replace embedding ID references
flow_json_str = re.sub(
re.escape(old_embedding_id), new_embedding_id, flow_json_str
)
flow_json_str = re.sub(
re.escape(old_embedding_id.split("-")[0]),
new_embedding_id.split("-")[0],
flow_json_str,
)
if not DISABLE_INGEST_WITH_LANGFLOW:
flow_json_str = re.sub(
re.escape(old_embedding_id), new_embedding_id, flow_json_str
)
flow_json_str = re.sub(
re.escape(old_embedding_id.split("-")[0]),
new_embedding_id.split("-")[0],
flow_json_str,
)
# Replace LLM ID references (if applicable)
if old_llm_id:
flow_json_str = re.sub(
re.escape(old_llm_id), new_llm_id, flow_json_str
)
flow_json_str = re.sub(re.escape(old_llm_id), new_llm_id, flow_json_str)
if old_llm_text_id:
flow_json_str = re.sub(
re.escape(old_llm_text_id), new_llm_text_id, flow_json_str
@ -506,7 +539,14 @@ class FlowsService:
return None, None
async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None):
async def _update_flow_field(
self,
flow_id: str,
field_name: str,
field_value: str,
node_display_name: str = None,
node_id: str = None,
):
"""
Generic helper function to update any field in any Langflow component.
@ -521,22 +561,26 @@ class FlowsService:
raise ValueError("flow_id is required")
# Get the current flow data from Langflow
response = await clients.langflow_request(
"GET", f"/api/v1/flows/{flow_id}"
)
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}")
raise Exception(
f"Failed to get flow: HTTP {response.status_code} - {response.text}"
)
flow_data = response.json()
# Find the target component by display name first, then by ID as fallback
target_node, target_node_index = None, None
if node_display_name:
target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name)
target_node, target_node_index = self._find_node_in_flow(
flow_data, display_name=node_display_name
)
if target_node is None and node_id:
target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id)
target_node, target_node_index = self._find_node_in_flow(
flow_data, node_id=node_id
)
if target_node is None:
identifier = node_display_name or node_id
@ -545,7 +589,9 @@ class FlowsService:
# Update the field value directly in the existing node
template = target_node.get("data", {}).get("node", {}).get("template", {})
if template.get(field_name):
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][
field_name
]["value"] = field_value
else:
identifier = node_display_name or node_id
raise Exception(f"{field_name} field not found in {identifier} component")
@ -556,21 +602,31 @@ class FlowsService:
)
if patch_response.status_code != 200:
raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}")
raise Exception(
f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}"
)
async def update_chat_flow_model(self, model_name: str):
"""Helper function to update the model in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name,
node_display_name="Language Model")
await self._update_flow_field(
LANGFLOW_CHAT_FLOW_ID,
"model_name",
model_name,
node_display_name="Language Model",
)
async def update_chat_flow_system_prompt(self, system_prompt: str):
"""Helper function to update the system prompt in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt,
node_display_name="Agent")
await self._update_flow_field(
LANGFLOW_CHAT_FLOW_ID,
"system_prompt",
system_prompt,
node_display_name="Agent",
)
async def update_flow_docling_preset(self, preset: str, preset_config: dict):
"""Helper function to update docling preset in the ingest flow"""
@ -578,29 +634,46 @@ class FlowsService:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
from config.settings import DOCLING_COMPONENT_ID
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config,
node_id=DOCLING_COMPONENT_ID)
await self._update_flow_field(
LANGFLOW_INGEST_FLOW_ID,
"docling_serve_opts",
preset_config,
node_id=DOCLING_COMPONENT_ID,
)
async def update_ingest_flow_chunk_size(self, chunk_size: int):
"""Helper function to update chunk size in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size,
node_display_name="Split Text")
await self._update_flow_field(
LANGFLOW_INGEST_FLOW_ID,
"chunk_size",
chunk_size,
node_display_name="Split Text",
)
async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int):
"""Helper function to update chunk overlap in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap,
node_display_name="Split Text")
await self._update_flow_field(
LANGFLOW_INGEST_FLOW_ID,
"chunk_overlap",
chunk_overlap,
node_display_name="Split Text",
)
async def update_ingest_flow_embedding_model(self, embedding_model: str):
"""Helper function to update embedding model in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model,
node_display_name="Embedding Model")
await self._update_flow_field(
LANGFLOW_INGEST_FLOW_ID,
"model",
embedding_model,
node_display_name="Embedding Model",
)
def _replace_node_in_flow(self, flow_data, old_id, new_node):
"""Replace a node in the flow data"""
@ -612,7 +685,12 @@ class FlowsService:
return False
async def change_langflow_model_value(
self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None, flow_configs: list = None
self,
provider: str,
embedding_model: str,
llm_model: str,
endpoint: str = None,
flow_configs: list = None,
):
"""
Change dropdown values for provider-specific components across flows
@ -656,8 +734,8 @@ class FlowsService:
]
# Determine target component IDs based on provider
target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids(
provider
target_embedding_id, target_llm_id, target_llm_text_id = (
self._get_provider_component_ids(provider)
)
results = []
@ -713,12 +791,24 @@ class FlowsService:
def _get_provider_component_ids(self, provider: str):
"""Get the component IDs for a specific provider"""
if provider == "watsonx":
return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID
return (
WATSONX_EMBEDDING_COMPONENT_ID,
WATSONX_LLM_COMPONENT_ID,
WATSONX_LLM_TEXT_COMPONENT_ID,
)
elif provider == "ollama":
return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID
return (
OLLAMA_EMBEDDING_COMPONENT_ID,
OLLAMA_LLM_COMPONENT_ID,
OLLAMA_LLM_TEXT_COMPONENT_ID,
)
elif provider == "openai":
# OpenAI components are the default ones
return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID
return (
OPENAI_EMBEDDING_COMPONENT_ID,
OPENAI_LLM_COMPONENT_ID,
OPENAI_LLM_TEXT_COMPONENT_ID,
)
else:
raise ValueError(f"Unsupported provider: {provider}")
@ -738,26 +828,25 @@ class FlowsService:
flow_id = config["flow_id"]
# Get flow data from Langflow API instead of file
response = await clients.langflow_request(
"GET", f"/api/v1/flows/{flow_id}"
)
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
raise Exception(
f"Failed to get flow from Langflow: HTTP {response.status_code} - {response.text}"
)
flow_data = response.json()
updates_made = []
# Update embedding component
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
if embedding_node:
if self._update_component_fields(
embedding_node, provider, embedding_model, endpoint
):
updates_made.append(f"embedding model: {embedding_model}")
if not DISABLE_INGEST_WITH_LANGFLOW:
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
if embedding_node:
if self._update_component_fields(
embedding_node, provider, embedding_model, endpoint
):
updates_made.append(f"embedding model: {embedding_model}")
# Update LLM component (if exists in this flow)
if target_llm_id: