diff --git a/api/apps/user_app.py b/api/apps/user_app.py
index 13937b94f..1d7b16edc 100644
--- a/api/apps/user_app.py
+++ b/api/apps/user_app.py
@@ -61,6 +61,42 @@ from api.utils.web_utils import (
from common import settings
+def sanitize_nickname(nickname: str) -> str:
+ """Sanitize nickname to prevent XSS and control character injection.
+
+ Args:
+ nickname: Raw nickname string to sanitize.
+
+ Returns:
+ Sanitized nickname with XSS payloads and control characters removed.
+ """
+ if not nickname:
+ return nickname
+
+ # Remove control characters (null bytes, carriage returns, line feeds)
+ nickname = nickname.replace("\x00", "") # Null byte
+ nickname = nickname.replace("\r", "") # Carriage return
+ nickname = nickname.replace("\n", "") # Line feed
+
+ # Remove XSS payloads - script tags (case insensitive, handles nested/escaped tags)
+ # First remove complete script tags with content
+ nickname = re.sub(r"", "", nickname, flags=re.IGNORECASE | re.DOTALL)
+ nickname = re.sub(r"", "", nickname, flags=re.IGNORECASE | re.DOTALL)
+
+ # Remove javascript: protocol (case insensitive)
+ nickname = re.sub(r"javascript:", "", nickname, flags=re.IGNORECASE)
+
+ # Remove img tags with event handlers (onerror, onclick, etc.)
+ nickname = re.sub(r"
]*on\w+\s*=[^>]*>", "", nickname, flags=re.IGNORECASE)
+
+ # Remove any remaining script/iframe opening and closing tags (handles cases like < (case insensitive, handles variations)
+ nickname = re.sub(r"<+/?\s*script[^>]*>", "", nickname, flags=re.IGNORECASE)
+ nickname = re.sub(r"<+/?\s*iframe[^>]*>", "", nickname, flags=re.IGNORECASE)
+
+ return nickname
+
+
@manager.route("/login", methods=["POST", "GET"]) # noqa: F821
def login():
"""
@@ -818,11 +854,20 @@ def create_user() -> Response:
)
req: Dict[str, Any] = request.json
- email_address: str = req["email"]
+ email_address: str = str(req.get("email", ""))
+
+ # Validate email is provided
+ if not email_address:
+ return get_json_result(
+ data=False,
+ message="Email is required!",
+ code=RetCode.ARGUMENT_ERROR,
+ )
- # Sanitize null bytes from email
- if "\x00" in email_address:
- email_address = email_address.replace("\x00", "")
+ # Sanitize control characters from email (null bytes, carriage returns, line feeds)
+ email_address = email_address.replace("\x00", "") # Null byte
+ email_address = email_address.replace("\r", "") # Carriage return
+ email_address = email_address.replace("\n", "") # Line feed
# Validate email length (RFC 5321: local part max 64 chars, total max 254 chars)
if len(email_address) > 254:
@@ -833,7 +878,7 @@ def create_user() -> Response:
)
# Split email to check local part length
- email_parts = email_address.split("@")
+ email_parts: List[str] = email_address.split("@")
if len(email_parts) != 2:
return get_json_result(
data=False,
@@ -841,7 +886,7 @@ def create_user() -> Response:
code=RetCode.OPERATING_ERROR,
)
- local_part = email_parts[0]
+ local_part: str = email_parts[0]
if len(local_part) > 64:
return get_json_result(
data=False,
@@ -871,14 +916,13 @@ def create_user() -> Response:
)
# Construct user info data
- nickname: str = req["nickname"]
- # Sanitize null bytes from nickname
- if "\x00" in nickname:
- nickname = nickname.replace("\x00", "")
+ nickname: str = str(req.get("nickname", ""))
+ # Sanitize nickname to prevent XSS and control character injection
+ nickname = sanitize_nickname(nickname)
- is_superuser: bool = req.get("is_superuser", False)
+ is_superuser: bool = bool(req.get("is_superuser", False))
# Accept plain text password (no encryption required)
- password: str = req["password"]
+ password: str = str(req.get("password", ""))
# Validate password is not empty
if not password or not password.strip():