From 3d2e0f1a1b7fb9410221cf382efede124d3d5645 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Fri, 28 Nov 2025 17:09:58 +0800 Subject: [PATCH 1/5] fix: tolerate null mergeable status in tests workflow --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4be4b06be..a5a44a29c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,7 +31,7 @@ jobs: name: ragflow_tests # https://docs.github.com/en/actions/using-jobs/using-conditions-to-control-job-execution # https://github.com/orgs/community/discussions/26261 - if: ${{ github.event_name != 'pull_request_target' || (contains(github.event.pull_request.labels.*.name, 'ci') && github.event.pull_request.mergeable == true) }} + if: ${{ github.event_name != 'pull_request_target' || (contains(github.event.pull_request.labels.*.name, 'ci') && github.event.pull_request.mergeable != false) }} runs-on: [ "self-hosted", "ragflow-test" ] steps: # https://github.com/hmarr/debug-action From ccce8beeeb6d2b99db280977b9cef07d44a38447 Mon Sep 17 00:00:00 2001 From: balibabu Date: Fri, 28 Nov 2025 17:15:01 +0800 Subject: [PATCH 2/5] Feat: Replace antd in the chat message with shadcn. #10427 (#11590) ### What problem does this PR solve? Feat: Replace antd in the chat message with shadcn. #10427 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- web/src/components/floating-chat-widget.tsx | 6 +- web/src/components/markdown-content/index.tsx | 49 +++++++++------ web/src/components/message-item/index.tsx | 61 ++++++++++--------- web/src/components/pdf-drawer/index.tsx | 38 +++++++----- web/src/pages/agent/chat/box.tsx | 6 +- web/src/pages/agent/share/index.tsx | 6 +- .../chat/chat-box/multiple-chat-box.tsx | 6 +- .../chat/chat-box/single-chat-box.tsx | 6 +- web/src/pages/next-chats/share/index.tsx | 6 +- 9 files changed, 101 insertions(+), 83 deletions(-) diff --git a/web/src/components/floating-chat-widget.tsx b/web/src/components/floating-chat-widget.tsx index 51aab028a..c31cb41bd 100644 --- a/web/src/components/floating-chat-widget.tsx +++ b/web/src/components/floating-chat-widget.tsx @@ -1,4 +1,4 @@ -import PdfDrawer from '@/components/pdf-drawer'; +import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; import { MessageType } from '@/constants/chat'; import { useFetchExternalChatInfo } from '@/hooks/use-chat-request'; @@ -494,7 +494,7 @@ const FloatingChatWidget = () => { - { )} - Number(match); @@ -145,20 +149,20 @@ const MarkdownContent = ({ return (
{imageId && ( - + + + + - } - > - - + + )}
{documentId && ( - +
{fileThumbnail ? ( )} - +
)}
@@ -228,9 +232,14 @@ const MarkdownContent = ({ } > ) : ( - - - + + + + + + {getPopoverContent(chunkIndex)} + + ); }); diff --git a/web/src/components/message-item/index.tsx b/web/src/components/message-item/index.tsx index ff5b08b7d..dbd38aef4 100644 --- a/web/src/components/message-item/index.tsx +++ b/web/src/components/message-item/index.tsx @@ -14,10 +14,10 @@ import { } from '@/hooks/document-hooks'; import { IRegenerateMessage, IRemoveMessageById } from '@/hooks/logic-hooks'; import { cn } from '@/lib/utils'; -import { Avatar, Flex, Space } from 'antd'; import MarkdownContent from '../markdown-content'; import { ReferenceDocumentList } from '../next-message-item/reference-document-list'; import { InnerUploadedMessageFiles } from '../next-message-item/uploaded-message-files'; +import { RAGFlowAvatar } from '../ragflow-avatar'; import { useTheme } from '../theme-provider'; import { AssistantGroupButton, UserGroupButton } from './group-button'; import styles from './index.less'; @@ -98,40 +98,43 @@ const MessageItem = ({ > {visibleAvatar && (item.role === MessageType.User ? ( - + ) : avatarDialog ? ( - + ) : ( ))} - - - {isAssistant ? ( - index !== 0 && ( - - ) - ) : ( - + {isAssistant ? ( + index !== 0 && ( + - )} + content={item.content} + prompt={item.prompt} + showLikeButton={showLikeButton} + audioBinary={item.audio_binary} + showLoudspeaker={showLoudspeaker} + > + ) + ) : ( + + )} - {/* {isAssistant ? '' : nickname} */} -
)} - +
diff --git a/web/src/components/pdf-drawer/index.tsx b/web/src/components/pdf-drawer/index.tsx index 680aacc32..4b6caeca9 100644 --- a/web/src/components/pdf-drawer/index.tsx +++ b/web/src/components/pdf-drawer/index.tsx @@ -1,8 +1,9 @@ import { IModalProps } from '@/interfaces/common'; import { IReferenceChunk } from '@/interfaces/database/chat'; import { IChunk } from '@/interfaces/database/knowledge'; -import { Drawer } from 'antd'; +import { cn } from '@/lib/utils'; import DocumentPreviewer from '../pdf-previewer'; +import { Sheet, SheetContent, SheetHeader, SheetTitle } from '../ui/sheet'; interface IProps extends IModalProps { documentId: string; @@ -11,7 +12,7 @@ interface IProps extends IModalProps { height?: string | number; } -export const PdfDrawer = ({ +export const PdfSheet = ({ visible = false, hideModal, documentId, @@ -20,20 +21,25 @@ export const PdfDrawer = ({ height, }: IProps) => { return ( - - - + + + + Document Previewer + + + + ); }; -export default PdfDrawer; \ No newline at end of file +export default PdfSheet; diff --git a/web/src/pages/agent/chat/box.tsx b/web/src/pages/agent/chat/box.tsx index f44e219fc..85abf6827 100644 --- a/web/src/pages/agent/chat/box.tsx +++ b/web/src/pages/agent/chat/box.tsx @@ -5,7 +5,7 @@ import { useSendAgentMessage } from './use-send-agent-message'; import { FileUploadProps } from '@/components/file-upload'; import { NextMessageInput } from '@/components/message-input/next'; import MessageItem from '@/components/next-message-item'; -import PdfDrawer from '@/components/pdf-drawer'; +import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; import { useFetchAgent, @@ -127,12 +127,12 @@ function AgentChatBox() { /> )} - + > ); } diff --git a/web/src/pages/agent/share/index.tsx b/web/src/pages/agent/share/index.tsx index 24308ea66..af3393ad2 100644 --- a/web/src/pages/agent/share/index.tsx +++ b/web/src/pages/agent/share/index.tsx @@ -2,7 +2,7 @@ import { EmbedContainer } from '@/components/embed-container'; import { FileUploadProps } from '@/components/file-upload'; import { NextMessageInput } from '@/components/message-input/next'; import MessageItem from '@/components/next-message-item'; -import PdfDrawer from '@/components/pdf-drawer'; +import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; import { MessageType } from '@/constants/chat'; import { useUploadCanvasFileWithProgress } from '@/hooks/use-agent-request'; @@ -204,12 +204,12 @@ const ChatContainer = () => { {visible && ( - + > )} {parameterDialogVisible && ( {visible && ( - + > )} ); diff --git a/web/src/pages/next-chats/chat/chat-box/single-chat-box.tsx b/web/src/pages/next-chats/chat/chat-box/single-chat-box.tsx index a63348e0a..8d14eb82f 100644 --- a/web/src/pages/next-chats/chat/chat-box/single-chat-box.tsx +++ b/web/src/pages/next-chats/chat/chat-box/single-chat-box.tsx @@ -1,6 +1,6 @@ import { NextMessageInput } from '@/components/message-input/next'; import MessageItem from '@/components/message-item'; -import PdfDrawer from '@/components/pdf-drawer'; +import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; import { MessageType } from '@/constants/chat'; import { @@ -101,12 +101,12 @@ export function SingleChatBox({ controller, stopOutputMessage }: IProps) { removeFile={removeFile} /> {visible && ( - + > )} ); diff --git a/web/src/pages/next-chats/share/index.tsx b/web/src/pages/next-chats/share/index.tsx index 648b22f72..e01671eed 100644 --- a/web/src/pages/next-chats/share/index.tsx +++ b/web/src/pages/next-chats/share/index.tsx @@ -1,7 +1,7 @@ import { EmbedContainer } from '@/components/embed-container'; import { NextMessageInput } from '@/components/message-input/next'; import MessageItem from '@/components/message-item'; -import PdfDrawer from '@/components/pdf-drawer'; +import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; import { MessageType, SharedFrom } from '@/constants/chat'; import { useFetchNextConversationSSE } from '@/hooks/chat-hooks'; @@ -123,12 +123,12 @@ const ChatContainer = () => { {visible && ( - + > )} ); From d2915f6984fad826d7255ffe32d307a470b98ba8 Mon Sep 17 00:00:00 2001 From: balibabu Date: Fri, 28 Nov 2025 19:05:43 +0800 Subject: [PATCH 3/5] Fix: Error 102 "Can't find dialog by ID" when embedding agent with from=agent** #11552 (#11594) ### What problem does this PR solve? Fix: Error 102 "Can't find dialog by ID" when embedding agent with from=agent** #11552 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- web/src/components/floating-chat-widget.tsx | 42 ++++++++++++++------- web/src/pages/agent/chat/box.tsx | 14 ++++--- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/web/src/components/floating-chat-widget.tsx b/web/src/components/floating-chat-widget.tsx index c31cb41bd..c02d28f0a 100644 --- a/web/src/components/floating-chat-widget.tsx +++ b/web/src/components/floating-chat-widget.tsx @@ -1,8 +1,10 @@ import PdfSheet from '@/components/pdf-drawer'; import { useClickDrawer } from '@/components/pdf-drawer/hooks'; -import { MessageType } from '@/constants/chat'; +import { MessageType, SharedFrom } from '@/constants/chat'; +import { useFetchExternalAgentInputs } from '@/hooks/use-agent-request'; import { useFetchExternalChatInfo } from '@/hooks/use-chat-request'; import i18n from '@/locales/config'; +import { useSendNextSharedMessage } from '@/pages/agent/hooks/use-send-shared-message'; import { MessageCircle, Minimize2, Send, X } from 'lucide-react'; import React, { useCallback, useEffect, useRef, useState } from 'react'; import { @@ -20,7 +22,13 @@ const FloatingChatWidget = () => { const [isLoaded, setIsLoaded] = useState(false); const messagesEndRef = useRef(null); - const { sharedId: conversationId, locale } = useGetSharedChatSearchParams(); + const { + sharedId: conversationId, + locale, + from, + } = useGetSharedChatSearchParams(); + + const isFromAgent = from === SharedFrom.Agent; // Check if we're in button-only mode or window-only mode const urlParams = new URLSearchParams(window.location.search); @@ -34,7 +42,7 @@ const FloatingChatWidget = () => { sendLoading, derivedMessages, hasError, - } = useSendSharedMessage(); + } = (isFromAgent ? useSendNextSharedMessage : useSendSharedMessage)(() => {}); // Sync our local input with the hook's value when needed useEffect(() => { @@ -43,7 +51,11 @@ const FloatingChatWidget = () => { } }, [hookValue, inputValue]); - const { data: chatInfo } = useFetchExternalChatInfo(); + const { data } = ( + isFromAgent ? useFetchExternalAgentInputs : useFetchExternalChatInfo + )(); + + const title = data.title; const { visible, hideModal, documentId, selectedChunk, clickDocumentButton } = useClickDrawer(); @@ -372,7 +384,7 @@ const FloatingChatWidget = () => {

- {chatInfo?.title || 'Chat Support'} + {title || 'Chat Support'}

We typically reply instantly @@ -494,14 +506,16 @@ const FloatingChatWidget = () => {

- + {visible && ( + + )} ); } // Full mode - render everything together (original behavior) @@ -524,7 +538,7 @@ const FloatingChatWidget = () => {

- {chatInfo?.title || 'Chat Support'} + {title || 'Chat Support'}

We typically reply instantly diff --git a/web/src/pages/agent/chat/box.tsx b/web/src/pages/agent/chat/box.tsx index 85abf6827..df4d006b2 100644 --- a/web/src/pages/agent/chat/box.tsx +++ b/web/src/pages/agent/chat/box.tsx @@ -127,12 +127,14 @@ function AgentChatBox() { /> )} - + {visible && ( + + )} ); } From 14616cf8457e688ea30275920612c5ec72a68777 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 28 Nov 2025 19:25:32 +0800 Subject: [PATCH 4/5] Feat: add child parent chunking method in backend. (#11598) ### What problem does this PR solve? #7996 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- agent/canvas.py | 20 +------- agent/component/begin.py | 3 +- api/apps/canvas_app.py | 71 ++------------------------ api/apps/document_app.py | 11 +++- api/db/services/dialog_service.py | 8 ++- api/db/services/file_service.py | 84 ++++++++++++++++++++++++++++++- rag/app/naive.py | 42 ++++++---------- rag/flow/splitter/splitter.py | 50 ++++++++++++++++-- rag/nlp/__init__.py | 26 ++++++++-- rag/svr/task_executor.py | 31 ++++++++++-- 10 files changed, 216 insertions(+), 130 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 9e95a5611..5344d70c3 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import base64 import json import logging import re @@ -25,6 +24,7 @@ from typing import Any, Union, Tuple from agent.component import component_class from agent.component.base import ComponentBase +from api.db.services.file_service import FileService from api.db.services.task_service import has_canceled from common.misc_utils import get_uuid, hash_str2int from common.exceptions import TaskCanceledException @@ -372,7 +372,7 @@ class Canvas(Graph): for k in kwargs.keys(): if k in ["query", "user_id", "files"] and kwargs[k]: if k == "files": - self.globals[f"sys.{k}"] = self.get_files(kwargs[k]) + self.globals[f"sys.{k}"] = FileService.get_files(kwargs[k]) else: self.globals[f"sys.{k}"] = kwargs[k] if not self.globals["sys.conversation_turns"] : @@ -621,22 +621,6 @@ class Canvas(Graph): def get_component_input_elements(self, cpnnm): return self.components[cpnnm]["obj"].get_input_elements() - def get_files(self, files: Union[None, list[dict]]) -> list[str]: - from api.db.services.file_service import FileService - if not files: - return [] - def image_to_base64(file): - return "data:{};base64,{}".format(file["mime_type"], - base64.b64encode(FileService.get_blob(file["created_by"], file["id"])).decode("utf-8")) - exe = ThreadPoolExecutor(max_workers=5) - threads = [] - for file in files: - if file["mime_type"].find("image") >=0: - threads.append(exe.submit(image_to_base64, file)) - continue - threads.append(exe.submit(FileService.parse, file["name"], FileService.get_blob(file["created_by"], file["id"]), True, file["created_by"])) - return [th.result() for th in threads] - def tool_use_callback(self, agent_id: str, func_name: str, params: dict, result: Any, elapsed_time=None): agent_ids = agent_id.split("-->") agent_name = self.get_component_name(agent_ids[0]) diff --git a/agent/component/begin.py b/agent/component/begin.py index b5985bb7a..1314aff74 100644 --- a/agent/component/begin.py +++ b/agent/component/begin.py @@ -14,6 +14,7 @@ # limitations under the License. # from agent.component.fillup import UserFillUpParam, UserFillUp +from api.db.services.file_service import FileService class BeginParam(UserFillUpParam): @@ -48,7 +49,7 @@ class Begin(UserFillUp): if v.get("optional") and v.get("value", None) is None: v = None else: - v = self._canvas.get_files([v["value"]]) + v = FileService.get_files([v["value"]]) else: v = v.get("value") self.set_output(k, v) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 86ffaedb1..afdb3269b 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -15,13 +15,10 @@ # import json import logging -import re -import sys from functools import partial -import trio from quart import request, Response, make_response from agent.component import LLM -from api.db import CanvasCategory, FileType +from api.db import CanvasCategory from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService from api.db.services.document_service import DocumentService from api.db.services.file_service import FileService @@ -38,7 +35,6 @@ from peewee import MySQLDatabase, PostgresqlDatabase from api.db.db_models import APIToken, Task import time -from api.utils.file_utils import filename_type, read_potential_broken_pdf from rag.flow.pipeline import Pipeline from rag.nlp import search from rag.utils.redis_conn import REDIS_CONN @@ -250,71 +246,10 @@ async def upload(canvas_id): return get_data_error_result(message="canvas not found.") user_id = cvs["user_id"] - def structured(filename, filetype, blob, content_type): - nonlocal user_id - if filetype == FileType.PDF.value: - blob = read_potential_broken_pdf(blob) - - location = get_uuid() - FileService.put_blob(user_id, location, blob) - - return { - "id": location, - "name": filename, - "size": sys.getsizeof(blob), - "extension": filename.split(".")[-1].lower(), - "mime_type": content_type, - "created_by": user_id, - "created_at": time.time(), - "preview_url": None - } - - if request.args.get("url"): - from crawl4ai import ( - AsyncWebCrawler, - BrowserConfig, - CrawlerRunConfig, - DefaultMarkdownGenerator, - PruningContentFilter, - CrawlResult - ) - try: - url = request.args.get("url") - filename = re.sub(r"\?.*", "", url.split("/")[-1]) - async def adownload(): - browser_config = BrowserConfig( - headless=True, - verbose=False, - ) - async with AsyncWebCrawler(config=browser_config) as crawler: - crawler_config = CrawlerRunConfig( - markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter() - ), - pdf=True, - screenshot=False - ) - result: CrawlResult = await crawler.arun( - url=url, - config=crawler_config - ) - return result - page = trio.run(adownload()) - if page.pdf: - if filename.split(".")[-1].lower() != "pdf": - filename += ".pdf" - return get_json_result(data=structured(filename, "pdf", page.pdf, page.response_headers["content-type"])) - - return get_json_result(data=structured(filename, "html", str(page.markdown).encode("utf-8"), page.response_headers["content-type"], user_id)) - - except Exception as e: - return server_error_response(e) - files = await request.files - file = files['file'] + file = files['file'] if files and files.get("file") else None try: - DocumentService.check_doc_health(user_id, file.filename) - return get_json_result(data=structured(file.filename, filename_type(file.filename), file.read(), file.content_type)) + return get_json_result(data=FileService.upload_info(user_id, file, request.args.get("url"))) except Exception as e: return server_error_response(e) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 7ec8c1587..bd2262919 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -607,7 +607,7 @@ async def get_image(image_id): @login_required @validate_request("conversation_id") async def upload_and_parse(): - files = await request.file + files = await request.files if "file" not in files: return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR) @@ -705,3 +705,12 @@ async def set_meta(): return get_json_result(data=True) except Exception as e: return server_error_response(e) + +@manager.route("/upload_info", methods=["POST"]) # noqa: F821 +async def upload_info(): + files = await request.files + file = files['file'] if files and files.get("file") else None + try: + return get_json_result(data=FileService.upload_info(current_user.id, file, request.args.get("url"))) + except Exception as e: + return server_error_response(e) diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 558ba1b0f..ae79b45a6 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -25,6 +25,7 @@ import trio from langfuse import Langfuse from peewee import fn from agentic_reasoning import DeepResearcher +from api.db.services.file_service import FileService from common.constants import LLMType, ParserType, StatusEnum from api.db.db_models import DB, Dialog from api.db.services.common_service import CommonService @@ -380,8 +381,11 @@ def chat(dialog, messages, stream=True, **kwargs): retriever = settings.retriever questions = [m["content"] for m in messages if m["role"] == "user"][-3:] attachments = kwargs["doc_ids"].split(",") if "doc_ids" in kwargs else [] + attachments_= "" if "doc_ids" in messages[-1]: attachments = messages[-1]["doc_ids"] + if "files" in messages[-1]: + attachments_ = "\n\n".join(FileService.get_files(messages[-1]["files"])) prompt_config = dialog.prompt_config field_map = KnowledgebaseService.get_field_map(dialog.kb_ids) @@ -451,7 +455,7 @@ def chat(dialog, messages, stream=True, **kwargs): ), ) - for think in reasoner.thinking(kbinfos, " ".join(questions)): + for think in reasoner.thinking(kbinfos, attachments_ + " ".join(questions)): if isinstance(think, str): thought = think knowledges = [t for t in think.split("\n") if t] @@ -503,7 +507,7 @@ def chat(dialog, messages, stream=True, **kwargs): kwargs["knowledge"] = "\n------\n" + "\n\n------\n\n".join(knowledges) gen_conf = dialog.llm_setting - msg = [{"role": "system", "content": prompt_config["system"].format(**kwargs)}] + msg = [{"role": "system", "content": prompt_config["system"].format(**kwargs)+attachments_}] prompt4citation = "" if knowledges and (prompt_config.get("quote", True) and kwargs.get("quote", True)): prompt4citation = citation_prompt() diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 1fbecdafe..11ef5b454 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -13,10 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import asyncio +import base64 import logging import re +import sys +import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path +from typing import Union from peewee import fn @@ -520,7 +525,7 @@ class FileService(CommonService): if img_base64 and file_type == FileType.VISUAL.value: return GptV4.image2base64(blob) cks = FACTORY.get(FileService.get_parser(filename_type(filename), filename, ""), naive).chunk(filename, blob, **kwargs) - return "\n".join([ck["content_with_weight"] for ck in cks]) + return f"\n -----------------\nFile: {filename}\nContent as following: \n" + "\n".join([ck["content_with_weight"] for ck in cks]) @staticmethod def get_parser(doc_type, filename, default): @@ -588,3 +593,80 @@ class FileService(CommonService): errors += str(e) return errors + + @staticmethod + def upload_info(user_id, file, url: str|None=None): + def structured(filename, filetype, blob, content_type): + nonlocal user_id + if filetype == FileType.PDF.value: + blob = read_potential_broken_pdf(blob) + + location = get_uuid() + FileService.put_blob(user_id, location, blob) + + return { + "id": location, + "name": filename, + "size": sys.getsizeof(blob), + "extension": filename.split(".")[-1].lower(), + "mime_type": content_type, + "created_by": user_id, + "created_at": time.time(), + "preview_url": None + } + + if url: + from crawl4ai import ( + AsyncWebCrawler, + BrowserConfig, + CrawlerRunConfig, + DefaultMarkdownGenerator, + PruningContentFilter, + CrawlResult + ) + filename = re.sub(r"\?.*", "", url.split("/")[-1]) + async def adownload(): + browser_config = BrowserConfig( + headless=True, + verbose=False, + ) + async with AsyncWebCrawler(config=browser_config) as crawler: + crawler_config = CrawlerRunConfig( + markdown_generator=DefaultMarkdownGenerator( + content_filter=PruningContentFilter() + ), + pdf=True, + screenshot=False + ) + result: CrawlResult = await crawler.arun( + url=url, + config=crawler_config + ) + return result + page = asyncio.run(adownload()) + if page.pdf: + if filename.split(".")[-1].lower() != "pdf": + filename += ".pdf" + return structured(filename, "pdf", page.pdf, page.response_headers["content-type"]) + + return structured(filename, "html", str(page.markdown).encode("utf-8"), page.response_headers["content-type"], user_id) + + DocumentService.check_doc_health(user_id, file.filename) + return structured(file.filename, filename_type(file.filename), file.read(), file.content_type) + + @staticmethod + def get_files(self, files: Union[None, list[dict]]) -> list[str]: + if not files: + return [] + def image_to_base64(file): + return "data:{};base64,{}".format(file["mime_type"], + base64.b64encode(FileService.get_blob(file["created_by"], file["id"])).decode("utf-8")) + exe = ThreadPoolExecutor(max_workers=5) + threads = [] + for file in files: + if file["mime_type"].find("image") >=0: + threads.append(exe.submit(image_to_base64, file)) + continue + threads.append(exe.submit(FileService.parse, file["name"], FileService.get_blob(file["created_by"], file["id"]), True, file["created_by"])) + return [th.result() for th in threads] + diff --git a/rag/app/naive.py b/rag/app/naive.py index 7872ebc22..18a956beb 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -39,6 +39,7 @@ from deepdoc.parser.docling_parser import DoclingParser from deepdoc.parser.tcadp_parser import TCADPParser from rag.nlp import concat_img, find_codec, naive_merge, naive_merge_with_images, naive_merge_docx, rag_tokenizer, tokenize_chunks, tokenize_chunks_with_images, tokenize_table, attach_media_context + def by_deepdoc(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", callback=None, pdf_cls = None ,**kwargs): callback = callback binary = binary @@ -600,8 +601,7 @@ def load_from_xml_v2(baseURI, rels_item_xml): srels._srels.append(_SerializedRelationship(baseURI, rel_elm)) return srels -def chunk(filename, binary=None, from_page=0, to_page=100000, - lang="Chinese", callback=None, **kwargs): +def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", callback=None, **kwargs): """ Supported file formats are docx, pdf, excel, txt. This method apply the naive ways to chunk files. @@ -611,14 +611,18 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, urls = set() url_res = [] - is_english = lang.lower() == "english" # is_english(cks) parser_config = kwargs.get( "parser_config", { "chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True}) + + child_deli = re.findall(r"`([^`]+)`", parser_config.get("children_delimiter", "")) + child_deli = sorted(set(child_deli), key=lambda x: -len(x)) + child_deli = "|".join(re.escape(t) for t in child_deli if t) + is_markdown = False table_context_size = max(0, int(parser_config.get("table_context_size", 0) or 0)) image_context_size = max(0, int(parser_config.get("image_context_size", 0) or 0)) - final_sections = False + doc = { "docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename)) @@ -679,12 +683,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - chunks.extend(url_res) - return chunks - - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images, child_delimiters_pattern=child_deli)) logging.info("naive_merge({}): {}".format(filename, timer() - st)) res.extend(embed_res) res.extend(url_res) @@ -780,7 +779,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return_section_images=True, ) - final_sections = True + is_markdown = True try: vision_model = LLMBundle(kwargs["tenant_id"], LLMType.IMAGE2TEXT) @@ -857,7 +856,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, "file type not supported yet(pdf, xlsx, doc, docx, txt supported)") st = timer() - if final_sections: + if is_markdown: merged_chunks = [] merged_images = [] chunk_limit = max(0, int(parser_config.get("chunk_token_num", 128))) @@ -900,13 +899,11 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, chunks = merged_chunks has_images = merged_images and any(img is not None for img in merged_images) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks + if has_images: - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, merged_images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, merged_images, child_delimiters_pattern=child_deli)) else: - res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser)) + res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser, child_delimiters_pattern=child_deli)) else: if section_images: if all(image is None for image in section_images): @@ -917,21 +914,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, int(parser_config.get( "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks - - res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images)) + res.extend(tokenize_chunks_with_images(chunks, doc, is_english, images, child_delimiters_pattern=child_deli)) else: chunks = naive_merge( sections, int(parser_config.get( "chunk_token_num", 128)), parser_config.get( "delimiter", "\n!?。;!?")) - if kwargs.get("section_only", False): - chunks.extend(embed_res) - return chunks - res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser)) + res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser, child_delimiters_pattern=child_deli)) if urls and parser_config.get("analyze_hyperlink", False) and is_root: for index, url in enumerate(urls): diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index 7e687ad71..c790790cb 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import random +import re +from copy import deepcopy from functools import partial - import trio - from common.misc_utils import get_uuid from rag.utils.base64_image import id2image, image2id from deepdoc.parser.pdf_parser import RAGFlowPdfParser @@ -32,6 +32,7 @@ class SplitterParam(ProcessParamBase): self.chunk_token_size = 512 self.delimiters = ["\n"] self.overlapped_percent = 0 + self.children_delimiters = [] def check(self): self.check_empty(self.delimiters, "Delimiters.") @@ -58,6 +59,14 @@ class Splitter(ProcessBase): deli += f"`{d}`" else: deli += d + child_deli = "" + for d in self._param.children_delimiters: + if len(d) > 1: + child_deli += f"`{d}`" + else: + child_deli += d + child_deli = [m.group(1) for m in re.finditer(r"`([^`]+)`", child_deli)] + custom_pattern = "|".join(re.escape(t) for t in sorted(set(child_deli), key=len, reverse=True)) self.set_output("output_format", "chunks") self.callback(random.randint(1, 5) / 100.0, "Start to split into chunks.") @@ -78,7 +87,23 @@ class Splitter(ProcessBase): deli, self._param.overlapped_percent, ) - self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()]) + if custom_pattern: + docs = [] + for c in cks: + if not c.strip(): + continue + split_sec = re.split(r"(%s)" % custom_pattern, c, flags=re.DOTALL) + if split_sec: + for txt in split_sec: + docs.append({ + "text": txt, + "mom": c + }) + else: + docs.append({"text": c}) + self.set_output("chunks", docs) + else: + self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()]) self.callback(1, "Done.") return @@ -100,12 +125,27 @@ class Splitter(ProcessBase): { "text": RAGFlowPdfParser.remove_tag(c), "image": img, - "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], + "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)] } for c, img in zip(chunks, images) if c.strip() ] async with trio.open_nursery() as nursery: for d in cks: nursery.start_soon(image2id, d, partial(settings.STORAGE_IMPL.put, tenant_id=self._canvas._tenant_id), get_uuid()) - self.set_output("chunks", cks) + + if custom_pattern: + docs = [] + for c in cks: + split_sec = re.split(r"(%s)" % custom_pattern, c["text"], flags=re.DOTALL) + if split_sec: + c["mom"] = c["text"] + for txt in split_sec: + cc = deepcopy(c) + cc["text"] = txt + docs.append(cc) + else: + docs.append(c) + self.set_output("chunks", docs) + else: + self.set_output("chunks", cks) self.callback(1, "Done.") diff --git a/rag/nlp/__init__.py b/rag/nlp/__init__.py index 6f36a927a..ae932be85 100644 --- a/rag/nlp/__init__.py +++ b/rag/nlp/__init__.py @@ -264,14 +264,14 @@ def is_chinese(text): return False -def tokenize(d, t, eng): - d["content_with_weight"] = t - t = re.sub(r"]{0,12})?>", " ", t) +def tokenize(d, txt, eng): + d["content_with_weight"] = txt + t = re.sub(r"]{0,12})?>", " ", txt) d["content_ltks"] = rag_tokenizer.tokenize(t) d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"]) -def tokenize_chunks(chunks, doc, eng, pdf_parser=None): +def tokenize_chunks(chunks, doc, eng, pdf_parser=None, child_delimiters_pattern=None): res = [] # wrap up as es documents for ii, ck in enumerate(chunks): @@ -288,12 +288,21 @@ def tokenize_chunks(chunks, doc, eng, pdf_parser=None): pass else: add_positions(d, [[ii]*5]) + + if child_delimiters_pattern: + d["mom_with_weight"] = ck + for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL): + dd = copy.deepcopy(d) + tokenize(dd, txt, eng) + res.append(dd) + continue + tokenize(d, ck, eng) res.append(d) return res -def tokenize_chunks_with_images(chunks, doc, eng, images): +def tokenize_chunks_with_images(chunks, doc, eng, images, child_delimiters_pattern=None): res = [] # wrap up as es documents for ii, (ck, image) in enumerate(zip(chunks, images)): @@ -303,6 +312,13 @@ def tokenize_chunks_with_images(chunks, doc, eng, images): d = copy.deepcopy(doc) d["image"] = image add_positions(d, [[ii]*5]) + if child_delimiters_pattern: + d["mom_with_weight"] = ck + for txt in re.split(r"(%s)" % child_delimiters_pattern, ck, flags=re.DOTALL): + dd = copy.deepcopy(d) + tokenize(dd, txt, eng) + res.append(dd) + continue tokenize(d, ck, eng) res.append(d) return res diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 370bd2a10..d7cbced0c 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -128,9 +128,6 @@ def signal_handler(sig, frame): sys.exit(0) - - - def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."): try: if prog is not None and prog < 0: @@ -720,6 +717,34 @@ async def delete_image(kb_id, chunk_id): async def insert_es(task_id, task_tenant_id, task_dataset_id, chunks, progress_callback): + mothers = [] + mother_ids = set([]) + for ck in chunks: + mom = ck.get("mom") or ck.get("mom_with_weight") or "" + if not mom: + continue + id = xxhash.xxh64(mom.encode("utf-8")).hexdigest() + if id in mother_ids: + continue + mother_ids.add(id) + ck["mom_id"] = id + mom_ck = copy.deepcopy(ck) + mom_ck["id"] = id + mom_ck["content_with_weight"] = mom + mom_ck["available_int"] = 0 + flds = list(mom_ck.keys()) + for fld in flds: + if fld not in ["id", "content_with_weight", "doc_id", "kb_id", "available_int"]: + del mom_ck[fld] + mothers.append(mom_ck) + + for b in range(0, len(mothers), settings.DOC_BULK_SIZE): + await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(mothers[b:b + settings.DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id)) + task_canceled = has_canceled(task_id) + if task_canceled: + progress_callback(-1, msg="Task has been canceled.") + return False + for b in range(0, len(chunks), settings.DOC_BULK_SIZE): doc_store_result = await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(chunks[b:b + settings.DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id)) task_canceled = has_canceled(task_id) From fa9b7b259c46c01bc22fea7a302bc41d14a9b13a Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Fri, 28 Nov 2025 19:55:24 +0800 Subject: [PATCH 5/5] Feat: create datasets from http api supports ingestion pipeline (#11597) ### What problem does this PR solve? Feat: create datasets from http api supports ingestion pipeline ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/utils/validation_utils.py | 96 +++++++++++++++++++++++++-- docs/references/http_api_reference.md | 40 ++++++++++- 2 files changed, 131 insertions(+), 5 deletions(-) diff --git a/api/utils/validation_utils.py b/api/utils/validation_utils.py index 630b64feb..6c426f6f8 100644 --- a/api/utils/validation_utils.py +++ b/api/utils/validation_utils.py @@ -14,6 +14,7 @@ # limitations under the License. # from collections import Counter +import string from typing import Annotated, Any, Literal from uuid import UUID @@ -25,6 +26,7 @@ from pydantic import ( StringConstraints, ValidationError, field_validator, + model_validator, ) from pydantic_core import PydanticCustomError from werkzeug.exceptions import BadRequest, UnsupportedMediaType @@ -361,10 +363,9 @@ class CreateDatasetReq(Base): description: Annotated[str | None, Field(default=None, max_length=65535)] embedding_model: Annotated[str | None, Field(default=None, max_length=255, serialization_alias="embd_id")] permission: Annotated[Literal["me", "team"], Field(default="me", min_length=1, max_length=16)] - chunk_method: Annotated[ - Literal["naive", "book", "email", "laws", "manual", "one", "paper", "picture", "presentation", "qa", "table", "tag"], - Field(default="naive", min_length=1, max_length=32, serialization_alias="parser_id"), - ] + chunk_method: Annotated[str | None, Field(default=None, serialization_alias="parser_id")] + parse_type: Annotated[int | None, Field(default=None, ge=0, le=64)] + pipeline_id: Annotated[str | None, Field(default=None, min_length=32, max_length=32, serialization_alias="pipeline_id")] parser_config: Annotated[ParserConfig | None, Field(default=None)] @field_validator("avatar", mode="after") @@ -525,6 +526,93 @@ class CreateDatasetReq(Base): raise PydanticCustomError("string_too_long", "Parser config exceeds size limit (max 65,535 characters). Current size: {actual}", {"actual": len(json_str)}) return v + @field_validator("pipeline_id", mode="after") + @classmethod + def validate_pipeline_id(cls, v: str | None) -> str | None: + """Validate pipeline_id as 32-char lowercase hex string if provided. + + Rules: + - None or empty string: treat as None (not set) + - Must be exactly length 32 + - Must contain only hex digits (0-9a-fA-F); normalized to lowercase + """ + if v is None: + return None + if v == "": + return None + if len(v) != 32: + raise PydanticCustomError("format_invalid", "pipeline_id must be 32 hex characters") + if any(ch not in string.hexdigits for ch in v): + raise PydanticCustomError("format_invalid", "pipeline_id must be hexadecimal") + return v.lower() + + @model_validator(mode="after") + def validate_parser_dependency(self) -> "CreateDatasetReq": + """ + Mixed conditional validation: + - If parser_id is omitted (field not set): + * If both parse_type and pipeline_id are omitted → default chunk_method = "naive" + * If both parse_type and pipeline_id are provided → allow ingestion pipeline mode + - If parser_id is provided (valid enum) → parse_type and pipeline_id must be None (disallow mixed usage) + + Raises: + PydanticCustomError with code 'dependency_error' on violation. + """ + # Omitted chunk_method (not in fields) logic + if self.chunk_method is None and "chunk_method" not in self.model_fields_set: + # All three absent → default naive + if self.parse_type is None and self.pipeline_id is None: + object.__setattr__(self, "chunk_method", "naive") + return self + # parser_id omitted: require BOTH parse_type & pipeline_id present (no partial allowed) + if self.parse_type is None or self.pipeline_id is None: + missing = [] + if self.parse_type is None: + missing.append("parse_type") + if self.pipeline_id is None: + missing.append("pipeline_id") + raise PydanticCustomError( + "dependency_error", + "parser_id omitted → required fields missing: {fields}", + {"fields": ", ".join(missing)}, + ) + # Both provided → allow pipeline mode + return self + + # parser_id provided (valid): MUST NOT have parse_type or pipeline_id + if isinstance(self.chunk_method, str): + if self.parse_type is not None or self.pipeline_id is not None: + invalid = [] + if self.parse_type is not None: + invalid.append("parse_type") + if self.pipeline_id is not None: + invalid.append("pipeline_id") + raise PydanticCustomError( + "dependency_error", + "parser_id provided → disallowed fields present: {fields}", + {"fields": ", ".join(invalid)}, + ) + return self + + @field_validator("chunk_method", mode="wrap") + @classmethod + def validate_chunk_method(cls, v: Any, handler) -> Any: + """Wrap validation to unify error messages, including type errors (e.g. list).""" + allowed = {"naive", "book", "email", "laws", "manual", "one", "paper", "picture", "presentation", "qa", "table", "tag"} + error_msg = "Input should be 'naive', 'book', 'email', 'laws', 'manual', 'one', 'paper', 'picture', 'presentation', 'qa', 'table' or 'tag'" + # Omitted field: handler won't be invoked (wrap still gets value); None treated as explicit invalid + if v is None: + raise PydanticCustomError("literal_error", error_msg) + try: + # Run inner validation (type checking) + result = handler(v) + except Exception: + raise PydanticCustomError("literal_error", error_msg) + # After handler, enforce enumeration + if not isinstance(result, str) or result == "" or result not in allowed: + raise PydanticCustomError("literal_error", error_msg) + return result + class UpdateDatasetReq(CreateDatasetReq): dataset_id: Annotated[str, Field(...)] diff --git a/docs/references/http_api_reference.md b/docs/references/http_api_reference.md index 3c73cf58c..7f006ec3d 100644 --- a/docs/references/http_api_reference.md +++ b/docs/references/http_api_reference.md @@ -419,7 +419,15 @@ Creates a dataset. - `"embedding_model"`: `string` - `"permission"`: `string` - `"chunk_method"`: `string` - - `"parser_config"`: `object` + - "parser_config": `object` + - "parse_type": `int` + - "pipeline_id": `string` + +Note: Choose exactly one ingestion mode when creating a dataset. +- Chunking method: provide `"chunk_method"` (optionally with `"parser_config"`). +- Ingestion pipeline: provide both `"parse_type"` and `"pipeline_id"` and do not provide `"chunk_method"`. + +These options are mutually exclusive. If all three of `chunk_method`, `parse_type`, and `pipeline_id` are omitted, the system defaults to `chunk_method = "naive"`. ##### Request example @@ -433,6 +441,26 @@ curl --request POST \ }' ``` +##### Request example (ingestion pipeline) + +Use this form when specifying an ingestion pipeline (do not include `chunk_method`). + +```bash +curl --request POST \ + --url http://{address}/api/v1/datasets \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "name": "test-sdk", + "parse_type": , + "pipeline_id": "" + }' +``` + +Notes: +- `parse_type` is an integer. Replace `` with your pipeline's parse-type value. +- `pipeline_id` must be a 32-character lowercase hexadecimal string. + ##### Request parameters - `"name"`: (*Body parameter*), `string`, *Required* @@ -473,6 +501,7 @@ curl --request POST \ - `"qa"`: Q&A - `"table"`: Table - `"tag"`: Tag + - Mutually exclusive with `parse_type` and `pipeline_id`. If you set `chunk_method`, do not include `parse_type` or `pipeline_id`. - `"parser_config"`: (*Body parameter*), `object` The configuration settings for the dataset parser. The attributes in this JSON object vary with the selected `"chunk_method"`: @@ -509,6 +538,15 @@ curl --request POST \ - Defaults to: `{"use_raptor": false}`. - If `"chunk_method"` is `"table"`, `"picture"`, `"one"`, or `"email"`, `"parser_config"` is an empty JSON object. +- "parse_type": (*Body parameter*), `int` + The ingestion pipeline parse type identifier. Required if and only if you are using an ingestion pipeline (together with `"pipeline_id"`). Must not be provided when `"chunk_method"` is set. + +- "pipeline_id": (*Body parameter*), `string` + The ingestion pipeline ID. Required if and only if you are using an ingestion pipeline (together with `"parse_type"`). + - Must not be provided when `"chunk_method"` is set. + +Note: If none of `chunk_method`, `parse_type`, and `pipeline_id` are provided, the system will default to `chunk_method = "naive"`. + #### Response Success: