From 0fb2925c6ad5bb40c7206ec96bd70311a15b82fd Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 19 Nov 2025 21:38:17 +0800 Subject: [PATCH 1/2] Remove ascii_colors dependency and fix stream handling errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove ascii_colors.trace_exception calls • Add SafeStreamHandler for closed streams • Patch ascii_colors console handler • Prevent ValueError on stream close • Improve logging error handling --- lightrag/api/routers/ollama_api.py | 409 +++++++++++++-------------- lightrag/api/routers/query_routes.py | 5 - lightrag/utils.py | 65 ++++- 3 files changed, 258 insertions(+), 221 deletions(-) diff --git a/lightrag/api/routers/ollama_api.py b/lightrag/api/routers/ollama_api.py index f9353dda..259e7781 100644 --- a/lightrag/api/routers/ollama_api.py +++ b/lightrag/api/routers/ollama_api.py @@ -8,7 +8,6 @@ import re from enum import Enum from fastapi.responses import StreamingResponse import asyncio -from ascii_colors import trace_exception from lightrag import LightRAG, QueryParam from lightrag.utils import TiktokenTokenizer from lightrag.api.utils_api import get_combined_auth_dependency @@ -309,118 +308,113 @@ class OllamaAPI: ) async def stream_generator(): - try: - first_chunk_time = None + first_chunk_time = None + last_chunk_time = time.time_ns() + total_response = "" + + # Ensure response is an async generator + if isinstance(response, str): + # If it's a string, send in two parts + first_chunk_time = start_time last_chunk_time = time.time_ns() - total_response = "" + total_response = response - # Ensure response is an async generator - if isinstance(response, str): - # If it's a string, send in two parts - first_chunk_time = start_time - last_chunk_time = time.time_ns() - total_response = response + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "response": response, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" - data = { + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time + + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "response": "", + "done": True, + "done_reason": "stop", + "context": [], + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + else: + try: + async for chunk in response: + if chunk: + if first_chunk_time is None: + first_chunk_time = time.time_ns() + + last_chunk_time = time.time_ns() + + total_response += chunk + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "response": chunk, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + except (asyncio.CancelledError, Exception) as e: + error_msg = str(e) + if isinstance(e, asyncio.CancelledError): + error_msg = "Stream was cancelled by server" + else: + error_msg = f"Provider error: {error_msg}" + + logger.error(f"Stream error: {error_msg}") + + # Send error message to client + error_data = { "model": self.ollama_server_infos.LIGHTRAG_MODEL, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "response": response, + "response": f"\n\nError: {error_msg}", + "error": f"\n\nError: {error_msg}", "done": False, } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time - - data = { + # Send final message to close the stream + final_data = { "model": self.ollama_server_infos.LIGHTRAG_MODEL, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "response": "", "done": True, - "done_reason": "stop", - "context": [], - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - else: - try: - async for chunk in response: - if chunk: - if first_chunk_time is None: - first_chunk_time = time.time_ns() - - last_chunk_time = time.time_ns() - - total_response += chunk - data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "response": chunk, - "done": False, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - except (asyncio.CancelledError, Exception) as e: - error_msg = str(e) - if isinstance(e, asyncio.CancelledError): - error_msg = "Stream was cancelled by server" - else: - error_msg = f"Provider error: {error_msg}" - - logger.error(f"Stream error: {error_msg}") - - # Send error message to client - error_data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "response": f"\n\nError: {error_msg}", - "error": f"\n\nError: {error_msg}", - "done": False, - } - yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - - # Send final message to close the stream - final_data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "response": "", - "done": True, - } - yield f"{json.dumps(final_data, ensure_ascii=False)}\n" - return - if first_chunk_time is None: - first_chunk_time = start_time - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time - - data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "response": "", - "done": True, - "done_reason": "stop", - "context": [], - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" return + if first_chunk_time is None: + first_chunk_time = start_time + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time - except Exception as e: - trace_exception(e) - raise + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "response": "", + "done": True, + "done_reason": "stop", + "context": [], + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + return return StreamingResponse( stream_generator(), @@ -462,7 +456,6 @@ class OllamaAPI: "eval_duration": eval_time, } except Exception as e: - trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) @self.router.post( @@ -535,36 +528,98 @@ class OllamaAPI: ) async def stream_generator(): - try: - first_chunk_time = None + first_chunk_time = None + last_chunk_time = time.time_ns() + total_response = "" + + # Ensure response is an async generator + if isinstance(response, str): + # If it's a string, send in two parts + first_chunk_time = start_time last_chunk_time = time.time_ns() - total_response = "" + total_response = response - # Ensure response is an async generator - if isinstance(response, str): - # If it's a string, send in two parts - first_chunk_time = start_time - last_chunk_time = time.time_ns() - total_response = response + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": response, + "images": None, + }, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" - data = { + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time + + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": "", + "images": None, + }, + "done_reason": "stop", + "done": True, + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + else: + try: + async for chunk in response: + if chunk: + if first_chunk_time is None: + first_chunk_time = time.time_ns() + + last_chunk_time = time.time_ns() + + total_response += chunk + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": chunk, + "images": None, + }, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + except (asyncio.CancelledError, Exception) as e: + error_msg = str(e) + if isinstance(e, asyncio.CancelledError): + error_msg = "Stream was cancelled by server" + else: + error_msg = f"Provider error: {error_msg}" + + logger.error(f"Stream error: {error_msg}") + + # Send error message to client + error_data = { "model": self.ollama_server_infos.LIGHTRAG_MODEL, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "message": { "role": "assistant", - "content": response, + "content": f"\n\nError: {error_msg}", "images": None, }, + "error": f"\n\nError: {error_msg}", "done": False, } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time - - data = { + # Send final message to close the stream + final_data = { "model": self.ollama_server_infos.LIGHTRAG_MODEL, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "message": { @@ -572,103 +627,36 @@ class OllamaAPI: "content": "", "images": None, }, - "done_reason": "stop", "done": True, - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - else: - try: - async for chunk in response: - if chunk: - if first_chunk_time is None: - first_chunk_time = time.time_ns() + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" + return - last_chunk_time = time.time_ns() + if first_chunk_time is None: + first_chunk_time = start_time + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time - total_response += chunk - data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": chunk, - "images": None, - }, - "done": False, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - except (asyncio.CancelledError, Exception) as e: - error_msg = str(e) - if isinstance(e, asyncio.CancelledError): - error_msg = "Stream was cancelled by server" - else: - error_msg = f"Provider error: {error_msg}" - - logger.error(f"Stream error: {error_msg}") - - # Send error message to client - error_data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": f"\n\nError: {error_msg}", - "images": None, - }, - "error": f"\n\nError: {error_msg}", - "done": False, - } - yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - - # Send final message to close the stream - final_data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": "", - "images": None, - }, - "done": True, - } - yield f"{json.dumps(final_data, ensure_ascii=False)}\n" - return - - if first_chunk_time is None: - first_chunk_time = start_time - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time - - data = { - "model": self.ollama_server_infos.LIGHTRAG_MODEL, - "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": "", - "images": None, - }, - "done_reason": "stop", - "done": True, - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - - except Exception as e: - trace_exception(e) - raise + data = { + "model": self.ollama_server_infos.LIGHTRAG_MODEL, + "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": "", + "images": None, + }, + "done_reason": "stop", + "done": True, + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" return StreamingResponse( stream_generator(), @@ -730,5 +718,4 @@ class OllamaAPI: "eval_duration": eval_time, } except Exception as e: - trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py index d163ca5a..5dc9f084 100644 --- a/lightrag/api/routers/query_routes.py +++ b/lightrag/api/routers/query_routes.py @@ -11,8 +11,6 @@ from lightrag.base import QueryParam from lightrag.api.utils_api import get_combined_auth_dependency from pydantic import BaseModel, Field, field_validator -from ascii_colors import trace_exception - router = APIRouter(tags=["query"]) @@ -453,7 +451,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): else: return QueryResponse(response=response_content, references=None) except Exception as e: - trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -739,7 +736,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): }, ) except Exception as e: - trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -1156,7 +1152,6 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): data={}, ) except Exception as e: - trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) return router diff --git a/lightrag/utils.py b/lightrag/utils.py index 8c9b7776..6a7237c0 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1,6 +1,8 @@ from __future__ import annotations import weakref +import sys + import asyncio import html import csv @@ -40,6 +42,35 @@ from lightrag.constants import ( SOURCE_IDS_LIMIT_METHOD_FIFO, ) +# Precompile regex pattern for JSON sanitization (module-level, compiled once) +_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") + + +class SafeStreamHandler(logging.StreamHandler): + """StreamHandler that gracefully handles closed streams during shutdown. + + This handler prevents "ValueError: I/O operation on closed file" errors + that can occur when pytest or other test frameworks close stdout/stderr + before Python's logging cleanup runs. + """ + + def flush(self): + """Flush the stream, ignoring errors if the stream is closed.""" + try: + super().flush() + except (ValueError, OSError): + # Stream is closed or otherwise unavailable, silently ignore + pass + + def close(self): + """Close the handler, ignoring errors if the stream is already closed.""" + try: + super().close() + except (ValueError, OSError): + # Stream is closed or otherwise unavailable, silently ignore + pass + + # Initialize logger with basic configuration logger = logging.getLogger("lightrag") logger.propagate = False # prevent log message send to root logger @@ -47,7 +78,7 @@ logger.setLevel(logging.INFO) # Add console handler if no handlers exist if not logger.handlers: - console_handler = logging.StreamHandler() + console_handler = SafeStreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter("%(levelname)s: %(message)s") console_handler.setFormatter(formatter) @@ -56,8 +87,32 @@ if not logger.handlers: # Set httpx logging level to WARNING logging.getLogger("httpx").setLevel(logging.WARNING) -# Precompile regex pattern for JSON sanitization (module-level, compiled once) -_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") + +def _patch_ascii_colors_console_handler() -> None: + """Prevent ascii_colors from printing flush errors during interpreter exit.""" + + try: + from ascii_colors import ConsoleHandler + except ImportError: + return + + if getattr(ConsoleHandler, "_lightrag_patched", False): + return + + original_handle_error = ConsoleHandler.handle_error + + def _safe_handle_error(self, message: str) -> None: # type: ignore[override] + exc_type, _, _ = sys.exc_info() + if exc_type in (ValueError, OSError) and "close" in message.lower(): + return + original_handle_error(self, message) + + ConsoleHandler.handle_error = _safe_handle_error # type: ignore[assignment] + ConsoleHandler._lightrag_patched = True # type: ignore[attr-defined] + + +_patch_ascii_colors_console_handler() + # Global import for pypinyin with startup-time logging try: @@ -286,8 +341,8 @@ def setup_logger( logger_instance.handlers = [] # Clear existing handlers logger_instance.propagate = False - # Add console handler - console_handler = logging.StreamHandler() + # Add console handler with safe stream handling + console_handler = SafeStreamHandler() console_handler.setFormatter(simple_formatter) console_handler.setLevel(level) logger_instance.addHandler(console_handler) From b7de694f483341db868e59073ac88988fc776ea0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 19 Nov 2025 22:50:06 +0800 Subject: [PATCH 2/2] Add comprehensive error logging across API routes - Add error logs to Ollama API endpoints - Replace logging with unified logger - Log streaming query errors - Add data query error logging - Include stack traces for debugging --- lightrag/api/routers/ollama_api.py | 2 ++ lightrag/api/routers/query_routes.py | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lightrag/api/routers/ollama_api.py b/lightrag/api/routers/ollama_api.py index 259e7781..15c695ce 100644 --- a/lightrag/api/routers/ollama_api.py +++ b/lightrag/api/routers/ollama_api.py @@ -456,6 +456,7 @@ class OllamaAPI: "eval_duration": eval_time, } except Exception as e: + logger.error(f"Ollama generate error: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @self.router.post( @@ -718,4 +719,5 @@ class OllamaAPI: "eval_duration": eval_time, } except Exception as e: + logger.error(f"Ollama chat error: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py index 5dc9f084..99a799c1 100644 --- a/lightrag/api/routers/query_routes.py +++ b/lightrag/api/routers/query_routes.py @@ -3,12 +3,11 @@ This module contains all query-related routes for the LightRAG API. """ import json -import logging from typing import Any, Dict, List, Literal, Optional - from fastapi import APIRouter, Depends, HTTPException from lightrag.base import QueryParam from lightrag.api.utils_api import get_combined_auth_dependency +from lightrag.utils import logger from pydantic import BaseModel, Field, field_validator router = APIRouter(tags=["query"]) @@ -451,6 +450,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): else: return QueryResponse(response=response_content, references=None) except Exception as e: + logger.error(f"Error processing query: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -710,7 +710,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): if chunk: # Only send non-empty content yield f"{json.dumps({'response': chunk})}\n" except Exception as e: - logging.error(f"Streaming error: {str(e)}") + logger.error(f"Streaming error: {str(e)}") yield f"{json.dumps({'error': str(e)})}\n" else: # Non-streaming mode: send complete response in one message @@ -736,6 +736,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): }, ) except Exception as e: + logger.error(f"Error processing streaming query: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -1152,6 +1153,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): data={}, ) except Exception as e: + logger.error(f"Error processing data query: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) return router