streaming text extract formats
This commit is contained in:
parent
0943ab5f2e
commit
28bda02351
1 changed files with 21 additions and 16 deletions
|
|
@ -23,7 +23,6 @@ async def _transform_stream_to_sse(raw_stream, chat_id_container: dict):
|
||||||
data: {"type": "done", "chat_id": "..."}
|
data: {"type": "done", "chat_id": "..."}
|
||||||
"""
|
"""
|
||||||
full_text = ""
|
full_text = ""
|
||||||
sources = []
|
|
||||||
chat_id = None
|
chat_id = None
|
||||||
|
|
||||||
async for chunk in raw_stream:
|
async for chunk in raw_stream:
|
||||||
|
|
@ -38,38 +37,44 @@ async def _transform_stream_to_sse(raw_stream, chat_id_container: dict):
|
||||||
|
|
||||||
chunk_data = json.loads(chunk_str)
|
chunk_data = json.loads(chunk_str)
|
||||||
|
|
||||||
# Extract text delta
|
# Extract text from various possible formats
|
||||||
delta_text = None
|
delta_text = ""
|
||||||
|
|
||||||
|
# Format 1: delta.content (OpenAI-style)
|
||||||
if "delta" in chunk_data:
|
if "delta" in chunk_data:
|
||||||
delta = chunk_data["delta"]
|
delta = chunk_data["delta"]
|
||||||
if isinstance(delta, dict):
|
if isinstance(delta, dict):
|
||||||
delta_text = delta.get("content") or delta.get("text") or ""
|
delta_text = delta.get("content", "") or delta.get("text", "")
|
||||||
elif isinstance(delta, str):
|
elif isinstance(delta, str):
|
||||||
delta_text = delta
|
delta_text = delta
|
||||||
|
|
||||||
if "output_text" in chunk_data and chunk_data["output_text"]:
|
# Format 2: output_text (Langflow-style)
|
||||||
|
if not delta_text and chunk_data.get("output_text"):
|
||||||
delta_text = chunk_data["output_text"]
|
delta_text = chunk_data["output_text"]
|
||||||
|
|
||||||
|
# Format 3: text field directly
|
||||||
|
if not delta_text and chunk_data.get("text"):
|
||||||
|
delta_text = chunk_data["text"]
|
||||||
|
|
||||||
|
# Format 4: content field directly
|
||||||
|
if not delta_text and chunk_data.get("content"):
|
||||||
|
delta_text = chunk_data["content"]
|
||||||
|
|
||||||
if delta_text:
|
if delta_text:
|
||||||
full_text += delta_text
|
full_text += delta_text
|
||||||
yield f"data: {json.dumps({'type': 'content', 'delta': delta_text})}\n\n"
|
yield f"data: {json.dumps({'type': 'content', 'delta': delta_text})}\n\n"
|
||||||
|
|
||||||
# Extract chat_id/response_id
|
# Extract chat_id/response_id from various fields
|
||||||
if "id" in chunk_data and chunk_data["id"]:
|
if not chat_id:
|
||||||
chat_id = chunk_data["id"]
|
chat_id = chunk_data.get("id") or chunk_data.get("response_id")
|
||||||
elif "response_id" in chunk_data and chunk_data["response_id"]:
|
|
||||||
chat_id = chunk_data["response_id"]
|
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
if chunk_str and not chunk_str.startswith("{"):
|
# Raw text without JSON wrapper
|
||||||
|
if chunk_str:
|
||||||
yield f"data: {json.dumps({'type': 'content', 'delta': chunk_str})}\n\n"
|
yield f"data: {json.dumps({'type': 'content', 'delta': chunk_str})}\n\n"
|
||||||
full_text += chunk_str
|
full_text += chunk_str
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Error processing stream chunk", error=str(e))
|
logger.warning("Error processing stream chunk", error=str(e), chunk=chunk_str[:100] if chunk_str else "")
|
||||||
continue
|
|
||||||
|
|
||||||
if sources:
|
|
||||||
yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"
|
|
||||||
|
|
||||||
yield f"data: {json.dumps({'type': 'done', 'chat_id': chat_id})}\n\n"
|
yield f"data: {json.dumps({'type': 'done', 'chat_id': chat_id})}\n\n"
|
||||||
chat_id_container["chat_id"] = chat_id
|
chat_id_container["chat_id"] = chat_id
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue