From 850c504c55e2823c363f04fed7d5ee0dd402ea1e Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 22 Dec 2025 22:00:42 -0500 Subject: [PATCH 1/4] sdk chat endpoint fix --- src/api/v1/chat.py | 215 +++++++-------------------------------------- 1 file changed, 31 insertions(+), 184 deletions(-) diff --git a/src/api/v1/chat.py b/src/api/v1/chat.py index 4a8d3e3d..39d9b179 100644 --- a/src/api/v1/chat.py +++ b/src/api/v1/chat.py @@ -2,144 +2,36 @@ Public API v1 Chat endpoint. Provides chat functionality with streaming support and conversation history. -Uses API key authentication. +Uses API key authentication. Routes through Langflow endpoint. """ import json from starlette.requests import Request -from starlette.responses import JSONResponse, StreamingResponse +from starlette.responses import JSONResponse from utils.logging_config import get_logger -from auth_context import set_search_filters, set_search_limit, set_score_threshold, set_auth_context +from api.chat import langflow_endpoint logger = get_logger(__name__) -async def _transform_stream_to_sse(raw_stream, chat_id_container: dict): - """ - Transform the raw internal streaming format to clean SSE events. - - Yields SSE events in the format: - event: content - data: {"type": "content", "delta": "..."} - - event: sources - data: {"type": "sources", "sources": [...]} - - event: done - data: {"type": "done", "chat_id": "..."} - """ - full_text = "" - sources = [] - chat_id = None - - async for chunk in raw_stream: - try: - # Decode the chunk - if isinstance(chunk, bytes): - chunk_str = chunk.decode("utf-8").strip() - else: - chunk_str = str(chunk).strip() - - if not chunk_str: - continue - - # Parse the JSON chunk - chunk_data = json.loads(chunk_str) - - # Extract text delta - delta_text = None - if "delta" in chunk_data: - delta = chunk_data["delta"] - if isinstance(delta, dict): - delta_text = delta.get("content") or delta.get("text") or "" - elif isinstance(delta, str): - delta_text = delta - - if "output_text" in chunk_data and chunk_data["output_text"]: - delta_text = chunk_data["output_text"] - - # Yield content event if we have text - if delta_text: - full_text += delta_text - event = {"type": "content", "delta": delta_text} - yield f"event: content\ndata: {json.dumps(event)}\n\n" - - # Extract chat_id/response_id - if "id" in chunk_data and chunk_data["id"]: - chat_id = chunk_data["id"] - elif "response_id" in chunk_data and chunk_data["response_id"]: - chat_id = chunk_data["response_id"] - - # Extract sources from tool call results - if "item" in chunk_data and isinstance(chunk_data["item"], dict): - item = chunk_data["item"] - if item.get("type") in ("retrieval_call", "tool_call", "function_call"): - results = item.get("results", []) - if results: - for result in results: - if isinstance(result, dict): - source = { - "filename": result.get("filename", result.get("title", "Unknown")), - "text": result.get("text", result.get("content", "")), - "score": result.get("score", 0), - "page": result.get("page"), - } - sources.append(source) - - except json.JSONDecodeError: - # Not JSON, might be raw text - if chunk_str and not chunk_str.startswith("{"): - event = {"type": "content", "delta": chunk_str} - yield f"event: content\ndata: {json.dumps(event)}\n\n" - full_text += chunk_str - except Exception as e: - logger.warning("Error processing stream chunk", error=str(e)) - continue - - # Yield sources event if we have any - if sources: - event = {"type": "sources", "sources": sources} - yield f"event: sources\ndata: {json.dumps(event)}\n\n" - - # Yield done event with chat_id - event = {"type": "done", "chat_id": chat_id} - yield f"event: done\ndata: {json.dumps(event)}\n\n" - - # Store chat_id for caller - chat_id_container["chat_id"] = chat_id +def _transform_v1_request_to_internal(data: dict) -> dict: + """Transform v1 API request format to internal Langflow format.""" + return { + "prompt": data.get("message", ""), # v1 uses "message", internal uses "prompt" + "previous_response_id": data.get("chat_id"), # v1 uses "chat_id" + "stream": data.get("stream", False), + "filters": data.get("filters"), + "limit": data.get("limit", 10), + "scoreThreshold": data.get("score_threshold", 0), # v1 uses snake_case + "filter_id": data.get("filter_id"), + } async def chat_create_endpoint(request: Request, chat_service, session_manager): """ - Send a chat message. + Send a chat message. Routes to internal Langflow endpoint. - POST /v1/chat - - Request body: - { - "message": "What is RAG?", - "stream": false, // optional, default false - "chat_id": "...", // optional, to continue conversation - "filters": {...}, // optional - "limit": 10, // optional - "score_threshold": 0.5 // optional - } - - Non-streaming response: - { - "response": "RAG stands for...", - "chat_id": "chat_xyz789", - "sources": [...] - } - - Streaming response (SSE): - event: content - data: {"type": "content", "delta": "RAG stands for"} - - event: sources - data: {"type": "sources", "sources": [...]} - - event: done - data: {"type": "done", "chat_id": "chat_xyz789"} + POST /v1/chat - see internal /langflow endpoint for full documentation. + Transforms v1 format (message, chat_id, score_threshold) to internal format. """ try: data = await request.json() @@ -156,65 +48,20 @@ async def chat_create_endpoint(request: Request, chat_service, session_manager): status_code=400, ) - stream = data.get("stream", False) - chat_id = data.get("chat_id") # For conversation continuation - filters = data.get("filters") - limit = data.get("limit", 10) - score_threshold = data.get("score_threshold", 0) + # Transform v1 request to internal format + internal_data = _transform_v1_request_to_internal(data) - user = request.state.user - user_id = user.user_id + # Create a new request with transformed body for the internal endpoint + body = json.dumps(internal_data).encode() - # Note: API key auth doesn't have JWT, so we pass None - jwt_token = None + async def receive(): + return {"type": "http.request", "body": body} - # Set context variables for search tool - if filters: - set_search_filters(filters) - set_search_limit(limit) - set_score_threshold(score_threshold) - set_auth_context(user_id, jwt_token) + internal_request = Request(request.scope, receive) + internal_request.state = request.state # Copy state for auth - if stream: - # Streaming response - raw_stream = await chat_service.chat( - prompt=message, - user_id=user_id, - jwt_token=jwt_token, - previous_response_id=chat_id, - stream=True, - ) - - chat_id_container = {} - - return StreamingResponse( - _transform_stream_to_sse(raw_stream, chat_id_container), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no", - }, - ) - else: - # Non-streaming response - result = await chat_service.chat( - prompt=message, - user_id=user_id, - jwt_token=jwt_token, - previous_response_id=chat_id, - stream=False, - ) - - # Transform response to public API format - # Internal format: {"response": "...", "response_id": "..."} - response_data = { - "response": result.get("response", ""), - "chat_id": result.get("response_id"), - "sources": result.get("sources", []), - } - - return JSONResponse(response_data) + # Call internal Langflow endpoint + return await langflow_endpoint(internal_request, chat_service, session_manager) async def chat_list_endpoint(request: Request, chat_service, session_manager): @@ -240,8 +87,8 @@ async def chat_list_endpoint(request: Request, chat_service, session_manager): user_id = user.user_id try: - # Get chat history - history = await chat_service.get_chat_history(user_id) + # Get Langflow chat history (since v1 routes through Langflow) + history = await chat_service.get_langflow_history(user_id) # Transform to public API format conversations = [] @@ -293,8 +140,8 @@ async def chat_get_endpoint(request: Request, chat_service, session_manager): ) try: - # Get chat history and find the specific conversation - history = await chat_service.get_chat_history(user_id) + # Get Langflow chat history and find the specific conversation + history = await chat_service.get_langflow_history(user_id) conversation = None for conv in history.get("conversations", []): From 242ac4c7f3515e2b9733f06a795cf6cdcc5778aa Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 22 Dec 2025 22:21:55 -0500 Subject: [PATCH 2/4] set state fix --- src/api/v1/chat.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/api/v1/chat.py b/src/api/v1/chat.py index 39d9b179..43f3ba25 100644 --- a/src/api/v1/chat.py +++ b/src/api/v1/chat.py @@ -58,7 +58,10 @@ async def chat_create_endpoint(request: Request, chat_service, session_manager): return {"type": "http.request", "body": body} internal_request = Request(request.scope, receive) - internal_request.state = request.state # Copy state for auth + + # Copy state attributes individually (state property has no setter) + internal_request.state.user = request.state.user + internal_request.state.jwt_token = getattr(request.state, "jwt_token", None) # Call internal Langflow endpoint return await langflow_endpoint(internal_request, chat_service, session_manager) From 0943ab5f2e5d8154cbba802aa0354b778d4c5843 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 22 Dec 2025 22:47:18 -0500 Subject: [PATCH 3/4] streaming fix --- src/api/v1/chat.py | 157 +++++++++++++++++++++++++++++++++------------ 1 file changed, 115 insertions(+), 42 deletions(-) diff --git a/src/api/v1/chat.py b/src/api/v1/chat.py index 43f3ba25..418f08ee 100644 --- a/src/api/v1/chat.py +++ b/src/api/v1/chat.py @@ -2,69 +2,142 @@ Public API v1 Chat endpoint. Provides chat functionality with streaming support and conversation history. -Uses API key authentication. Routes through Langflow endpoint. +Uses API key authentication. Routes through Langflow (chat_service.langflow_chat). """ import json from starlette.requests import Request -from starlette.responses import JSONResponse +from starlette.responses import JSONResponse, StreamingResponse from utils.logging_config import get_logger -from api.chat import langflow_endpoint +from auth_context import set_search_filters, set_search_limit, set_score_threshold, set_auth_context logger = get_logger(__name__) -def _transform_v1_request_to_internal(data: dict) -> dict: - """Transform v1 API request format to internal Langflow format.""" - return { - "prompt": data.get("message", ""), # v1 uses "message", internal uses "prompt" - "previous_response_id": data.get("chat_id"), # v1 uses "chat_id" - "stream": data.get("stream", False), - "filters": data.get("filters"), - "limit": data.get("limit", 10), - "scoreThreshold": data.get("score_threshold", 0), # v1 uses snake_case - "filter_id": data.get("filter_id"), - } +async def _transform_stream_to_sse(raw_stream, chat_id_container: dict): + """ + Transform the raw Langflow streaming format to clean SSE events for v1 API. + + Yields SSE events in the format: + data: {"type": "content", "delta": "..."} + data: {"type": "sources", "sources": [...]} + data: {"type": "done", "chat_id": "..."} + """ + full_text = "" + sources = [] + chat_id = None + + async for chunk in raw_stream: + try: + if isinstance(chunk, bytes): + chunk_str = chunk.decode("utf-8").strip() + else: + chunk_str = str(chunk).strip() + + if not chunk_str: + continue + + chunk_data = json.loads(chunk_str) + + # Extract text delta + delta_text = None + if "delta" in chunk_data: + delta = chunk_data["delta"] + if isinstance(delta, dict): + delta_text = delta.get("content") or delta.get("text") or "" + elif isinstance(delta, str): + delta_text = delta + + if "output_text" in chunk_data and chunk_data["output_text"]: + delta_text = chunk_data["output_text"] + + if delta_text: + full_text += delta_text + yield f"data: {json.dumps({'type': 'content', 'delta': delta_text})}\n\n" + + # Extract chat_id/response_id + if "id" in chunk_data and chunk_data["id"]: + chat_id = chunk_data["id"] + elif "response_id" in chunk_data and chunk_data["response_id"]: + chat_id = chunk_data["response_id"] + + except json.JSONDecodeError: + if chunk_str and not chunk_str.startswith("{"): + yield f"data: {json.dumps({'type': 'content', 'delta': chunk_str})}\n\n" + full_text += chunk_str + except Exception as e: + logger.warning("Error processing stream chunk", error=str(e)) + continue + + if sources: + yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n" + + yield f"data: {json.dumps({'type': 'done', 'chat_id': chat_id})}\n\n" + chat_id_container["chat_id"] = chat_id async def chat_create_endpoint(request: Request, chat_service, session_manager): """ - Send a chat message. Routes to internal Langflow endpoint. + Send a chat message via Langflow. - POST /v1/chat - see internal /langflow endpoint for full documentation. - Transforms v1 format (message, chat_id, score_threshold) to internal format. + POST /v1/chat """ try: data = await request.json() except Exception: - return JSONResponse( - {"error": "Invalid JSON in request body"}, - status_code=400, - ) + return JSONResponse({"error": "Invalid JSON in request body"}, status_code=400) message = data.get("message", "").strip() if not message: - return JSONResponse( - {"error": "Message is required"}, - status_code=400, + return JSONResponse({"error": "Message is required"}, status_code=400) + + stream = data.get("stream", False) + chat_id = data.get("chat_id") + filters = data.get("filters") + limit = data.get("limit", 10) + score_threshold = data.get("score_threshold", 0) + filter_id = data.get("filter_id") + + user = request.state.user + user_id = user.user_id + jwt_token = session_manager.get_effective_jwt_token(user_id, None) + + # Set context variables for search tool + if filters: + set_search_filters(filters) + set_search_limit(limit) + set_score_threshold(score_threshold) + set_auth_context(user_id, jwt_token) + + if stream: + raw_stream = await chat_service.langflow_chat( + prompt=message, + user_id=user_id, + jwt_token=jwt_token, + previous_response_id=chat_id, + stream=True, + filter_id=filter_id, ) - - # Transform v1 request to internal format - internal_data = _transform_v1_request_to_internal(data) - - # Create a new request with transformed body for the internal endpoint - body = json.dumps(internal_data).encode() - - async def receive(): - return {"type": "http.request", "body": body} - - internal_request = Request(request.scope, receive) - - # Copy state attributes individually (state property has no setter) - internal_request.state.user = request.state.user - internal_request.state.jwt_token = getattr(request.state, "jwt_token", None) - - # Call internal Langflow endpoint - return await langflow_endpoint(internal_request, chat_service, session_manager) + chat_id_container = {} + return StreamingResponse( + _transform_stream_to_sse(raw_stream, chat_id_container), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, + ) + else: + result = await chat_service.langflow_chat( + prompt=message, + user_id=user_id, + jwt_token=jwt_token, + previous_response_id=chat_id, + stream=False, + filter_id=filter_id, + ) + # Transform response_id to chat_id for v1 API format + return JSONResponse({ + "response": result.get("response", ""), + "chat_id": result.get("response_id"), + "sources": result.get("sources", []), + }) async def chat_list_endpoint(request: Request, chat_service, session_manager): From 28bda02351378bb20a9ee8378c25eb6c046cd0a5 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 22 Dec 2025 22:53:36 -0500 Subject: [PATCH 4/4] streaming text extract formats --- src/api/v1/chat.py | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/api/v1/chat.py b/src/api/v1/chat.py index 418f08ee..ecadba16 100644 --- a/src/api/v1/chat.py +++ b/src/api/v1/chat.py @@ -23,7 +23,6 @@ async def _transform_stream_to_sse(raw_stream, chat_id_container: dict): data: {"type": "done", "chat_id": "..."} """ full_text = "" - sources = [] chat_id = None async for chunk in raw_stream: @@ -38,38 +37,44 @@ async def _transform_stream_to_sse(raw_stream, chat_id_container: dict): chunk_data = json.loads(chunk_str) - # Extract text delta - delta_text = None + # Extract text from various possible formats + delta_text = "" + + # Format 1: delta.content (OpenAI-style) if "delta" in chunk_data: delta = chunk_data["delta"] if isinstance(delta, dict): - delta_text = delta.get("content") or delta.get("text") or "" + delta_text = delta.get("content", "") or delta.get("text", "") elif isinstance(delta, str): delta_text = delta - if "output_text" in chunk_data and chunk_data["output_text"]: + # Format 2: output_text (Langflow-style) + if not delta_text and chunk_data.get("output_text"): delta_text = chunk_data["output_text"] + # Format 3: text field directly + if not delta_text and chunk_data.get("text"): + delta_text = chunk_data["text"] + + # Format 4: content field directly + if not delta_text and chunk_data.get("content"): + delta_text = chunk_data["content"] + if delta_text: full_text += delta_text yield f"data: {json.dumps({'type': 'content', 'delta': delta_text})}\n\n" - # Extract chat_id/response_id - if "id" in chunk_data and chunk_data["id"]: - chat_id = chunk_data["id"] - elif "response_id" in chunk_data and chunk_data["response_id"]: - chat_id = chunk_data["response_id"] + # Extract chat_id/response_id from various fields + if not chat_id: + chat_id = chunk_data.get("id") or chunk_data.get("response_id") except json.JSONDecodeError: - if chunk_str and not chunk_str.startswith("{"): + # Raw text without JSON wrapper + if chunk_str: yield f"data: {json.dumps({'type': 'content', 'delta': chunk_str})}\n\n" full_text += chunk_str except Exception as e: - logger.warning("Error processing stream chunk", error=str(e)) - continue - - if sources: - yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n" + logger.warning("Error processing stream chunk", error=str(e), chunk=chunk_str[:100] if chunk_str else "") yield f"data: {json.dumps({'type': 'done', 'chat_id': chat_id})}\n\n" chat_id_container["chat_id"] = chat_id