Compare commits
5 commits
main
...
fix/agent_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d53e6cd2ed | ||
|
|
343dc442dd | ||
|
|
44e4f3d0d6 | ||
|
|
c96d4943d5 | ||
|
|
de447a6ae5 |
7 changed files with 275 additions and 111 deletions
|
|
@ -74,6 +74,7 @@ services:
|
||||||
- ./documents:/app/documents:Z
|
- ./documents:/app/documents:Z
|
||||||
- ./keys:/app/keys:Z
|
- ./keys:/app/keys:Z
|
||||||
- ./flows:/app/flows:Z
|
- ./flows:/app/flows:Z
|
||||||
|
- ./config.yaml:/app/config.yaml:Z
|
||||||
|
|
||||||
openrag-frontend:
|
openrag-frontend:
|
||||||
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
|
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
|
||||||
|
|
|
||||||
|
|
@ -73,6 +73,7 @@ services:
|
||||||
- ./documents:/app/documents:Z
|
- ./documents:/app/documents:Z
|
||||||
- ./keys:/app/keys:Z
|
- ./keys:/app/keys:Z
|
||||||
- ./flows:/app/flows:z
|
- ./flows:/app/flows:z
|
||||||
|
- ./config.yaml:/app/config.yaml:Z
|
||||||
gpus: all
|
gpus: all
|
||||||
|
|
||||||
openrag-frontend:
|
openrag-frontend:
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,12 @@
|
||||||
|
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
|
|
||||||
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
|
from config.settings import (
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
|
clients,
|
||||||
|
INDEX_NAME,
|
||||||
|
INDEX_BODY,
|
||||||
|
)
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
@ -22,7 +27,7 @@ class ConnectorRouter:
|
||||||
self.openrag_connector_service = openrag_connector_service
|
self.openrag_connector_service = openrag_connector_service
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"ConnectorRouter initialized",
|
"ConnectorRouter initialized",
|
||||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_active_service(self):
|
def get_active_service(self):
|
||||||
|
|
@ -38,6 +43,8 @@ class ConnectorRouter:
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize the active connector service."""
|
"""Initialize the active connector service."""
|
||||||
|
# Initialize OpenSearch index if using traditional OpenRAG connector service
|
||||||
|
|
||||||
return await self.get_active_service().initialize()
|
return await self.get_active_service().initialize()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
@ -49,7 +56,9 @@ class ConnectorRouter:
|
||||||
"""Get a connector instance from the active service."""
|
"""Get a connector instance from the active service."""
|
||||||
return await self.get_active_service().get_connector(connection_id)
|
return await self.get_active_service().get_connector(connection_id)
|
||||||
|
|
||||||
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
|
async def sync_specific_files(
|
||||||
|
self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None
|
||||||
|
):
|
||||||
"""Sync specific files using the active service."""
|
"""Sync specific files using the active service."""
|
||||||
return await self.get_active_service().sync_specific_files(
|
return await self.get_active_service().sync_specific_files(
|
||||||
connection_id, user_id, file_list, jwt_token
|
connection_id, user_id, file_list, jwt_token
|
||||||
|
|
@ -64,4 +73,6 @@ class ConnectorRouter:
|
||||||
if hasattr(active_service, name):
|
if hasattr(active_service, name):
|
||||||
return getattr(active_service, name)
|
return getattr(active_service, name)
|
||||||
else:
|
else:
|
||||||
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")
|
raise AttributeError(
|
||||||
|
f"'{type(active_service).__name__}' object has no attribute '{name}'"
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import platform
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
LANGFLOW_URL,
|
LANGFLOW_URL,
|
||||||
LANGFLOW_CHAT_FLOW_ID,
|
LANGFLOW_CHAT_FLOW_ID,
|
||||||
LANGFLOW_INGEST_FLOW_ID,
|
LANGFLOW_INGEST_FLOW_ID,
|
||||||
|
|
@ -412,7 +413,7 @@ async def onboarding(request, flows_service):
|
||||||
config_updated = True
|
config_updated = True
|
||||||
|
|
||||||
# Update knowledge settings
|
# Update knowledge settings
|
||||||
if "embedding_model" in body:
|
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
if (
|
if (
|
||||||
not isinstance(body["embedding_model"], str)
|
not isinstance(body["embedding_model"], str)
|
||||||
or not body["embedding_model"].strip()
|
or not body["embedding_model"].strip()
|
||||||
|
|
@ -561,11 +562,16 @@ async def onboarding(request, flows_service):
|
||||||
# Import here to avoid circular imports
|
# Import here to avoid circular imports
|
||||||
from main import init_index
|
from main import init_index
|
||||||
|
|
||||||
logger.info("Initializing OpenSearch index after onboarding configuration")
|
logger.info(
|
||||||
|
"Initializing OpenSearch index after onboarding configuration"
|
||||||
|
)
|
||||||
await init_index()
|
await init_index()
|
||||||
logger.info("OpenSearch index initialization completed successfully")
|
logger.info("OpenSearch index initialization completed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to initialize OpenSearch index after onboarding", error=str(e))
|
logger.error(
|
||||||
|
"Failed to initialize OpenSearch index after onboarding",
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
# Don't fail the entire onboarding process if index creation fails
|
# Don't fail the entire onboarding process if index creation fails
|
||||||
# The application can still work, but document operations may fail
|
# The application can still work, but document operations may fail
|
||||||
|
|
||||||
|
|
|
||||||
57
src/main.py
57
src/main.py
|
|
@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
DISABLE_INGEST_WITH_LANGFLOW,
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
EMBED_MODEL,
|
EMBED_MODEL,
|
||||||
|
INDEX_BODY,
|
||||||
INDEX_NAME,
|
INDEX_NAME,
|
||||||
SESSION_SECRET,
|
SESSION_SECRET,
|
||||||
clients,
|
clients,
|
||||||
|
|
@ -82,6 +83,7 @@ logger.info(
|
||||||
cuda_version=torch.version.cuda,
|
cuda_version=torch.version.cuda,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_opensearch():
|
async def wait_for_opensearch():
|
||||||
"""Wait for OpenSearch to be ready with retries"""
|
"""Wait for OpenSearch to be ready with retries"""
|
||||||
max_retries = 30
|
max_retries = 30
|
||||||
|
|
@ -128,6 +130,34 @@ async def configure_alerting_security():
|
||||||
# Don't fail startup if alerting config fails
|
# Don't fail startup if alerting config fails
|
||||||
|
|
||||||
|
|
||||||
|
async def _ensure_opensearch_index(self):
|
||||||
|
"""Ensure OpenSearch index exists when using traditional connector service."""
|
||||||
|
try:
|
||||||
|
# Check if index already exists
|
||||||
|
if await clients.opensearch.indices.exists(index=INDEX_NAME):
|
||||||
|
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
|
||||||
|
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
|
||||||
|
logger.info(
|
||||||
|
"Created OpenSearch index for traditional connector service",
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][
|
||||||
|
"dimension"
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Failed to initialize OpenSearch index for traditional connector service",
|
||||||
|
error=str(e),
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
)
|
||||||
|
# Don't raise the exception to avoid breaking the initialization
|
||||||
|
# The service can still function, document operations might fail later
|
||||||
|
|
||||||
|
|
||||||
async def init_index():
|
async def init_index():
|
||||||
"""Initialize OpenSearch index and security roles"""
|
"""Initialize OpenSearch index and security roles"""
|
||||||
await wait_for_opensearch()
|
await wait_for_opensearch()
|
||||||
|
|
@ -141,10 +171,20 @@ async def init_index():
|
||||||
|
|
||||||
# Create documents index
|
# Create documents index
|
||||||
if not await clients.opensearch.indices.exists(index=INDEX_NAME):
|
if not await clients.opensearch.indices.exists(index=INDEX_NAME):
|
||||||
await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body)
|
await clients.opensearch.indices.create(
|
||||||
logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model)
|
index=INDEX_NAME, body=dynamic_index_body
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Created OpenSearch index",
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model)
|
logger.info(
|
||||||
|
"Index already exists, skipping creation",
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
)
|
||||||
|
|
||||||
# Create knowledge filters index
|
# Create knowledge filters index
|
||||||
knowledge_filter_index_name = "knowledge_filters"
|
knowledge_filter_index_name = "knowledge_filters"
|
||||||
|
|
@ -402,6 +442,9 @@ async def startup_tasks(services):
|
||||||
# Index will be created after onboarding when we know the embedding model
|
# Index will be created after onboarding when we know the embedding model
|
||||||
await wait_for_opensearch()
|
await wait_for_opensearch()
|
||||||
|
|
||||||
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
|
await _ensure_opensearch_index()
|
||||||
|
|
||||||
# Configure alerting security
|
# Configure alerting security
|
||||||
await configure_alerting_security()
|
await configure_alerting_security()
|
||||||
|
|
||||||
|
|
@ -1075,14 +1118,6 @@ async def create_app():
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
async def startup():
|
|
||||||
"""Application startup tasks"""
|
|
||||||
await init_index()
|
|
||||||
# Get services from app state if needed for initialization
|
|
||||||
# services = app.state.services
|
|
||||||
# await services['connector_service'].initialize()
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup():
|
def cleanup():
|
||||||
"""Cleanup on application shutdown"""
|
"""Cleanup on application shutdown"""
|
||||||
# Cleanup process pools only (webhooks handled by Starlette shutdown)
|
# Cleanup process pools only (webhooks handled by Starlette shutdown)
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
NUDGES_FLOW_ID,
|
NUDGES_FLOW_ID,
|
||||||
LANGFLOW_URL,
|
LANGFLOW_URL,
|
||||||
LANGFLOW_CHAT_FLOW_ID,
|
LANGFLOW_CHAT_FLOW_ID,
|
||||||
|
|
@ -73,17 +74,17 @@ class FlowsService:
|
||||||
# Scan all JSON files in the flows directory
|
# Scan all JSON files in the flows directory
|
||||||
try:
|
try:
|
||||||
for filename in os.listdir(flows_dir):
|
for filename in os.listdir(flows_dir):
|
||||||
if not filename.endswith('.json'):
|
if not filename.endswith(".json"):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
file_path = os.path.join(flows_dir, filename)
|
file_path = os.path.join(flows_dir, filename)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(file_path, 'r') as f:
|
with open(file_path, "r") as f:
|
||||||
flow_data = json.load(f)
|
flow_data = json.load(f)
|
||||||
|
|
||||||
# Check if this file contains the flow we're looking for
|
# Check if this file contains the flow we're looking for
|
||||||
if flow_data.get('id') == flow_id:
|
if flow_data.get("id") == flow_id:
|
||||||
# Cache the result
|
# Cache the result
|
||||||
self._flow_file_cache[flow_id] = file_path
|
self._flow_file_cache[flow_id] = file_path
|
||||||
logger.info(f"Found flow {flow_id} in file: {filename}")
|
logger.info(f"Found flow {flow_id} in file: {filename}")
|
||||||
|
|
@ -99,6 +100,7 @@ class FlowsService:
|
||||||
|
|
||||||
logger.warning(f"Flow with ID {flow_id} not found in flows directory")
|
logger.warning(f"Flow with ID {flow_id} not found in flows directory")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def reset_langflow_flow(self, flow_type: str):
|
async def reset_langflow_flow(self, flow_type: str):
|
||||||
"""Reset a Langflow flow by uploading the corresponding JSON file
|
"""Reset a Langflow flow by uploading the corresponding JSON file
|
||||||
|
|
||||||
|
|
@ -135,7 +137,9 @@ class FlowsService:
|
||||||
try:
|
try:
|
||||||
with open(flow_path, "r") as f:
|
with open(flow_path, "r") as f:
|
||||||
flow_data = json.load(f)
|
flow_data = json.load(f)
|
||||||
logger.info(f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}")
|
logger.info(
|
||||||
|
f"Successfully loaded flow data for {flow_type} from {os.path.basename(flow_path)}"
|
||||||
|
)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
|
raise ValueError(f"Invalid JSON in flow file {flow_path}: {e}")
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
|
|
@ -161,43 +165,62 @@ class FlowsService:
|
||||||
|
|
||||||
# Check if configuration has been edited (onboarding completed)
|
# Check if configuration has been edited (onboarding completed)
|
||||||
if config.edited:
|
if config.edited:
|
||||||
logger.info(f"Updating {flow_type} flow with current configuration settings")
|
logger.info(
|
||||||
|
f"Updating {flow_type} flow with current configuration settings"
|
||||||
|
)
|
||||||
|
|
||||||
provider = config.provider.model_provider.lower()
|
provider = config.provider.model_provider.lower()
|
||||||
|
|
||||||
# Step 1: Assign model provider (replace components) if not OpenAI
|
# Step 1: Assign model provider (replace components) if not OpenAI
|
||||||
if provider != "openai":
|
if provider != "openai":
|
||||||
logger.info(f"Assigning {provider} components to {flow_type} flow")
|
logger.info(
|
||||||
|
f"Assigning {provider} components to {flow_type} flow"
|
||||||
|
)
|
||||||
provider_result = await self.assign_model_provider(provider)
|
provider_result = await self.assign_model_provider(provider)
|
||||||
|
|
||||||
if not provider_result.get("success"):
|
if not provider_result.get("success"):
|
||||||
logger.warning(f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}")
|
logger.warning(
|
||||||
|
f"Failed to assign {provider} components: {provider_result.get('error', 'Unknown error')}"
|
||||||
|
)
|
||||||
# Continue anyway, maybe just value updates will work
|
# Continue anyway, maybe just value updates will work
|
||||||
|
|
||||||
# Step 2: Update model values for the specific flow being reset
|
# Step 2: Update model values for the specific flow being reset
|
||||||
single_flow_config = [{
|
single_flow_config = [
|
||||||
|
{
|
||||||
"name": flow_type,
|
"name": flow_type,
|
||||||
"flow_id": flow_id,
|
"flow_id": flow_id,
|
||||||
}]
|
}
|
||||||
|
]
|
||||||
|
|
||||||
logger.info(f"Updating {flow_type} flow model values")
|
logger.info(f"Updating {flow_type} flow model values")
|
||||||
update_result = await self.change_langflow_model_value(
|
update_result = await self.change_langflow_model_value(
|
||||||
provider=provider,
|
provider=provider,
|
||||||
embedding_model=config.knowledge.embedding_model,
|
embedding_model=config.knowledge.embedding_model,
|
||||||
llm_model=config.agent.llm_model,
|
llm_model=config.agent.llm_model,
|
||||||
endpoint=config.provider.endpoint if config.provider.endpoint else None,
|
endpoint=config.provider.endpoint
|
||||||
flow_configs=single_flow_config
|
if config.provider.endpoint
|
||||||
|
else None,
|
||||||
|
flow_configs=single_flow_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
if update_result.get("success"):
|
if update_result.get("success"):
|
||||||
logger.info(f"Successfully updated {flow_type} flow with current configuration")
|
logger.info(
|
||||||
|
f"Successfully updated {flow_type} flow with current configuration"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}")
|
logger.warning(
|
||||||
|
f"Failed to update {flow_type} flow with current configuration: {update_result.get('error', 'Unknown error')}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info(f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow")
|
logger.info(
|
||||||
|
f"Configuration not yet edited (onboarding not completed), skipping model updates for {flow_type} flow"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating {flow_type} flow with current configuration", error=str(e))
|
logger.error(
|
||||||
|
f"Error updating {flow_type} flow with current configuration",
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
# Don't fail the entire reset operation if configuration update fails
|
# Don't fail the entire reset operation if configuration update fails
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|
@ -243,7 +266,9 @@ class FlowsService:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Load component templates based on provider
|
# Load component templates based on provider
|
||||||
llm_template, embedding_template, llm_text_template = self._load_component_templates(provider)
|
llm_template, embedding_template, llm_text_template = (
|
||||||
|
self._load_component_templates(provider)
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(f"Assigning {provider} components")
|
logger.info(f"Assigning {provider} components")
|
||||||
|
|
||||||
|
|
@ -358,7 +383,9 @@ class FlowsService:
|
||||||
logger.info(f"Loaded component templates for {provider}")
|
logger.info(f"Loaded component templates for {provider}")
|
||||||
return llm_template, embedding_template, llm_text_template
|
return llm_template, embedding_template, llm_text_template
|
||||||
|
|
||||||
async def _update_flow_components(self, config, llm_template, embedding_template, llm_text_template):
|
async def _update_flow_components(
|
||||||
|
self, config, llm_template, embedding_template, llm_text_template
|
||||||
|
):
|
||||||
"""Update components in a specific flow"""
|
"""Update components in a specific flow"""
|
||||||
flow_name = config["name"]
|
flow_name = config["name"]
|
||||||
flow_id = config["flow_id"]
|
flow_id = config["flow_id"]
|
||||||
|
|
@ -383,6 +410,7 @@ class FlowsService:
|
||||||
components_updated = []
|
components_updated = []
|
||||||
|
|
||||||
# Replace embedding component
|
# Replace embedding component
|
||||||
|
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
|
embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
|
||||||
if embedding_node:
|
if embedding_node:
|
||||||
# Preserve position
|
# Preserve position
|
||||||
|
|
@ -393,7 +421,9 @@ class FlowsService:
|
||||||
new_embedding_node["position"] = original_position
|
new_embedding_node["position"] = original_position
|
||||||
|
|
||||||
# Replace in flow
|
# Replace in flow
|
||||||
self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node)
|
self._replace_node_in_flow(
|
||||||
|
flow_data, old_embedding_id, new_embedding_node
|
||||||
|
)
|
||||||
components_updated.append(
|
components_updated.append(
|
||||||
f"embedding: {old_embedding_id} -> {new_embedding_id}"
|
f"embedding: {old_embedding_id} -> {new_embedding_id}"
|
||||||
)
|
)
|
||||||
|
|
@ -425,13 +455,18 @@ class FlowsService:
|
||||||
new_llm_text_node["position"] = original_position
|
new_llm_text_node["position"] = original_position
|
||||||
|
|
||||||
# Replace in flow
|
# Replace in flow
|
||||||
self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node)
|
self._replace_node_in_flow(
|
||||||
components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}")
|
flow_data, old_llm_text_id, new_llm_text_node
|
||||||
|
)
|
||||||
|
components_updated.append(
|
||||||
|
f"llm: {old_llm_text_id} -> {new_llm_text_id}"
|
||||||
|
)
|
||||||
|
|
||||||
# Update all edge references using regex replacement
|
# Update all edge references using regex replacement
|
||||||
flow_json_str = json.dumps(flow_data)
|
flow_json_str = json.dumps(flow_data)
|
||||||
|
|
||||||
# Replace embedding ID references
|
# Replace embedding ID references
|
||||||
|
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
flow_json_str = re.sub(
|
flow_json_str = re.sub(
|
||||||
re.escape(old_embedding_id), new_embedding_id, flow_json_str
|
re.escape(old_embedding_id), new_embedding_id, flow_json_str
|
||||||
)
|
)
|
||||||
|
|
@ -443,9 +478,7 @@ class FlowsService:
|
||||||
|
|
||||||
# Replace LLM ID references (if applicable)
|
# Replace LLM ID references (if applicable)
|
||||||
if old_llm_id:
|
if old_llm_id:
|
||||||
flow_json_str = re.sub(
|
flow_json_str = re.sub(re.escape(old_llm_id), new_llm_id, flow_json_str)
|
||||||
re.escape(old_llm_id), new_llm_id, flow_json_str
|
|
||||||
)
|
|
||||||
if old_llm_text_id:
|
if old_llm_text_id:
|
||||||
flow_json_str = re.sub(
|
flow_json_str = re.sub(
|
||||||
re.escape(old_llm_text_id), new_llm_text_id, flow_json_str
|
re.escape(old_llm_text_id), new_llm_text_id, flow_json_str
|
||||||
|
|
@ -506,7 +539,14 @@ class FlowsService:
|
||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None):
|
async def _update_flow_field(
|
||||||
|
self,
|
||||||
|
flow_id: str,
|
||||||
|
field_name: str,
|
||||||
|
field_value: str,
|
||||||
|
node_display_name: str = None,
|
||||||
|
node_id: str = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Generic helper function to update any field in any Langflow component.
|
Generic helper function to update any field in any Langflow component.
|
||||||
|
|
||||||
|
|
@ -521,22 +561,26 @@ class FlowsService:
|
||||||
raise ValueError("flow_id is required")
|
raise ValueError("flow_id is required")
|
||||||
|
|
||||||
# Get the current flow data from Langflow
|
# Get the current flow data from Langflow
|
||||||
response = await clients.langflow_request(
|
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||||
"GET", f"/api/v1/flows/{flow_id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}")
|
raise Exception(
|
||||||
|
f"Failed to get flow: HTTP {response.status_code} - {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
flow_data = response.json()
|
flow_data = response.json()
|
||||||
|
|
||||||
# Find the target component by display name first, then by ID as fallback
|
# Find the target component by display name first, then by ID as fallback
|
||||||
target_node, target_node_index = None, None
|
target_node, target_node_index = None, None
|
||||||
if node_display_name:
|
if node_display_name:
|
||||||
target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name)
|
target_node, target_node_index = self._find_node_in_flow(
|
||||||
|
flow_data, display_name=node_display_name
|
||||||
|
)
|
||||||
|
|
||||||
if target_node is None and node_id:
|
if target_node is None and node_id:
|
||||||
target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id)
|
target_node, target_node_index = self._find_node_in_flow(
|
||||||
|
flow_data, node_id=node_id
|
||||||
|
)
|
||||||
|
|
||||||
if target_node is None:
|
if target_node is None:
|
||||||
identifier = node_display_name or node_id
|
identifier = node_display_name or node_id
|
||||||
|
|
@ -545,7 +589,9 @@ class FlowsService:
|
||||||
# Update the field value directly in the existing node
|
# Update the field value directly in the existing node
|
||||||
template = target_node.get("data", {}).get("node", {}).get("template", {})
|
template = target_node.get("data", {}).get("node", {}).get("template", {})
|
||||||
if template.get(field_name):
|
if template.get(field_name):
|
||||||
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value
|
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][
|
||||||
|
field_name
|
||||||
|
]["value"] = field_value
|
||||||
else:
|
else:
|
||||||
identifier = node_display_name or node_id
|
identifier = node_display_name or node_id
|
||||||
raise Exception(f"{field_name} field not found in {identifier} component")
|
raise Exception(f"{field_name} field not found in {identifier} component")
|
||||||
|
|
@ -556,21 +602,31 @@ class FlowsService:
|
||||||
)
|
)
|
||||||
|
|
||||||
if patch_response.status_code != 200:
|
if patch_response.status_code != 200:
|
||||||
raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}")
|
raise Exception(
|
||||||
|
f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
async def update_chat_flow_model(self, model_name: str):
|
async def update_chat_flow_model(self, model_name: str):
|
||||||
"""Helper function to update the model in the chat flow"""
|
"""Helper function to update the model in the chat flow"""
|
||||||
if not LANGFLOW_CHAT_FLOW_ID:
|
if not LANGFLOW_CHAT_FLOW_ID:
|
||||||
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
||||||
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name,
|
await self._update_flow_field(
|
||||||
node_display_name="Language Model")
|
LANGFLOW_CHAT_FLOW_ID,
|
||||||
|
"model_name",
|
||||||
|
model_name,
|
||||||
|
node_display_name="Language Model",
|
||||||
|
)
|
||||||
|
|
||||||
async def update_chat_flow_system_prompt(self, system_prompt: str):
|
async def update_chat_flow_system_prompt(self, system_prompt: str):
|
||||||
"""Helper function to update the system prompt in the chat flow"""
|
"""Helper function to update the system prompt in the chat flow"""
|
||||||
if not LANGFLOW_CHAT_FLOW_ID:
|
if not LANGFLOW_CHAT_FLOW_ID:
|
||||||
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
|
||||||
await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt,
|
await self._update_flow_field(
|
||||||
node_display_name="Agent")
|
LANGFLOW_CHAT_FLOW_ID,
|
||||||
|
"system_prompt",
|
||||||
|
system_prompt,
|
||||||
|
node_display_name="Agent",
|
||||||
|
)
|
||||||
|
|
||||||
async def update_flow_docling_preset(self, preset: str, preset_config: dict):
|
async def update_flow_docling_preset(self, preset: str, preset_config: dict):
|
||||||
"""Helper function to update docling preset in the ingest flow"""
|
"""Helper function to update docling preset in the ingest flow"""
|
||||||
|
|
@ -578,29 +634,46 @@ class FlowsService:
|
||||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
|
|
||||||
from config.settings import DOCLING_COMPONENT_ID
|
from config.settings import DOCLING_COMPONENT_ID
|
||||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config,
|
|
||||||
node_id=DOCLING_COMPONENT_ID)
|
await self._update_flow_field(
|
||||||
|
LANGFLOW_INGEST_FLOW_ID,
|
||||||
|
"docling_serve_opts",
|
||||||
|
preset_config,
|
||||||
|
node_id=DOCLING_COMPONENT_ID,
|
||||||
|
)
|
||||||
|
|
||||||
async def update_ingest_flow_chunk_size(self, chunk_size: int):
|
async def update_ingest_flow_chunk_size(self, chunk_size: int):
|
||||||
"""Helper function to update chunk size in the ingest flow"""
|
"""Helper function to update chunk size in the ingest flow"""
|
||||||
if not LANGFLOW_INGEST_FLOW_ID:
|
if not LANGFLOW_INGEST_FLOW_ID:
|
||||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size,
|
await self._update_flow_field(
|
||||||
node_display_name="Split Text")
|
LANGFLOW_INGEST_FLOW_ID,
|
||||||
|
"chunk_size",
|
||||||
|
chunk_size,
|
||||||
|
node_display_name="Split Text",
|
||||||
|
)
|
||||||
|
|
||||||
async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int):
|
async def update_ingest_flow_chunk_overlap(self, chunk_overlap: int):
|
||||||
"""Helper function to update chunk overlap in the ingest flow"""
|
"""Helper function to update chunk overlap in the ingest flow"""
|
||||||
if not LANGFLOW_INGEST_FLOW_ID:
|
if not LANGFLOW_INGEST_FLOW_ID:
|
||||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap,
|
await self._update_flow_field(
|
||||||
node_display_name="Split Text")
|
LANGFLOW_INGEST_FLOW_ID,
|
||||||
|
"chunk_overlap",
|
||||||
|
chunk_overlap,
|
||||||
|
node_display_name="Split Text",
|
||||||
|
)
|
||||||
|
|
||||||
async def update_ingest_flow_embedding_model(self, embedding_model: str):
|
async def update_ingest_flow_embedding_model(self, embedding_model: str):
|
||||||
"""Helper function to update embedding model in the ingest flow"""
|
"""Helper function to update embedding model in the ingest flow"""
|
||||||
if not LANGFLOW_INGEST_FLOW_ID:
|
if not LANGFLOW_INGEST_FLOW_ID:
|
||||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model,
|
await self._update_flow_field(
|
||||||
node_display_name="Embedding Model")
|
LANGFLOW_INGEST_FLOW_ID,
|
||||||
|
"model",
|
||||||
|
embedding_model,
|
||||||
|
node_display_name="Embedding Model",
|
||||||
|
)
|
||||||
|
|
||||||
def _replace_node_in_flow(self, flow_data, old_id, new_node):
|
def _replace_node_in_flow(self, flow_data, old_id, new_node):
|
||||||
"""Replace a node in the flow data"""
|
"""Replace a node in the flow data"""
|
||||||
|
|
@ -612,7 +685,12 @@ class FlowsService:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def change_langflow_model_value(
|
async def change_langflow_model_value(
|
||||||
self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None, flow_configs: list = None
|
self,
|
||||||
|
provider: str,
|
||||||
|
embedding_model: str,
|
||||||
|
llm_model: str,
|
||||||
|
endpoint: str = None,
|
||||||
|
flow_configs: list = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Change dropdown values for provider-specific components across flows
|
Change dropdown values for provider-specific components across flows
|
||||||
|
|
@ -656,8 +734,8 @@ class FlowsService:
|
||||||
]
|
]
|
||||||
|
|
||||||
# Determine target component IDs based on provider
|
# Determine target component IDs based on provider
|
||||||
target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids(
|
target_embedding_id, target_llm_id, target_llm_text_id = (
|
||||||
provider
|
self._get_provider_component_ids(provider)
|
||||||
)
|
)
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
|
|
@ -713,12 +791,24 @@ class FlowsService:
|
||||||
def _get_provider_component_ids(self, provider: str):
|
def _get_provider_component_ids(self, provider: str):
|
||||||
"""Get the component IDs for a specific provider"""
|
"""Get the component IDs for a specific provider"""
|
||||||
if provider == "watsonx":
|
if provider == "watsonx":
|
||||||
return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID
|
return (
|
||||||
|
WATSONX_EMBEDDING_COMPONENT_ID,
|
||||||
|
WATSONX_LLM_COMPONENT_ID,
|
||||||
|
WATSONX_LLM_TEXT_COMPONENT_ID,
|
||||||
|
)
|
||||||
elif provider == "ollama":
|
elif provider == "ollama":
|
||||||
return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID
|
return (
|
||||||
|
OLLAMA_EMBEDDING_COMPONENT_ID,
|
||||||
|
OLLAMA_LLM_COMPONENT_ID,
|
||||||
|
OLLAMA_LLM_TEXT_COMPONENT_ID,
|
||||||
|
)
|
||||||
elif provider == "openai":
|
elif provider == "openai":
|
||||||
# OpenAI components are the default ones
|
# OpenAI components are the default ones
|
||||||
return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID
|
return (
|
||||||
|
OPENAI_EMBEDDING_COMPONENT_ID,
|
||||||
|
OPENAI_LLM_COMPONENT_ID,
|
||||||
|
OPENAI_LLM_TEXT_COMPONENT_ID,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unsupported provider: {provider}")
|
raise ValueError(f"Unsupported provider: {provider}")
|
||||||
|
|
||||||
|
|
@ -738,9 +828,7 @@ class FlowsService:
|
||||||
flow_id = config["flow_id"]
|
flow_id = config["flow_id"]
|
||||||
|
|
||||||
# Get flow data from Langflow API instead of file
|
# Get flow data from Langflow API instead of file
|
||||||
response = await clients.langflow_request(
|
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||||
"GET", f"/api/v1/flows/{flow_id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
|
|
@ -752,6 +840,7 @@ class FlowsService:
|
||||||
updates_made = []
|
updates_made = []
|
||||||
|
|
||||||
# Update embedding component
|
# Update embedding component
|
||||||
|
if not DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
|
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
|
||||||
if embedding_node:
|
if embedding_node:
|
||||||
if self._update_component_fields(
|
if self._update_component_fields(
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,27 @@ class ModelsService:
|
||||||
"jina-embeddings-v2-base-en",
|
"jina-embeddings-v2-base-en",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
OPENAI_TOOL_CALLING_MODELS = [
|
||||||
|
"gpt-5",
|
||||||
|
"gpt-5-mini",
|
||||||
|
"gpt-5-nano",
|
||||||
|
"gpt-4o-mini",
|
||||||
|
"gpt-4o",
|
||||||
|
"gpt-4.1",
|
||||||
|
"gpt-4.1-mini",
|
||||||
|
"gpt-4.1-nano",
|
||||||
|
"gpt-4-turbo",
|
||||||
|
"gpt-4-turbo-preview",
|
||||||
|
"gpt-4",
|
||||||
|
"gpt-3.5-turbo",
|
||||||
|
"o1",
|
||||||
|
"o3-mini",
|
||||||
|
"o3",
|
||||||
|
"o3-pro",
|
||||||
|
"o4-mini",
|
||||||
|
"o4-mini-high",
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.session_manager = None
|
self.session_manager = None
|
||||||
|
|
||||||
|
|
@ -48,12 +69,12 @@ class ModelsService:
|
||||||
model_id = model.get("id", "")
|
model_id = model.get("id", "")
|
||||||
|
|
||||||
# Language models (GPT models)
|
# Language models (GPT models)
|
||||||
if any(prefix in model_id for prefix in ["gpt-4", "gpt-3.5"]):
|
if model_id in self.OPENAI_TOOL_CALLING_MODELS:
|
||||||
language_models.append(
|
language_models.append(
|
||||||
{
|
{
|
||||||
"value": model_id,
|
"value": model_id,
|
||||||
"label": model_id,
|
"label": model_id,
|
||||||
"default": model_id == "gpt-4o-mini",
|
"default": model_id == "gpt-5",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue