fix: set defined models for openai, make default embedding be used when disable ingest with LF feature flag is enabled (#128)
* hard-coded openai models * ensure index if disable ingest with langflow is active * update backend to not update embedding model when flag is disabled * initialize index on startup when feature flag is enabled * put config.yaml on docker compose
This commit is contained in:
parent
3347181615
commit
52896a8da7
7 changed files with 275 additions and 111 deletions
|
|
@ -74,6 +74,7 @@ services:
|
|||
- ./documents:/app/documents:Z
|
||||
- ./keys:/app/keys:Z
|
||||
- ./flows:/app/flows:Z
|
||||
- ./config.yaml:/app/config.yaml:Z
|
||||
|
||||
openrag-frontend:
|
||||
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
|
||||
|
|
|
|||
|
|
@ -73,6 +73,7 @@ services:
|
|||
- ./documents:/app/documents:Z
|
||||
- ./keys:/app/keys:Z
|
||||
- ./flows:/app/flows:z
|
||||
- ./config.yaml:/app/config.yaml:Z
|
||||
gpus: all
|
||||
|
||||
openrag-frontend:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,12 @@
|
|||
|
||||
from starlette.requests import Request
|
||||
|
||||
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
|
||||
from config.settings import (
|
||||
DISABLE_INGEST_WITH_LANGFLOW,
|
||||
clients,
|
||||
INDEX_NAME,
|
||||
INDEX_BODY,
|
||||
)
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
|
@ -12,19 +17,19 @@ class ConnectorRouter:
|
|||
"""
|
||||
Router that automatically chooses between LangflowConnectorService and ConnectorService
|
||||
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
|
||||
|
||||
|
||||
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
|
||||
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, langflow_connector_service, openrag_connector_service):
|
||||
self.langflow_connector_service = langflow_connector_service
|
||||
self.openrag_connector_service = openrag_connector_service
|
||||
logger.debug(
|
||||
"ConnectorRouter initialized",
|
||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||
)
|
||||
|
||||
|
||||
def get_active_service(self):
|
||||
"""Get the currently active connector service based on configuration."""
|
||||
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||
|
|
@ -33,28 +38,32 @@ class ConnectorRouter:
|
|||
else:
|
||||
logger.debug("Using Langflow connector service")
|
||||
return self.langflow_connector_service
|
||||
|
||||
|
||||
# Proxy all connector service methods to the active service
|
||||
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize the active connector service."""
|
||||
# Initialize OpenSearch index if using traditional OpenRAG connector service
|
||||
|
||||
return await self.get_active_service().initialize()
|
||||
|
||||
|
||||
@property
|
||||
def connection_manager(self):
|
||||
"""Get the connection manager from the active service."""
|
||||
return self.get_active_service().connection_manager
|
||||
|
||||
|
||||
async def get_connector(self, connection_id: str):
|
||||
"""Get a connector instance from the active service."""
|
||||
return await self.get_active_service().get_connector(connection_id)
|
||||
|
||||
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
|
||||
|
||||
async def sync_specific_files(
|
||||
self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None
|
||||
):
|
||||
"""Sync specific files using the active service."""
|
||||
return await self.get_active_service().sync_specific_files(
|
||||
connection_id, user_id, file_list, jwt_token
|
||||
)
|
||||
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""
|
||||
Proxy any other method calls to the active service.
|
||||
|
|
@ -64,4 +73,6 @@ class ConnectorRouter:
|
|||
if hasattr(active_service, name):
|
||||
return getattr(active_service, name)
|
||||
else:
|
||||
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")
|
||||
raise AttributeError(
|
||||
f"'{type(active_service).__name__}' object has no attribute '{name}'"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from starlette.responses import JSONResponse
|
|||
from utils.container_utils import transform_localhost_url
|
||||
from utils.logging_config import get_logger
|
||||
from config.settings import (
|
||||
DISABLE_INGEST_WITH_LANGFLOW,
|
||||
LANGFLOW_URL,
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
|
|
@ -450,7 +451,7 @@ async def onboarding(request, flows_service):
|
|||
config_updated = True
|
||||
|
||||
# Update knowledge settings
|
||||
if "embedding_model" in body:
|
||||
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
|
||||
if (
|
||||
not isinstance(body["embedding_model"], str)
|
||||
or not body["embedding_model"].strip()
|
||||
|
|
@ -600,11 +601,16 @@ async def onboarding(request, flows_service):
|
|||
# Import here to avoid circular imports
|
||||
from main import init_index
|
||||
|
||||
logger.info("Initializing OpenSearch index after onboarding configuration")
|
||||
logger.info(
|
||||
"Initializing OpenSearch index after onboarding configuration"
|
||||
)
|
||||
await init_index()
|
||||
logger.info("OpenSearch index initialization completed successfully")
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize OpenSearch index after onboarding", error=str(e))
|
||||
logger.error(
|
||||
"Failed to initialize OpenSearch index after onboarding",
|
||||
error=str(e),
|
||||
)
|
||||
# Don't fail the entire onboarding process if index creation fails
|
||||
# The application can still work, but document operations may fail
|
||||
|
||||
|
|
|
|||
57
src/main.py
57
src/main.py
|
|
@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth
|
|||
from config.settings import (
|
||||
DISABLE_INGEST_WITH_LANGFLOW,
|
||||
EMBED_MODEL,
|
||||
INDEX_BODY,
|
||||
INDEX_NAME,
|
||||
SESSION_SECRET,
|
||||
clients,
|
||||
|
|
@ -82,6 +83,7 @@ logger.info(
|
|||
cuda_version=torch.version.cuda,
|
||||
)
|
||||
|
||||
|
||||
async def wait_for_opensearch():
|
||||
"""Wait for OpenSearch to be ready with retries"""
|
||||
max_retries = 30
|
||||
|
|
@ -128,6 +130,34 @@ async def configure_alerting_security():
|
|||
# Don't fail startup if alerting config fails
|
||||
|
||||
|
||||
async def _ensure_opensearch_index(self):
|
||||
"""Ensure OpenSearch index exists when using traditional connector service."""
|
||||
try:
|
||||
# Check if index already exists
|
||||
if await clients.opensearch.indices.exists(index=INDEX_NAME):
|
||||
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
|
||||
return
|
||||
|
||||
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
|
||||
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
|
||||
logger.info(
|
||||
"Created OpenSearch index for traditional connector service",
|
||||
index_name=INDEX_NAME,
|
||||
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][
|
||||
"dimension"
|
||||
],
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to initialize OpenSearch index for traditional connector service",
|
||||
error=str(e),
|
||||
index_name=INDEX_NAME,
|
||||
)
|
||||
# Don't raise the exception to avoid breaking the initialization
|
||||
# The service can still function, document operations might fail later
|
||||
|
||||
|
||||
async def init_index():
|
||||
"""Initialize OpenSearch index and security roles"""
|
||||
await wait_for_opensearch()
|
||||
|
|
@ -141,10 +171,20 @@ async def init_index():
|
|||
|
||||
# Create documents index
|
||||
if not await clients.opensearch.indices.exists(index=INDEX_NAME):
|
||||
await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body)
|
||||
logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model)
|
||||
await clients.opensearch.indices.create(
|
||||
index=INDEX_NAME, body=dynamic_index_body
|
||||
)
|
||||
logger.info(
|
||||
"Created OpenSearch index",
|
||||
index_name=INDEX_NAME,
|
||||
embedding_model=embedding_model,
|
||||
)
|
||||
else:
|
||||
logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model)
|
||||
logger.info(
|
||||
"Index already exists, skipping creation",
|
||||
index_name=INDEX_NAME,
|
||||
embedding_model=embedding_model,
|
||||
)
|
||||
|
||||
# Create knowledge filters index
|
||||
knowledge_filter_index_name = "knowledge_filters"
|
||||
|
|
@ -402,6 +442,9 @@ async def startup_tasks(services):
|
|||
# Index will be created after onboarding when we know the embedding model
|
||||
await wait_for_opensearch()
|
||||
|
||||
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||
await _ensure_opensearch_index()
|
||||
|
||||
# Configure alerting security
|
||||
await configure_alerting_security()
|
||||
|
||||
|
|
@ -1075,14 +1118,6 @@ async def create_app():
|
|||
return app
|
||||
|
||||
|
||||
async def startup():
|
||||
"""Application startup tasks"""
|
||||
await init_index()
|
||||
# Get services from app state if needed for initialization
|
||||
# services = app.state.services
|
||||
# await services['connector_service'].initialize()
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Cleanup on application shutdown"""
|
||||
# Cleanup process pools only (webhooks handled by Starlette shutdown)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import asyncio
|
||||
from config.settings import (
|
||||
DISABLE_INGEST_WITH_LANGFLOW,
|
||||
NUDGES_FLOW_ID,
|
||||
LANGFLOW_URL,
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
|
|
@ -73,17 +74,17 @@ class FlowsService:
|
|||
# Scan all JSON files in the flows directory
|
||||
try:
|
||||
for filename in os.listdir(flows_dir):
|
||||
if not filename.endswith('.json'):
|
||||
if not filename.endswith(".json"):
|
||||
continue
|
||||
|
||||
file_path = os.path.join(flows_dir, filename)
|
||||
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
with open(file_path, "r") as f:
|
||||
flow_data = json.load(f)
|
||||
|
||||
# Check if this file contains the flow we're looking for
|
||||
if flow_data.get('id') == flow_id:
|
||||
if flow_data.get("id") == flow_id:
|
||||
# Cache the result
|
||||
self._flow_file_cache[flow_id] = file_path
|
||||
logger.info(f"Found flow {flow_id} in file: {filename}")
|
||||
|
|
@ -99,6 +100,7 @@ class FlowsService:
|
|||
|
||||
logger.warning(f"Flow with ID {flow_id} not found in flows directory")
|
||||
return None
|
||||
|
||||
async def reset_langflow_flow(self, flow_type: str):
|
||||
"""Reset a Langflow flow by uploading the corresponding JSON file
|
||||
|
||||
|
|
@ -135,7 +137,9 @@ class FlowsService:
|
|||
try:
|
||||
with open(flow_path, "r") as f:
|
||||
flow_data = json.load(f)
|
||||
logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}")
|
||||
logger.info(
|
||||
f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}"
|
||||
)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
|
||||
except FileNotFoundError:
|
||||
|
|
@ -161,43 +165,62 @@ class FlowsService:
|
|||
|
||||
# Check if configuration has been edited (onboarding completed)
|
||||
if config.edited:
|
||||
logger.info(f"Updating {flow_type} flow with current configuration settings")
|
||||
logger.info(
|
||||
f"Updating {flow_type} flow with current configuration settings"
|
||||
)
|
||||
|
||||
provider = config.provider.model_provider.lower()
|
||||
|
||||
# Step 1: Assign model provider (replace components) if not OpenAI
|
||||
if provider != "openai":
|
||||
logger.info(f"Assigning {provider} components to {flow_type} flow")
|
||||
logger.info(
|
||||
f"Assigning {provider} components to {flow_type} flow"
|
||||
)
|
||||
provider_result = await self.assign_model_provider(provider)
|
||||
|
||||
if not provider_result.get("success"):
|
||||
logger.warning(f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}")
|
||||
logger.warning(
|
||||
f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}"
|
||||
)
|
||||
# Continue anyway, maybe just value updates will work
|
||||
|
||||
# Step 2: Update model values for the specific flow being reset
|
||||
single_flow_config = [{
|
||||
"name": flow_type,
|
||||
"flow_id": flow_id,
|
||||
}]
|
||||
single_flow_config = [
|
||||
{
|
||||
"name": flow_type,
|
||||
"flow_id": flow_id,
|
||||
}
|
||||
]
|
||||
|
||||
logger.info(f"Updating {flow_type} flow model values")
|
||||
update_result = await self.change_langflow_model_value(
|
||||
provider=provider,
|
||||
embedding_model=config.knowledge.embedding_model,
|
||||
llm_model=config.agent.llm_model,
|
||||
endpoint=config.provider.endpoint if config.provider.endpoint else None,
|
||||
flow_configs=single_flow_config
|
||||
endpoint=config.provider.endpoint
|
||||
if config.provider.endpoint
|
||||
else None,
|
||||
flow_configs=single_flow_config,
|
||||
)
|
||||
|
||||
if update_result.get("success"):
|
||||
logger.info(f"Successfully updated {flow_type} flow with current configuration")
|
||||
logger.info(
|
||||
f"Successfully updated {flow_type} flow with current configuration"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}")
|
||||
logger.warning(
|
||||
f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow")
|
||||
logger.info(
|
||||
f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating {flow_type} flow with current configuration", error=str(e))
|
||||
logger.error(
|
||||
f"Error updating {flow_type} flow with current configuration",
|
||||
error=str(e),
|
||||
)
|
||||
# Don't fail the entire reset operation if configuration update fails
|
||||
|
||||
return {
|
||||
|
|
@ -243,7 +266,9 @@ class FlowsService:
|
|||
|
||||
try:
|
||||
# Load component templates based on provider
|
||||
llm_template, embedding_template, llm_text_template = self._load_component_templates(provider)
|
||||
llm_template, embedding_template, llm_text_template = (
|
||||
self._load_component_templates(provider)
|
||||
)
|
||||
|
||||
logger.info(f"Assigning {provider} components")
|
||||
|
||||
|
|
@ -358,7 +383,9 @@ class FlowsService:
|
|||
logger.info(f"Loaded component templates for {provider}")
|
||||
return llm_template, embedding_template, llm_text_template
|
||||
|
||||
async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template):
|
||||
async def _update_flow_components(
|
||||
self, config, llm_template, embedding_template, llm_text_template
|
||||
):
|
||||
"""Update components in a specific flow"""
|
||||
flow_name = config["name"]
|
||||
flow_id = config["flow_id"]
|
||||
|
|
@ -383,20 +410,23 @@ class FlowsService:
|
|||
components_updated = []
|
||||
|
||||
# Replace embedding component
|
||||
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
|
||||
if embedding_node:
|
||||
# Preserve position
|
||||
original_position = embedding_node.get("position", {})
|
||||
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
|
||||
if embedding_node:
|
||||
# Preserve position
|
||||
original_position = embedding_node.get("position", {})
|
||||
|
||||
# Replace with new template
|
||||
new_embedding_node = embedding_template.copy()
|
||||
new_embedding_node["position"] = original_position
|
||||
# Replace with new template
|
||||
new_embedding_node = embedding_template.copy()
|
||||
new_embedding_node["position"] = original_position
|
||||
|
||||
# Replace in flow
|
||||
self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node)
|
||||
components_updated.append(
|
||||
f"embedding: {old_embedding_id} -> {new_embedding_id}"
|
||||
)
|
||||
# Replace in flow
|
||||
self._replace_node_in_flow(
|
||||
flow_data, old_embedding_id, new_embedding_node
|
||||
)
|
||||
components_updated.append(
|
||||
f"embedding: {old_embedding_id} -> {new_embedding_id}"
|
||||
)
|
||||
|
||||
# Replace LLM component (if exists in this flow)
|
||||
if old_llm_id:
|
||||
|
|
@ -425,27 +455,30 @@ class FlowsService:
|
|||
new_llm_text_node["position"] = original_position
|
||||
|
||||
# Replace in flow
|
||||
self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node)
|
||||
components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}")
|
||||
self._replace_node_in_flow(
|
||||
flow_data, old_llm_text_id, new_llm_text_node
|
||||
)
|
||||
components_updated.append(
|
||||
f"llm: {old_llm_text_id} -> {new_llm_text_id}"
|
||||
)
|
||||
|
||||
# Update all edge references using regex replacement
|
||||
flow_json_str = json.dumps(flow_data)
|
||||
|
||||
# Replace embedding ID references
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_embedding_id), new_embedding_id, flow_json_str
|
||||
)
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_embedding_id.split("-")[0]),
|
||||
new_embedding_id.split("-")[0],
|
||||
flow_json_str,
|
||||
)
|
||||
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_embedding_id), new_embedding_id, flow_json_str
|
||||
)
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_embedding_id.split("-")[0]),
|
||||
new_embedding_id.split("-")[0],
|
||||
flow_json_str,
|
||||
)
|
||||
|
||||
# Replace LLM ID references (if applicable)
|
||||
if old_llm_id:
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_llm_id), new_llm_id, flow_json_str
|
||||
)
|
||||
flow_json_str = re.sub(re.escape(old_llm_id), new_llm_id, flow_json_str)
|
||||
if old_llm_text_id:
|
||||
flow_json_str = re.sub(
|
||||
re.escape(old_llm_text_id), new_llm_text_id, flow_json_str
|
||||
|
|
@ -506,7 +539,14 @@ class FlowsService:
|
|||
|
||||
return None, None
|
||||
|
||||
async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None):
|
||||
async def _update_flow_field(
|
||||
self,
|
||||
flow_id: str,
|
||||
field_name: str,
|
||||
field_value: str,
|
||||
node_display_name: str = None,
|
||||
node_id: str = None,
|
||||
):
|
||||
"""
|
||||
Generic helper function to update any field in any Langflow component.
|
||||
|
||||
|
|
@ -521,22 +561,26 @@ class FlowsService:
|
|||
raise ValueError("flow_id is required")
|
||||
|
||||
# Get the current flow data from Langflow
|
||||
response = await clients.langflow_request(
|
||||
"GET", f"/api/v1/flows/{flow_id}"
|
||||
)
|
||||
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}")
|
||||
raise Exception(
|
||||
f"Failed to get flow: HTTP {response.status_code} - {response.text}"
|
||||
)
|
||||
|
||||
flow_data = response.json()
|
||||
|
||||
# Find the target component by display name first, then by ID as fallback
|
||||
target_node, target_node_index = None, None
|
||||
if node_display_name:
|
||||
target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name)
|
||||
target_node, target_node_index = self._find_node_in_flow(
|
||||
flow_data, display_name=node_display_name
|
||||
)
|
||||
|
||||
if target_node is None and node_id:
|
||||
target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id)
|
||||
target_node, target_node_index = self._find_node_in_flow(
|
||||
flow_data, node_id=node_id
|
||||
)
|
||||
|
||||
if target_node is None:
|
||||
identifier = node_display_name or node_id
|
||||
|
|
@ -545,7 +589,9 @@ class FlowsService:
|
|||
# Update the field value directly in the existing node
|
||||
template = target_node.get("data", {}).get("node", {}).get("template", {})
|
||||
if template.get(field_name):
|
||||
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value
|
||||
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][
|
||||
field_name
|
||||
]["value"] = field_value
|
||||
else:
|
||||
identifier = node_display_name or node_id
|
||||
raise Exception(f"{field_name} field not found in {identifier} component")
|
||||
|
|
@ -556,21 +602,31 @@ class FlowsService:
|
|||
)
|
||||
|
||||
if patch_response.status_code != 200:
|
||||
raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}")
|
||||
raise Exception(
|
||||
f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}"
|
||||
)
|
||||
|
||||
async def update_chat_flow_model(self, model_name: str):
|
||||
"""Helper function to update the model in the chat flow"""
|
||||
if not LANGFLOW_CHAT_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
||||
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name,
|
||||
node_display_name="Language Model")
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
"model_name",
|
||||
model_name,
|
||||
node_display_name="Language Model",
|
||||
)
|
||||
|
||||
async def update_chat_flow_system_prompt(self, system_prompt: str):
|
||||
"""Helper function to update the system prompt in the chat flow"""
|
||||
if not LANGFLOW_CHAT_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
||||
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt,
|
||||
node_display_name="Agent")
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_CHAT_FLOW_ID,
|
||||
"system_prompt",
|
||||
system_prompt,
|
||||
node_display_name="Agent",
|
||||
)
|
||||
|
||||
async def update_flow_docling_preset(self, preset: str, preset_config: dict):
|
||||
"""Helper function to update docling preset in the ingest flow"""
|
||||
|
|
@ -578,29 +634,46 @@ class FlowsService:
|
|||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
|
||||
from config.settings import DOCLING_COMPONENT_ID
|
||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config,
|
||||
node_id=DOCLING_COMPONENT_ID)
|
||||
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
"docling_serve_opts",
|
||||
preset_config,
|
||||
node_id=DOCLING_COMPONENT_ID,
|
||||
)
|
||||
|
||||
async def update_ingest_flow_chunk_size(self, chunk_size: int):
|
||||
"""Helper function to update chunk size in the ingest flow"""
|
||||
if not LANGFLOW_INGEST_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size,
|
||||
node_display_name="Split Text")
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
"chunk_size",
|
||||
chunk_size,
|
||||
node_display_name="Split Text",
|
||||
)
|
||||
|
||||
async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int):
|
||||
"""Helper function to update chunk overlap in the ingest flow"""
|
||||
if not LANGFLOW_INGEST_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap,
|
||||
node_display_name="Split Text")
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
"chunk_overlap",
|
||||
chunk_overlap,
|
||||
node_display_name="Split Text",
|
||||
)
|
||||
|
||||
async def update_ingest_flow_embedding_model(self, embedding_model: str):
|
||||
"""Helper function to update embedding model in the ingest flow"""
|
||||
if not LANGFLOW_INGEST_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model,
|
||||
node_display_name="Embedding Model")
|
||||
await self._update_flow_field(
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
"model",
|
||||
embedding_model,
|
||||
node_display_name="Embedding Model",
|
||||
)
|
||||
|
||||
def _replace_node_in_flow(self, flow_data, old_id, new_node):
|
||||
"""Replace a node in the flow data"""
|
||||
|
|
@ -612,7 +685,12 @@ class FlowsService:
|
|||
return False
|
||||
|
||||
async def change_langflow_model_value(
|
||||
self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None, flow_configs: list = None
|
||||
self,
|
||||
provider: str,
|
||||
embedding_model: str,
|
||||
llm_model: str,
|
||||
endpoint: str = None,
|
||||
flow_configs: list = None,
|
||||
):
|
||||
"""
|
||||
Change dropdown values for provider-specific components across flows
|
||||
|
|
@ -656,8 +734,8 @@ class FlowsService:
|
|||
]
|
||||
|
||||
# Determine target component IDs based on provider
|
||||
target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids(
|
||||
provider
|
||||
target_embedding_id, target_llm_id, target_llm_text_id = (
|
||||
self._get_provider_component_ids(provider)
|
||||
)
|
||||
|
||||
results = []
|
||||
|
|
@ -713,12 +791,24 @@ class FlowsService:
|
|||
def _get_provider_component_ids(self, provider: str):
|
||||
"""Get the component IDs for a specific provider"""
|
||||
if provider == "watsonx":
|
||||
return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID
|
||||
return (
|
||||
WATSONX_EMBEDDING_COMPONENT_ID,
|
||||
WATSONX_LLM_COMPONENT_ID,
|
||||
WATSONX_LLM_TEXT_COMPONENT_ID,
|
||||
)
|
||||
elif provider == "ollama":
|
||||
return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID
|
||||
return (
|
||||
OLLAMA_EMBEDDING_COMPONENT_ID,
|
||||
OLLAMA_LLM_COMPONENT_ID,
|
||||
OLLAMA_LLM_TEXT_COMPONENT_ID,
|
||||
)
|
||||
elif provider == "openai":
|
||||
# OpenAI components are the default ones
|
||||
return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID
|
||||
return (
|
||||
OPENAI_EMBEDDING_COMPONENT_ID,
|
||||
OPENAI_LLM_COMPONENT_ID,
|
||||
OPENAI_LLM_TEXT_COMPONENT_ID,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported provider: {provider}")
|
||||
|
||||
|
|
@ -738,26 +828,25 @@ class FlowsService:
|
|||
flow_id = config["flow_id"]
|
||||
|
||||
# Get flow data from Langflow API instead of file
|
||||
response = await clients.langflow_request(
|
||||
"GET", f"/api/v1/flows/{flow_id}"
|
||||
)
|
||||
|
||||
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||
|
||||
if response.status_code != 200:
|
||||
raise Exception(
|
||||
f"Failed to get flow from Langflow: HTTP {response.status_code} - {response.text}"
|
||||
)
|
||||
|
||||
|
||||
flow_data = response.json()
|
||||
|
||||
updates_made = []
|
||||
|
||||
# Update embedding component
|
||||
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
|
||||
if embedding_node:
|
||||
if self._update_component_fields(
|
||||
embedding_node, provider, embedding_model, endpoint
|
||||
):
|
||||
updates_made.append(f"embedding model: {embedding_model}")
|
||||
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
|
||||
if embedding_node:
|
||||
if self._update_component_fields(
|
||||
embedding_node, provider, embedding_model, endpoint
|
||||
):
|
||||
updates_made.append(f"embedding model: {embedding_model}")
|
||||
|
||||
# Update LLM component (if exists in this flow)
|
||||
if target_llm_id:
|
||||
|
|
|
|||
|
|
@ -21,6 +21,27 @@ class ModelsService:
|
|||
"jina-embeddings-v2-base-en",
|
||||
]
|
||||
|
||||
OPENAI_TOOL_CALLING_MODELS = [
|
||||
"gpt-5",
|
||||
"gpt-5-mini",
|
||||
"gpt-5-nano",
|
||||
"gpt-4o-mini",
|
||||
"gpt-4o",
|
||||
"gpt-4.1",
|
||||
"gpt-4.1-mini",
|
||||
"gpt-4.1-nano",
|
||||
"gpt-4-turbo",
|
||||
"gpt-4-turbo-preview",
|
||||
"gpt-4",
|
||||
"gpt-3.5-turbo",
|
||||
"o1",
|
||||
"o3-mini",
|
||||
"o3",
|
||||
"o3-pro",
|
||||
"o4-mini",
|
||||
"o4-mini-high",
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
self.session_manager = None
|
||||
|
||||
|
|
@ -49,12 +70,12 @@ class ModelsService:
|
|||
model_id = model.get("id", "")
|
||||
|
||||
# Language models (GPT models)
|
||||
if any(prefix in model_id for prefix in ["gpt-4", "gpt-3.5"]):
|
||||
if model_id in self.OPENAI_TOOL_CALLING_MODELS:
|
||||
language_models.append(
|
||||
{
|
||||
"value": model_id,
|
||||
"label": model_id,
|
||||
"default": model_id == "gpt-4o-mini",
|
||||
"default": model_id == "gpt-5",
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue