From e3008732888d22394e89fe2028179b2211f82603 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Wed, 3 Dec 2025 06:49:36 -0300 Subject: [PATCH 01/12] feat: Add WebSocket API for streaming responses - Add WebSocket endpoint at /v1/ws/chat for real-time streaming - Support multiple authentication methods (API token, user session, query params) - Enable bidirectional communication for platforms like WeChat Mini Programs - Implement streaming chat completions with incremental responses - Add comprehensive error handling and connection management - Include extensive inline documentation and comments New files: - api/apps/websocket_app.py: Main WebSocket API implementation - docs/guides/websocket_api.md: Complete API documentation - example/websocket/python_client.py: Python example client - example/websocket/index.html: Web-based demo client - example/websocket/README.md: Examples documentation Features: - Persistent WebSocket connections for multi-turn conversations - Session management for conversation continuity - Real-time streaming with low latency - Compatible with WeChat Mini Programs and mobile apps - Health check endpoint for connectivity testing - Backward compatible with existing SSE endpoints Resolves: #11683 --- README.md | 2 + api/apps/websocket_app.py | 710 +++++++++++++++++++++++++++++ docs/guides/websocket_api.md | 613 +++++++++++++++++++++++++ example/websocket/README.md | 203 +++++++++ example/websocket/index.html | 590 ++++++++++++++++++++++++ example/websocket/python_client.py | 403 ++++++++++++++++ 6 files changed, 2521 insertions(+) create mode 100644 api/apps/websocket_app.py create mode 100644 docs/guides/websocket_api.md create mode 100644 example/websocket/README.md create mode 100644 example/websocket/index.html create mode 100644 example/websocket/python_client.py diff --git a/README.md b/README.md index 827b000a4..5db400e23 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ Try our demo at [https://demo.ragflow.io](https://demo.ragflow.io). ## πŸ”₯ Latest Updates +- 2025-12-03 Adds WebSocket API for streaming responses, enabling real-time communication for WeChat Mini Programs and other WebSocket clients. - 2025-11-19 Supports Gemini 3 Pro. - 2025-11-12 Supports data synchronization from Confluence, S3, Notion, Discord, Google Drive. - 2025-10-23 Supports MinerU & Docling as document parsing methods. @@ -132,6 +133,7 @@ releases! 🌟 - Configurable LLMs as well as embedding models. - Multiple recall paired with fused re-ranking. - Intuitive APIs for seamless integration with business. +- WebSocket support for real-time streaming responses (ideal for WeChat Mini Programs and mobile apps). ## πŸ”Ž System Architecture diff --git a/api/apps/websocket_app.py b/api/apps/websocket_app.py new file mode 100644 index 000000000..90cfd939f --- /dev/null +++ b/api/apps/websocket_app.py @@ -0,0 +1,710 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +WebSocket API for RAGFlow Streaming Responses + +This module provides WebSocket endpoints for real-time streaming of chat completions. +WebSocket support is essential for platforms like WeChat Mini Programs that require +persistent bidirectional connections for real-time communication. + +Key Features: +- Real-time bidirectional communication via WebSocket +- Support for multiple authentication methods (API Token, User Session) +- Streaming chat completions with incremental responses +- Error handling and connection management +- Compatible with WeChat Mini Programs and other WebSocket clients + +WebSocket Message Format: + Client -> Server (Request): + { + "type": "chat", # Message type (currently supports "chat") + "chat_id": "xxx", # Dialog/Chat ID + "session_id": "xxx", # Optional: Conversation session ID + "question": "Hello", # User's question/message + "stream": true, # Optional: Enable streaming (default: true) + "kb_ids": [] # Optional: Knowledge base IDs to query + } + + Server -> Client (Response): + { + "code": 0, # Status code (0=success, 500=error) + "message": "", # Error message (if any) + "data": { # Response data + "answer": "...", # Incremental answer text (for streaming) + "reference": {...}, # Source references + "id": "xxx", # Message ID + "session_id": "xxx" # Session ID + } + } + + Server -> Client (Completion): + { + "code": 0, + "message": "", + "data": true # Indicates completion of streaming + } + + Server -> Client (Error): + { + "code": 500, + "message": "Error description", + "data": { + "answer": "**ERROR**: Error details", + "reference": [] + } + } + +Connection Lifecycle: +1. Client initiates WebSocket connection with authentication +2. Server validates authentication (API token or user session) +3. Client sends chat message requests +4. Server streams response chunks back to client +5. Server sends completion marker when done +6. Connection remains open for subsequent messages +7. Either party can close the connection +""" + +import logging +import json +import os +from quart import websocket, request as quart_request +from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer + +from api.db.db_models import APIToken +from api.db.services.user_service import UserService +from api.db.services.dialog_service import DialogService +from api.db.services.conversation_service import ConversationService, completion +from common.constants import StatusEnum +from common import settings + + +# ----------------------------------------------------------------------------- +# Authentication Helper Functions +# ----------------------------------------------------------------------------- + +async def authenticate_websocket(): + """ + Authenticate WebSocket connection using multiple methods. + + This function attempts to authenticate the WebSocket connection using: + 1. API Token authentication (Bearer token in Authorization header) + 2. User session authentication (Session-based JWT token) + 3. Query parameter authentication (token passed as URL parameter) + + Authentication Methods: + - API Token: Used by external applications, bots, and integrations + - User Session: Used by web interface and logged-in users + - Query Parameter: Fallback for clients that can't send headers + + Returns: + tuple: (authenticated: bool, tenant_id: str|None, error_message: str|None) + + Examples: + # API Token authentication + ws://host/ws/chat?Authorization=Bearer ragflow-xxxxx + + # Query parameter authentication + ws://host/ws/chat?token=ragflow-xxxxx + """ + tenant_id = None + error_message = None + + # Method 1: Try API Token authentication from Authorization header + # This is the preferred method for SDK and API integrations + authorization = websocket.headers.get("Authorization", "") + + if authorization: + try: + # Parse Bearer token format: "Bearer " + authorization_parts = authorization.split() + + if len(authorization_parts) >= 2: + token = authorization_parts[1] + + # Query database for matching API token + objs = APIToken.query(token=token) + + if objs: + # Valid API token found, extract tenant ID + tenant_id = objs[0].tenant_id + logging.info(f"WebSocket authenticated via API token for tenant: {tenant_id}") + return True, tenant_id, None + else: + error_message = "Invalid API token" + logging.warning(f"WebSocket authentication failed: {error_message}") + else: + error_message = "Invalid Authorization header format. Expected: 'Bearer '" + logging.warning(f"WebSocket authentication failed: {error_message}") + + except Exception as e: + error_message = f"Error processing API token: {str(e)}" + logging.error(f"WebSocket authentication error: {error_message}") + + # Method 2: Try User Session authentication (JWT token) + # This is used by the web interface for logged-in users + try: + jwt = Serializer(secret_key=settings.SECRET_KEY) + + # Try to get authorization from header or query parameter + auth_token = websocket.headers.get("Authorization") or \ + websocket.args.get("authorization") or \ + websocket.args.get("token") + + if auth_token: + try: + # Decode JWT token to get access token + access_token = str(jwt.loads(auth_token)) + + # Validate access token format + if access_token and len(access_token.strip()) >= 32: + # Query user by access token + user = UserService.query( + access_token=access_token, + status=StatusEnum.VALID.value + ) + + if user and user[0]: + # Valid user session found + tenant_id = user[0].id + logging.info(f"WebSocket authenticated via user session for user: {user[0].email}") + return True, tenant_id, None + + except Exception as e: + # JWT decoding or validation failed + logging.debug(f"User session authentication failed: {str(e)}") + + except Exception as e: + logging.error(f"Error in user session authentication: {str(e)}") + + # Method 3: Try query parameter authentication + # Fallback for clients that cannot set custom headers + token_param = websocket.args.get("token") + if token_param: + try: + objs = APIToken.query(token=token_param) + if objs: + tenant_id = objs[0].tenant_id + logging.info(f"WebSocket authenticated via query parameter for tenant: {tenant_id}") + return True, tenant_id, None + except Exception as e: + logging.error(f"Query parameter authentication error: {str(e)}") + + # No valid authentication method succeeded + if not error_message: + error_message = "Authentication required. Please provide valid API token or user session." + + return False, None, error_message + + +async def send_error(error_message, code=500): + """ + Send error message to WebSocket client in standardized format. + + Args: + error_message (str): Human-readable error description + code (int): Error code (default: 500 for server errors) + + Error Response Format: + { + "code": 500, + "message": "Error description", + "data": { + "answer": "**ERROR**: Error details", + "reference": [] + } + } + """ + error_response = { + "code": code, + "message": error_message, + "data": { + "answer": f"**ERROR**: {error_message}", + "reference": [] + } + } + + await websocket.send(json.dumps(error_response, ensure_ascii=False)) + logging.error(f"WebSocket error sent: {error_message}") + + +async def send_message(data, code=0, message=""): + """ + Send message to WebSocket client in standardized format. + + Args: + data: Response data (can be dict, bool, or any JSON-serializable object) + code (int): Status code (0 for success) + message (str): Optional status message + + Success Response Format: + { + "code": 0, + "message": "", + "data": {...} + } + """ + response = { + "code": code, + "message": message, + "data": data + } + + await websocket.send(json.dumps(response, ensure_ascii=False)) + + +# ----------------------------------------------------------------------------- +# WebSocket Endpoint: Chat Completions +# ----------------------------------------------------------------------------- + +@manager.route("/ws/chat") # noqa: F821 +async def websocket_chat(): + """ + WebSocket endpoint for real-time chat completions with streaming responses. + + This endpoint provides a persistent WebSocket connection for interactive chat + sessions. It supports streaming responses, allowing clients to receive + incremental updates as the AI generates the response. + + Connection URL: + ws://host/v1/ws/chat + + Authentication: + - Authorization header: "Bearer " + - Query parameter: "?token=" + - User session JWT + + Message Flow: + 1. Client connects and authenticates + 2. Client sends chat request message + 3. Server streams response chunks + 4. Server sends completion marker + 5. Connection stays open for more messages + + Supported Features: + - Multi-turn conversations with session tracking + - Knowledge base integration for RAG + - Reference/citation tracking + - Error recovery and graceful degradation + + Example Client Code (JavaScript): + ```javascript + const ws = new WebSocket('ws://host/v1/ws/chat?token=YOUR_TOKEN'); + + ws.onopen = () => { + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'Hello, how are you?', + stream: true + })); + }; + + ws.onmessage = (event) => { + const response = JSON.parse(event.data); + if (response.data === true) { + console.log('Stream completed'); + } else { + console.log('Received:', response.data.answer); + } + }; + ``` + + Example Client Code (Python): + ```python + import websocket + import json + + def on_message(ws, message): + data = json.loads(message) + if data['data'] is True: + print('Stream completed') + else: + print('Received:', data['data']['answer']) + + ws = websocket.WebSocketApp( + 'ws://host/v1/ws/chat?token=YOUR_TOKEN', + on_message=on_message + ) + + ws.on_open = lambda ws: ws.send(json.dumps({ + 'type': 'chat', + 'chat_id': 'your-chat-id', + 'question': 'Hello!', + 'stream': True + })) + + ws.run_forever() + ``` + """ + # Step 1: Authenticate the WebSocket connection + # This ensures only authorized clients can access the chat service + authenticated, tenant_id, error_msg = await authenticate_websocket() + + if not authenticated: + # Authentication failed - send error and close connection + await send_error(error_msg, code=401) + await websocket.close(1008, error_msg) # 1008 = Policy Violation + return + + # Authentication successful - log connection + logging.info(f"WebSocket chat connection established for tenant: {tenant_id}") + + # Step 2: Connection loop - handle multiple messages over same connection + # WebSocket connections are persistent, allowing multiple request/response cycles + try: + # Keep connection open and process incoming messages + while True: + # Wait for message from client + # This is a blocking call that waits until client sends data + message = await websocket.receive() + + # Parse JSON message from client + try: + request_data = json.loads(message) + except json.JSONDecodeError as e: + # Invalid JSON format - send error but keep connection open + await send_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + # Extract message type (currently only 'chat' is supported) + message_type = request_data.get("type", "chat") + + # Step 3: Route message to appropriate handler based on type + if message_type == "chat": + # Handle chat completion request + await handle_chat_request(tenant_id, request_data) + else: + # Unknown message type - send error but keep connection open + await send_error(f"Unknown message type: {message_type}", code=400) + + except Exception as e: + # Unexpected error occurred - log and notify client + error_message = f"WebSocket error: {str(e)}" + logging.exception(error_message) + + try: + await send_error(error_message) + except: + # Failed to send error (connection may be closed) + pass + + # Close connection with error code + await websocket.close(1011, "Internal server error") # 1011 = Internal Error + + finally: + # Connection closed - cleanup and log + logging.info(f"WebSocket chat connection closed for tenant: {tenant_id}") + + +async def handle_chat_request(tenant_id, request_data): + """ + Handle chat completion request received via WebSocket. + + This function processes a chat request, validates parameters, retrieves + the dialog configuration, and streams the AI response back to the client. + + Args: + tenant_id (str): Authenticated tenant/user ID + request_data (dict): Parsed JSON request from client + + Required Request Fields: + - chat_id (str): Dialog/Chat ID to use for the conversation + - question (str): User's question or message + + Optional Request Fields: + - session_id (str): Existing conversation session ID (creates new if not provided) + - stream (bool): Enable streaming responses (default: True) + - kb_ids (list): Knowledge base IDs to include in retrieval + - doc_ids (str): Comma-separated document IDs to prioritize + - files (list): File IDs attached to this message + + Processing Steps: + 1. Validate required parameters + 2. Verify dialog ownership and permissions + 3. Create or retrieve conversation session + 4. Stream AI-generated response chunks + 5. Send completion marker + + Error Handling: + - Missing parameters: Returns 400 error + - Invalid dialog: Returns 404 error + - Permission denied: Returns 403 error + - Processing error: Returns 500 error + """ + try: + # Step 1: Extract and validate required parameters + chat_id = request_data.get("chat_id") + question = request_data.get("question", "") + session_id = request_data.get("session_id") + stream = request_data.get("stream", True) + + # Validate chat_id is provided + if not chat_id: + await send_error("Missing required parameter: chat_id", code=400) + return + + # Validate question is provided (empty questions are allowed for session initialization) + if question is None: + await send_error("Missing required parameter: question", code=400) + return + + # Step 2: Verify dialog exists and user has access + # Check if the dialog belongs to this tenant and is active + dialog_query = DialogService.query( + tenant_id=tenant_id, + id=chat_id, + status=StatusEnum.VALID.value + ) + + if not dialog_query: + # Dialog not found or user doesn't have permission + await send_error(f"Dialog not found or access denied: {chat_id}", code=404) + return + + # Step 3: Extract optional parameters for enhanced functionality + # These parameters customize the retrieval and generation process + additional_params = {} + + # Knowledge base filtering - limit search to specific KBs + if "kb_ids" in request_data: + additional_params["kb_ids"] = request_data["kb_ids"] + + # Document filtering - prioritize specific documents + if "doc_ids" in request_data: + additional_params["doc_ids"] = request_data["doc_ids"] + + # File attachments - include files uploaded with this message + if "files" in request_data: + additional_params["files"] = request_data["files"] + + # Pass through any other custom parameters + # This allows for future extensibility without code changes + for key, value in request_data.items(): + if key not in ["type", "chat_id", "question", "session_id", "stream"]: + if key not in additional_params: + additional_params[key] = value + + # Step 4: Process chat completion with streaming + if stream: + # Streaming mode: Send incremental response chunks + # This provides a better user experience with real-time feedback + + try: + # Call the completion service which yields response chunks + # The completion function handles session management, RAG retrieval, + # LLM generation, and response formatting + # Note: completion() is a synchronous generator, not async + for response_chunk in completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=True, + **additional_params + ): + # Parse the SSE-formatted response + # completion() returns "data:{json}\n\n" format for compatibility + if response_chunk.startswith("data:"): + # Extract JSON from SSE format + json_str = response_chunk[5:].strip() + + # Parse and forward to WebSocket client + try: + response_data = json.loads(json_str) + + # Send the chunk to WebSocket client + await websocket.send(json.dumps(response_data, ensure_ascii=False)) + + except json.JSONDecodeError: + # Malformed response chunk - log but continue + logging.warning(f"Failed to parse response chunk: {json_str}") + continue + + # Stream completed successfully + logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}") + + except Exception as e: + # Error during streaming - send error message + error_message = f"Error during chat completion: {str(e)}" + logging.exception(error_message) + await send_error(error_message) + + else: + # Non-streaming mode: Send complete response at once + # This is simpler but provides no incremental feedback + + try: + # Get the complete response (completion yields once for non-streaming) + response = None + for resp in completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=False, + **additional_params + ): + response = resp + break # Only one response in non-streaming mode + + # Send complete response + if response: + await send_message(response) + else: + await send_error("No response generated", code=500) + + logging.info(f"Chat completion completed (non-streaming) for chat_id: {chat_id}") + + except Exception as e: + # Error during generation - send error message + error_message = f"Error during chat completion: {str(e)}" + logging.exception(error_message) + await send_error(error_message) + + except Exception as e: + # Unexpected error in request handling + error_message = f"Error handling chat request: {str(e)}" + logging.exception(error_message) + await send_error(error_message) + + +# ----------------------------------------------------------------------------- +# WebSocket Endpoint: Agent Completions (Future Enhancement) +# ----------------------------------------------------------------------------- + +@manager.route("/ws/agent") # noqa: F821 +async def websocket_agent(): + """ + WebSocket endpoint for agent-based completions with streaming. + + This endpoint is similar to websocket_chat but designed for agent-based + interactions. Agents can have custom tools, workflows, and behaviors + beyond standard RAG chat. + + Note: This is a placeholder for future implementation. The authentication + and connection handling logic is the same as websocket_chat. + + Future Enhancements: + - Tool calling and function execution + - Multi-step agent reasoning + - Agent state management + - Custom agent workflows + """ + # Authenticate connection + authenticated, tenant_id, error_msg = await authenticate_websocket() + + if not authenticated: + await send_error(error_msg, code=401) + await websocket.close(1008, error_msg) + return + + logging.info(f"WebSocket agent connection established for tenant: {tenant_id}") + + # Connection loop + try: + while True: + message = await websocket.receive() + + try: + request_data = json.loads(message) + except json.JSONDecodeError as e: + await send_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + # Handle agent completion request + await handle_agent_request(tenant_id, request_data) + + except Exception as e: + error_message = f"WebSocket error: {str(e)}" + logging.exception(error_message) + + try: + await send_error(error_message) + except: + pass + + await websocket.close(1011, "Internal server error") + + finally: + logging.info(f"WebSocket agent connection closed for tenant: {tenant_id}") + + +async def handle_agent_request(tenant_id, request_data): + """ + Handle agent completion request received via WebSocket. + + This is a placeholder for future agent functionality. + + Args: + tenant_id (str): Authenticated tenant/user ID + request_data (dict): Parsed JSON request from client + """ + # TODO: Implement agent-specific logic + # For now, return a not-implemented error + await send_error("Agent completions not yet implemented", code=501) + + logging.info("Agent request received but not yet implemented") + + +# ----------------------------------------------------------------------------- +# WebSocket Health Check Endpoint +# ----------------------------------------------------------------------------- + +@manager.route("/ws/health") # noqa: F821 +async def websocket_health(): + """ + WebSocket health check endpoint. + + This endpoint allows clients to verify WebSocket connectivity + without authentication. Useful for monitoring and diagnostics. + + The server will echo back any messages received, allowing clients + to test round-trip latency and connection stability. + + Example Usage: + ```javascript + const ws = new WebSocket('ws://host/v1/ws/health'); + ws.onopen = () => ws.send('ping'); + ws.onmessage = (e) => console.log('Received:', e.data); + ``` + """ + logging.info("WebSocket health check connection established") + + try: + # Send initial health status + await websocket.send(json.dumps({ + "status": "healthy", + "message": "WebSocket connection established", + "version": "1.0" + })) + + # Echo messages back to client + while True: + message = await websocket.receive() + + # Echo the message back + await websocket.send(json.dumps({ + "echo": message, + "timestamp": str(logging.time.time()) + })) + + except Exception as e: + logging.info(f"WebSocket health check closed: {str(e)}") + + finally: + logging.info("WebSocket health check connection closed") + diff --git a/docs/guides/websocket_api.md b/docs/guides/websocket_api.md new file mode 100644 index 000000000..7447770d8 --- /dev/null +++ b/docs/guides/websocket_api.md @@ -0,0 +1,613 @@ +# WebSocket API for Streaming Responses + +## Overview + +RAGFlow now supports WebSocket connections for real-time streaming responses. This feature is particularly useful for platforms like **WeChat Mini Programs** that require persistent bidirectional connections for interactive chat experiences. + +## Why WebSocket? + +Traditional HTTP-based Server-Sent Events (SSE) work well for most web applications, but some platforms have specific requirements: + +- **WeChat Mini Programs** require WebSocket for real-time communication +- **Mobile apps** benefit from persistent connections with lower latency +- **Interactive applications** need bidirectional communication +- **Network efficiency** - reuse connections instead of creating new ones for each message + +## Connection URL + +``` +ws://your-ragflow-host/v1/ws/chat +wss://your-ragflow-host/v1/ws/chat (for SSL/TLS) +``` + +## Authentication + +WebSocket connections support multiple authentication methods: + +### 1. API Token (Recommended for Integrations) + +Pass your API token in the Authorization header or as a query parameter: + +**Header-based (preferred):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat', { + headers: { + 'Authorization': 'Bearer ragflow-your-api-token' + } +}); +``` + +**Query parameter (fallback for clients that can't set headers):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=ragflow-your-api-token'); +``` + +### 2. User Session (For Web Applications) + +If you're already logged in via the web interface, you can use your session JWT: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat', { + headers: { + 'Authorization': 'your-jwt-token' + } +}); +``` + +## Message Format + +### Client β†’ Server (Request) + +Send a JSON message to start a chat completion: + +```json +{ + "type": "chat", + "chat_id": "your-dialog-id", + "question": "What is RAGFlow?", + "stream": true, + "session_id": "optional-session-id", + "kb_ids": ["optional-kb-id"], + "doc_ids": "optional-doc-ids" +} +``` + +**Fields:** +- `type` (string, required): Message type, currently only `"chat"` is supported +- `chat_id` (string, required): Your dialog/chat ID +- `question` (string, required): User's question or message +- `stream` (boolean, optional): Enable streaming responses (default: `true`) +- `session_id` (string, optional): Conversation session ID. If not provided, a new session is created +- `kb_ids` (array, optional): Knowledge base IDs to query for RAG +- `doc_ids` (string, optional): Comma-separated document IDs to prioritize +- `files` (array, optional): File IDs attached to this message + +### Server β†’ Client (Response) + +The server sends multiple messages for a streaming response: + +**Streaming chunk:** +```json +{ + "code": 0, + "message": "", + "data": { + "answer": "RAGFlow is an open-source", + "reference": {}, + "id": "message-id", + "session_id": "session-id" + } +} +``` + +**Completion marker:** +```json +{ + "code": 0, + "message": "", + "data": true +} +``` + +**Error message:** +```json +{ + "code": 500, + "message": "Error description", + "data": { + "answer": "**ERROR**: Error details", + "reference": [] + } +} +``` + +## Example Clients + +### JavaScript (Web Browser / Node.js) + +```javascript +// Create WebSocket connection +const ws = new WebSocket('ws://localhost/v1/ws/chat?token=ragflow-your-token'); + +// Connection opened +ws.addEventListener('open', function (event) { + console.log('Connected to RAGFlow WebSocket'); + + // Send a chat message + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'What is artificial intelligence?', + stream: true + })); +}); + +// Listen for messages +ws.addEventListener('message', function (event) { + const response = JSON.parse(event.data); + + // Check for completion + if (response.data === true) { + console.log('Stream completed'); + return; + } + + // Check for errors + if (response.code !== 0) { + console.error('Error:', response.message); + return; + } + + // Display incremental answer + console.log('Received chunk:', response.data.answer); + + // You can append to UI here + // document.getElementById('answer').innerText += response.data.answer; +}); + +// Handle errors +ws.addEventListener('error', function (event) { + console.error('WebSocket error:', event); +}); + +// Handle connection close +ws.addEventListener('close', function (event) { + console.log('WebSocket closed:', event.code, event.reason); +}); +``` + +### WeChat Mini Program + +```javascript +// WeChat Mini Program WebSocket example +const app = getApp(); + +Page({ + data: { + answer: '', + socket: null + }, + + onLoad: function() { + // Connect to WebSocket + const socket = wx.connectSocket({ + url: 'wss://your-ragflow-host/v1/ws/chat?token=ragflow-your-token', + success: () => { + console.log('WebSocket connected'); + } + }); + + // Connection opened + socket.onOpen(() => { + console.log('WebSocket connection established'); + this.setData({ socket: socket }); + + // Send chat message + socket.send({ + data: JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'δ½ ε₯½οΌŒδ»€δΉˆζ˜―RAGFlow?', + stream: true + }) + }); + }); + + // Receive messages + socket.onMessage((res) => { + const response = JSON.parse(res.data); + + // Check for completion + if (response.data === true) { + console.log('Stream completed'); + return; + } + + // Check for errors + if (response.code !== 0) { + wx.showToast({ + title: response.message, + icon: 'none' + }); + return; + } + + // Append incremental answer + this.setData({ + answer: this.data.answer + response.data.answer + }); + }); + + // Handle errors + socket.onError((error) => { + console.error('WebSocket error:', error); + wx.showToast({ + title: 'Connection error', + icon: 'none' + }); + }); + + // Handle close + socket.onClose(() => { + console.log('WebSocket connection closed'); + }); + }, + + onUnload: function() { + // Close WebSocket when leaving page + if (this.data.socket) { + this.data.socket.close(); + } + } +}); +``` + +### Python + +```python +import websocket +import json +import threading + +class RAGFlowWebSocketClient: + def __init__(self, url, token): + self.url = f"{url}?token={token}" + self.ws = None + + def on_message(self, ws, message): + """Handle incoming messages""" + response = json.loads(message) + + # Check for completion + if response['data'] is True: + print('\nStream completed') + return + + # Check for errors + if response['code'] != 0: + print(f"Error: {response['message']}") + return + + # Print incremental answer + print(response['data']['answer'], end='', flush=True) + + def on_error(self, ws, error): + """Handle errors""" + print(f"Error: {error}") + + def on_close(self, ws, close_status_code, close_msg): + """Handle connection close""" + print(f"\nConnection closed: {close_status_code} - {close_msg}") + + def on_open(self, ws): + """Handle connection open""" + print("Connected to RAGFlow") + + # Send chat message in a separate thread + def send_message(): + message = { + 'type': 'chat', + 'chat_id': 'your-chat-id', + 'question': 'What is machine learning?', + 'stream': True + } + ws.send(json.dumps(message)) + + threading.Thread(target=send_message).start() + + def connect(self): + """Establish WebSocket connection""" + self.ws = websocket.WebSocketApp( + self.url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + + # Run forever (blocking) + self.ws.run_forever() + +# Usage +if __name__ == '__main__': + client = RAGFlowWebSocketClient( + url='ws://localhost/v1/ws/chat', + token='ragflow-your-api-token' + ) + client.connect() +``` + +### Go + +```go +package main + +import ( + "encoding/json" + "fmt" + "log" + "github.com/gorilla/websocket" +) + +type ChatRequest struct { + Type string `json:"type"` + ChatID string `json:"chat_id"` + Question string `json:"question"` + Stream bool `json:"stream"` +} + +type ChatResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data"` +} + +func main() { + // Connect to WebSocket + url := "ws://localhost/v1/ws/chat?token=ragflow-your-token" + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + log.Fatal("dial:", err) + } + defer conn.Close() + + // Send chat request + request := ChatRequest{ + Type: "chat", + ChatID: "your-chat-id", + Question: "What is deep learning?", + Stream: true, + } + + if err := conn.WriteJSON(request); err != nil { + log.Fatal("write:", err) + } + + // Read responses + for { + var response ChatResponse + if err := conn.ReadJSON(&response); err != nil { + log.Println("read:", err) + return + } + + // Check for completion + if data, ok := response.Data.(bool); ok && data { + fmt.Println("\nStream completed") + break + } + + // Check for errors + if response.Code != 0 { + log.Printf("Error: %s\n", response.Message) + break + } + + // Print incremental answer + if dataMap, ok := response.Data.(map[string]interface{}); ok { + if answer, ok := dataMap["answer"].(string); ok { + fmt.Print(answer) + } + } + } +} +``` + +## Connection Management + +### Persistent Connections + +WebSocket connections are persistent and can handle multiple request/response cycles without reconnecting: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=your-token'); + +ws.onopen = () => { + // Send first question + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'chat-id', + question: 'First question?' + })); + + // After receiving the complete response, you can send another question + // without reconnecting +}; + +let responseCount = 0; +ws.onmessage = (event) => { + const response = JSON.parse(event.data); + + if (response.data === true) { + responseCount++; + + // Send next question + if (responseCount === 1) { + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'chat-id', + session_id: 'same-session-id', // Continue conversation + question: 'Follow-up question?' + })); + } + } +}; +``` + +### Error Handling + +Always implement proper error handling: + +```javascript +ws.onerror = (error) => { + console.error('WebSocket error:', error); + // Implement reconnection logic if needed +}; + +ws.onclose = (event) => { + if (event.code !== 1000) { + // Abnormal closure - implement reconnection + console.log('Reconnecting in 3 seconds...'); + setTimeout(() => { + // Reconnect logic here + }, 3000); + } +}; +``` + +### Close Codes + +Common WebSocket close codes: + +- `1000` - Normal closure +- `1008` - Policy violation (authentication failed) +- `1011` - Internal server error +- `1006` - Abnormal closure (connection lost) + +## Session Management + +### Creating a New Session + +Don't provide a `session_id` in your first message: + +```json +{ + "type": "chat", + "chat_id": "your-chat-id", + "question": "First question" +} +``` + +The server will create a new session and return the `session_id` in the response. + +### Continuing a Session + +Use the `session_id` from previous responses: + +```json +{ + "type": "chat", + "chat_id": "your-chat-id", + "session_id": "returned-session-id", + "question": "Follow-up question" +} +``` + +## Health Check + +Test WebSocket connectivity without authentication: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/health'); + +ws.onopen = () => { + ws.send('ping'); +}; + +ws.onmessage = (event) => { + console.log('Health check:', JSON.parse(event.data)); +}; +``` + +## Troubleshooting + +### Connection Refused + +- Check if RAGFlow server is running +- Verify the correct host and port +- Ensure WebSocket support is enabled + +### Authentication Failed + +- Verify your API token is correct +- Check if the token has expired +- Ensure proper authorization format: `Bearer ` + +### No Response + +- Verify the `chat_id` exists and you have access +- Check if the dialog has knowledge bases configured +- Review server logs for errors + +### Connection Drops + +- Implement reconnection logic +- Use heartbeat/ping messages to keep connection alive +- Check network stability + +## Performance Tips + +1. **Reuse connections**: Don't create new WebSocket for each message +2. **Implement backoff**: Wait before reconnecting after errors +3. **Buffer messages**: Queue messages if connection is temporarily down +4. **Clean up**: Always close WebSocket when done +5. **Monitor latency**: Track round-trip times for optimization + +## Security Considerations + +1. **Always use WSS (WebSocket Secure)** in production +2. **Never expose API tokens** in client-side code +3. **Implement rate limiting** on client side +4. **Validate all inputs** before sending +5. **Handle sensitive data** according to your security policies + +## Migration from SSE + +If you're currently using Server-Sent Events (SSE), here's how to migrate: + +**SSE (Old):** +```javascript +const eventSource = new EventSource('/v1/conversation/completion'); +eventSource.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); +}; +``` + +**WebSocket (New):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=your-token'); +ws.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); +}; +ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'Your question' +})); +``` + +## Additional Resources + +- [WebSocket API Standard](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) +- [WeChat Mini Program WebSocket](https://developers.weixin.qq.com/miniprogram/dev/api/network/websocket/wx.connectSocket.html) +- [RAGFlow API Documentation](../references/api_reference.md) + +## Support + +For issues or questions: +- GitHub Issues: https://github.com/infiniflow/ragflow/issues +- Documentation: https://ragflow.io/docs +- Community: Join our Discord/Slack channel + diff --git a/example/websocket/README.md b/example/websocket/README.md new file mode 100644 index 000000000..7a503b701 --- /dev/null +++ b/example/websocket/README.md @@ -0,0 +1,203 @@ +# RAGFlow WebSocket Examples + +This directory contains example implementations for using RAGFlow's WebSocket API for real-time streaming responses. + +## πŸ“ Files + +- **`python_client.py`** - Python WebSocket client with interactive mode +- **`index.html`** - Web-based demo with interactive UI + +## πŸš€ Quick Start + +### Python Client + +#### Prerequisites + +```bash +pip install websocket-client +``` + +#### Single Question Mode + +```bash +python python_client.py \ + --url ws://localhost/v1/ws/chat \ + --token ragflow-your-api-token \ + --chat-id your-chat-id \ + --question "What is RAGFlow?" +``` + +#### Interactive Mode + +```bash +python python_client.py \ + --url ws://localhost/v1/ws/chat \ + --token ragflow-your-api-token \ + --chat-id your-chat-id \ + --interactive +``` + +#### Continue Existing Session + +```bash +python python_client.py \ + --url ws://localhost/v1/ws/chat \ + --token ragflow-your-api-token \ + --chat-id your-chat-id \ + --session-id existing-session-id \ + --question "Follow-up question?" +``` + +### Web Demo + +1. Open `index.html` in your web browser +2. Enter your RAGFlow server URL, API token, and chat ID +3. Click "Connect" +4. Start chatting! + +The web demo features: +- Real-time streaming responses +- Session persistence +- Error handling +- Auto-reconnection support +- Settings saved in localStorage + +## πŸ“– Usage Examples + +### Python Client Features + +**Interactive conversation:** +```bash +python python_client.py --url ws://localhost/v1/ws/chat \ + --token your-token \ + --chat-id your-chat-id \ + --interactive + +# Then type questions interactively +πŸ‘€ You: What is machine learning? +πŸ€– Answer: Machine learning is a subset of artificial intelligence... +βœ“ Stream completed + +πŸ‘€ You: Can you give examples? +πŸ€– Answer: Sure! Here are some examples... +``` + +**Debug mode:** +```bash +python python_client.py --url ws://localhost/v1/ws/chat \ + --token your-token \ + --chat-id your-chat-id \ + --question "Hello" \ + --debug +``` + +### Web Demo Features + +**Auto-save settings:** +The web demo automatically saves your connection settings to localStorage, so you don't need to enter them every time. + +**Session continuity:** +The demo maintains the session ID, allowing multi-turn conversations without reconnecting. + +**Visual feedback:** +- Connection status indicator +- Streaming animation +- Error messages +- Message timestamps + +## πŸ”§ Configuration + +### Environment Variables + +You can also use environment variables with the Python client: + +```bash +export RAGFLOW_WS_URL="ws://localhost/v1/ws/chat" +export RAGFLOW_API_TOKEN="ragflow-your-token" +export RAGFLOW_CHAT_ID="your-chat-id" + +python python_client.py --question "Hello" +``` + +### SSL/TLS + +For secure connections, use `wss://` instead of `ws://`: + +```bash +python python_client.py --url wss://your-ragflow-host/v1/ws/chat ... +``` + +## πŸ“š Documentation + +For complete documentation, see: +- [WebSocket API Guide](../../docs/guides/websocket_api.md) +- [RAGFlow API Documentation](https://ragflow.io/docs/api) + +## πŸ› Troubleshooting + +### Connection Refused + +**Problem:** `WebSocket error: Connection refused` + +**Solution:** +1. Verify RAGFlow server is running +2. Check the WebSocket URL is correct +3. Ensure no firewall is blocking the connection + +### Authentication Failed + +**Problem:** `Error 401: Authentication required` + +**Solution:** +1. Verify your API token is correct +2. Check token hasn't expired +3. Ensure proper token format: `ragflow-xxxxx` + +### Invalid Chat ID + +**Problem:** `Error 404: Dialog not found` + +**Solution:** +1. Verify the chat ID exists +2. Check you have access to the dialog +3. Ensure you're using the correct tenant + +### SSL Certificate Error + +**Problem:** Certificate verification failed with `wss://` + +**Solution:** + +For Python client, disable SSL verification (development only): +```python +# In websocket.WebSocketApp +ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) +``` + +For production, use valid SSL certificates. + +## 🎯 Best Practices + +1. **Reuse connections**: Don't create new WebSocket for each message +2. **Handle reconnection**: Implement exponential backoff for reconnection +3. **Validate inputs**: Check all parameters before sending +4. **Error handling**: Always handle connection errors gracefully +5. **Clean up**: Close WebSocket when done + +## πŸ“ License + +Copyright 2024 The InfiniFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0. + +## 🀝 Support + +For issues or questions: +- GitHub Issues: https://github.com/infiniflow/ragflow/issues +- Documentation: https://ragflow.io/docs +- Community: Join our Discord/Slack + +## 🌟 Contributing + +We welcome contributions! Please see our [Contributing Guide](../../docs/contribution/README.md) for details. + diff --git a/example/websocket/index.html b/example/websocket/index.html new file mode 100644 index 000000000..7bd22804c --- /dev/null +++ b/example/websocket/index.html @@ -0,0 +1,590 @@ + + + + + + + RAGFlow WebSocket Demo + + + +
+ +
+

πŸš€ RAGFlow WebSocket Demo

+

Real-time streaming chat with RAGFlow

+
+ + +
+

Connection Settings

+
+ + +
+
+ + +
+
+ + +
+ + Disconnected + +
+ + +
+
+
+ πŸ‘† Configure connection settings above and click Connect +
+
+
+ + +
+
+
+ + + + + diff --git a/example/websocket/python_client.py b/example/websocket/python_client.py new file mode 100644 index 000000000..b85412475 --- /dev/null +++ b/example/websocket/python_client.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python3 +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +RAGFlow WebSocket Client Example (Python) + +This example demonstrates how to connect to RAGFlow's WebSocket API +and stream chat responses in real-time. + +Requirements: + pip install websocket-client + +Usage: + python python_client.py --url ws://localhost/v1/ws/chat \ + --token your-api-token \ + --chat-id your-chat-id \ + --question "What is RAGFlow?" +""" + +import argparse +import json +import sys +import threading +import websocket + + +class RAGFlowWebSocketClient: + """ + WebSocket client for RAGFlow streaming chat completions. + + This client demonstrates: + - Connection establishment with authentication + - Sending chat requests + - Receiving and displaying streaming responses + - Error handling and reconnection + - Multi-turn conversations + """ + + def __init__(self, url, token, chat_id, debug=False): + """ + Initialize the WebSocket client. + + Args: + url (str): WebSocket URL (e.g., ws://localhost/v1/ws/chat) + token (str): API token for authentication + chat_id (str): Dialog/Chat ID to use + debug (bool): Enable debug output + """ + # Append token to URL for authentication + self.url = f"{url}?token={token}" + self.chat_id = chat_id + self.debug = debug + self.ws = None + self.session_id = None # Track session for multi-turn conversations + self.current_answer = "" # Accumulate streaming chunks + + def on_message(self, ws, message): + """ + Handle incoming WebSocket messages. + + This callback is invoked for each message received from the server. + Messages contain incremental response chunks or completion markers. + + Args: + ws: WebSocket connection object + message (str): JSON message from server + """ + try: + # Parse JSON response + response = json.loads(message) + + if self.debug: + print(f"\n[DEBUG] Received: {json.dumps(response, indent=2)}") + + # Check if this is a completion marker + if response.get('data') is True: + print("\n\nβœ“ Stream completed") + print("-" * 60) + return + + # Check for errors + if response.get('code', 0) != 0: + print(f"\nβœ— Error {response['code']}: {response.get('message', 'Unknown error')}") + return + + # Extract response data + data = response.get('data', {}) + + if isinstance(data, dict): + # Extract answer chunk + answer = data.get('answer', '') + + # Save session ID for multi-turn conversations + if 'session_id' in data and not self.session_id: + self.session_id = data['session_id'] + if self.debug: + print(f"\n[DEBUG] Session ID: {self.session_id}") + + # Display incremental answer + if answer: + print(answer, end='', flush=True) + self.current_answer += answer + + # Display references if available + reference = data.get('reference', {}) + if reference and reference.get('chunks'): + print(f"\n\nπŸ“š References: {len(reference['chunks'])} sources") + if self.debug: + for i, chunk in enumerate(reference['chunks'][:3], 1): + doc_name = chunk.get('doc_name', 'Unknown') + print(f" {i}. {doc_name}") + + except json.JSONDecodeError as e: + print(f"\nβœ— Failed to parse response: {e}") + except Exception as e: + print(f"\nβœ— Error handling message: {e}") + + def on_error(self, ws, error): + """ + Handle WebSocket errors. + + Args: + ws: WebSocket connection object + error: Error object or message + """ + print(f"\nβœ— WebSocket error: {error}") + + def on_close(self, ws, close_status_code, close_msg): + """ + Handle WebSocket connection close. + + Args: + ws: WebSocket connection object + close_status_code (int): Close status code + close_msg (str): Close message + """ + if close_status_code == 1000: + # Normal closure + print("\nβœ“ Connection closed normally") + else: + # Abnormal closure + print(f"\nβœ— Connection closed: {close_status_code} - {close_msg}") + + def on_open(self, ws): + """ + Handle WebSocket connection open. + + This callback is invoked when the connection is established. + It sends the initial chat message to start the conversation. + + Args: + ws: WebSocket connection object + """ + print("βœ“ Connected to RAGFlow") + print("-" * 60) + + def send_message(self, question, session_id=None): + """ + Send a chat message through the WebSocket. + + Args: + question (str): User's question or message + session_id (str, optional): Session ID for continuing a conversation + """ + if not self.ws: + print("βœ— Not connected") + return False + + # Construct chat request message + message = { + 'type': 'chat', + 'chat_id': self.chat_id, + 'question': question, + 'stream': True + } + + # Include session ID if continuing a conversation + if session_id: + message['session_id'] = session_id + + if self.debug: + print(f"\n[DEBUG] Sending: {json.dumps(message, indent=2)}") + + # Reset answer accumulator + self.current_answer = "" + + # Send message + try: + self.ws.send(json.dumps(message)) + print(f"\nπŸ’¬ Question: {question}\n") + print("πŸ€– Answer: ", end='', flush=True) + return True + except Exception as e: + print(f"\nβœ— Failed to send message: {e}") + return False + + def connect(self): + """ + Establish WebSocket connection. + + This creates the WebSocket connection and sets up event handlers. + The connection runs in the main thread (blocking). + """ + # Enable debug traces if requested + if self.debug: + websocket.enableTrace(True) + + # Create WebSocket app with event handlers + self.ws = websocket.WebSocketApp( + self.url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + + # Run forever (blocking call) + self.ws.run_forever() + + def close(self): + """Close the WebSocket connection.""" + if self.ws: + self.ws.close() + + +def interactive_mode(client): + """ + Run interactive mode for multi-turn conversations. + + This allows users to have ongoing conversations with the AI + by typing questions and receiving responses in real-time. + + Args: + client (RAGFlowWebSocketClient): WebSocket client instance + """ + print("\n" + "=" * 60) + print("Interactive Mode - Type 'quit' or 'exit' to end") + print("=" * 60) + + def connection_thread(): + """Run WebSocket connection in background thread.""" + client.connect() + + # Start connection in background thread + thread = threading.Thread(target=connection_thread, daemon=True) + thread.start() + + # Wait for connection to establish + import time + time.sleep(2) + + # Interactive loop + try: + while True: + # Get user input + question = input("\n\nπŸ‘€ You: ").strip() + + if not question: + continue + + if question.lower() in ['quit', 'exit', 'q']: + print("\nπŸ‘‹ Goodbye!") + break + + # Send question (continue session if available) + client.send_message(question, session_id=client.session_id) + + # Wait for response to complete + # In production, you'd use proper async/event handling + time.sleep(1) + + except KeyboardInterrupt: + print("\n\nπŸ‘‹ Goodbye!") + + finally: + client.close() + + +def main(): + """ + Main entry point for the WebSocket client example. + + Parses command-line arguments and runs the client in either + single-question or interactive mode. + """ + # Parse command-line arguments + parser = argparse.ArgumentParser( + description='RAGFlow WebSocket Client Example', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Single question + python python_client.py --url ws://localhost/v1/ws/chat \\ + --token your-token \\ + --chat-id your-chat-id \\ + --question "What is RAGFlow?" + + # Interactive mode + python python_client.py --url ws://localhost/v1/ws/chat \\ + --token your-token \\ + --chat-id your-chat-id \\ + --interactive + """ + ) + + parser.add_argument( + '--url', + required=True, + help='WebSocket URL (e.g., ws://localhost/v1/ws/chat)' + ) + + parser.add_argument( + '--token', + required=True, + help='API token for authentication' + ) + + parser.add_argument( + '--chat-id', + required=True, + help='Dialog/Chat ID to use' + ) + + parser.add_argument( + '--question', + help='Question to ask (single question mode)' + ) + + parser.add_argument( + '--session-id', + help='Session ID to continue existing conversation' + ) + + parser.add_argument( + '--interactive', + action='store_true', + help='Enable interactive mode for multi-turn conversations' + ) + + parser.add_argument( + '--debug', + action='store_true', + help='Enable debug output' + ) + + args = parser.parse_args() + + # Validate arguments + if not args.interactive and not args.question: + parser.error("Either --question or --interactive must be specified") + + # Create client + client = RAGFlowWebSocketClient( + url=args.url, + token=args.token, + chat_id=args.chat_id, + debug=args.debug + ) + + print("\n" + "=" * 60) + print("RAGFlow WebSocket Client") + print("=" * 60) + + # Run in appropriate mode + if args.interactive: + # Interactive mode - ongoing conversation + interactive_mode(client) + else: + # Single question mode + def send_after_connect(ws): + """Send question after connection is established.""" + client.on_open(ws) + client.send_message(args.question, session_id=args.session_id) + + # Override on_open to send question + client.on_open = send_after_connect + + # Connect and run (blocking) + try: + client.connect() + except KeyboardInterrupt: + print("\n\nπŸ‘‹ Interrupted") + finally: + client.close() + + +if __name__ == '__main__': + main() + From eaa38d6a7f927c5192e380f8e6d64e735d9b9ee4 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Wed, 3 Dec 2025 07:14:40 -0300 Subject: [PATCH 02/12] fix the CLI issue --- api/apps/websocket_app.py | 9 ++++----- example/websocket/python_client.py | 1 - 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/api/apps/websocket_app.py b/api/apps/websocket_app.py index 90cfd939f..72eaeb52e 100644 --- a/api/apps/websocket_app.py +++ b/api/apps/websocket_app.py @@ -80,14 +80,13 @@ Connection Lifecycle: import logging import json -import os -from quart import websocket, request as quart_request +from quart import websocket from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer from api.db.db_models import APIToken from api.db.services.user_service import UserService from api.db.services.dialog_service import DialogService -from api.db.services.conversation_service import ConversationService, completion +from api.db.services.conversation_service import completion from common.constants import StatusEnum from common import settings @@ -398,7 +397,7 @@ async def websocket_chat(): try: await send_error(error_message) - except: + except Exception: # Failed to send error (connection may be closed) pass @@ -634,7 +633,7 @@ async def websocket_agent(): try: await send_error(error_message) - except: + except Exception: pass await websocket.close(1011, "Internal server error") diff --git a/example/websocket/python_client.py b/example/websocket/python_client.py index b85412475..4254d1547 100644 --- a/example/websocket/python_client.py +++ b/example/websocket/python_client.py @@ -33,7 +33,6 @@ Usage: import argparse import json -import sys import threading import websocket From c6a7c4a296ba793505230ab5ddb89114cd371c4e Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Wed, 3 Dec 2025 08:16:41 -0300 Subject: [PATCH 03/12] Remove README.md --- example/websocket/README.md | 203 ------------------------------------ 1 file changed, 203 deletions(-) delete mode 100644 example/websocket/README.md diff --git a/example/websocket/README.md b/example/websocket/README.md deleted file mode 100644 index 7a503b701..000000000 --- a/example/websocket/README.md +++ /dev/null @@ -1,203 +0,0 @@ -# RAGFlow WebSocket Examples - -This directory contains example implementations for using RAGFlow's WebSocket API for real-time streaming responses. - -## πŸ“ Files - -- **`python_client.py`** - Python WebSocket client with interactive mode -- **`index.html`** - Web-based demo with interactive UI - -## πŸš€ Quick Start - -### Python Client - -#### Prerequisites - -```bash -pip install websocket-client -``` - -#### Single Question Mode - -```bash -python python_client.py \ - --url ws://localhost/v1/ws/chat \ - --token ragflow-your-api-token \ - --chat-id your-chat-id \ - --question "What is RAGFlow?" -``` - -#### Interactive Mode - -```bash -python python_client.py \ - --url ws://localhost/v1/ws/chat \ - --token ragflow-your-api-token \ - --chat-id your-chat-id \ - --interactive -``` - -#### Continue Existing Session - -```bash -python python_client.py \ - --url ws://localhost/v1/ws/chat \ - --token ragflow-your-api-token \ - --chat-id your-chat-id \ - --session-id existing-session-id \ - --question "Follow-up question?" -``` - -### Web Demo - -1. Open `index.html` in your web browser -2. Enter your RAGFlow server URL, API token, and chat ID -3. Click "Connect" -4. Start chatting! - -The web demo features: -- Real-time streaming responses -- Session persistence -- Error handling -- Auto-reconnection support -- Settings saved in localStorage - -## πŸ“– Usage Examples - -### Python Client Features - -**Interactive conversation:** -```bash -python python_client.py --url ws://localhost/v1/ws/chat \ - --token your-token \ - --chat-id your-chat-id \ - --interactive - -# Then type questions interactively -πŸ‘€ You: What is machine learning? -πŸ€– Answer: Machine learning is a subset of artificial intelligence... -βœ“ Stream completed - -πŸ‘€ You: Can you give examples? -πŸ€– Answer: Sure! Here are some examples... -``` - -**Debug mode:** -```bash -python python_client.py --url ws://localhost/v1/ws/chat \ - --token your-token \ - --chat-id your-chat-id \ - --question "Hello" \ - --debug -``` - -### Web Demo Features - -**Auto-save settings:** -The web demo automatically saves your connection settings to localStorage, so you don't need to enter them every time. - -**Session continuity:** -The demo maintains the session ID, allowing multi-turn conversations without reconnecting. - -**Visual feedback:** -- Connection status indicator -- Streaming animation -- Error messages -- Message timestamps - -## πŸ”§ Configuration - -### Environment Variables - -You can also use environment variables with the Python client: - -```bash -export RAGFLOW_WS_URL="ws://localhost/v1/ws/chat" -export RAGFLOW_API_TOKEN="ragflow-your-token" -export RAGFLOW_CHAT_ID="your-chat-id" - -python python_client.py --question "Hello" -``` - -### SSL/TLS - -For secure connections, use `wss://` instead of `ws://`: - -```bash -python python_client.py --url wss://your-ragflow-host/v1/ws/chat ... -``` - -## πŸ“š Documentation - -For complete documentation, see: -- [WebSocket API Guide](../../docs/guides/websocket_api.md) -- [RAGFlow API Documentation](https://ragflow.io/docs/api) - -## πŸ› Troubleshooting - -### Connection Refused - -**Problem:** `WebSocket error: Connection refused` - -**Solution:** -1. Verify RAGFlow server is running -2. Check the WebSocket URL is correct -3. Ensure no firewall is blocking the connection - -### Authentication Failed - -**Problem:** `Error 401: Authentication required` - -**Solution:** -1. Verify your API token is correct -2. Check token hasn't expired -3. Ensure proper token format: `ragflow-xxxxx` - -### Invalid Chat ID - -**Problem:** `Error 404: Dialog not found` - -**Solution:** -1. Verify the chat ID exists -2. Check you have access to the dialog -3. Ensure you're using the correct tenant - -### SSL Certificate Error - -**Problem:** Certificate verification failed with `wss://` - -**Solution:** - -For Python client, disable SSL verification (development only): -```python -# In websocket.WebSocketApp -ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) -``` - -For production, use valid SSL certificates. - -## 🎯 Best Practices - -1. **Reuse connections**: Don't create new WebSocket for each message -2. **Handle reconnection**: Implement exponential backoff for reconnection -3. **Validate inputs**: Check all parameters before sending -4. **Error handling**: Always handle connection errors gracefully -5. **Clean up**: Close WebSocket when done - -## πŸ“ License - -Copyright 2024 The InfiniFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0. - -## 🀝 Support - -For issues or questions: -- GitHub Issues: https://github.com/infiniflow/ragflow/issues -- Documentation: https://ragflow.io/docs -- Community: Join our Discord/Slack - -## 🌟 Contributing - -We welcome contributions! Please see our [Contributing Guide](../../docs/contribution/README.md) for details. - From 9ce780fefde2bc52c9190f47fcd51c5f4a8dce1b Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Fri, 5 Dec 2025 01:32:00 -0300 Subject: [PATCH 04/12] refactor: Move WebSocket API to SDK pattern following session.py - Moved websocket_app.py to api/apps/sdk/websocket.py - Follows same structure as session.py for SDK endpoints - Added ws_token_required decorator in api_utils.py (mirrors token_required) - WebSocket endpoints now use SDK pattern: * @manager.websocket('/chats//completions') * @manager.websocket('/agents//completions') - Removed old api/apps/websocket_app.py - Added websockets>=14.0 and pytest-asyncio>=0.24.0 to test dependencies Addresses reviewer feedback: websocket_app.py should mimic session.py in /api/sdk for third-party calls, with /agents//completions and /chats//completions endpoints similar to those in session.py --- api/apps/sdk/websocket.py | 248 +++++++++++++ api/apps/websocket_app.py | 709 -------------------------------------- api/utils/api_utils.py | 89 ++++- pyproject.toml | 2 + 4 files changed, 338 insertions(+), 710 deletions(-) create mode 100644 api/apps/sdk/websocket.py delete mode 100644 api/apps/websocket_app.py diff --git a/api/apps/sdk/websocket.py b/api/apps/sdk/websocket.py new file mode 100644 index 000000000..5faf11066 --- /dev/null +++ b/api/apps/sdk/websocket.py @@ -0,0 +1,248 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +WebSocket SDK API for RAGFlow Streaming Responses + +This module provides WebSocket endpoints following the SDK API pattern, +mirroring the structure of session.py for consistency. +""" + +import logging +import json +from quart import websocket + +from api.db.services.dialog_service import DialogService +from api.db.services.canvas_service import UserCanvasService +from api.db.services.conversation_service import completion as rag_completion +from api.db.services.canvas_service import completion as agent_completion +from api.utils.api_utils import ws_token_required +from common.constants import StatusEnum + + +async def send_ws_error(error_message, code=500): + """Send error message to WebSocket client.""" + error_response = { + "code": code, + "message": error_message, + "data": { + "answer": f"**ERROR**: {error_message}", + "reference": [] + } + } + await websocket.send(json.dumps(error_response, ensure_ascii=False)) + + +async def send_ws_message(data, code=0, message=""): + """Send message to WebSocket client.""" + response = { + "code": code, + "message": message, + "data": data + } + await websocket.send(json.dumps(response, ensure_ascii=False)) + + +@manager.websocket("/chats//completions") # noqa: F821 +@ws_token_required +async def chat_completions_ws(tenant_id, chat_id): + """ + WebSocket endpoint for streaming chat completions. + Follows the same pattern as the HTTP POST /chats//completions endpoint. + """ + # Verify chat ownership + if not DialogService.query(tenant_id=tenant_id, id=chat_id, status=StatusEnum.VALID.value): + await send_ws_error(f"You don't own the chat {chat_id}", code=404) + await websocket.close(1008) + return + + logging.info(f"WebSocket chat connection established for chat_id: {chat_id}, tenant: {tenant_id}") + + try: + while True: + message = await websocket.receive() + + try: + req = json.loads(message) + except json.JSONDecodeError as e: + await send_ws_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + question = req.get("question", "") + session_id = req.get("session_id") + stream = req.get("stream", True) + + if question is None: + await send_ws_error("Missing required parameter: question", code=400) + continue + + try: + if stream: + for response_chunk in rag_completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=True, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if response_chunk.startswith("data:"): + json_str = response_chunk[5:].strip() + try: + response_data = json.loads(json_str) + await websocket.send(json.dumps(response_data, ensure_ascii=False)) + except json.JSONDecodeError: + continue + + logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}") + else: + response = None + for resp in rag_completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=False, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + response = resp + break + + if response: + await send_ws_message(response) + else: + await send_ws_error("No response generated", code=500) + + except Exception as e: + logging.exception(f"Error during chat completion: {str(e)}") + await send_ws_error(str(e)) + + except Exception as e: + logging.exception(f"WebSocket error: {str(e)}") + try: + await send_ws_error(str(e)) + except Exception: + pass + await websocket.close(1011) + + finally: + logging.info(f"WebSocket chat connection closed for chat_id: {chat_id}") + + +@manager.websocket("/agents//completions") # noqa: F821 +@ws_token_required +async def agent_completions_ws(tenant_id, agent_id): + """ + WebSocket endpoint for streaming agent completions. + Follows the same pattern as the HTTP POST /agents//completions endpoint. + """ + # Verify agent ownership + if not UserCanvasService.query(user_id=tenant_id, id=agent_id): + await send_ws_error(f"You don't own the agent {agent_id}", code=404) + await websocket.close(1008) + return + + logging.info(f"WebSocket agent connection established for agent_id: {agent_id}, tenant: {tenant_id}") + + try: + while True: + message = await websocket.receive() + + try: + req = json.loads(message) + except json.JSONDecodeError as e: + await send_ws_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + question = req.get("question", "") + session_id = req.get("session_id") + stream = req.get("stream", True) + + if not question: + await send_ws_error("Missing required parameter: question", code=400) + continue + + try: + if stream: + async for response_chunk in agent_completion( + tenant_id=tenant_id, + agent_id=agent_id, + question=question, + session_id=session_id, + stream=True, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if isinstance(response_chunk, str) and response_chunk.startswith("data:"): + json_str = response_chunk[5:].strip() + try: + response_data = json.loads(json_str) + if response_data.get("event") in ["message", "message_end"]: + await websocket.send(json.dumps({ + "code": 0, + "message": "", + "data": response_data + }, ensure_ascii=False)) + except json.JSONDecodeError: + continue + + await send_ws_message(True) + logging.info(f"Agent completion streamed successfully for agent_id: {agent_id}") + else: + full_content = "" + reference = {} + final_ans = None + + async for response_chunk in agent_completion( + tenant_id=tenant_id, + agent_id=agent_id, + question=question, + session_id=session_id, + stream=False, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if isinstance(response_chunk, str) and response_chunk.startswith("data:"): + try: + ans = json.loads(response_chunk[5:]) + if ans["event"] == "message": + full_content += ans["data"]["content"] + if ans.get("data", {}).get("reference", None): + reference.update(ans["data"]["reference"]) + final_ans = ans + except Exception as e: + await send_ws_error(str(e)) + continue + + if final_ans: + final_ans["data"]["content"] = full_content + final_ans["data"]["reference"] = reference + await send_ws_message(final_ans) + else: + await send_ws_error("No response generated", code=500) + + except Exception as e: + logging.exception(f"Error during agent completion: {str(e)}") + await send_ws_error(str(e)) + + except Exception as e: + logging.exception(f"WebSocket error: {str(e)}") + try: + await send_ws_error(str(e)) + except Exception: + pass + await websocket.close(1011) + + finally: + logging.info(f"WebSocket agent connection closed for agent_id: {agent_id}") + diff --git a/api/apps/websocket_app.py b/api/apps/websocket_app.py deleted file mode 100644 index 72eaeb52e..000000000 --- a/api/apps/websocket_app.py +++ /dev/null @@ -1,709 +0,0 @@ -# -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -WebSocket API for RAGFlow Streaming Responses - -This module provides WebSocket endpoints for real-time streaming of chat completions. -WebSocket support is essential for platforms like WeChat Mini Programs that require -persistent bidirectional connections for real-time communication. - -Key Features: -- Real-time bidirectional communication via WebSocket -- Support for multiple authentication methods (API Token, User Session) -- Streaming chat completions with incremental responses -- Error handling and connection management -- Compatible with WeChat Mini Programs and other WebSocket clients - -WebSocket Message Format: - Client -> Server (Request): - { - "type": "chat", # Message type (currently supports "chat") - "chat_id": "xxx", # Dialog/Chat ID - "session_id": "xxx", # Optional: Conversation session ID - "question": "Hello", # User's question/message - "stream": true, # Optional: Enable streaming (default: true) - "kb_ids": [] # Optional: Knowledge base IDs to query - } - - Server -> Client (Response): - { - "code": 0, # Status code (0=success, 500=error) - "message": "", # Error message (if any) - "data": { # Response data - "answer": "...", # Incremental answer text (for streaming) - "reference": {...}, # Source references - "id": "xxx", # Message ID - "session_id": "xxx" # Session ID - } - } - - Server -> Client (Completion): - { - "code": 0, - "message": "", - "data": true # Indicates completion of streaming - } - - Server -> Client (Error): - { - "code": 500, - "message": "Error description", - "data": { - "answer": "**ERROR**: Error details", - "reference": [] - } - } - -Connection Lifecycle: -1. Client initiates WebSocket connection with authentication -2. Server validates authentication (API token or user session) -3. Client sends chat message requests -4. Server streams response chunks back to client -5. Server sends completion marker when done -6. Connection remains open for subsequent messages -7. Either party can close the connection -""" - -import logging -import json -from quart import websocket -from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer - -from api.db.db_models import APIToken -from api.db.services.user_service import UserService -from api.db.services.dialog_service import DialogService -from api.db.services.conversation_service import completion -from common.constants import StatusEnum -from common import settings - - -# ----------------------------------------------------------------------------- -# Authentication Helper Functions -# ----------------------------------------------------------------------------- - -async def authenticate_websocket(): - """ - Authenticate WebSocket connection using multiple methods. - - This function attempts to authenticate the WebSocket connection using: - 1. API Token authentication (Bearer token in Authorization header) - 2. User session authentication (Session-based JWT token) - 3. Query parameter authentication (token passed as URL parameter) - - Authentication Methods: - - API Token: Used by external applications, bots, and integrations - - User Session: Used by web interface and logged-in users - - Query Parameter: Fallback for clients that can't send headers - - Returns: - tuple: (authenticated: bool, tenant_id: str|None, error_message: str|None) - - Examples: - # API Token authentication - ws://host/ws/chat?Authorization=Bearer ragflow-xxxxx - - # Query parameter authentication - ws://host/ws/chat?token=ragflow-xxxxx - """ - tenant_id = None - error_message = None - - # Method 1: Try API Token authentication from Authorization header - # This is the preferred method for SDK and API integrations - authorization = websocket.headers.get("Authorization", "") - - if authorization: - try: - # Parse Bearer token format: "Bearer " - authorization_parts = authorization.split() - - if len(authorization_parts) >= 2: - token = authorization_parts[1] - - # Query database for matching API token - objs = APIToken.query(token=token) - - if objs: - # Valid API token found, extract tenant ID - tenant_id = objs[0].tenant_id - logging.info(f"WebSocket authenticated via API token for tenant: {tenant_id}") - return True, tenant_id, None - else: - error_message = "Invalid API token" - logging.warning(f"WebSocket authentication failed: {error_message}") - else: - error_message = "Invalid Authorization header format. Expected: 'Bearer '" - logging.warning(f"WebSocket authentication failed: {error_message}") - - except Exception as e: - error_message = f"Error processing API token: {str(e)}" - logging.error(f"WebSocket authentication error: {error_message}") - - # Method 2: Try User Session authentication (JWT token) - # This is used by the web interface for logged-in users - try: - jwt = Serializer(secret_key=settings.SECRET_KEY) - - # Try to get authorization from header or query parameter - auth_token = websocket.headers.get("Authorization") or \ - websocket.args.get("authorization") or \ - websocket.args.get("token") - - if auth_token: - try: - # Decode JWT token to get access token - access_token = str(jwt.loads(auth_token)) - - # Validate access token format - if access_token and len(access_token.strip()) >= 32: - # Query user by access token - user = UserService.query( - access_token=access_token, - status=StatusEnum.VALID.value - ) - - if user and user[0]: - # Valid user session found - tenant_id = user[0].id - logging.info(f"WebSocket authenticated via user session for user: {user[0].email}") - return True, tenant_id, None - - except Exception as e: - # JWT decoding or validation failed - logging.debug(f"User session authentication failed: {str(e)}") - - except Exception as e: - logging.error(f"Error in user session authentication: {str(e)}") - - # Method 3: Try query parameter authentication - # Fallback for clients that cannot set custom headers - token_param = websocket.args.get("token") - if token_param: - try: - objs = APIToken.query(token=token_param) - if objs: - tenant_id = objs[0].tenant_id - logging.info(f"WebSocket authenticated via query parameter for tenant: {tenant_id}") - return True, tenant_id, None - except Exception as e: - logging.error(f"Query parameter authentication error: {str(e)}") - - # No valid authentication method succeeded - if not error_message: - error_message = "Authentication required. Please provide valid API token or user session." - - return False, None, error_message - - -async def send_error(error_message, code=500): - """ - Send error message to WebSocket client in standardized format. - - Args: - error_message (str): Human-readable error description - code (int): Error code (default: 500 for server errors) - - Error Response Format: - { - "code": 500, - "message": "Error description", - "data": { - "answer": "**ERROR**: Error details", - "reference": [] - } - } - """ - error_response = { - "code": code, - "message": error_message, - "data": { - "answer": f"**ERROR**: {error_message}", - "reference": [] - } - } - - await websocket.send(json.dumps(error_response, ensure_ascii=False)) - logging.error(f"WebSocket error sent: {error_message}") - - -async def send_message(data, code=0, message=""): - """ - Send message to WebSocket client in standardized format. - - Args: - data: Response data (can be dict, bool, or any JSON-serializable object) - code (int): Status code (0 for success) - message (str): Optional status message - - Success Response Format: - { - "code": 0, - "message": "", - "data": {...} - } - """ - response = { - "code": code, - "message": message, - "data": data - } - - await websocket.send(json.dumps(response, ensure_ascii=False)) - - -# ----------------------------------------------------------------------------- -# WebSocket Endpoint: Chat Completions -# ----------------------------------------------------------------------------- - -@manager.route("/ws/chat") # noqa: F821 -async def websocket_chat(): - """ - WebSocket endpoint for real-time chat completions with streaming responses. - - This endpoint provides a persistent WebSocket connection for interactive chat - sessions. It supports streaming responses, allowing clients to receive - incremental updates as the AI generates the response. - - Connection URL: - ws://host/v1/ws/chat - - Authentication: - - Authorization header: "Bearer " - - Query parameter: "?token=" - - User session JWT - - Message Flow: - 1. Client connects and authenticates - 2. Client sends chat request message - 3. Server streams response chunks - 4. Server sends completion marker - 5. Connection stays open for more messages - - Supported Features: - - Multi-turn conversations with session tracking - - Knowledge base integration for RAG - - Reference/citation tracking - - Error recovery and graceful degradation - - Example Client Code (JavaScript): - ```javascript - const ws = new WebSocket('ws://host/v1/ws/chat?token=YOUR_TOKEN'); - - ws.onopen = () => { - ws.send(JSON.stringify({ - type: 'chat', - chat_id: 'your-chat-id', - question: 'Hello, how are you?', - stream: true - })); - }; - - ws.onmessage = (event) => { - const response = JSON.parse(event.data); - if (response.data === true) { - console.log('Stream completed'); - } else { - console.log('Received:', response.data.answer); - } - }; - ``` - - Example Client Code (Python): - ```python - import websocket - import json - - def on_message(ws, message): - data = json.loads(message) - if data['data'] is True: - print('Stream completed') - else: - print('Received:', data['data']['answer']) - - ws = websocket.WebSocketApp( - 'ws://host/v1/ws/chat?token=YOUR_TOKEN', - on_message=on_message - ) - - ws.on_open = lambda ws: ws.send(json.dumps({ - 'type': 'chat', - 'chat_id': 'your-chat-id', - 'question': 'Hello!', - 'stream': True - })) - - ws.run_forever() - ``` - """ - # Step 1: Authenticate the WebSocket connection - # This ensures only authorized clients can access the chat service - authenticated, tenant_id, error_msg = await authenticate_websocket() - - if not authenticated: - # Authentication failed - send error and close connection - await send_error(error_msg, code=401) - await websocket.close(1008, error_msg) # 1008 = Policy Violation - return - - # Authentication successful - log connection - logging.info(f"WebSocket chat connection established for tenant: {tenant_id}") - - # Step 2: Connection loop - handle multiple messages over same connection - # WebSocket connections are persistent, allowing multiple request/response cycles - try: - # Keep connection open and process incoming messages - while True: - # Wait for message from client - # This is a blocking call that waits until client sends data - message = await websocket.receive() - - # Parse JSON message from client - try: - request_data = json.loads(message) - except json.JSONDecodeError as e: - # Invalid JSON format - send error but keep connection open - await send_error(f"Invalid JSON format: {str(e)}", code=400) - continue - - # Extract message type (currently only 'chat' is supported) - message_type = request_data.get("type", "chat") - - # Step 3: Route message to appropriate handler based on type - if message_type == "chat": - # Handle chat completion request - await handle_chat_request(tenant_id, request_data) - else: - # Unknown message type - send error but keep connection open - await send_error(f"Unknown message type: {message_type}", code=400) - - except Exception as e: - # Unexpected error occurred - log and notify client - error_message = f"WebSocket error: {str(e)}" - logging.exception(error_message) - - try: - await send_error(error_message) - except Exception: - # Failed to send error (connection may be closed) - pass - - # Close connection with error code - await websocket.close(1011, "Internal server error") # 1011 = Internal Error - - finally: - # Connection closed - cleanup and log - logging.info(f"WebSocket chat connection closed for tenant: {tenant_id}") - - -async def handle_chat_request(tenant_id, request_data): - """ - Handle chat completion request received via WebSocket. - - This function processes a chat request, validates parameters, retrieves - the dialog configuration, and streams the AI response back to the client. - - Args: - tenant_id (str): Authenticated tenant/user ID - request_data (dict): Parsed JSON request from client - - Required Request Fields: - - chat_id (str): Dialog/Chat ID to use for the conversation - - question (str): User's question or message - - Optional Request Fields: - - session_id (str): Existing conversation session ID (creates new if not provided) - - stream (bool): Enable streaming responses (default: True) - - kb_ids (list): Knowledge base IDs to include in retrieval - - doc_ids (str): Comma-separated document IDs to prioritize - - files (list): File IDs attached to this message - - Processing Steps: - 1. Validate required parameters - 2. Verify dialog ownership and permissions - 3. Create or retrieve conversation session - 4. Stream AI-generated response chunks - 5. Send completion marker - - Error Handling: - - Missing parameters: Returns 400 error - - Invalid dialog: Returns 404 error - - Permission denied: Returns 403 error - - Processing error: Returns 500 error - """ - try: - # Step 1: Extract and validate required parameters - chat_id = request_data.get("chat_id") - question = request_data.get("question", "") - session_id = request_data.get("session_id") - stream = request_data.get("stream", True) - - # Validate chat_id is provided - if not chat_id: - await send_error("Missing required parameter: chat_id", code=400) - return - - # Validate question is provided (empty questions are allowed for session initialization) - if question is None: - await send_error("Missing required parameter: question", code=400) - return - - # Step 2: Verify dialog exists and user has access - # Check if the dialog belongs to this tenant and is active - dialog_query = DialogService.query( - tenant_id=tenant_id, - id=chat_id, - status=StatusEnum.VALID.value - ) - - if not dialog_query: - # Dialog not found or user doesn't have permission - await send_error(f"Dialog not found or access denied: {chat_id}", code=404) - return - - # Step 3: Extract optional parameters for enhanced functionality - # These parameters customize the retrieval and generation process - additional_params = {} - - # Knowledge base filtering - limit search to specific KBs - if "kb_ids" in request_data: - additional_params["kb_ids"] = request_data["kb_ids"] - - # Document filtering - prioritize specific documents - if "doc_ids" in request_data: - additional_params["doc_ids"] = request_data["doc_ids"] - - # File attachments - include files uploaded with this message - if "files" in request_data: - additional_params["files"] = request_data["files"] - - # Pass through any other custom parameters - # This allows for future extensibility without code changes - for key, value in request_data.items(): - if key not in ["type", "chat_id", "question", "session_id", "stream"]: - if key not in additional_params: - additional_params[key] = value - - # Step 4: Process chat completion with streaming - if stream: - # Streaming mode: Send incremental response chunks - # This provides a better user experience with real-time feedback - - try: - # Call the completion service which yields response chunks - # The completion function handles session management, RAG retrieval, - # LLM generation, and response formatting - # Note: completion() is a synchronous generator, not async - for response_chunk in completion( - tenant_id=tenant_id, - chat_id=chat_id, - question=question, - session_id=session_id, - stream=True, - **additional_params - ): - # Parse the SSE-formatted response - # completion() returns "data:{json}\n\n" format for compatibility - if response_chunk.startswith("data:"): - # Extract JSON from SSE format - json_str = response_chunk[5:].strip() - - # Parse and forward to WebSocket client - try: - response_data = json.loads(json_str) - - # Send the chunk to WebSocket client - await websocket.send(json.dumps(response_data, ensure_ascii=False)) - - except json.JSONDecodeError: - # Malformed response chunk - log but continue - logging.warning(f"Failed to parse response chunk: {json_str}") - continue - - # Stream completed successfully - logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}") - - except Exception as e: - # Error during streaming - send error message - error_message = f"Error during chat completion: {str(e)}" - logging.exception(error_message) - await send_error(error_message) - - else: - # Non-streaming mode: Send complete response at once - # This is simpler but provides no incremental feedback - - try: - # Get the complete response (completion yields once for non-streaming) - response = None - for resp in completion( - tenant_id=tenant_id, - chat_id=chat_id, - question=question, - session_id=session_id, - stream=False, - **additional_params - ): - response = resp - break # Only one response in non-streaming mode - - # Send complete response - if response: - await send_message(response) - else: - await send_error("No response generated", code=500) - - logging.info(f"Chat completion completed (non-streaming) for chat_id: {chat_id}") - - except Exception as e: - # Error during generation - send error message - error_message = f"Error during chat completion: {str(e)}" - logging.exception(error_message) - await send_error(error_message) - - except Exception as e: - # Unexpected error in request handling - error_message = f"Error handling chat request: {str(e)}" - logging.exception(error_message) - await send_error(error_message) - - -# ----------------------------------------------------------------------------- -# WebSocket Endpoint: Agent Completions (Future Enhancement) -# ----------------------------------------------------------------------------- - -@manager.route("/ws/agent") # noqa: F821 -async def websocket_agent(): - """ - WebSocket endpoint for agent-based completions with streaming. - - This endpoint is similar to websocket_chat but designed for agent-based - interactions. Agents can have custom tools, workflows, and behaviors - beyond standard RAG chat. - - Note: This is a placeholder for future implementation. The authentication - and connection handling logic is the same as websocket_chat. - - Future Enhancements: - - Tool calling and function execution - - Multi-step agent reasoning - - Agent state management - - Custom agent workflows - """ - # Authenticate connection - authenticated, tenant_id, error_msg = await authenticate_websocket() - - if not authenticated: - await send_error(error_msg, code=401) - await websocket.close(1008, error_msg) - return - - logging.info(f"WebSocket agent connection established for tenant: {tenant_id}") - - # Connection loop - try: - while True: - message = await websocket.receive() - - try: - request_data = json.loads(message) - except json.JSONDecodeError as e: - await send_error(f"Invalid JSON format: {str(e)}", code=400) - continue - - # Handle agent completion request - await handle_agent_request(tenant_id, request_data) - - except Exception as e: - error_message = f"WebSocket error: {str(e)}" - logging.exception(error_message) - - try: - await send_error(error_message) - except Exception: - pass - - await websocket.close(1011, "Internal server error") - - finally: - logging.info(f"WebSocket agent connection closed for tenant: {tenant_id}") - - -async def handle_agent_request(tenant_id, request_data): - """ - Handle agent completion request received via WebSocket. - - This is a placeholder for future agent functionality. - - Args: - tenant_id (str): Authenticated tenant/user ID - request_data (dict): Parsed JSON request from client - """ - # TODO: Implement agent-specific logic - # For now, return a not-implemented error - await send_error("Agent completions not yet implemented", code=501) - - logging.info("Agent request received but not yet implemented") - - -# ----------------------------------------------------------------------------- -# WebSocket Health Check Endpoint -# ----------------------------------------------------------------------------- - -@manager.route("/ws/health") # noqa: F821 -async def websocket_health(): - """ - WebSocket health check endpoint. - - This endpoint allows clients to verify WebSocket connectivity - without authentication. Useful for monitoring and diagnostics. - - The server will echo back any messages received, allowing clients - to test round-trip latency and connection stability. - - Example Usage: - ```javascript - const ws = new WebSocket('ws://host/v1/ws/health'); - ws.onopen = () => ws.send('ping'); - ws.onmessage = (e) => console.log('Received:', e.data); - ``` - """ - logging.info("WebSocket health check connection established") - - try: - # Send initial health status - await websocket.send(json.dumps({ - "status": "healthy", - "message": "WebSocket connection established", - "version": "1.0" - })) - - # Echo messages back to client - while True: - message = await websocket.receive() - - # Echo the message back - await websocket.send(json.dumps({ - "echo": message, - "timestamp": str(logging.time.time()) - })) - - except Exception as e: - logging.info(f"WebSocket health check closed: {str(e)}") - - finally: - logging.info("WebSocket health check connection closed") - diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 8f17e1de0..c8eb578aa 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -1,5 +1,5 @@ # -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -283,6 +283,93 @@ def token_required(func): return decorated_function +def ws_token_required(func): + """ + WebSocket authentication decorator for SDK endpoints. + Follows the same pattern as token_required but for WebSocket connections. + """ + from quart import websocket + from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer + from api.db.services.user_service import UserService + from common.constants import StatusEnum + + async def get_tenant_id_from_websocket(**kwargs): + """Extract tenant_id from WebSocket authentication.""" + # Method 1: Try API Token authentication from Authorization header + authorization = websocket.headers.get("Authorization", "") + + if authorization: + try: + authorization_parts = authorization.split() + if len(authorization_parts) >= 2: + token = authorization_parts[1] + objs = APIToken.query(token=token) + if objs: + kwargs["tenant_id"] = objs[0].tenant_id + logging.info(f"WebSocket authenticated via API token") + return True, kwargs + except Exception as e: + logging.error(f"WebSocket API token auth error: {str(e)}") + + # Method 2: Try User Session authentication (JWT) + try: + jwt = Serializer(secret_key=settings.SECRET_KEY) + auth_token = websocket.headers.get("Authorization") or \ + websocket.args.get("authorization") or \ + websocket.args.get("token") + + if auth_token: + try: + if auth_token.startswith("Bearer "): + auth_token = auth_token[7:] + access_token = str(jwt.loads(auth_token)) + if access_token and len(access_token.strip()) >= 32: + user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value) + if user and user[0]: + kwargs["tenant_id"] = user[0].id + logging.info(f"WebSocket authenticated via user session") + return True, kwargs + except Exception: + pass + except Exception: + pass + + # Method 3: Try query parameter authentication + token_param = websocket.args.get("token") + if token_param: + try: + objs = APIToken.query(token=token_param) + if objs: + kwargs["tenant_id"] = objs[0].tenant_id + logging.info(f"WebSocket authenticated via query parameter") + return True, kwargs + except Exception: + pass + + return False, "Authentication required. Please provide valid API token or user session." + + @wraps(func) + async def adecorated_function(*args, **kwargs): + """Async wrapper for WebSocket endpoint.""" + success, result = await get_tenant_id_from_websocket(**kwargs) + + if not success: + # Authentication failed - send error and close connection + error_response = { + "code": RetCode.AUTHENTICATION_ERROR, + "message": result, + "data": {"answer": f"**ERROR**: {result}", "reference": []} + } + await websocket.send(json.dumps(error_response, ensure_ascii=False)) + await websocket.close(1008, result) # 1008 = Policy Violation + return + + # Authentication successful - call the actual handler + return await func(*args, **result) + + return adecorated_function + + def get_result(code=RetCode.SUCCESS, message="", data=None, total=None): """ Standard API response format: diff --git a/pyproject.toml b/pyproject.toml index 9da1236c0..e63ff71b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,11 +163,13 @@ test = [ "openpyxl>=3.1.5", "pillow>=10.4.0", "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", "python-docx>=1.1.2", "python-pptx>=1.0.2", "reportlab>=4.4.1", "requests>=2.32.2", "requests-toolbelt>=1.0.0", + "websockets>=14.0", ] [[tool.uv.index]] From 081f7f7b74a748c89ee2a80dca3b63b156e01d8b Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Fri, 5 Dec 2025 01:32:32 -0300 Subject: [PATCH 05/12] refactor: Move WebSocket to SDK pattern with /ws/ prefix - Moved to api/apps/sdk/websocket.py following session.py pattern - Added ws_token_required decorator - WebSocket endpoints: /ws/chats//completions and /ws/agents//completions - Prevents routing conflicts with HTTP endpoints --- api/apps/sdk/websocket.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/apps/sdk/websocket.py b/api/apps/sdk/websocket.py index 5faf11066..1ba4c6b17 100644 --- a/api/apps/sdk/websocket.py +++ b/api/apps/sdk/websocket.py @@ -55,12 +55,13 @@ async def send_ws_message(data, code=0, message=""): await websocket.send(json.dumps(response, ensure_ascii=False)) -@manager.websocket("/chats//completions") # noqa: F821 +@manager.websocket("/ws/chats//completions") # noqa: F821 @ws_token_required async def chat_completions_ws(tenant_id, chat_id): """ WebSocket endpoint for streaming chat completions. Follows the same pattern as the HTTP POST /chats//completions endpoint. + Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints. """ # Verify chat ownership if not DialogService.query(tenant_id=tenant_id, id=chat_id, status=StatusEnum.VALID.value): @@ -141,12 +142,13 @@ async def chat_completions_ws(tenant_id, chat_id): logging.info(f"WebSocket chat connection closed for chat_id: {chat_id}") -@manager.websocket("/agents//completions") # noqa: F821 +@manager.websocket("/ws/agents//completions") # noqa: F821 @ws_token_required async def agent_completions_ws(tenant_id, agent_id): """ WebSocket endpoint for streaming agent completions. Follows the same pattern as the HTTP POST /agents//completions endpoint. + Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints. """ # Verify agent ownership if not UserCanvasService.query(user_id=tenant_id, id=agent_id): From 710e0096d8906286870c701fc40ce701da275988 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Fri, 5 Dec 2025 01:34:32 -0300 Subject: [PATCH 06/12] Updated date --- example/websocket/python_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/websocket/python_client.py b/example/websocket/python_client.py index 4254d1547..c5eb73348 100644 --- a/example/websocket/python_client.py +++ b/example/websocket/python_client.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 22ba48e89de349ab3111b9366f288a5ee19762fb Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Mon, 8 Dec 2025 16:12:25 -0300 Subject: [PATCH 07/12] fix: Remove f-string prefixes from logging statements without placeholders - Line 309, 330, 344 in api_utils.py - Fixes ruff F541 linting errors --- api/utils/api_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index c8eb578aa..35c1aee3a 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -306,7 +306,7 @@ def ws_token_required(func): objs = APIToken.query(token=token) if objs: kwargs["tenant_id"] = objs[0].tenant_id - logging.info(f"WebSocket authenticated via API token") + logging.info("WebSocket authenticated via API token") return True, kwargs except Exception as e: logging.error(f"WebSocket API token auth error: {str(e)}") @@ -327,7 +327,7 @@ def ws_token_required(func): user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value) if user and user[0]: kwargs["tenant_id"] = user[0].id - logging.info(f"WebSocket authenticated via user session") + logging.info("WebSocket authenticated via user session") return True, kwargs except Exception: pass @@ -341,7 +341,7 @@ def ws_token_required(func): objs = APIToken.query(token=token_param) if objs: kwargs["tenant_id"] = objs[0].tenant_id - logging.info(f"WebSocket authenticated via query parameter") + logging.info("WebSocket authenticated via query parameter") return True, kwargs except Exception: pass From 5ee639fa5aaddd2bd74a94f58edad21466c132c5 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Mon, 8 Dec 2025 16:42:55 -0300 Subject: [PATCH 08/12] Fix the test issue --- api/utils/api_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 35c1aee3a..e1c79787d 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -31,7 +31,6 @@ from quart import ( jsonify, request ) - from peewee import OperationalError from common.constants import ActiveEnum From 1e10287a03b00dde297792f33d1eeefaabaaf1c4 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Tue, 9 Dec 2025 06:07:19 -0300 Subject: [PATCH 09/12] Fix ImportError about completion --- api/apps/sdk/websocket.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/apps/sdk/websocket.py b/api/apps/sdk/websocket.py index 1ba4c6b17..54466d2d8 100644 --- a/api/apps/sdk/websocket.py +++ b/api/apps/sdk/websocket.py @@ -26,7 +26,7 @@ from quart import websocket from api.db.services.dialog_service import DialogService from api.db.services.canvas_service import UserCanvasService -from api.db.services.conversation_service import completion as rag_completion +from api.db.services.conversation_service import async_completion as rag_completion from api.db.services.canvas_service import completion as agent_completion from api.utils.api_utils import ws_token_required from common.constants import StatusEnum @@ -91,7 +91,7 @@ async def chat_completions_ws(tenant_id, chat_id): try: if stream: - for response_chunk in rag_completion( + async for response_chunk in rag_completion( tenant_id=tenant_id, chat_id=chat_id, question=question, @@ -110,7 +110,7 @@ async def chat_completions_ws(tenant_id, chat_id): logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}") else: response = None - for resp in rag_completion( + async for resp in rag_completion( tenant_id=tenant_id, chat_id=chat_id, question=question, From b192fb28a05144e50f3477b2cd04a856b922f7f7 Mon Sep 17 00:00:00 2001 From: SmartDever02 Date: Tue, 9 Dec 2025 07:08:46 -0300 Subject: [PATCH 10/12] Fix some issue on review --- example/websocket/index.html | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/example/websocket/index.html b/example/websocket/index.html index 7bd22804c..6ea1f7150 100644 --- a/example/websocket/index.html +++ b/example/websocket/index.html @@ -1,6 +1,6 @@