[OND211-2329]: updated the create and update user API's to handle edge cases.

This commit is contained in:
Hetavi Shah 2025-11-18 12:16:55 +05:30
parent 5413628aaf
commit feb6a3a0df

View file

@ -701,8 +701,8 @@ def user_add():
req = request.json
email_address = req["email"]
# Validate the email address
if not re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_address):
# Validate the email address (allow + in local part)
if not re.match(r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_address):
return get_json_result(
data=False,
message=f"Invalid email address: {email_address}!",
@ -820,9 +820,38 @@ def create_user() -> Response:
req: Dict[str, Any] = request.json
email_address: str = req["email"]
# Validate the email address
# Sanitize null bytes from email
if "\x00" in email_address:
email_address = email_address.replace("\x00", "")
# Validate email length (RFC 5321: local part max 64 chars, total max 254 chars)
if len(email_address) > 254:
return get_json_result(
data=False,
message=f"Invalid email address: email is too long (max 254 characters)!",
code=RetCode.OPERATING_ERROR,
)
# Split email to check local part length
email_parts = email_address.split("@")
if len(email_parts) != 2:
return get_json_result(
data=False,
message=f"Invalid email address: {email_address}!",
code=RetCode.OPERATING_ERROR,
)
local_part = email_parts[0]
if len(local_part) > 64:
return get_json_result(
data=False,
message=f"Invalid email address: local part is too long (max 64 characters)!",
code=RetCode.OPERATING_ERROR,
)
# Validate the email address format (allow + in local part)
email_match: Optional[Match[str]] = re.match(
r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_address
r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_address
)
if not email_match:
return get_json_result(
@ -843,6 +872,10 @@ def create_user() -> Response:
# Construct user info data
nickname: str = req["nickname"]
# Sanitize null bytes from nickname
if "\x00" in nickname:
nickname = nickname.replace("\x00", "")
is_superuser: bool = req.get("is_superuser", False)
# Accept plain text password (no encryption required)
password: str = req["password"]
@ -979,9 +1012,9 @@ def update_user() -> Response:
if user_id:
user = UserService.filter_by_id(user_id)
elif email:
# Validate the email address format
# Validate the email address format (allow + in local part)
email_match: Optional[Match[str]] = re.match(
r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email
r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email
)
if not email_match:
return get_json_result(
@ -1055,9 +1088,9 @@ def update_user() -> Response:
new_email = req["new_email"]
if new_email:
# Validate the new email address format
# Validate the new email address format (allow + in local part)
email_match: Optional[Match[str]] = re.match(
r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", new_email
r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", new_email
)
if not email_match:
return get_json_result(
@ -1219,9 +1252,9 @@ def list_users() -> Response:
# Query users
if email_filter:
# Validate email format if provided
# Validate email format if provided (allow + in local part)
email_match: Optional[Match[str]] = re.match(
r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_filter
r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email_filter
)
if not email_match:
return get_json_result(
@ -1361,9 +1394,9 @@ def delete_user() -> Response:
if user_id:
user = UserService.filter_by_id(user_id)
elif email:
# Validate the email address format
# Validate the email address format (allow + in local part)
email_match: Optional[Match[str]] = re.match(
r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email
r"^[\w\._\+-]+@([\w_-]+\.)+[\w-]{2,}$", email
)
if not email_match:
return get_json_result(