Merge pull request #2390 from danielaskdd/fix-pytest-logging-error

Fix: Remove redundant exception logging to eliminate pytest shutdown errors
This commit is contained in:
Daniel.y 2025-11-19 23:09:30 +08:00 committed by GitHub
commit d52adb64d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 265 additions and 224 deletions

View file

@ -8,7 +8,6 @@ import re
from enum import Enum from enum import Enum
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
import asyncio import asyncio
from ascii_colors import trace_exception
from lightrag import LightRAG, QueryParam from lightrag import LightRAG, QueryParam
from lightrag.utils import TiktokenTokenizer from lightrag.utils import TiktokenTokenizer
from lightrag.api.utils_api import get_combined_auth_dependency from lightrag.api.utils_api import get_combined_auth_dependency
@ -309,118 +308,113 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
try: first_chunk_time = None
first_chunk_time = None last_chunk_time = time.time_ns()
total_response = ""
# Ensure response is an async generator
if isinstance(response, str):
# If it's a string, send in two parts
first_chunk_time = start_time
last_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
total_response = "" total_response = response
# Ensure response is an async generator data = {
if isinstance(response, str): "model": self.ollama_server_infos.LIGHTRAG_MODEL,
# If it's a string, send in two parts "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
first_chunk_time = start_time "response": response,
last_chunk_time = time.time_ns() "done": False,
total_response = response }
yield f"{json.dumps(data, ensure_ascii=False)}\n"
data = { completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": "",
"done": True,
"done_reason": "stop",
"context": [],
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
else:
try:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns()
total_response += chunk
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": chunk,
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logger.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": response, "response": f"\n\nError: {error_msg}",
"error": f"\n\nError: {error_msg}",
"done": False, "done": False,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
completion_tokens = estimate_tokens(total_response) # Send final message to close the stream
total_time = last_chunk_time - start_time final_data = {
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": "", "response": "",
"done": True, "done": True,
"done_reason": "stop",
"context": [],
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
else:
try:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns()
total_response += chunk
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": chunk,
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logger.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": f"\n\nError: {error_msg}",
"error": f"\n\nError: {error_msg}",
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Send final message to close the stream
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": "",
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
if first_chunk_time is None:
first_chunk_time = start_time
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": "",
"done": True,
"done_reason": "stop",
"context": [],
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
return return
if first_chunk_time is None:
first_chunk_time = start_time
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
except Exception as e: data = {
trace_exception(e) "model": self.ollama_server_infos.LIGHTRAG_MODEL,
raise "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": "",
"done": True,
"done_reason": "stop",
"context": [],
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
return
return StreamingResponse( return StreamingResponse(
stream_generator(), stream_generator(),
@ -462,7 +456,7 @@ class OllamaAPI:
"eval_duration": eval_time, "eval_duration": eval_time,
} }
except Exception as e: except Exception as e:
trace_exception(e) logger.error(f"Ollama generate error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@self.router.post( @self.router.post(
@ -535,36 +529,98 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
try: first_chunk_time = None
first_chunk_time = None last_chunk_time = time.time_ns()
total_response = ""
# Ensure response is an async generator
if isinstance(response, str):
# If it's a string, send in two parts
first_chunk_time = start_time
last_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
total_response = "" total_response = response
# Ensure response is an async generator data = {
if isinstance(response, str): "model": self.ollama_server_infos.LIGHTRAG_MODEL,
# If it's a string, send in two parts "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
first_chunk_time = start_time "message": {
last_chunk_time = time.time_ns() "role": "assistant",
total_response = response "content": response,
"images": None,
},
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
data = { completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": "",
"images": None,
},
"done_reason": "stop",
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
else:
try:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns()
total_response += chunk
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": chunk,
"images": None,
},
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logger.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": { "message": {
"role": "assistant", "role": "assistant",
"content": response, "content": f"\n\nError: {error_msg}",
"images": None, "images": None,
}, },
"error": f"\n\nError: {error_msg}",
"done": False, "done": False,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
completion_tokens = estimate_tokens(total_response) # Send final message to close the stream
total_time = last_chunk_time - start_time final_data = {
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": { "message": {
@ -572,103 +628,36 @@ class OllamaAPI:
"content": "", "content": "",
"images": None, "images": None,
}, },
"done_reason": "stop",
"done": True, "done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
else: return
try:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns() if first_chunk_time is None:
first_chunk_time = start_time
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
total_response += chunk data = {
data = { "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "message": {
"message": { "role": "assistant",
"role": "assistant", "content": "",
"content": chunk, "images": None,
"images": None, },
}, "done_reason": "stop",
"done": False, "done": True,
} "total_duration": total_time,
yield f"{json.dumps(data, ensure_ascii=False)}\n" "load_duration": 0,
except (asyncio.CancelledError, Exception) as e: "prompt_eval_count": prompt_tokens,
error_msg = str(e) "prompt_eval_duration": prompt_eval_time,
if isinstance(e, asyncio.CancelledError): "eval_count": completion_tokens,
error_msg = "Stream was cancelled by server" "eval_duration": eval_time,
else: }
error_msg = f"Provider error: {error_msg}" yield f"{json.dumps(data, ensure_ascii=False)}\n"
logger.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": f"\n\nError: {error_msg}",
"images": None,
},
"error": f"\n\nError: {error_msg}",
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Send final message to close the stream
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": "",
"images": None,
},
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
if first_chunk_time is None:
first_chunk_time = start_time
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": "",
"images": None,
},
"done_reason": "stop",
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except Exception as e:
trace_exception(e)
raise
return StreamingResponse( return StreamingResponse(
stream_generator(), stream_generator(),
@ -730,5 +719,5 @@ class OllamaAPI:
"eval_duration": eval_time, "eval_duration": eval_time,
} }
except Exception as e: except Exception as e:
trace_exception(e) logger.error(f"Ollama chat error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))

View file

@ -3,16 +3,13 @@ This module contains all query-related routes for the LightRAG API.
""" """
import json import json
import logging
from typing import Any, Dict, List, Literal, Optional from typing import Any, Dict, List, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from lightrag.base import QueryParam from lightrag.base import QueryParam
from lightrag.api.utils_api import get_combined_auth_dependency from lightrag.api.utils_api import get_combined_auth_dependency
from lightrag.utils import logger
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from ascii_colors import trace_exception
router = APIRouter(tags=["query"]) router = APIRouter(tags=["query"])
@ -453,7 +450,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
else: else:
return QueryResponse(response=response_content, references=None) return QueryResponse(response=response_content, references=None)
except Exception as e: except Exception as e:
trace_exception(e) logger.error(f"Error processing query: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.post( @router.post(
@ -713,7 +710,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
if chunk: # Only send non-empty content if chunk: # Only send non-empty content
yield f"{json.dumps({'response': chunk})}\n" yield f"{json.dumps({'response': chunk})}\n"
except Exception as e: except Exception as e:
logging.error(f"Streaming error: {str(e)}") logger.error(f"Streaming error: {str(e)}")
yield f"{json.dumps({'error': str(e)})}\n" yield f"{json.dumps({'error': str(e)})}\n"
else: else:
# Non-streaming mode: send complete response in one message # Non-streaming mode: send complete response in one message
@ -739,7 +736,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}, },
) )
except Exception as e: except Exception as e:
trace_exception(e) logger.error(f"Error processing streaming query: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.post( @router.post(
@ -1156,7 +1153,7 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
data={}, data={},
) )
except Exception as e: except Exception as e:
trace_exception(e) logger.error(f"Error processing data query: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
return router return router

View file

@ -1,6 +1,8 @@
from __future__ import annotations from __future__ import annotations
import weakref import weakref
import sys
import asyncio import asyncio
import html import html
import csv import csv
@ -40,6 +42,35 @@ from lightrag.constants import (
SOURCE_IDS_LIMIT_METHOD_FIFO, SOURCE_IDS_LIMIT_METHOD_FIFO,
) )
# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]")
class SafeStreamHandler(logging.StreamHandler):
"""StreamHandler that gracefully handles closed streams during shutdown.
This handler prevents "ValueError: I/O operation on closed file" errors
that can occur when pytest or other test frameworks close stdout/stderr
before Python's logging cleanup runs.
"""
def flush(self):
"""Flush the stream, ignoring errors if the stream is closed."""
try:
super().flush()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
def close(self):
"""Close the handler, ignoring errors if the stream is already closed."""
try:
super().close()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
# Initialize logger with basic configuration # Initialize logger with basic configuration
logger = logging.getLogger("lightrag") logger = logging.getLogger("lightrag")
logger.propagate = False # prevent log message send to root logger logger.propagate = False # prevent log message send to root logger
@ -47,7 +78,7 @@ logger.setLevel(logging.INFO)
# Add console handler if no handlers exist # Add console handler if no handlers exist
if not logger.handlers: if not logger.handlers:
console_handler = logging.StreamHandler() console_handler = SafeStreamHandler()
console_handler.setLevel(logging.INFO) console_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)s: %(message)s") formatter = logging.Formatter("%(levelname)s: %(message)s")
console_handler.setFormatter(formatter) console_handler.setFormatter(formatter)
@ -56,8 +87,32 @@ if not logger.handlers:
# Set httpx logging level to WARNING # Set httpx logging level to WARNING
logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING)
# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]") def _patch_ascii_colors_console_handler() -> None:
"""Prevent ascii_colors from printing flush errors during interpreter exit."""
try:
from ascii_colors import ConsoleHandler
except ImportError:
return
if getattr(ConsoleHandler, "_lightrag_patched", False):
return
original_handle_error = ConsoleHandler.handle_error
def _safe_handle_error(self, message: str) -> None: # type: ignore[override]
exc_type, _, _ = sys.exc_info()
if exc_type in (ValueError, OSError) and "close" in message.lower():
return
original_handle_error(self, message)
ConsoleHandler.handle_error = _safe_handle_error # type: ignore[assignment]
ConsoleHandler._lightrag_patched = True # type: ignore[attr-defined]
_patch_ascii_colors_console_handler()
# Global import for pypinyin with startup-time logging # Global import for pypinyin with startup-time logging
try: try:
@ -286,8 +341,8 @@ def setup_logger(
logger_instance.handlers = [] # Clear existing handlers logger_instance.handlers = [] # Clear existing handlers
logger_instance.propagate = False logger_instance.propagate = False
# Add console handler # Add console handler with safe stream handling
console_handler = logging.StreamHandler() console_handler = SafeStreamHandler()
console_handler.setFormatter(simple_formatter) console_handler.setFormatter(simple_formatter)
console_handler.setLevel(level) console_handler.setLevel(level)
logger_instance.addHandler(console_handler) logger_instance.addHandler(console_handler)