From ab4b62031f4e7b5261e7dd4d41af863ea5e46fd9 Mon Sep 17 00:00:00 2001 From: buua436 Date: Wed, 10 Dec 2025 16:44:06 +0800 Subject: [PATCH 1/4] Fix:csv parse in Table (#11870) ### What problem does this PR solve? change: csv parse in Table ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/app/table.py | 30 ++++++++++++++++++- web/package-lock.json | 17 +++++++++++ web/package.json | 2 ++ .../document-preview/csv-preview.tsx | 18 ++++++----- 4 files changed, 59 insertions(+), 8 deletions(-) diff --git a/rag/app/table.py b/rag/app/table.py index 7a21a738a..a87a858bf 100644 --- a/rag/app/table.py +++ b/rag/app/table.py @@ -15,6 +15,8 @@ # import copy +import csv +import io import logging import re from io import BytesIO @@ -323,7 +325,7 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese callback(0.1, "Start to parse.") excel_parser = Excel() dfs = excel_parser(filename, binary, from_page=from_page, to_page=to_page, callback=callback) - elif re.search(r"\.(txt|csv)$", filename, re.IGNORECASE): + elif re.search(r"\.txt$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") txt = get_text(filename, binary) lines = txt.split("\n") @@ -344,7 +346,33 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese callback(0.3, ("Extract records: {}~{}".format(from_page, min(len(lines), to_page)) + (f"{len(fails)} failure, line: %s..." % (",".join(fails[:3])) if fails else ""))) dfs = [pd.DataFrame(np.array(rows), columns=headers)] + elif re.search(r"\.csv$", filename, re.IGNORECASE): + callback(0.1, "Start to parse.") + txt = get_text(filename, binary) + delimiter = kwargs.get("delimiter", ",") + reader = csv.reader(io.StringIO(txt), delimiter=delimiter) + all_rows = list(reader) + if not all_rows: + raise ValueError("Empty CSV file") + + headers = all_rows[0] + fails = [] + rows = [] + + for i, row in enumerate(all_rows[1 + from_page : 1 + to_page]): + if len(row) != len(headers): + fails.append(str(i + from_page)) + continue + rows.append(row) + + callback( + 0.3, + (f"Extract records: {from_page}~{from_page + len(rows)}" + + (f"{len(fails)} failure, line: {','.join(fails[:3])}..." if fails else "")) + ) + + dfs = [pd.DataFrame(rows, columns=headers)] else: raise NotImplementedError("file type not supported yet(excel, text, csv supported)") diff --git a/web/package-lock.json b/web/package-lock.json index 880a1c9b4..ed94049b2 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -45,6 +45,7 @@ "@tanstack/react-query": "^5.40.0", "@tanstack/react-query-devtools": "^5.51.5", "@tanstack/react-table": "^8.20.5", + "@types/papaparse": "^5.5.1", "@uiw/react-markdown-preview": "^5.1.3", "@xyflow/react": "^12.3.6", "ahooks": "^3.7.10", @@ -73,6 +74,7 @@ "mammoth": "^1.7.2", "next-themes": "^0.4.6", "openai-speech-stream-player": "^1.0.8", + "papaparse": "^5.5.3", "pptx-preview": "^1.0.5", "rc-tween-one": "^3.0.6", "react": "^18.2.0", @@ -10632,6 +10634,15 @@ "integrity": "sha512-37i+OaWTh9qeK4LSHPsyRC7NahnGotNuZvjLSgcPzblpHB3rrCJxAOgI5gCdKm7coonsaX1Of0ILiTcnZjbfxA==", "peer": true }, + "node_modules/@types/papaparse": { + "version": "5.5.1", + "resolved": "https://registry.npmmirror.com/@types/papaparse/-/papaparse-5.5.1.tgz", + "integrity": "sha512-esEO+VISsLIyE+JZBmb89NzsYYbpwV8lmv2rPo6oX5y9KhBaIP7hhHgjuTut54qjdKVMufTEcrh5fUl9+58huw==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/parse-json": { "version": "4.0.2", "resolved": "https://registry.npmmirror.com/@types/parse-json/-/parse-json-4.0.2.tgz", @@ -27413,6 +27424,12 @@ "resolved": "https://registry.npmmirror.com/pako/-/pako-1.0.11.tgz", "integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==" }, + "node_modules/papaparse": { + "version": "5.5.3", + "resolved": "https://registry.npmmirror.com/papaparse/-/papaparse-5.5.3.tgz", + "integrity": "sha512-5QvjGxYVjxO59MGU2lHVYpRWBBtKHnlIAcSe1uNFCkkptUh63NFRj0FJQm7nR67puEruUci/ZkjmEFrjCAyP4A==", + "license": "MIT" + }, "node_modules/param-case": { "version": "3.0.4", "resolved": "https://registry.npmmirror.com/param-case/-/param-case-3.0.4.tgz", diff --git a/web/package.json b/web/package.json index f183c8008..051c4b9d7 100644 --- a/web/package.json +++ b/web/package.json @@ -58,6 +58,7 @@ "@tanstack/react-query": "^5.40.0", "@tanstack/react-query-devtools": "^5.51.5", "@tanstack/react-table": "^8.20.5", + "@types/papaparse": "^5.5.1", "@uiw/react-markdown-preview": "^5.1.3", "@xyflow/react": "^12.3.6", "ahooks": "^3.7.10", @@ -86,6 +87,7 @@ "mammoth": "^1.7.2", "next-themes": "^0.4.6", "openai-speech-stream-player": "^1.0.8", + "papaparse": "^5.5.3", "pptx-preview": "^1.0.5", "rc-tween-one": "^3.0.6", "react": "^18.2.0", diff --git a/web/src/components/document-preview/csv-preview.tsx b/web/src/components/document-preview/csv-preview.tsx index 45b05454e..fa1cf1ed8 100644 --- a/web/src/components/document-preview/csv-preview.tsx +++ b/web/src/components/document-preview/csv-preview.tsx @@ -2,6 +2,7 @@ import message from '@/components/ui/message'; import { Spin } from '@/components/ui/spin'; import request from '@/utils/request'; import classNames from 'classnames'; +import Papa from 'papaparse'; import React, { useEffect, useRef, useState } from 'react'; interface CSVData { @@ -20,14 +21,17 @@ const CSVFileViewer: React.FC = ({ url }) => { const containerRef = useRef(null); // const url = useGetDocumentUrl(); const parseCSV = (csvText: string): CSVData => { - console.log('Parsing CSV data:', csvText); - const lines = csvText.split('\n'); - const headers = lines[0].split(',').map((header) => header.trim()); - const rows = lines - .slice(1) - .map((line) => line.split(',').map((cell) => cell.trim())); + const result = Papa.parse(csvText, { + header: false, + skipEmptyLines: false, + }); - return { headers, rows }; + const rows = result.data as string[][]; + + const headers = rows[0]; + const dataRows = rows.slice(1); + + return { headers, rows: dataRows }; }; useEffect(() => { From 3cb72377d7e8e98d287df347ff0b15abe72ff4b8 Mon Sep 17 00:00:00 2001 From: buua436 Date: Wed, 10 Dec 2025 19:08:45 +0800 Subject: [PATCH 2/4] Refa:remove sensitive information (#11873) ### What problem does this PR solve? change: remove sensitive information ### Type of change - [x] Refactoring --- .github/workflows/tests.yml | 2 ++ admin/client/admin_client.py | 12 ++++---- admin/server/auth.py | 6 ++-- api/apps/canvas_app.py | 10 ++++++- api/db/init_data.py | 2 +- api/db/joint_services/user_account_service.py | 2 +- api/db/services/llm_service.py | 4 +-- common/data_source/confluence_connector.py | 5 +++- common/data_source/jira/connector.py | 7 ++--- common/http_client.py | 28 +++++++++++++++++-- rag/utils/opendal_conn.py | 8 +++++- 11 files changed, 62 insertions(+), 24 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5341d83ae..a5bdc1735 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,4 +1,6 @@ name: tests +permissions: + contents: read on: push: diff --git a/admin/client/admin_client.py b/admin/client/admin_client.py index 4b210d2b5..8cad14bab 100644 --- a/admin/client/admin_client.py +++ b/admin/client/admin_client.py @@ -351,7 +351,7 @@ class AdminCLI(Cmd): def verify_admin(self, arguments: dict, single_command: bool): self.host = arguments['host'] self.port = arguments['port'] - print(f"Attempt to access ip: {self.host}, port: {self.port}") + print("Attempt to access server for admin login") url = f"http://{self.host}:{self.port}/api/v1/admin/login" attempt_count = 3 @@ -390,7 +390,7 @@ class AdminCLI(Cmd): print(f"Bad response,status: {response.status_code}, password is wrong") except Exception as e: print(str(e)) - print(f"Can't access {self.host}, port: {self.port}") + print("Can't access server for admin login (connection failed)") def _format_service_detail_table(self, data): if isinstance(data, list): @@ -674,7 +674,7 @@ class AdminCLI(Cmd): user_name: str = user_name_tree.children[0].strip("'\"") password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") - print(f"Alter user: {user_name}, password: {password}") + print(f"Alter user: {user_name}, password: ******") url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/password' response = self.session.put(url, json={'new_password': encrypt(password)}) res_json = response.json() @@ -689,7 +689,7 @@ class AdminCLI(Cmd): password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") role: str = command['role'] - print(f"Create user: {user_name}, password: {password}, role: {role}") + print(f"Create user: {user_name}, password: ******, role: {role}") url = f'http://{self.host}:{self.port}/api/v1/admin/users' response = self.session.post( url, @@ -951,7 +951,7 @@ def main(): args = cli.parse_connection_args(sys.argv) if 'error' in args: - print(f"Error: {args['error']}") + print("Error: Invalid connection arguments") return if 'command' in args: @@ -960,7 +960,7 @@ def main(): return if cli.verify_admin(args, single_command=True): command: str = args['command'] - print(f"Run single command: {command}") + # print(f"Run single command: {command}") cli.run_single_command(command) else: if cli.verify_admin(args, single_command=False): diff --git a/admin/server/auth.py b/admin/server/auth.py index 6c8bc2cb8..486b9a4fb 100644 --- a/admin/server/auth.py +++ b/admin/server/auth.py @@ -176,11 +176,11 @@ def login_verify(f): "message": "Access denied", "data": None }), 200 - except Exception as e: - error_msg = str(e) + except Exception: + logging.exception("An error occurred during admin login verification.") return jsonify({ "code": 500, - "message": error_msg + "message": "An internal server error occurred." }), 200 return f(*args, **kwargs) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index fe32dca0b..ed8c8c7a0 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -342,7 +342,15 @@ async def test_db_connect(): f"UID={req['username']};" f"PWD={req['password']};" ) - logging.info(conn_str) + redacted_conn_str = ( + f"DATABASE={req['database']};" + f"HOSTNAME={req['host']};" + f"PORT={req['port']};" + f"PROTOCOL=TCPIP;" + f"UID={req['username']};" + f"PWD=****;" + ) + logging.info(redacted_conn_str) conn = ibm_db.connect(conn_str, "", "") stmt = ibm_db.exec_immediate(conn, "SELECT 1 FROM sysibm.sysdummy1") ibm_db.fetch_assoc(stmt) diff --git a/api/db/init_data.py b/api/db/init_data.py index d4873d332..7454965eb 100644 --- a/api/db/init_data.py +++ b/api/db/init_data.py @@ -73,7 +73,7 @@ def init_superuser(nickname=DEFAULT_SUPERUSER_NICKNAME, email=DEFAULT_SUPERUSER_ UserTenantService.insert(**usr_tenant) TenantLLMService.insert_many(tenant_llm) logging.info( - f"Super user initialized. email: {email}, password: {password}. Changing the password after login is strongly recommended.") + f"Super user initialized. email: {email},A default password has been set; changing the password after login is strongly recommended.") chat_mdl = LLMBundle(tenant["id"], LLMType.CHAT, tenant["llm_id"]) msg = chat_mdl.chat(system="", history=[ diff --git a/api/db/joint_services/user_account_service.py b/api/db/joint_services/user_account_service.py index 34ceee648..48937653e 100644 --- a/api/db/joint_services/user_account_service.py +++ b/api/db/joint_services/user_account_service.py @@ -273,7 +273,7 @@ def delete_user_data(user_id: str) -> dict: except Exception as e: logging.exception(e) - return {"success": False, "message": f"Error: {str(e)}. Already done:\n{done_msg}"} + return {"success": False, "message": "An internal error occurred during user deletion. Some operations may have completed.","details": done_msg} def delete_user_agents(user_id: str) -> dict: diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index 86356a7a7..e4bf64aac 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -109,7 +109,7 @@ class LLMBundle(LLM4Tenant): llm_name = getattr(self, "llm_name", None) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens, llm_name): - logging.error("LLMBundle.encode can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) + logging.error("LLMBundle.encode can't update token usage for /EMBEDDING used_tokens: {}".format(used_tokens)) if self.langfuse: generation.update(usage_details={"total_tokens": used_tokens}) @@ -124,7 +124,7 @@ class LLMBundle(LLM4Tenant): emd, used_tokens = self.mdl.encode_queries(query) llm_name = getattr(self, "llm_name", None) if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, used_tokens, llm_name): - logging.error("LLMBundle.encode_queries can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) + logging.error("LLMBundle.encode_queries can't update token usage for /EMBEDDING used_tokens: {}".format(used_tokens)) if self.langfuse: generation.update(usage_details={"total_tokens": used_tokens}) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index a057d0694..aff225703 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1110,7 +1110,10 @@ def _make_attachment_link( ) -> str | None: download_link = "" - if "api.atlassian.com" in confluence_client.url: + from urllib.parse import urlparse + netloc =urlparse(confluence_client.url).hostname + if netloc == "api.atlassian.com" or (netloc and netloc.endswith(".api.atlassian.com")): + # if "api.atlassian.com" in confluence_client.url: # https://developer.atlassian.com/cloud/confluence/rest/v1/api-group-content---attachments/#api-wiki-rest-api-content-id-child-attachment-attachmentid-download-get if not parent_content_id: logging.warning( diff --git a/common/data_source/jira/connector.py b/common/data_source/jira/connector.py index 06a0a9069..2a93aaf51 100644 --- a/common/data_source/jira/connector.py +++ b/common/data_source/jira/connector.py @@ -135,7 +135,7 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync except ValueError as exc: raise ConnectorValidationError(str(exc)) from exc else: - logger.warning(f"[Jira] Scoped token requested but Jira base URL {self.jira_base_url} does not appear to be an Atlassian Cloud domain; scoped token ignored.") + logger.warning("[Jira] Scoped token requested but Jira base URL does not appear to be an Atlassian Cloud domain; scoped token ignored.") user_email = credentials.get("jira_user_email") or credentials.get("username") api_token = credentials.get("jira_api_token") or credentials.get("token") or credentials.get("api_token") @@ -245,7 +245,7 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync while True: attempt += 1 jql = self._build_jql(attempt_start, end) - logger.info(f"[Jira] Executing Jira JQL attempt {attempt} (start={attempt_start}, end={end}, buffered_retry={retried_with_buffer}): {jql}") + logger.info(f"[Jira] Executing Jira JQL attempt {attempt} (start={attempt_start}, end={end}, buffered_retry={retried_with_buffer})") try: return (yield from self._load_from_checkpoint_internal(jql, checkpoint, start_filter=start)) except Exception as exc: @@ -927,9 +927,6 @@ def main(config: dict[str, Any] | None = None) -> None: base_url = config.get("base_url") credentials = config.get("credentials", {}) - print(f"[Jira] {config=}", flush=True) - print(f"[Jira] {credentials=}", flush=True) - if not base_url: raise RuntimeError("Jira base URL must be provided via config or CLI arguments.") if not (credentials.get("jira_api_token") or (credentials.get("jira_user_email") and credentials.get("jira_password"))): diff --git a/common/http_client.py b/common/http_client.py index 91ac0cadc..5c57f8638 100644 --- a/common/http_client.py +++ b/common/http_client.py @@ -16,6 +16,7 @@ import logging import os import time from typing import Any, Dict, Optional +from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import httpx @@ -52,6 +53,27 @@ def _get_delay(backoff_factor: float, attempt: int) -> float: return backoff_factor * (2**attempt) +# List of sensitive parameters to redact from URLs before logging +_SENSITIVE_QUERY_KEYS = {"client_secret", "secret", "code", "access_token", "refresh_token", "password", "token", "app_secret"} + +def _redact_sensitive_url_params(url: str) -> str: + try: + parsed = urlparse(url) + if not parsed.query: + return url + clean_query = [] + for k, v in parse_qsl(parsed.query, keep_blank_values=True): + if k.lower() in _SENSITIVE_QUERY_KEYS: + clean_query.append((k, "***REDACTED***")) + else: + clean_query.append((k, v)) + new_query = urlencode(clean_query, doseq=True) + redacted_url = urlunparse(parsed._replace(query=new_query)) + return redacted_url + except Exception: + return url + + async def async_request( method: str, url: str, @@ -94,19 +116,19 @@ async def async_request( ) duration = time.monotonic() - start logger.debug( - f"async_request {method} {url} -> {response.status_code} in {duration:.3f}s" + f"async_request {method} {_redact_sensitive_url_params(url)} -> {response.status_code} in {duration:.3f}s" ) return response except httpx.RequestError as exc: last_exc = exc if attempt >= retries: logger.warning( - f"async_request exhausted retries for {method} {url}: {exc}" + f"async_request exhausted retries for {method} {_redact_sensitive_url_params(url)}: {exc}" ) raise delay = _get_delay(backoff_factor, attempt) logger.warning( - f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {url}: {exc}; retrying in {delay:.2f}s" + f"async_request attempt {attempt + 1}/{retries + 1} failed for {method} {_redact_sensitive_url_params(url)}: {exc}; retrying in {delay:.2f}s" ) await asyncio.sleep(delay) raise last_exc # pragma: no cover diff --git a/rag/utils/opendal_conn.py b/rag/utils/opendal_conn.py index c6cebf9ca..a260daebc 100644 --- a/rag/utils/opendal_conn.py +++ b/rag/utils/opendal_conn.py @@ -41,7 +41,13 @@ def get_opendal_config(): scheme = opendal_config.get("scheme") config_data = opendal_config.get("config", {}) kwargs = {"scheme": scheme, **config_data} - logging.info("Loaded OpenDAL configuration from yaml: %s", kwargs) + redacted_kwargs = kwargs.copy() + if 'password' in redacted_kwargs: + redacted_kwargs['password'] = '***REDACTED***' + if 'connection_string' in redacted_kwargs and 'password' in redacted_kwargs: + import re + redacted_kwargs['connection_string'] = re.sub(r':[^@]+@', ':***REDACTED***@', redacted_kwargs['connection_string']) + logging.info("Loaded OpenDAL configuration from yaml: %s", redacted_kwargs) return kwargs except Exception as e: logging.error("Failed to load OpenDAL configuration from yaml: %s", str(e)) From badf33e3b9939bce42011cece8338e496ab81f66 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 10 Dec 2025 19:13:37 +0800 Subject: [PATCH 3/4] feat: enhance OBConnection.search (#11876) ### What problem does this PR solve? Enhance OBConnection.search for better performance. Main changes: 1. Use string type of vector array in distance func for better parsing performance. 2. Manually set max_connections as pool size instead of using default value. 3. Set 'fulltext_search_columns' when starting. 4. Cache the results of the table existence check (we will never drop the table). 5. Remove unused 'group_results' logic. 6. Add the `USE_FULLTEXT_FIRST_FUSION_SEARCH` flag, and the corresponding fusion search SQL when it's false. ### Type of change - [x] Performance Improvement --- rag/utils/ob_conn.py | 266 ++++++++++++++++++++++++++----------------- 1 file changed, 164 insertions(+), 102 deletions(-) diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index 6218a8c4e..3c00be421 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -17,13 +17,16 @@ import json import logging import os import re +import threading import time from typing import Any, Optional +import numpy as np from elasticsearch_dsl import Q, Search from pydantic import BaseModel from pymysql.converters import escape_string from pyobvector import ObVecClient, FtsIndexParam, FtsParser, ARRAY, VECTOR +from pyobvector.client import ClusterVersionException from pyobvector.client.hybrid_search import HybridSearch from pyobvector.util import ObVersion from sqlalchemy import text, Column, String, Integer, JSON, Double, Row, Table @@ -106,17 +109,6 @@ index_columns: list[str] = [ "removed_kwd", ] -fulltext_search_columns: list[str] = [ - "docnm_kwd", - "content_with_weight", - "title_tks", - "title_sm_tks", - "important_tks", - "question_tks", - "content_ltks", - "content_sm_ltks" -] - fts_columns_origin: list[str] = [ "docnm_kwd^10", "content_with_weight", @@ -138,7 +130,7 @@ fulltext_index_name_template = "fts_idx_%s" # MATCH AGAINST: https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017607 fulltext_search_template = "MATCH (%s) AGAINST ('%s' IN NATURAL LANGUAGE MODE)" # cosine_distance: https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002012938 -vector_search_template = "cosine_distance(%s, %s)" +vector_search_template = "cosine_distance(%s, '%s')" class SearchResult(BaseModel): @@ -362,18 +354,28 @@ class OBConnection(DocStoreConnection): port = mysql_config.get("port", 2881) self.username = mysql_config.get("user", "root@test") self.password = mysql_config.get("password", "infini_rag_flow") + max_connections = mysql_config.get("max_connections", 300) else: logger.info("Use customized config to create OceanBase connection.") host = ob_config.get("host", "localhost") port = ob_config.get("port", 2881) self.username = ob_config.get("user", "root@test") self.password = ob_config.get("password", "infini_rag_flow") + max_connections = ob_config.get("max_connections", 300) self.db_name = ob_config.get("db_name", "test") self.uri = f"{host}:{port}" logger.info(f"Use OceanBase '{self.uri}' as the doc engine.") + # Set the maximum number of connections that can be created above the pool_size. + # By default, this is half of max_connections, but at least 10. + # This allows the pool to handle temporary spikes in demand without exhausting resources. + max_overflow = int(os.environ.get("OB_MAX_OVERFLOW", max(max_connections // 2, 10))) + # Set the number of seconds to wait before giving up when trying to get a connection from the pool. + # Default is 30 seconds, but can be overridden with the OB_POOL_TIMEOUT environment variable. + pool_timeout = int(os.environ.get("OB_POOL_TIMEOUT", "30")) + for _ in range(ATTEMPT_TIME): try: self.client = ObVecClient( @@ -383,6 +385,9 @@ class OBConnection(DocStoreConnection): db_name=self.db_name, pool_pre_ping=True, pool_recycle=3600, + pool_size=max_connections, + max_overflow=max_overflow, + pool_timeout=pool_timeout, ) break except Exception as e: @@ -398,6 +403,37 @@ class OBConnection(DocStoreConnection): self._check_ob_version() self._try_to_update_ob_query_timeout() + self.es = None + if self.enable_hybrid_search: + try: + self.es = HybridSearch( + uri=self.uri, + user=self.username, + password=self.password, + db_name=self.db_name, + pool_pre_ping=True, + pool_recycle=3600, + pool_size=max_connections, + max_overflow=max_overflow, + pool_timeout=pool_timeout, + ) + logger.info("OceanBase Hybrid Search feature is enabled") + except ClusterVersionException as e: + logger.info("Failed to initialize HybridSearch client, fallback to use SQL", exc_info=e) + self.es = None + + if self.es is not None and self.search_original_content: + logger.info("HybridSearch is enabled, forcing search_original_content to False") + self.search_original_content = False + # Determine which columns to use for full-text search dynamically: + # If HybridSearch is enabled (self.es is not None), we must use tokenized columns (fts_columns_tks) + # for compatibility and performance with HybridSearch. Otherwise, we use the original content columns + # (fts_columns_origin), which may be controlled by an environment variable. + self.fulltext_search_columns = fts_columns_origin if self.search_original_content else fts_columns_tks + + self._table_exists_cache: set[str] = set() + self._table_exists_cache_lock = threading.RLock() + logger.info(f"OceanBase {self.uri} is healthy.") def _check_ob_version(self): @@ -417,18 +453,6 @@ class OBConnection(DocStoreConnection): f"The version of OceanBase needs to be higher than or equal to 4.3.5.1, current version is {version_str}" ) - self.es = None - if not ob_version < ObVersion.from_db_version_nums(4, 4, 1, 0) and self.enable_hybrid_search: - self.es = HybridSearch( - uri=self.uri, - user=self.username, - password=self.password, - db_name=self.db_name, - pool_pre_ping=True, - pool_recycle=3600, - ) - logger.info("OceanBase Hybrid Search feature is enabled") - def _try_to_update_ob_query_timeout(self): try: val = self._get_variable_value("ob_query_timeout") @@ -455,9 +479,19 @@ class OBConnection(DocStoreConnection): return os.getenv(var, default).lower() in ['true', '1', 'yes', 'y'] self.enable_fulltext_search = is_true('ENABLE_FULLTEXT_SEARCH', 'true') + logger.info(f"ENABLE_FULLTEXT_SEARCH={self.enable_fulltext_search}") + self.use_fulltext_hint = is_true('USE_FULLTEXT_HINT', 'true') + logger.info(f"USE_FULLTEXT_HINT={self.use_fulltext_hint}") + self.search_original_content = is_true("SEARCH_ORIGINAL_CONTENT", 'true') + logger.info(f"SEARCH_ORIGINAL_CONTENT={self.search_original_content}") + self.enable_hybrid_search = is_true('ENABLE_HYBRID_SEARCH', 'false') + logger.info(f"ENABLE_HYBRID_SEARCH={self.enable_hybrid_search}") + + self.use_fulltext_first_fusion_search = is_true('USE_FULLTEXT_FIRST_FUSION_SEARCH', 'true') + logger.info(f"USE_FULLTEXT_FIRST_FUSION_SEARCH={self.use_fulltext_first_fusion_search}") """ Database operations @@ -478,6 +512,43 @@ class OBConnection(DocStoreConnection): return row[1] raise Exception(f"Variable '{var_name}' not found.") + def _check_table_exists_cached(self, table_name: str) -> bool: + """ + Check table existence with cache to reduce INFORMATION_SCHEMA queries under high concurrency. + Only caches when table exists. Does not cache when table does not exist. + Thread-safe implementation: read operations are lock-free (GIL-protected), + write operations are protected by RLock to ensure cache consistency. + + Args: + table_name: Table name + + Returns: + Whether the table exists with all required indexes and columns + """ + if table_name in self._table_exists_cache: + return True + + try: + if not self.client.check_table_exists(table_name): + return False + for column_name in index_columns: + if not self._index_exists(table_name, index_name_template % (table_name, column_name)): + return False + for fts_column in self.fulltext_search_columns: + column_name = fts_column.split("^")[0] + if not self._index_exists(table_name, fulltext_index_name_template % column_name): + return False + for column in [column_order_id, column_group_id]: + if not self._column_exist(table_name, column.name): + return False + except Exception as e: + raise Exception(f"OBConnection._check_table_exists_cached error: {str(e)}") + + with self._table_exists_cache_lock: + if table_name not in self._table_exists_cache: + self._table_exists_cache.add(table_name) + return True + """ Table operations """ @@ -500,8 +571,7 @@ class OBConnection(DocStoreConnection): process_func=lambda: self._add_index(indexName, column_name), ) - fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks - for fts_column in fts_columns: + for fts_column in self.fulltext_search_columns: column_name = fts_column.split("^")[0] _try_with_lock( lock_name=f"ob_add_fulltext_idx_{indexName}_{column_name}", @@ -546,24 +616,7 @@ class OBConnection(DocStoreConnection): raise Exception(f"OBConnection.deleteIndex error: {str(e)}") def indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool: - try: - if not self.client.check_table_exists(indexName): - return False - for column_name in index_columns: - if not self._index_exists(indexName, index_name_template % (indexName, column_name)): - return False - fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks - for fts_column in fts_columns: - column_name = fts_column.split("^")[0] - if not self._index_exists(indexName, fulltext_index_name_template % column_name): - return False - for column in [column_order_id, column_group_id]: - if not self._column_exist(indexName, column.name): - return False - except Exception as e: - raise Exception(f"OBConnection.indexExist error: {str(e)}") - - return True + return self._check_table_exists_cached(indexName) def _get_count(self, table_name: str, filter_list: list[str] = None) -> int: where_clause = "WHERE " + " AND ".join(filter_list) if len(filter_list) > 0 else "" @@ -853,10 +906,8 @@ class OBConnection(DocStoreConnection): fulltext_query = escape_string(fulltext_query.strip()) fulltext_topn = m.topn - fts_columns = fts_columns_origin if self.search_original_content else fts_columns_tks - # get fulltext match expression and weight values - for field in fts_columns: + for field in self.fulltext_search_columns: parts = field.split("^") column_name: str = parts[0] column_weight: float = float(parts[1]) if (len(parts) > 1 and parts[1]) else 1.0 @@ -885,7 +936,8 @@ class OBConnection(DocStoreConnection): fulltext_search_score_expr = f"({' + '.join(f'{expr} * {fulltext_search_weight.get(col, 0)}' for col, expr in fulltext_search_expr.items())})" if vector_data: - vector_search_expr = vector_search_template % (vector_column_name, vector_data) + vector_data_str = "[" + ",".join([str(np.float32(v)) for v in vector_data]) + "]" + vector_search_expr = vector_search_template % (vector_column_name, vector_data_str) # use (1 - cosine_distance) as score, which should be [-1, 1] # https://www.oceanbase.com/docs/common-oceanbase-database-standalone-1000000003577323 vector_search_score_expr = f"(1 - {vector_search_expr})" @@ -910,11 +962,15 @@ class OBConnection(DocStoreConnection): if search_type in ["fusion", "fulltext", "vector"] and "_score" not in output_fields: output_fields.append("_score") - group_results = kwargs.get("group_results", False) + if limit: + if vector_topn is not None: + limit = min(vector_topn, limit) + if fulltext_topn is not None: + limit = min(fulltext_topn, limit) for index_name in indexNames: - if not self.client.check_table_exists(index_name): + if not self._check_table_exists_cached(index_name): continue fulltext_search_hint = f"/*+ UNION_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" @@ -922,29 +978,7 @@ class OBConnection(DocStoreConnection): if search_type == "fusion": # fusion search, usually for chat num_candidates = vector_topn + fulltext_topn - if group_results: - count_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")," - f" scored_results AS (" - f" SELECT *" - f" FROM fulltext_results" - f" WHERE {vector_search_filter}" - f")," - f" group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" - f" FROM scored_results" - f")" - f" SELECT COUNT(*)" - f" FROM group_results" - f" WHERE rn = 1" - ) - else: + if self.use_fulltext_first_fusion_search: count_sql = ( f"WITH fulltext_results AS (" f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" @@ -955,6 +989,22 @@ class OBConnection(DocStoreConnection): f")" f" SELECT COUNT(*) FROM fulltext_results WHERE {vector_search_filter}" ) + else: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY {fulltext_search_score_expr}" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")" + f" SELECT COUNT(*) FROM fulltext_results f FULL OUTER JOIN vector_results v ON f.id = v.id" + ) logger.debug("OBConnection.search with count sql: %s", count_sql) start_time = time.time() @@ -976,32 +1026,8 @@ class OBConnection(DocStoreConnection): if total_count == 0: continue - score_expr = f"(relevance * {1 - vector_similarity_weight} + {vector_search_score_expr} * {vector_similarity_weight} + {pagerank_score_expr})" - if group_results: - fusion_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")," - f" scored_results AS (" - f" SELECT *, {score_expr} AS _score" - f" FROM fulltext_results" - f" WHERE {vector_search_filter}" - f")," - f" group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" - f" FROM scored_results" - f")" - f" SELECT {fields_expr}, _score" - f" FROM group_results" - f" WHERE rn = 1" - f" ORDER BY _score DESC" - f" LIMIT {offset}, {limit}" - ) - else: + if self.use_fulltext_first_fusion_search: + score_expr = f"(relevance * {1 - vector_similarity_weight} + {vector_search_score_expr} * {vector_similarity_weight} + {pagerank_score_expr})" fusion_sql = ( f"WITH fulltext_results AS (" f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" @@ -1016,6 +1042,38 @@ class OBConnection(DocStoreConnection): f" ORDER BY _score DESC" f" LIMIT {offset}, {limit}" ) + else: + pagerank_score_expr = f"(CAST(IFNULL(f.{PAGERANK_FLD}, 0) AS DECIMAL(10, 2)) / 100)" + score_expr = f"(f.relevance * {1 - vector_similarity_weight} + v.similarity * {vector_similarity_weight} + {pagerank_score_expr})" + fields_expr = ", ".join([f"t.{f} as {f}" for f in output_fields if f != "_score"]) + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id, pagerank_fea, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id, pagerank_fea, {vector_search_score_expr} AS similarity" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")," + f"combined_results AS (" + f" SELECT COALESCE(f.id, v.id) AS id, {score_expr} AS score" + f" FROM fulltext_results f" + f" FULL OUTER JOIN vector_results v" + f" ON f.id = v.id" + f")" + f" SELECT {fields_expr}, c.score as _score" + f" FROM combined_results c" + f" JOIN {index_name} t" + f" ON c.id = t.id" + f" ORDER BY score DESC" + f" LIMIT {offset}, {limit}" + ) logger.debug("OBConnection.search with fusion sql: %s", fusion_sql) start_time = time.time() @@ -1234,10 +1292,14 @@ class OBConnection(DocStoreConnection): for row in rows: result.chunks.append(self._row_to_entity(row, output_fields)) + + if result.total == 0: + result.total = len(result.chunks) + return result def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: - if not self.client.check_table_exists(indexName): + if not self._check_table_exists_cached(indexName): return None try: @@ -1336,7 +1398,7 @@ class OBConnection(DocStoreConnection): return res def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: - if not self.client.check_table_exists(indexName): + if not self._check_table_exists_cached(indexName): return True condition["kb_id"] = knowledgebaseId @@ -1387,7 +1449,7 @@ class OBConnection(DocStoreConnection): return False def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: - if not self.client.check_table_exists(indexName): + if not self._check_table_exists_cached(indexName): return 0 condition["kb_id"] = knowledgebaseId From 34d29d7e8b1b920ea13275ff66332bd27bc05f9c Mon Sep 17 00:00:00 2001 From: balibabu Date: Wed, 10 Dec 2025 19:13:57 +0800 Subject: [PATCH 4/4] Feat: Add configuration for webhook to the begin node. #10427 (#11875) ### What problem does this PR solve? Feat: Add configuration for webhook to the begin node. #10427 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- .../schema-editor/add-field-button.tsx | 5 +- .../components/schema-editor/context.ts | 9 + .../components/schema-editor/interface.ts | 1 + .../schema-editor/schema-property-editor.tsx | 4 + .../schema-editor/schema-visual-editor.tsx | 21 +- web/src/constants/agent.tsx | 16 ++ web/src/locales/en.ts | 31 +++ web/src/locales/zh.ts | 31 +++ .../agent/canvas/node/variable-display.tsx | 26 +-- web/src/pages/agent/constant/index.tsx | 35 +++ web/src/pages/agent/form/agent-form/index.tsx | 12 +- web/src/pages/agent/form/begin-form/index.tsx | 156 ++++++++----- .../form/begin-form/use-handle-mode-change.ts | 76 +++++++ .../begin-form/use-show-schema-dialog.tsx | 28 +++ .../agent/form/begin-form/use-watch-change.ts | 13 ++ .../agent/form/begin-form/webhook/auth.tsx | 139 ++++++++++++ .../begin-form/webhook/dynamic-response.tsx | 213 ++++++++++++++++++ .../agent/form/begin-form/webhook/index.tsx | 134 +++++++++++ .../form/begin-form/webhook/response.tsx | 30 +++ .../form/components/dynamic-string-form.tsx | 46 ++++ .../schema-dialog.tsx} | 12 +- .../schema-panel.tsx} | 2 +- .../hooks/use-build-structured-output.ts | 130 +++++++++-- .../pages/agent/hooks/use-get-begin-query.tsx | 4 +- web/src/pages/agent/utils.ts | 40 ++++ 25 files changed, 1097 insertions(+), 117 deletions(-) create mode 100644 web/src/components/jsonjoy-builder/components/schema-editor/context.ts create mode 100644 web/src/components/jsonjoy-builder/components/schema-editor/interface.ts create mode 100644 web/src/pages/agent/form/begin-form/use-handle-mode-change.ts create mode 100644 web/src/pages/agent/form/begin-form/use-show-schema-dialog.tsx create mode 100644 web/src/pages/agent/form/begin-form/webhook/auth.tsx create mode 100644 web/src/pages/agent/form/begin-form/webhook/dynamic-response.tsx create mode 100644 web/src/pages/agent/form/begin-form/webhook/index.tsx create mode 100644 web/src/pages/agent/form/begin-form/webhook/response.tsx create mode 100644 web/src/pages/agent/form/components/dynamic-string-form.tsx rename web/src/pages/agent/form/{agent-form/structured-output-dialog.tsx => components/schema-dialog.tsx} (81%) rename web/src/pages/agent/form/{agent-form/structured-output-panel.tsx => components/schema-panel.tsx} (78%) diff --git a/web/src/components/jsonjoy-builder/components/schema-editor/add-field-button.tsx b/web/src/components/jsonjoy-builder/components/schema-editor/add-field-button.tsx index 7a25705f9..fe06c1952 100644 --- a/web/src/components/jsonjoy-builder/components/schema-editor/add-field-button.tsx +++ b/web/src/components/jsonjoy-builder/components/schema-editor/add-field-button.tsx @@ -20,6 +20,7 @@ import { CirclePlus, HelpCircle, Info } from 'lucide-react'; import { useId, useState, type FC, type FormEvent } from 'react'; import { useTranslation } from '../../hooks/use-translation'; import type { NewField, SchemaType } from '../../types/json-schema'; +import { KeyInputProps } from './interface'; import SchemaTypeSelector from './schema-type-selector'; interface AddFieldButtonProps { @@ -27,9 +28,10 @@ interface AddFieldButtonProps { variant?: 'primary' | 'secondary'; } -const AddFieldButton: FC = ({ +const AddFieldButton: FC = ({ onAddField, variant = 'primary', + pattern, }) => { const [dialogOpen, setDialogOpen] = useState(false); const [fieldName, setFieldName] = useState(''); @@ -120,6 +122,7 @@ const AddFieldButton: FC = ({ placeholder={t.fieldNamePlaceholder} className="font-mono text-sm w-full" required + searchValue={pattern} /> diff --git a/web/src/components/jsonjoy-builder/components/schema-editor/context.ts b/web/src/components/jsonjoy-builder/components/schema-editor/context.ts new file mode 100644 index 000000000..3fbb14a26 --- /dev/null +++ b/web/src/components/jsonjoy-builder/components/schema-editor/context.ts @@ -0,0 +1,9 @@ +import React, { useContext } from 'react'; +import { KeyInputProps } from './interface'; + +export const KeyInputContext = React.createContext({}); + +export function useInputPattern() { + const x = useContext(KeyInputContext); + return x.pattern; +} diff --git a/web/src/components/jsonjoy-builder/components/schema-editor/interface.ts b/web/src/components/jsonjoy-builder/components/schema-editor/interface.ts new file mode 100644 index 000000000..39e74a641 --- /dev/null +++ b/web/src/components/jsonjoy-builder/components/schema-editor/interface.ts @@ -0,0 +1 @@ +export type KeyInputProps = { pattern?: RegExp | string }; diff --git a/web/src/components/jsonjoy-builder/components/schema-editor/schema-property-editor.tsx b/web/src/components/jsonjoy-builder/components/schema-editor/schema-property-editor.tsx index f95031e9c..347d69d26 100644 --- a/web/src/components/jsonjoy-builder/components/schema-editor/schema-property-editor.tsx +++ b/web/src/components/jsonjoy-builder/components/schema-editor/schema-property-editor.tsx @@ -16,6 +16,7 @@ import { withObjectSchema, } from '../../types/json-schema'; import type { ValidationTreeNode } from '../../types/validation'; +import { useInputPattern } from './context'; import TypeDropdown from './type-dropdown'; import TypeEditor from './type-editor'; @@ -54,6 +55,8 @@ export const SchemaPropertyEditor: React.FC = ({ 'object' as SchemaType, ); + const pattern = useInputPattern(); + // Update temp values when props change useEffect(() => { setTempName(name); @@ -123,6 +126,7 @@ export const SchemaPropertyEditor: React.FC = ({ className="h-8 text-sm font-medium min-w-[120px] max-w-full z-10" autoFocus onFocus={(e) => e.target.select()} + searchValue={pattern} /> ) : ( - + )} {structuredOutputDialogVisible && ( - + > )} ); diff --git a/web/src/pages/agent/form/begin-form/index.tsx b/web/src/pages/agent/form/begin-form/index.tsx index ad4eb9d3e..c86f24cac 100644 --- a/web/src/pages/agent/form/begin-form/index.tsx +++ b/web/src/pages/agent/form/begin-form/index.tsx @@ -12,6 +12,7 @@ import { RAGFlowSelect } from '@/components/ui/select'; import { Switch } from '@/components/ui/switch'; import { Textarea } from '@/components/ui/textarea'; import { FormTooltip } from '@/components/ui/tooltip'; +import { WebhookAlgorithmList } from '@/constants/agent'; import { zodResolver } from '@hookform/resolvers/zod'; import { t } from 'i18next'; import { Plus } from 'lucide-react'; @@ -24,37 +25,71 @@ import { INextOperatorForm } from '../../interface'; import { ParameterDialog } from './parameter-dialog'; import { QueryTable } from './query-table'; import { useEditQueryRecord } from './use-edit-query'; +import { useHandleModeChange } from './use-handle-mode-change'; import { useValues } from './use-values'; import { useWatchFormChange } from './use-watch-change'; +import { WebHook } from './webhook'; const ModeOptions = [ { value: AgentDialogueMode.Conversational, label: t('flow.conversational') }, { value: AgentDialogueMode.Task, label: t('flow.task') }, + { value: AgentDialogueMode.Webhook, label: t('flow.webhook.name') }, ]; +const FormSchema = z.object({ + enablePrologue: z.boolean().optional(), + prologue: z.string().trim().optional(), + mode: z.string(), + inputs: z + .array( + z.object({ + key: z.string(), + type: z.string(), + value: z.string(), + optional: z.boolean(), + name: z.string(), + options: z.array(z.union([z.number(), z.string(), z.boolean()])), + }), + ) + .optional(), + methods: z.string().optional(), + content_types: z.string().optional(), + security: z + .object({ + auth_type: z.string(), + ip_whitelist: z.array(z.object({ value: z.string() })), + rate_limit: z.object({ + limit: z.number(), + per: z.string().optional(), + }), + max_body_size: z.string(), + jwt: z + .object({ + algorithm: z.string().default(WebhookAlgorithmList[0]).optional(), + }) + .optional(), + }) + .optional(), + schema: z.record(z.any()).optional(), + response: z + .object({ + status: z.number(), + headers_template: z.array( + z.object({ key: z.string(), value: z.string() }), + ), + body_template: z.array(z.object({ key: z.string(), value: z.string() })), + }) + .optional(), + execution_mode: z.string().optional(), +}); + +export type BeginFormSchemaType = z.infer; + function BeginForm({ node }: INextOperatorForm) { const { t } = useTranslation(); const values = useValues(node); - const FormSchema = z.object({ - enablePrologue: z.boolean().optional(), - prologue: z.string().trim().optional(), - mode: z.string(), - inputs: z - .array( - z.object({ - key: z.string(), - type: z.string(), - value: z.string(), - optional: z.boolean(), - name: z.string(), - options: z.array(z.union([z.number(), z.string(), z.boolean()])), - }), - ) - .optional(), - }); - const form = useForm({ defaultValues: values, resolver: zodResolver(FormSchema), @@ -72,6 +107,8 @@ function BeginForm({ node }: INextOperatorForm) { const previousModeRef = useRef(mode); + const { handleModeChange } = useHandleModeChange(form); + useEffect(() => { if ( previousModeRef.current === AgentDialogueMode.Task && @@ -111,6 +148,10 @@ function BeginForm({ node }: INextOperatorForm) { placeholder={t('common.pleaseSelect')} options={ModeOptions} {...field} + onChange={(val) => { + handleModeChange(val); + field.onChange(val); + }} > @@ -158,44 +199,49 @@ function BeginForm({ node }: INextOperatorForm) { )} /> )} - {/* Create a hidden field to make Form instance record this */} -
} - /> - - {t('flow.input')} - - - } - rightContent={ - + } > - - - } - > - - - {visible && ( - + + + {visible && ( + + )} + )} diff --git a/web/src/pages/agent/form/begin-form/use-handle-mode-change.ts b/web/src/pages/agent/form/begin-form/use-handle-mode-change.ts new file mode 100644 index 000000000..e85ed5a6e --- /dev/null +++ b/web/src/pages/agent/form/begin-form/use-handle-mode-change.ts @@ -0,0 +1,76 @@ +import { useCallback } from 'react'; +import { UseFormReturn } from 'react-hook-form'; +import { + AgentDialogueMode, + RateLimitPerList, + WebhookExecutionMode, + WebhookMaxBodySize, + WebhookSecurityAuthType, +} from '../../constant'; + +// const WebhookSchema = { +// query: { +// type: 'object', +// required: [], +// properties: { +// // debug: { type: 'boolean' }, +// // event: { type: 'string' }, +// }, +// }, + +// headers: { +// type: 'object', +// required: [], +// properties: { +// // 'X-Trace-ID': { type: 'string' }, +// }, +// }, + +// body: { +// type: 'object', +// required: [], +// properties: { +// id: { type: 'string' }, +// payload: { type: 'object' }, +// }, +// }, +// }; + +const schema = { + properties: { + query: { + type: 'object', + description: '', + }, + headers: { + type: 'object', + description: '', + }, + body: { + type: 'object', + description: '', + }, + }, +}; + +const initialFormValuesMap = { + schema: schema, + 'security.auth_type': WebhookSecurityAuthType.Basic, + 'security.rate_limit.per': RateLimitPerList[0], + 'security.max_body_size': WebhookMaxBodySize[0], + execution_mode: WebhookExecutionMode.Immediately, +}; + +export function useHandleModeChange(form: UseFormReturn) { + const handleModeChange = useCallback( + (mode: AgentDialogueMode) => { + if (mode === AgentDialogueMode.Webhook) { + Object.entries(initialFormValuesMap).forEach(([key, value]) => { + form.setValue(key, value, { shouldDirty: true }); + }); + } + }, + [form], + ); + return { handleModeChange }; +} diff --git a/web/src/pages/agent/form/begin-form/use-show-schema-dialog.tsx b/web/src/pages/agent/form/begin-form/use-show-schema-dialog.tsx new file mode 100644 index 000000000..0bc6261e5 --- /dev/null +++ b/web/src/pages/agent/form/begin-form/use-show-schema-dialog.tsx @@ -0,0 +1,28 @@ +import { JSONSchema } from '@/components/jsonjoy-builder'; +import { useSetModalState } from '@/hooks/common-hooks'; +import { useCallback } from 'react'; +import { UseFormReturn } from 'react-hook-form'; + +export function useShowSchemaDialog(form: UseFormReturn) { + const { + visible: schemaDialogVisible, + showModal: showSchemaDialog, + hideModal: hideSchemaDialog, + } = useSetModalState(); + + const handleSchemaDialogOk = useCallback( + (values: JSONSchema) => { + // Sync data to canvas + form.setValue('schema', values); + hideSchemaDialog(); + }, + [form, hideSchemaDialog], + ); + + return { + schemaDialogVisible, + showSchemaDialog, + hideSchemaDialog, + handleSchemaDialogOk, + }; +} diff --git a/web/src/pages/agent/form/begin-form/use-watch-change.ts b/web/src/pages/agent/form/begin-form/use-watch-change.ts index f0da58068..02158e969 100644 --- a/web/src/pages/agent/form/begin-form/use-watch-change.ts +++ b/web/src/pages/agent/form/begin-form/use-watch-change.ts @@ -1,6 +1,7 @@ import { omit } from 'lodash'; import { useEffect } from 'react'; import { UseFormReturn, useWatch } from 'react-hook-form'; +import { AgentDialogueMode } from '../../constant'; import { BeginQuery } from '../../interface'; import useGraphStore from '../../store'; @@ -20,9 +21,21 @@ export function useWatchFormChange(id?: string, form?: UseFormReturn) { if (id) { values = form?.getValues() || {}; + let outputs: Record = {}; + + // For webhook mode, use schema properties as direct outputs + // Each property (body, headers, query) should be able to show secondary menu + if ( + values.mode === AgentDialogueMode.Webhook && + values.schema?.properties + ) { + outputs = { ...values.schema.properties }; + } + const nextValues = { ...values, inputs: transferInputsArrayToObject(values.inputs), + outputs, }; updateNodeForm(id, nextValues); diff --git a/web/src/pages/agent/form/begin-form/webhook/auth.tsx b/web/src/pages/agent/form/begin-form/webhook/auth.tsx new file mode 100644 index 000000000..4a739b491 --- /dev/null +++ b/web/src/pages/agent/form/begin-form/webhook/auth.tsx @@ -0,0 +1,139 @@ +import { SelectWithSearch } from '@/components/originui/select-with-search'; +import { RAGFlowFormItem } from '@/components/ragflow-form'; +import { Input } from '@/components/ui/input'; +import { WebhookAlgorithmList } from '@/constants/agent'; +import { WebhookSecurityAuthType } from '@/pages/agent/constant'; +import { buildOptions } from '@/utils/form'; +import { useCallback } from 'react'; +import { useFormContext, useWatch } from 'react-hook-form'; +import { useTranslation } from 'react-i18next'; + +const AlgorithmOptions = buildOptions(WebhookAlgorithmList); + +const RequiredClaimsOptions = buildOptions(['exp', 'sub']); + +export function Auth() { + const { t } = useTranslation(); + const form = useFormContext(); + + const authType = useWatch({ + name: 'security.auth_type', + control: form.control, + }); + + const renderTokenAuth = useCallback( + () => ( + <> + + + + + + + + ), + [t], + ); + + const renderBasicAuth = useCallback( + () => ( + <> + + + + + + + + ), + [t], + ); + + const renderJwtAuth = useCallback( + () => ( + <> + + + + + + + + + + + + + + + + + ), + [t], + ); + + const renderHmacAuth = useCallback( + () => ( + <> + + + + + + + + + + + ), + [t], + ); + + const AuthMap = { + [WebhookSecurityAuthType.Token]: renderTokenAuth, + [WebhookSecurityAuthType.Basic]: renderBasicAuth, + [WebhookSecurityAuthType.Jwt]: renderJwtAuth, + [WebhookSecurityAuthType.Hmac]: renderHmacAuth, + [WebhookSecurityAuthType.None]: () => null, + }; + + return AuthMap[ + (authType ?? WebhookSecurityAuthType.None) as WebhookSecurityAuthType + ](); +} diff --git a/web/src/pages/agent/form/begin-form/webhook/dynamic-response.tsx b/web/src/pages/agent/form/begin-form/webhook/dynamic-response.tsx new file mode 100644 index 000000000..18030feff --- /dev/null +++ b/web/src/pages/agent/form/begin-form/webhook/dynamic-response.tsx @@ -0,0 +1,213 @@ +import { BoolSegmented } from '@/components/bool-segmented'; +import { KeyInput } from '@/components/key-input'; +import { SelectWithSearch } from '@/components/originui/select-with-search'; +import { RAGFlowFormItem } from '@/components/ragflow-form'; +import { useIsDarkTheme } from '@/components/theme-provider'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Separator } from '@/components/ui/separator'; +import { Textarea } from '@/components/ui/textarea'; +import { Editor, loader } from '@monaco-editor/react'; +import { X } from 'lucide-react'; +import { ReactNode, useCallback } from 'react'; +import { useFieldArray, useFormContext } from 'react-hook-form'; +import { InputMode, TypesWithArray } from '../../../constant'; +import { + InputModeOptions, + buildConversationVariableSelectOptions, +} from '../../../utils'; +import { DynamicFormHeader } from '../../components/dynamic-fom-header'; +import { QueryVariable } from '../../components/query-variable'; + +loader.config({ paths: { vs: '/vs' } }); + +type SelectKeysProps = { + name: string; + label: ReactNode; + tooltip?: string; + keyField?: string; + valueField?: string; + operatorField?: string; + nodeId?: string; +}; + +const VariableTypeOptions = buildConversationVariableSelectOptions(); + +const modeField = 'input_mode'; + +const ConstantValueMap = { + [TypesWithArray.Boolean]: true, + [TypesWithArray.Number]: 0, + [TypesWithArray.String]: '', + [TypesWithArray.ArrayBoolean]: '[]', + [TypesWithArray.ArrayNumber]: '[]', + [TypesWithArray.ArrayString]: '[]', + [TypesWithArray.ArrayObject]: '[]', + [TypesWithArray.Object]: '{}', +}; + +export function DynamicResponse({ + name, + label, + tooltip, + keyField = 'key', + valueField = 'value', + operatorField = 'type', +}: SelectKeysProps) { + const form = useFormContext(); + const isDarkTheme = useIsDarkTheme(); + + const { fields, remove, append } = useFieldArray({ + name: name, + control: form.control, + }); + + const initializeValue = useCallback( + (mode: string, variableType: string, valueFieldAlias: string) => { + if (mode === InputMode.Variable) { + form.setValue(valueFieldAlias, '', { shouldDirty: true }); + } else { + const val = ConstantValueMap[variableType as TypesWithArray]; + form.setValue(valueFieldAlias, val, { shouldDirty: true }); + } + }, + [form], + ); + + const handleModeChange = useCallback( + (mode: string, valueFieldAlias: string, operatorFieldAlias: string) => { + const variableType = form.getValues(operatorFieldAlias); + initializeValue(mode, variableType, valueFieldAlias); + }, + [form, initializeValue], + ); + + const handleVariableTypeChange = useCallback( + (variableType: string, valueFieldAlias: string, modeFieldAlias: string) => { + const mode = form.getValues(modeFieldAlias); + + initializeValue(mode, variableType, valueFieldAlias); + }, + [form, initializeValue], + ); + + const renderParameter = useCallback( + (operatorFieldName: string, modeFieldName: string) => { + const mode = form.getValues(modeFieldName); + const logicalOperator = form.getValues(operatorFieldName); + + if (mode === InputMode.Constant) { + if (logicalOperator === TypesWithArray.Boolean) { + return ; + } + + if (logicalOperator === TypesWithArray.Number) { + return ; + } + + if (logicalOperator === TypesWithArray.String) { + return ; + } + + return ( + + ); + } + + return ( + + ); + }, + [form, isDarkTheme], + ); + + return ( +
+ + append({ + [keyField]: '', + [valueField]: '', + [modeField]: InputMode.Constant, + [operatorField]: TypesWithArray.String, + }) + } + > +
+ {fields.map((field, index) => { + const keyFieldAlias = `${name}.${index}.${keyField}`; + const valueFieldAlias = `${name}.${index}.${valueField}`; + const operatorFieldAlias = `${name}.${index}.${operatorField}`; + const modeFieldAlias = `${name}.${index}.${modeField}`; + + return ( +
+
+
+ + + + + + {(field) => ( + { + handleVariableTypeChange( + val, + valueFieldAlias, + modeFieldAlias, + ); + field.onChange(val); + }} + options={VariableTypeOptions} + > + )} + + + + {(field) => ( + { + handleModeChange( + val, + valueFieldAlias, + operatorFieldAlias, + ); + field.onChange(val); + }} + options={InputModeOptions} + > + )} + +
+ + {renderParameter(operatorFieldAlias, modeFieldAlias)} + +
+ + +
+ ); + })} +
+
+ ); +} diff --git a/web/src/pages/agent/form/begin-form/webhook/index.tsx b/web/src/pages/agent/form/begin-form/webhook/index.tsx new file mode 100644 index 000000000..86e844b07 --- /dev/null +++ b/web/src/pages/agent/form/begin-form/webhook/index.tsx @@ -0,0 +1,134 @@ +import { Collapse } from '@/components/collapse'; +import { SelectWithSearch } from '@/components/originui/select-with-search'; +import { RAGFlowFormItem } from '@/components/ragflow-form'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Separator } from '@/components/ui/separator'; +import { Textarea } from '@/components/ui/textarea'; +import { buildOptions } from '@/utils/form'; +import { useFormContext, useWatch } from 'react-hook-form'; +import { useTranslation } from 'react-i18next'; +import { + RateLimitPerList, + WebhookContentType, + WebhookExecutionMode, + WebhookMaxBodySize, + WebhookMethod, + WebhookSecurityAuthType, +} from '../../../constant'; +import { DynamicStringForm } from '../../components/dynamic-string-form'; +import { SchemaDialog } from '../../components/schema-dialog'; +import { SchemaPanel } from '../../components/schema-panel'; +import { useShowSchemaDialog } from '../use-show-schema-dialog'; +import { Auth } from './auth'; +import { WebhookResponse } from './response'; + +const RateLimitPerOptions = buildOptions(RateLimitPerList); + +export function WebHook() { + const { t } = useTranslation(); + const form = useFormContext(); + + const executionMode = useWatch({ + control: form.control, + name: 'execution_mode', + }); + + const { + showSchemaDialog, + schemaDialogVisible, + hideSchemaDialog, + handleSchemaDialogOk, + } = useShowSchemaDialog(form); + + const schema = form.getValues('schema'); + + return ( + <> + + + + + + + Security}> +
+ + + + + + + + + + + + + + +
+
+ + + + + + + {executionMode === WebhookExecutionMode.Immediately && ( + + )} + +
+ Schema + +
+ + {schemaDialogVisible && ( + + )} + + ); +} diff --git a/web/src/pages/agent/form/begin-form/webhook/response.tsx b/web/src/pages/agent/form/begin-form/webhook/response.tsx new file mode 100644 index 000000000..a50d212e0 --- /dev/null +++ b/web/src/pages/agent/form/begin-form/webhook/response.tsx @@ -0,0 +1,30 @@ +import { Collapse } from '@/components/collapse'; +import { RAGFlowFormItem } from '@/components/ragflow-form'; +import { Input } from '@/components/ui/input'; +import { useTranslation } from 'react-i18next'; +import { DynamicResponse } from './dynamic-response'; + +export function WebhookResponse() { + const { t } = useTranslation(); + + return ( + Response}> +
+ + + + + +
+
+ ); +} diff --git a/web/src/pages/agent/form/components/dynamic-string-form.tsx b/web/src/pages/agent/form/components/dynamic-string-form.tsx new file mode 100644 index 000000000..224e92310 --- /dev/null +++ b/web/src/pages/agent/form/components/dynamic-string-form.tsx @@ -0,0 +1,46 @@ +import { RAGFlowFormItem } from '@/components/ragflow-form'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Trash2 } from 'lucide-react'; +import { useFieldArray, useFormContext } from 'react-hook-form'; +import { DynamicFormHeader, FormListHeaderProps } from './dynamic-fom-header'; + +type DynamicStringFormProps = { name: string } & FormListHeaderProps; +export function DynamicStringForm({ name, label }: DynamicStringFormProps) { + const form = useFormContext(); + + const { fields, append, remove } = useFieldArray({ + name: name, + control: form.control, + }); + + return ( +
+ append({ value: '' })} + > +
+ {fields.map((field, index) => ( +
+ + + + +
+ ))} +
+
+ ); +} diff --git a/web/src/pages/agent/form/agent-form/structured-output-dialog.tsx b/web/src/pages/agent/form/components/schema-dialog.tsx similarity index 81% rename from web/src/pages/agent/form/agent-form/structured-output-dialog.tsx rename to web/src/pages/agent/form/components/schema-dialog.tsx index 6ce305bff..4d67e00c0 100644 --- a/web/src/pages/agent/form/agent-form/structured-output-dialog.tsx +++ b/web/src/pages/agent/form/components/schema-dialog.tsx @@ -3,6 +3,7 @@ import { JsonSchemaVisualizer, SchemaVisualEditor, } from '@/components/jsonjoy-builder'; +import { KeyInputProps } from '@/components/jsonjoy-builder/components/schema-editor/interface'; import { Button } from '@/components/ui/button'; import { Dialog, @@ -16,11 +17,12 @@ import { IModalProps } from '@/interfaces/common'; import { useCallback, useState } from 'react'; import { useTranslation } from 'react-i18next'; -export function StructuredOutputDialog({ +export function SchemaDialog({ hideModal, onOk, initialValues, -}: IModalProps) { + pattern, +}: IModalProps & KeyInputProps) { const { t } = useTranslation(); const [schema, setSchema] = useState(initialValues); @@ -36,7 +38,11 @@ export function StructuredOutputDialog({
- +
diff --git a/web/src/pages/agent/form/agent-form/structured-output-panel.tsx b/web/src/pages/agent/form/components/schema-panel.tsx similarity index 78% rename from web/src/pages/agent/form/agent-form/structured-output-panel.tsx rename to web/src/pages/agent/form/components/schema-panel.tsx index 64e13c6eb..e76ff726e 100644 --- a/web/src/pages/agent/form/agent-form/structured-output-panel.tsx +++ b/web/src/pages/agent/form/components/schema-panel.tsx @@ -1,6 +1,6 @@ import { JSONSchema, JsonSchemaVisualizer } from '@/components/jsonjoy-builder'; -export function StructuredOutputPanel({ value }: { value: JSONSchema }) { +export function SchemaPanel({ value }: { value: JSONSchema }) { return (
state); + const { getOperatorTypeFromId, getNode } = useGraphStore((state) => state); const showSecondaryMenu = useCallback( (value: string, outputLabel: string) => { const nodeId = getNodeId(value); - return ( - getOperatorTypeFromId(nodeId) === Operator.Agent && + const operatorType = getOperatorTypeFromId(nodeId); + + // For Agent nodes, show secondary menu for 'structured' field + if ( + operatorType === Operator.Agent && outputLabel === AgentStructuredOutputField - ); + ) { + return true; + } + + // For Begin nodes in webhook mode, show secondary menu for schema properties (body, headers, query, etc.) + if (operatorType === Operator.Begin) { + const node = getNode(nodeId); + const mode = get(node, 'data.form.mode'); + if (mode === AgentDialogueMode.Webhook) { + // Check if this output field is from the schema + const outputs = get(node, 'data.form.outputs', {}); + const outputField = outputs[outputLabel]; + // Show secondary menu if the field is an object or has properties + return ( + outputField && + (outputField.type === 'object' || + (outputField.properties && + Object.keys(outputField.properties).length > 0)) + ); + } + } + + return false; }, - [getOperatorTypeFromId], + [getOperatorTypeFromId, getNode], ); return showSecondaryMenu; } +function useGetBeginOutputsOrSchema() { + const { getNode } = useGraphStore((state) => state); + + const getBeginOutputs = useCallback(() => { + const node = getNode(BeginId); + const outputs = get(node, 'data.form.outputs', {}); + return outputs; + }, [getNode]); + + const getBeginSchema = useCallback(() => { + const node = getNode(BeginId); + const outputs = get(node, 'data.form.schema', {}); + return outputs; + }, [getNode]); + + return { getBeginOutputs, getBeginSchema }; +} export function useGetStructuredOutputByValue() { - const { getNode } = useGraphStore((state) => state); + const { getNode, getOperatorTypeFromId } = useGraphStore((state) => state); + + const { getBeginOutputs } = useGetBeginOutputsOrSchema(); const getStructuredOutput = useCallback( (value: string) => { - const node = getNode(getNodeId(value)); - const structuredOutput = get( - node, - `data.form.outputs.${AgentStructuredOutputField}`, - ); + const nodeId = getNodeId(value); + const node = getNode(nodeId); + const operatorType = getOperatorTypeFromId(nodeId); + const fields = splitValue(value); + const outputLabel = fields.at(1); + + let structuredOutput; + if (operatorType === Operator.Agent) { + structuredOutput = get( + node, + `data.form.outputs.${AgentStructuredOutputField}`, + ); + } else if (operatorType === Operator.Begin) { + // For Begin nodes in webhook mode, get the specific schema property + const outputs = getBeginOutputs(); + if (outputLabel) { + structuredOutput = outputs[outputLabel]; + } + } return structuredOutput; }, - [getNode], + [getBeginOutputs, getNode, getOperatorTypeFromId], ); return getStructuredOutput; @@ -66,13 +126,14 @@ export function useFindAgentStructuredOutputLabel() { icon?: ReactNode; }>, ) => { - // agent structured output const fields = splitValue(value); + const operatorType = getOperatorTypeFromId(fields.at(0)); + + // Handle Agent structured fields if ( - getOperatorTypeFromId(fields.at(0)) === Operator.Agent && + operatorType === Operator.Agent && fields.at(1)?.startsWith(AgentStructuredOutputField) ) { - // is agent structured output const agentOption = options.find((x) => value.includes(x.value)); const jsonSchemaFields = fields .at(1) @@ -84,6 +145,19 @@ export function useFindAgentStructuredOutputLabel() { value: value, }; } + + // Handle Begin webhook fields + if (operatorType === Operator.Begin && fields.at(1)) { + const fieldOption = options + .filter((x) => x.parentLabel === BeginId) + .find((x) => value.startsWith(x.value)); + + return { + ...fieldOption, + label: fields.at(1), + value: value, + }; + } }, [getOperatorTypeFromId], ); @@ -94,6 +168,7 @@ export function useFindAgentStructuredOutputLabel() { export function useFindAgentStructuredOutputTypeByValue() { const { getOperatorTypeFromId } = useGraphStore((state) => state); const filterStructuredOutput = useGetStructuredOutputByValue(); + const { getBeginSchema } = useGetBeginOutputsOrSchema(); const findTypeByValue = useCallback( ( @@ -136,10 +211,12 @@ export function useFindAgentStructuredOutputTypeByValue() { } const fields = splitValue(value); const nodeId = fields.at(0); + const operatorType = getOperatorTypeFromId(nodeId); const jsonSchema = filterStructuredOutput(value); + // Handle Agent structured fields if ( - getOperatorTypeFromId(nodeId) === Operator.Agent && + operatorType === Operator.Agent && fields.at(1)?.startsWith(AgentStructuredOutputField) ) { const jsonSchemaFields = fields @@ -151,13 +228,32 @@ export function useFindAgentStructuredOutputTypeByValue() { return type; } } + + // Handle Begin webhook fields (body, headers, query, etc.) + if (operatorType === Operator.Begin) { + const outputLabel = fields.at(1); + const schema = getBeginSchema(); + if (outputLabel && schema) { + const jsonSchemaFields = fields.at(1); + if (jsonSchemaFields) { + const type = findTypeByValue(schema, jsonSchemaFields); + return type; + } + } + } }, - [filterStructuredOutput, findTypeByValue, getOperatorTypeFromId], + [ + filterStructuredOutput, + findTypeByValue, + getBeginSchema, + getOperatorTypeFromId, + ], ); return findAgentStructuredOutputTypeByValue; } +// TODO: Consider merging with useFindAgentStructuredOutputLabel export function useFindAgentStructuredOutputLabelByValue() { const { getNode } = useGraphStore((state) => state); diff --git a/web/src/pages/agent/hooks/use-get-begin-query.tsx b/web/src/pages/agent/hooks/use-get-begin-query.tsx index 387f59821..46825e5a4 100644 --- a/web/src/pages/agent/hooks/use-get-begin-query.tsx +++ b/web/src/pages/agent/hooks/use-get-begin-query.tsx @@ -314,10 +314,12 @@ export function useFilterQueryVariableOptionsByTypes({ ? toLower(y.type).includes(toLower(x)) : toLower(y.type) === toLower(x), ) || + // agent structured output isAgentStructured( y.value, y.value.slice(-AgentStructuredOutputField.length), - ), // agent structured output + ) || + y.value.startsWith(BeginId), // begin node outputs ), }; }) diff --git a/web/src/pages/agent/utils.ts b/web/src/pages/agent/utils.ts index 6825dd9f5..592d92e45 100644 --- a/web/src/pages/agent/utils.ts +++ b/web/src/pages/agent/utils.ts @@ -24,6 +24,7 @@ import { import pipe from 'lodash/fp/pipe'; import isObject from 'lodash/isObject'; import { + AgentDialogueMode, CategorizeAnchorPointPositions, FileType, FileTypeSuffixMap, @@ -34,6 +35,7 @@ import { Operator, TypesWithArray, } from './constant'; +import { BeginFormSchemaType } from './form/begin-form'; import { DataOperationsFormSchemaType } from './form/data-operations-form'; import { ExtractorFormSchemaType } from './form/extractor-form'; import { HierarchicalMergerFormSchemaType } from './form/hierarchical-merger-form'; @@ -312,6 +314,41 @@ function transformDataOperationsParams(params: DataOperationsFormSchemaType) { }; } +export function transformArrayToObject( + list?: Array<{ key: string; value: string }>, +) { + if (!Array.isArray(list)) return {}; + return list?.reduce>((pre, cur) => { + if (cur.key) { + pre[cur.key] = cur.value; + } + return pre; + }, {}); +} + +function transformBeginParams(params: BeginFormSchemaType) { + if (params.mode === AgentDialogueMode.Webhook) { + return { + ...params, + security: { + ...params.security, + ip_whitelist: params.security?.ip_whitelist.map((x) => x.value), + }, + response: { + ...params.response, + headers_template: transformArrayToObject( + params.response?.headers_template, + ), + body_template: transformArrayToObject(params.response?.body_template), + }, + }; + } + + return { + ...params, + }; +} + // construct a dsl based on the node information of the graph export const buildDslComponentsByGraph = ( nodes: RAGFlowNodeType[], @@ -361,6 +398,9 @@ export const buildDslComponentsByGraph = ( case Operator.DataOperations: params = transformDataOperationsParams(params); break; + case Operator.Begin: + params = transformBeginParams(params); + break; default: break; }