[OND211-2329]: updated the create and update user API's to handle security.
This commit is contained in:
parent
feb6a3a0df
commit
f3dd4bd8ed
1 changed files with 56 additions and 12 deletions
|
|
@ -61,6 +61,42 @@ from api.utils.web_utils import (
|
||||||
from common import settings
|
from common import settings
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_nickname(nickname: str) -> str:
|
||||||
|
"""Sanitize nickname to prevent XSS and control character injection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
nickname: Raw nickname string to sanitize.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized nickname with XSS payloads and control characters removed.
|
||||||
|
"""
|
||||||
|
if not nickname:
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
# Remove control characters (null bytes, carriage returns, line feeds)
|
||||||
|
nickname = nickname.replace("\x00", "") # Null byte
|
||||||
|
nickname = nickname.replace("\r", "") # Carriage return
|
||||||
|
nickname = nickname.replace("\n", "") # Line feed
|
||||||
|
|
||||||
|
# Remove XSS payloads - script tags (case insensitive, handles nested/escaped tags)
|
||||||
|
# First remove complete script tags with content
|
||||||
|
nickname = re.sub(r"<script[^>]*>.*?</script>", "", nickname, flags=re.IGNORECASE | re.DOTALL)
|
||||||
|
nickname = re.sub(r"<iframe[^>]*>.*?</iframe>", "", nickname, flags=re.IGNORECASE | re.DOTALL)
|
||||||
|
|
||||||
|
# Remove javascript: protocol (case insensitive)
|
||||||
|
nickname = re.sub(r"javascript:", "", nickname, flags=re.IGNORECASE)
|
||||||
|
|
||||||
|
# Remove img tags with event handlers (onerror, onclick, etc.)
|
||||||
|
nickname = re.sub(r"<img[^>]*on\w+\s*=[^>]*>", "", nickname, flags=re.IGNORECASE)
|
||||||
|
|
||||||
|
# Remove any remaining script/iframe opening and closing tags (handles cases like <<SCRIPT>)
|
||||||
|
# Match <script> or </script> (case insensitive, handles variations)
|
||||||
|
nickname = re.sub(r"<+/?\s*script[^>]*>", "", nickname, flags=re.IGNORECASE)
|
||||||
|
nickname = re.sub(r"<+/?\s*iframe[^>]*>", "", nickname, flags=re.IGNORECASE)
|
||||||
|
|
||||||
|
return nickname
|
||||||
|
|
||||||
|
|
||||||
@manager.route("/login", methods=["POST", "GET"]) # noqa: F821
|
@manager.route("/login", methods=["POST", "GET"]) # noqa: F821
|
||||||
def login():
|
def login():
|
||||||
"""
|
"""
|
||||||
|
|
@ -818,11 +854,20 @@ def create_user() -> Response:
|
||||||
)
|
)
|
||||||
|
|
||||||
req: Dict[str, Any] = request.json
|
req: Dict[str, Any] = request.json
|
||||||
email_address: str = req["email"]
|
email_address: str = str(req.get("email", ""))
|
||||||
|
|
||||||
# Sanitize null bytes from email
|
# Validate email is provided
|
||||||
if "\x00" in email_address:
|
if not email_address:
|
||||||
email_address = email_address.replace("\x00", "")
|
return get_json_result(
|
||||||
|
data=False,
|
||||||
|
message="Email is required!",
|
||||||
|
code=RetCode.ARGUMENT_ERROR,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Sanitize control characters from email (null bytes, carriage returns, line feeds)
|
||||||
|
email_address = email_address.replace("\x00", "") # Null byte
|
||||||
|
email_address = email_address.replace("\r", "") # Carriage return
|
||||||
|
email_address = email_address.replace("\n", "") # Line feed
|
||||||
|
|
||||||
# Validate email length (RFC 5321: local part max 64 chars, total max 254 chars)
|
# Validate email length (RFC 5321: local part max 64 chars, total max 254 chars)
|
||||||
if len(email_address) > 254:
|
if len(email_address) > 254:
|
||||||
|
|
@ -833,7 +878,7 @@ def create_user() -> Response:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Split email to check local part length
|
# Split email to check local part length
|
||||||
email_parts = email_address.split("@")
|
email_parts: List[str] = email_address.split("@")
|
||||||
if len(email_parts) != 2:
|
if len(email_parts) != 2:
|
||||||
return get_json_result(
|
return get_json_result(
|
||||||
data=False,
|
data=False,
|
||||||
|
|
@ -841,7 +886,7 @@ def create_user() -> Response:
|
||||||
code=RetCode.OPERATING_ERROR,
|
code=RetCode.OPERATING_ERROR,
|
||||||
)
|
)
|
||||||
|
|
||||||
local_part = email_parts[0]
|
local_part: str = email_parts[0]
|
||||||
if len(local_part) > 64:
|
if len(local_part) > 64:
|
||||||
return get_json_result(
|
return get_json_result(
|
||||||
data=False,
|
data=False,
|
||||||
|
|
@ -871,14 +916,13 @@ def create_user() -> Response:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Construct user info data
|
# Construct user info data
|
||||||
nickname: str = req["nickname"]
|
nickname: str = str(req.get("nickname", ""))
|
||||||
# Sanitize null bytes from nickname
|
# Sanitize nickname to prevent XSS and control character injection
|
||||||
if "\x00" in nickname:
|
nickname = sanitize_nickname(nickname)
|
||||||
nickname = nickname.replace("\x00", "")
|
|
||||||
|
|
||||||
is_superuser: bool = req.get("is_superuser", False)
|
is_superuser: bool = bool(req.get("is_superuser", False))
|
||||||
# Accept plain text password (no encryption required)
|
# Accept plain text password (no encryption required)
|
||||||
password: str = req["password"]
|
password: str = str(req.get("password", ""))
|
||||||
|
|
||||||
# Validate password is not empty
|
# Validate password is not empty
|
||||||
if not password or not password.strip():
|
if not password or not password.strip():
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue