diff --git a/api/apps/user_app.py b/api/apps/user_app.py index b3232afba..13937b94f 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -701,8 +701,8 @@ def user_add(): req = request.json email_address = req["email"] - # Validate the email address - if not re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_address): + # Validate the email address (allow + in local part) + if not re.match(r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_address): return get_json_result( data=False, message=f"Invalid email address: {email_address}!", @@ -820,9 +820,38 @@ def create_user() -> Response: req: Dict[str, Any] = request.json email_address: str = req["email"] - # Validate the email address + # Sanitize null bytes from email + if "\x00" in email_address: + email_address = email_address.replace("\x00", "") + + # Validate email length (RFC 5321: local part max 64 chars, total max 254 chars) + if len(email_address) > 254: + return get_json_result( + data=False, + message=f"Invalid email address: email is too long (max 254 characters)!", + code=RetCode.OPERATING_ERROR, + ) + + # Split email to check local part length + email_parts = email_address.split("@") + if len(email_parts) != 2: + return get_json_result( + data=False, + message=f"Invalid email address: {email_address}!", + code=RetCode.OPERATING_ERROR, + ) + + local_part = email_parts[0] + if len(local_part) > 64: + return get_json_result( + data=False, + message=f"Invalid email address: local part is too long (max 64 characters)!", + code=RetCode.OPERATING_ERROR, + ) + + # Validate the email address format (allow + in local part) email_match: Optional[Match[str]] = re.match( - r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_address + r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_address ) if not email_match: return get_json_result( @@ -843,6 +872,10 @@ def create_user() -> Response: # Construct user info data nickname: str = req["nickname"] + # Sanitize null bytes from nickname + if "\x00" in nickname: + nickname = nickname.replace("\x00", "") + is_superuser: bool = req.get("is_superuser", False) # Accept plain text password (no encryption required) password: str = req["password"] @@ -979,9 +1012,9 @@ def update_user() -> Response: if user_id: user = UserService.filter_by_id(user_id) elif email: - # Validate the email address format + # Validate the email address format (allow + in local part) email_match: Optional[Match[str]] = re.match( - r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email + r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email ) if not email_match: return get_json_result( @@ -1055,9 +1088,9 @@ def update_user() -> Response: new_email = req["new_email"] if new_email: - # Validate the new email address format + # Validate the new email address format (allow + in local part) email_match: Optional[Match[str]] = re.match( - r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", new_email + r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", new_email ) if not email_match: return get_json_result( @@ -1219,9 +1252,9 @@ def list_users() -> Response: # Query users if email_filter: - # Validate email format if provided + # Validate email format if provided (allow + in local part) email_match: Optional[Match[str]] = re.match( - r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_filter + r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_filter ) if not email_match: return get_json_result( @@ -1361,9 +1394,9 @@ def delete_user() -> Response: if user_id: user = UserService.filter_by_id(user_id) elif email: - # Validate the email address format + # Validate the email address format (allow + in local part) email_match: Optional[Match[str]] = re.match( - r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email + r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email ) if not email_match: return get_json_result(