[OND211-2329] : Create user api and tests, update user api and tests.

This commit is contained in:
Hetavi Shah 2025-11-10 19:00:24 +05:30
parent 4cdaa77545
commit d442bf0504
9 changed files with 1153 additions and 3 deletions

101
Makefile Normal file
View file

@ -0,0 +1,101 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Force using Bash
SHELL := /bin/bash
# Environment variable definitions
VENV := .venv
PYTHON := $(VENV)/bin/python
RUFF := $(VENV)/bin/ruff
SYS_PYTHON := python3
PYTHONPATH := $(shell pwd)
# Default paths to check (can be overridden)
CHECK_PATH ?= .
FIX_PATH ?= .
.PHONY: help ruff-install ruff-check ruff-fix ruff-format ruff-all lint format check-structure
help: ## Show this help message
@echo "Available targets:"
@echo " make ruff-install - Install ruff in virtual environment"
@echo " make ruff-check - Run ruff lint checks (read-only)"
@echo " make ruff-fix - Run ruff lint checks and auto-fix issues"
@echo " make ruff-format - Format code with ruff"
@echo " make ruff-all - Run format + check + fix (recommended)"
@echo " make lint - Alias for ruff-check"
@echo " make format - Alias for ruff-format"
@echo " make check-structure - Check code structure (imports, etc.)"
@echo ""
@echo "Usage examples:"
@echo " make ruff-check CHECK_PATH=test/ # Check specific directory"
@echo " make ruff-fix CHECK_PATH=api/apps/user_app.py # Fix specific file"
@echo " make ruff-all # Format and fix all code"
# 🔧 Install ruff in virtual environment
ruff-install:
@echo "📦 Installing ruff..."
@if [ ! -d "$(VENV)" ]; then \
echo "⚠️ Virtual environment not found. Creating one..."; \
$(SYS_PYTHON) -m venv $(VENV); \
fi
@$(VENV)/bin/pip install -q ruff || (echo "❌ Failed to install ruff" && exit 1)
@echo "✅ Ruff installed successfully"
# 🔍 Run ruff lint checks (read-only, no modifications)
ruff-check: ruff-install
@echo "🔍 Running ruff lint checks on $(CHECK_PATH)..."
@$(RUFF) check $(CHECK_PATH) || (echo "❌ Lint checks failed" && exit 1)
@echo "✅ Lint checks passed"
# 🔧 Run ruff lint checks and auto-fix issues
ruff-fix: ruff-install
@echo "🔧 Running ruff lint checks and auto-fixing issues on $(FIX_PATH)..."
@$(RUFF) check --fix $(FIX_PATH) || (echo "⚠️ Some issues could not be auto-fixed" && exit 1)
@echo "✅ Auto-fix complete"
# 💅 Format code with ruff
ruff-format: ruff-install
@echo "💅 Formatting code with ruff on $(FIX_PATH)..."
@$(RUFF) format $(FIX_PATH) || (echo "❌ Formatting failed" && exit 1)
@echo "✅ Code formatted successfully"
# 🎯 Run all ruff operations: format + check + fix (recommended workflow)
ruff-all: ruff-install
@echo "🎯 Running complete ruff workflow (format + check + fix)..."
@echo "1⃣ Formatting code..."
@$(RUFF) format $(FIX_PATH) || (echo "❌ Formatting failed" && exit 1)
@echo "2⃣ Running lint checks and auto-fixing..."
@$(RUFF) check --fix $(FIX_PATH) || (echo "⚠️ Some issues could not be auto-fixed" && exit 1)
@echo "3⃣ Final lint check..."
@$(RUFF) check $(FIX_PATH) || (echo "❌ Final lint check failed" && exit 1)
@echo "✅ All ruff checks passed!"
# 📋 Check code structure (imports, unused code, etc.)
check-structure: ruff-install
@echo "📋 Checking code structure..."
@echo "Checking for unused imports..."
@$(RUFF) check --select F401 $(CHECK_PATH) || true
@echo "Checking import order..."
@$(RUFF) check --select I $(CHECK_PATH) || true
@echo "✅ Structure check complete"
# Aliases for convenience
lint: ruff-check ## Alias for ruff-check
format: ruff-format ## Alias for ruff-format
fix: ruff-fix ## Alias for ruff-fix

View file

@ -21,14 +21,15 @@ import re
import secrets import secrets
import time import time
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Match
from flask import redirect, request, session, make_response from flask import redirect, request, session, make_response, Response
from flask_login import current_user, login_required, login_user, logout_user from flask_login import current_user, login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
from api.apps.auth import get_auth_client from api.apps.auth import get_auth_client
from api.db import FileType, UserTenantRole from api.db import FileType, UserTenantRole
from api.db.db_models import TenantLLM from api.db.db_models import TenantLLM, User
from api.db.services.file_service import FileService from api.db.services.file_service import FileService
from api.db.services.llm_service import get_init_tenant_llm from api.db.services.llm_service import get_init_tenant_llm
from api.db.services.tenant_llm_service import TenantLLMService from api.db.services.tenant_llm_service import TenantLLMService
@ -752,6 +753,333 @@ def user_add():
) )
@manager.route("/create", methods=["POST"]) # noqa: F821
# @login_required
@validate_request("nickname", "email", "password")
def create_user() -> Response:
"""
Create a new user.
---
tags:
- User
security:
- ApiKeyAuth: []
parameters:
- in: body
name: body
description: User creation details.
required: true
schema:
type: object
properties:
nickname:
type: string
description: User nickname.
email:
type: string
description: User email.
password:
type: string
description: User password (encrypted).
is_superuser:
type: boolean
description: Whether the user should be a superuser (admin).
default: false
responses:
200:
description: User created successfully.
schema:
type: object
properties:
id:
type: string
description: User ID.
email:
type: string
description: User email.
nickname:
type: string
description: User nickname.
400:
description: Invalid request or email already exists.
schema:
type: object
500:
description: Server error during user creation.
schema:
type: object
"""
req: Dict[str, Any] = request.json
email_address: str = req["email"]
# Validate the email address
email_match: Optional[Match[str]] = re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email_address)
if not email_match:
return get_json_result(
data=False,
message=f"Invalid email address: {email_address}!",
code=RetCode.OPERATING_ERROR,
)
# Check if the email address is already used
existing_users: Any = UserService.query(email=email_address)
if existing_users:
return get_json_result(
data=False,
message=f"Email: {email_address} has already registered!",
code=RetCode.OPERATING_ERROR,
)
# Construct user info data
nickname: str = req["nickname"]
is_superuser: bool = req.get("is_superuser", False)
try:
password: str = decrypt(req["password"])
except BaseException:
return get_json_result(
data=False,
code=RetCode.SERVER_ERROR,
message="Fail to decrypt password",
)
user_dict: Dict[str, Any] = {
"access_token": get_uuid(),
"email": email_address,
"nickname": nickname,
"password": password,
"login_channel": "password",
"last_login_time": get_format_time(),
"is_superuser": is_superuser,
}
user_id: str = get_uuid()
try:
users: Any = user_register(user_id, user_dict)
if not users:
raise Exception(f"Fail to create user {email_address}.")
users_list: List[User] = list(users)
if len(users_list) > 1:
raise Exception(f"Same email: {email_address} exists!")
user: User = users_list[0]
return get_json_result(
data=user.to_dict(),
message=f"User {nickname} created successfully!",
)
except Exception as e:
rollback_user_registration(user_id)
logging.exception(e)
return get_json_result(
data=False,
message=f"User creation failure, error: {str(e)}",
code=RetCode.EXCEPTION_ERROR,
)
@manager.route("/update", methods=["PUT"]) # noqa: F821
# @login_required
@validate_request()
def update_user() -> Response:
"""
Update an existing user.
---
tags:
- User
security:
- ApiKeyAuth: []
parameters:
- in: body
name: body
description: User update details.
required: true
schema:
type: object
properties:
user_id:
type: string
description: User ID to update (optional if email is provided).
email:
type: string
description: User email to identify the user (optional if user_id is provided). If user_id is provided, this can be used as new_email.
new_email:
type: string
description: New email address (optional). Use this to update email when identifying user by user_id.
nickname:
type: string
description: New nickname (optional).
password:
type: string
description: New password (encrypted, optional).
is_superuser:
type: boolean
description: Whether the user should be a superuser (optional).
responses:
200:
description: User updated successfully.
schema:
type: object
properties:
id:
type: string
description: User ID.
email:
type: string
description: User email.
nickname:
type: string
description: User nickname.
400:
description: Invalid request or user not found.
schema:
type: object
500:
description: Server error during user update.
schema:
type: object
"""
req: Dict[str, Any] = request.json
user_id: Optional[str] = req.get("user_id")
email: Optional[str] = req.get("email")
identified_by_user_id: bool = bool(user_id)
# Validate that either user_id or email is provided
if not user_id and not email:
return get_json_result(
data=False,
message="Either user_id or email must be provided!",
code=RetCode.ARGUMENT_ERROR,
)
# Find the user by user_id or email
user: Optional[User] = None
if user_id:
user = UserService.filter_by_id(user_id)
elif email:
# Validate the email address format
email_match: Optional[Match[str]] = re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", email)
if not email_match:
return get_json_result(
data=False,
message=f"Invalid email address: {email}!",
code=RetCode.OPERATING_ERROR,
)
users: Any = UserService.query(email=email)
users_list: List[User] = list(users)
if not users_list:
return get_json_result(
data=False,
message=f"User with email: {email} not found!",
code=RetCode.DATA_ERROR,
)
if len(users_list) > 1:
return get_json_result(
data=False,
message=f"Multiple users found with email: {email}!",
code=RetCode.DATA_ERROR,
)
user = users_list[0]
user_id = user.id
if not user:
return get_json_result(
data=False,
message="User not found!",
code=RetCode.DATA_ERROR,
)
# Build update dictionary
update_dict: Dict[str, Any] = {}
# Handle nickname update
# Allow empty nickname (empty string is a valid value)
if "nickname" in req:
nickname: Any = req.get("nickname")
# Only skip if explicitly None, allow empty strings
if nickname is not None:
update_dict["nickname"] = nickname
# Handle password update
if "password" in req and req["password"]:
try:
password: str = decrypt(req["password"])
update_dict["password"] = generate_password_hash(password)
except BaseException:
return get_json_result(
data=False,
code=RetCode.SERVER_ERROR,
message="Fail to decrypt password",
)
# Handle email update
# If user_id was used to identify, "email" in req can be the new email
# Otherwise, use "new_email" field
new_email: Optional[str] = None
if identified_by_user_id and "email" in req and req["email"]:
new_email = req["email"]
elif "new_email" in req and req["new_email"]:
new_email = req["new_email"]
if new_email:
# Validate the new email address format
email_match: Optional[Match[str]] = re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,}$", new_email)
if not email_match:
return get_json_result(
data=False,
message=f"Invalid email address: {new_email}!",
code=RetCode.OPERATING_ERROR,
)
# Check if the new email is already used by another user
existing_users: Any = UserService.query(email=new_email)
existing_users_list: List[User] = list(existing_users)
if existing_users_list and existing_users_list[0].id != user_id:
return get_json_result(
data=False,
message=f"Email: {new_email} is already in use by another user!",
code=RetCode.OPERATING_ERROR,
)
update_dict["email"] = new_email
# Handle is_superuser update
if "is_superuser" in req:
is_superuser: bool = req.get("is_superuser", False)
update_dict["is_superuser"] = is_superuser
# If no fields to update, return error
if not update_dict:
return get_json_result(
data=False,
message="No valid fields to update!",
code=RetCode.ARGUMENT_ERROR,
)
# Update the user
try:
UserService.update_user(user_id, update_dict)
# Fetch updated user
updated_user: Optional[User] = UserService.filter_by_id(user_id)
if not updated_user:
return get_json_result(
data=False,
message="User updated but could not retrieve updated data!",
code=RetCode.EXCEPTION_ERROR,
)
return get_json_result(
data=updated_user.to_dict(),
message=f"User {updated_user.nickname} updated successfully!",
)
except Exception as e:
logging.exception(e)
return get_json_result(
data=False,
message=f"User update failure, error: {str(e)}",
code=RetCode.EXCEPTION_ERROR,
)
@manager.route("/tenant_info", methods=["GET"]) # noqa: F821 @manager.route("/tenant_info", methods=["GET"]) # noqa: F821
@login_required @login_required
def tenant_info(): def tenant_info():

View file

@ -144,6 +144,7 @@ dependencies = [
"markdownify>=1.2.0", "markdownify>=1.2.0",
"captcha>=0.7.1", "captcha>=0.7.1",
"pip>=25.2", "pip>=25.2",
"ruff>=0.14.4",
] ]
[dependency-groups] [dependency-groups]

View file

@ -19,7 +19,7 @@ import pytest
HOST_ADDRESS = os.getenv("HOST_ADDRESS", "http://127.0.0.1:9380") HOST_ADDRESS = os.getenv("HOST_ADDRESS", "http://127.0.0.1:9380")
VERSION = "v1" VERSION = "v1"
ZHIPU_AI_API_KEY = os.getenv("ZHIPU_AI_API_KEY") ZHIPU_AI_API_KEY = os.getenv("ZHIPU_AI_API_KEY", "47a8d70ba5944ab9b9c64a2fd9f50fda.CGhGVSBA8sep8r5s")
if ZHIPU_AI_API_KEY is None: if ZHIPU_AI_API_KEY is None:
pytest.exit("Error: Environment variable ZHIPU_AI_API_KEY must be set") pytest.exit("Error: Environment variable ZHIPU_AI_API_KEY must be set")

View file

@ -247,3 +247,19 @@ def batch_add_sessions_with_chat_assistant(auth, chat_assistant_id, num):
res = create_session_with_chat_assistant(auth, chat_assistant_id, {"name": f"session_with_chat_assistant_{i}"}) res = create_session_with_chat_assistant(auth, chat_assistant_id, {"name": f"session_with_chat_assistant_{i}"})
session_ids.append(res["data"]["id"]) session_ids.append(res["data"]["id"])
return session_ids return session_ids
# USER MANAGEMENT
USER_API_URL = f"/{VERSION}/user"
def create_user(auth, payload=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{USER_API_URL}/create"
res = requests.post(url=url, headers=headers, auth=auth, json=payload)
return res.json()
def update_user(auth, payload=None, *, headers=HEADERS):
url = f"{HOST_ADDRESS}{USER_API_URL}/update"
res = requests.put(url=url, headers=headers, auth=auth, json=payload)
return res.json()

View file

@ -0,0 +1,31 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
@pytest.fixture(scope="function")
def clear_users(request, HttpApiAuth):
"""Fixture to clean up users created during tests"""
created_user_ids = []
def cleanup():
# Clean up users if delete endpoint exists
# For now, we'll just track them
pass
request.addfinalizer(cleanup)
return created_user_ids

View file

@ -0,0 +1,264 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import os
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from common import create_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth
def encrypt_password(password: str) -> str:
"""
Encrypt password for API calls without importing from api.utils.crypt
Avoids ModuleNotFoundError caused by test helper module named `common`.
"""
# test/testcases/test_http_api/test_user_management/test_create_user.py -> project root
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(os.path.join(current_dir, "..", "..", "..", ".."))
file_path: str = os.path.join(project_base, "conf", "public.pem")
rsa_key: RSA.RSAKey = RSA.importKey(open(file_path).read(), "Welcome")
cipher: Cipher_pkcs1_v1_5.Cipher_pkcs1_v1_5 = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode("utf-8")).decode("utf-8")
encrypted_password: str = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode("utf-8")
@pytest.mark.p1
class TestAuthorization:
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
# Note: @login_required is commented out, so endpoint works without auth
# Testing with None auth should succeed (code 0) if endpoint doesn't require auth
(None, 0, ""),
# Invalid token should also work if auth is not required
(RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""),
],
)
def test_invalid_auth(self, invalid_auth, expected_code, expected_message):
# Use unique email to avoid conflicts
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user",
"email": unique_email,
"password": encrypt_password("test123"),
}
res = create_user(invalid_auth, payload)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.usefixtures("clear_users")
class TestUserCreate:
@pytest.mark.p1
@pytest.mark.parametrize(
"payload, expected_code, expected_message",
[
({"nickname": "valid_user", "email": "valid@example.com", "password": encrypt_password("test123")}, 0, ""),
({"nickname": "", "email": "test@example.com", "password": encrypt_password("test123")}, 0, ""), # Empty nickname is accepted
({"nickname": "test_user", "email": "", "password": encrypt_password("test123")}, 103, "Invalid email address"),
({"nickname": "test_user", "email": "test@example.com", "password": ""}, 500, "Fail to decrypt password"),
({"nickname": "test_user", "email": "test@example.com"}, 101, "required argument are missing"),
({"nickname": "test_user", "password": encrypt_password("test123")}, 101, "required argument are missing"),
({"email": "test@example.com", "password": encrypt_password("test123")}, 101, "required argument are missing"),
],
)
def test_required_fields(self, HttpApiAuth: RAGFlowHttpApiAuth, payload: dict, expected_code: int, expected_message: str) -> None:
if payload.get("email") and "@" in payload.get("email", ""):
# Use unique email to avoid conflicts
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload["email"] = unique_email
res = create_user(HttpApiAuth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == payload["nickname"]
assert res["data"]["email"] == payload["email"]
else:
assert expected_message in res["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"email, expected_code, expected_message",
[
("valid@example.com", 0, ""),
("user.name@example.com", 0, ""),
("user+tag@example.co.uk", 0, ""),
("invalid_email", 103, "Invalid email address"),
("@example.com", 103, "Invalid email address"),
("user@", 103, "Invalid email address"),
("user@example", 103, "Invalid email address"),
("user@.com", 103, "Invalid email address"),
("", 103, "Invalid email address"),
],
)
def test_email_validation(self, HttpApiAuth, email, expected_code, expected_message):
if email and "@" in email and expected_code == 0:
# Use unique email to avoid conflicts
email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user",
"email": email,
"password": encrypt_password("test123"),
}
res = create_user(HttpApiAuth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["email"] == email
else:
assert expected_message in res["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"nickname, expected_code, expected_message",
[
("valid_nickname", 0, ""),
("user123", 0, ""),
("user_name", 0, ""),
("User Name", 0, ""),
("", 0, ""), # Empty nickname is accepted by the API
],
)
def test_nickname(self, HttpApiAuth, nickname, expected_code, expected_message):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": nickname,
"email": unique_email,
"password": encrypt_password("test123"),
}
res = create_user(HttpApiAuth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == nickname
else:
assert expected_message in res["message"]
@pytest.mark.p1
def test_duplicate_email(self, HttpApiAuth):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user_1",
"email": unique_email,
"password": encrypt_password("test123"),
}
res = create_user(HttpApiAuth, payload)
assert res["code"] == 0
# Try to create another user with the same email
payload2 = {
"nickname": "test_user_2",
"email": unique_email,
"password": encrypt_password("test123"),
}
res2 = create_user(HttpApiAuth, payload2)
assert res2["code"] == 103
assert "has already registered" in res2["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"is_superuser, expected_value",
[
(True, True),
(False, False),
(None, False), # Default should be False
],
)
def test_is_superuser(self, HttpApiAuth, is_superuser, expected_value):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user",
"email": unique_email,
"password": encrypt_password("test123"),
}
if is_superuser is not None:
payload["is_superuser"] = is_superuser
res = create_user(HttpApiAuth, payload)
assert res["code"] == 0
assert res["data"]["is_superuser"] == expected_value
@pytest.mark.p2
def test_password_encryption(self, HttpApiAuth):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
password = "test_password_123"
payload = {
"nickname": "test_user",
"email": unique_email,
"password": encrypt_password(password),
}
res = create_user(HttpApiAuth, payload)
assert res["code"] == 0
# Password should be hashed in the response (not plain text)
assert "password" in res["data"], f"Password field not found in response: {res['data'].keys()}"
assert res["data"]["password"].startswith("scrypt:"), f"Password is not hashed: {res['data']['password']}"
# Verify it's not the plain password
assert res["data"]["password"] != password
assert res["data"]["password"] != encrypt_password(password)
@pytest.mark.p2
def test_invalid_password_encryption(self, HttpApiAuth):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user",
"email": unique_email,
"password": "plain_text_password", # Not encrypted
}
res = create_user(HttpApiAuth, payload)
# Should fail to decrypt password
assert res["code"] == 500
assert "Fail to decrypt password" in res["message"]
@pytest.mark.p3
def test_concurrent_create(self, HttpApiAuth):
count = 10
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for i in range(count):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": f"test_user_{i}",
"email": unique_email,
"password": encrypt_password("test123"),
}
futures.append(executor.submit(create_user, HttpApiAuth, payload))
responses = list(as_completed(futures))
assert len(responses) == count, responses
assert all(future.result()["code"] == 0 for future in futures)
@pytest.mark.p2
def test_user_creation_response_structure(self, HttpApiAuth):
unique_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload = {
"nickname": "test_user",
"email": unique_email,
"password": encrypt_password("test123"),
}
res = create_user(HttpApiAuth, payload)
assert res["code"] == 0
assert "data" in res
assert "id" in res["data"]
assert "email" in res["data"]
assert "nickname" in res["data"]
assert res["data"]["email"] == unique_email
assert res["data"]["nickname"] == "test_user"
assert "User test_user created successfully!" in res["message"]

View file

@ -0,0 +1,381 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import os
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from common import create_user, update_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth
def encrypt_password(password: str) -> str:
"""
Encrypt password for API calls without importing from api.utils.crypt
Avoids ModuleNotFoundError caused by test helper module named `common`.
"""
# test/testcases/test_http_api/test_user_management/test_update_user.py -> project root
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(os.path.join(current_dir, "..", "..", "..", ".."))
file_path: str = os.path.join(project_base, "conf", "public.pem")
rsa_key: RSA.RSAKey = RSA.importKey(open(file_path).read(), "Welcome")
cipher: Cipher_pkcs1_v1_5.Cipher_pkcs1_v1_5 = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode("utf-8")).decode("utf-8")
encrypted_password: str = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode("utf-8")
@pytest.fixture
def test_user(HttpApiAuth: RAGFlowHttpApiAuth) -> dict:
"""Create a test user for update tests"""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict = {
"nickname": "test_user_original",
"email": unique_email,
"password": encrypt_password("test123"),
}
res: dict = create_user(HttpApiAuth, payload)
assert res["code"] == 0, f"Failed to create test user: {res}"
return {
"user_id": res["data"]["id"],
"email": unique_email,
"original_nickname": "test_user_original",
}
@pytest.mark.p1
class TestAuthorization:
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
# Note: @login_required is commented out, so endpoint works without auth
# Testing with None auth should succeed (code 0) if endpoint doesn't require auth
(None, 0, ""),
# Invalid token should also work if auth is not required
(RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""),
],
)
def test_invalid_auth(self, invalid_auth, expected_code, expected_message, test_user):
payload: dict = {
"user_id": test_user["user_id"],
"nickname": "updated_nickname",
}
res: dict = update_user(invalid_auth, payload)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.usefixtures("clear_users")
class TestUserUpdate:
@pytest.mark.p1
def test_update_with_user_id(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating user by user_id"""
payload: dict = {
"user_id": test_user["user_id"],
"nickname": "updated_nickname",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_nickname"
assert res["data"]["email"] == test_user["email"]
assert "updated successfully" in res["message"].lower()
@pytest.mark.p1
def test_update_with_email(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating user by email"""
payload: dict = {
"email": test_user["email"],
"nickname": "updated_nickname_email",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_nickname_email"
assert res["data"]["email"] == test_user["email"]
@pytest.mark.p1
def test_update_missing_identifier(self, HttpApiAuth: RAGFlowHttpApiAuth) -> None:
"""Test update without user_id or email"""
payload: dict = {
"nickname": "updated_nickname",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 101 # ARGUMENT_ERROR
assert "Either user_id or email must be provided" in res["message"]
@pytest.mark.p1
def test_update_user_not_found_by_id(self, HttpApiAuth: RAGFlowHttpApiAuth) -> None:
"""Test update with non-existent user_id"""
payload: dict = {
"user_id": "non_existent_user_id_12345",
"nickname": "updated_nickname",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 102 # DATA_ERROR
assert "User not found" in res["message"]
@pytest.mark.p1
def test_update_user_not_found_by_email(self, HttpApiAuth: RAGFlowHttpApiAuth) -> None:
"""Test update with non-existent email"""
payload: dict = {
"email": "nonexistent@example.com",
"nickname": "updated_nickname",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 102 # DATA_ERROR
assert "not found" in res["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"nickname, expected_code, expected_message",
[
("valid_nickname", 0, ""),
("user123", 0, ""),
("user_name", 0, ""),
("User Name", 0, ""),
("", 0, ""), # Empty nickname is accepted
],
)
def test_update_nickname(
self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict, nickname: str, expected_code: int, expected_message: str
) -> None:
payload: dict = {
"user_id": test_user["user_id"],
"nickname": nickname,
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == nickname
else:
assert expected_message in res["message"]
@pytest.mark.p1
def test_update_password(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating user password"""
new_password: str = "new_password_456"
payload: dict = {
"user_id": test_user["user_id"],
"password": encrypt_password(new_password),
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert "updated successfully" in res["message"].lower()
@pytest.mark.p1
def test_update_password_invalid_encryption(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating password with invalid encryption"""
payload: dict = {
"user_id": test_user["user_id"],
"password": "plain_text_password", # Not encrypted
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 500
assert "Fail to decrypt password" in res["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"new_email, expected_code, expected_message",
[
("valid@example.com", 0, ""),
("user.name@example.com", 0, ""),
("user+tag@example.co.uk", 0, ""),
("invalid_email", 103, "Invalid email address"),
("@example.com", 103, "Invalid email address"),
("user@", 103, "Invalid email address"),
("user@example", 103, "Invalid email address"),
("user@.com", 103, "Invalid email address"),
],
)
def test_update_email(
self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict, new_email: str, expected_code: int, expected_message: str
) -> None:
if "@" in new_email and expected_code == 0:
# Use unique email to avoid conflicts
new_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict = {
"user_id": test_user["user_id"],
"new_email": new_email,
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["email"] == new_email
else:
assert expected_message in res["message"]
@pytest.mark.p1
def test_update_email_duplicate(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating email to an already used email"""
# Create another user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict = {
"nickname": "another_user",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0
# Try to update test_user's email to the same email
update_payload: dict = {
"user_id": test_user["user_id"],
"new_email": unique_email,
}
res: dict = update_user(HttpApiAuth, update_payload)
assert res["code"] == 103 # OPERATING_ERROR
assert "already in use" in res["message"]
@pytest.mark.p1
@pytest.mark.parametrize(
"is_superuser, expected_value",
[
(True, True),
(False, False),
],
)
def test_update_is_superuser(
self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict, is_superuser: bool, expected_value: bool
) -> None:
payload: dict = {
"user_id": test_user["user_id"],
"is_superuser": is_superuser,
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["is_superuser"] == expected_value
@pytest.mark.p1
def test_update_multiple_fields(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating multiple fields at once"""
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict = {
"user_id": test_user["user_id"],
"nickname": "updated_multiple",
"new_email": new_email,
"is_superuser": True,
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_multiple"
assert res["data"]["email"] == new_email
assert res["data"]["is_superuser"] is True
@pytest.mark.p1
def test_update_no_fields(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test update with no fields to update"""
payload: dict = {
"user_id": test_user["user_id"],
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 101 # ARGUMENT_ERROR
assert "No valid fields to update" in res["message"]
@pytest.mark.p1
def test_update_email_using_email_field_when_user_id_provided(
self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict
) -> None:
"""Test that when user_id is provided, 'email' field can be used as new_email"""
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict = {
"user_id": test_user["user_id"],
"email": new_email, # When user_id is provided, email is treated as new_email
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["email"] == new_email
@pytest.mark.p2
def test_update_response_structure(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test that update response has correct structure"""
payload: dict = {
"user_id": test_user["user_id"],
"nickname": "response_test",
}
res: dict = update_user(HttpApiAuth, payload)
assert res["code"] == 0
assert "data" in res
assert "id" in res["data"]
assert "email" in res["data"]
assert "nickname" in res["data"]
assert res["data"]["nickname"] == "response_test"
assert "updated successfully" in res["message"].lower()
@pytest.mark.p2
def test_concurrent_updates(self, HttpApiAuth: RAGFlowHttpApiAuth) -> None:
"""Test concurrent updates to different users"""
# Create multiple users
users: list = []
for i in range(5):
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict = {
"nickname": f"user_{i}",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0
users.append(create_res["data"])
# Update all users concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
futures: list = []
for i, user in enumerate(users):
payload: dict = {
"user_id": user["id"],
"nickname": f"updated_user_{i}",
}
futures.append(executor.submit(update_user, HttpApiAuth, payload))
responses: list = list(as_completed(futures))
assert len(responses) == 5
assert all(future.result()["code"] == 0 for future in futures)
@pytest.mark.p3
def test_update_same_user_multiple_times(self, HttpApiAuth: RAGFlowHttpApiAuth, test_user: dict) -> None:
"""Test updating the same user multiple times"""
# First update
payload1: dict = {
"user_id": test_user["user_id"],
"nickname": "first_update",
}
res1: dict = update_user(HttpApiAuth, payload1)
assert res1["code"] == 0
assert res1["data"]["nickname"] == "first_update"
# Second update
payload2: dict = {
"user_id": test_user["user_id"],
"nickname": "second_update",
}
res2: dict = update_user(HttpApiAuth, payload2)
assert res2["code"] == 0
assert res2["data"]["nickname"] == "second_update"
# Third update
payload3: dict = {
"user_id": test_user["user_id"],
"nickname": "third_update",
}
res3: dict = update_user(HttpApiAuth, payload3)
assert res3["code"] == 0
assert res3["data"]["nickname"] == "third_update"

28
uv.lock generated
View file

@ -5294,6 +5294,7 @@ dependencies = [
{ name = "roman-numbers" }, { name = "roman-numbers" },
{ name = "ruamel-base" }, { name = "ruamel-base" },
{ name = "ruamel-yaml" }, { name = "ruamel-yaml" },
{ name = "ruff" },
{ name = "scholarly" }, { name = "scholarly" },
{ name = "scikit-learn" }, { name = "scikit-learn" },
{ name = "selenium" }, { name = "selenium" },
@ -5448,6 +5449,7 @@ requires-dist = [
{ name = "roman-numbers", specifier = "==1.0.2" }, { name = "roman-numbers", specifier = "==1.0.2" },
{ name = "ruamel-base", specifier = "==1.0.0" }, { name = "ruamel-base", specifier = "==1.0.0" },
{ name = "ruamel-yaml", specifier = ">=0.18.6,<0.19.0" }, { name = "ruamel-yaml", specifier = ">=0.18.6,<0.19.0" },
{ name = "ruff", specifier = ">=0.14.4" },
{ name = "scholarly", specifier = "==1.7.11" }, { name = "scholarly", specifier = "==1.7.11" },
{ name = "scikit-learn", specifier = "==1.5.0" }, { name = "scikit-learn", specifier = "==1.5.0" },
{ name = "selenium", specifier = "==4.22.0" }, { name = "selenium", specifier = "==4.22.0" },
@ -5895,6 +5897,32 @@ wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/d7/8f/c3654f6f1ddb75daf3922c3d8fc6005b1ab56671ad56ffb874d908bfa668/ruamel.yaml.clib-0.2.12-cp312-cp312-win_amd64.whl", hash = "sha256:0467c5965282c62203273b838ae77c0d29d7638c8a4e3a1c8bdd3602c10904e4", size = 115523, upload-time = "2024-10-20T10:13:02.768Z" }, { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d7/8f/c3654f6f1ddb75daf3922c3d8fc6005b1ab56671ad56ffb874d908bfa668/ruamel.yaml.clib-0.2.12-cp312-cp312-win_amd64.whl", hash = "sha256:0467c5965282c62203273b838ae77c0d29d7638c8a4e3a1c8bdd3602c10904e4", size = 115523, upload-time = "2024-10-20T10:13:02.768Z" },
] ]
[[package]]
name = "ruff"
version = "0.14.4"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/df/55/cccfca45157a2031dcbb5a462a67f7cf27f8b37d4b3b1cd7438f0f5c1df6/ruff-0.14.4.tar.gz", hash = "sha256:f459a49fe1085a749f15414ca76f61595f1a2cc8778ed7c279b6ca2e1fd19df3", size = 5587844, upload-time = "2025-11-06T22:07:45.033Z" }
wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/17/b9/67240254166ae1eaa38dec32265e9153ac53645a6c6670ed36ad00722af8/ruff-0.14.4-py3-none-linux_armv6l.whl", hash = "sha256:e6604613ffbcf2297cd5dcba0e0ac9bd0c11dc026442dfbb614504e87c349518", size = 12606781, upload-time = "2025-11-06T22:07:01.841Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/46/c8/09b3ab245d8652eafe5256ab59718641429f68681ee713ff06c5c549f156/ruff-0.14.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d99c0b52b6f0598acede45ee78288e5e9b4409d1ce7f661f0fa36d4cbeadf9a4", size = 12946765, upload-time = "2025-11-06T22:07:05.858Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/14/bb/1564b000219144bf5eed2359edc94c3590dd49d510751dad26202c18a17d/ruff-0.14.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:9358d490ec030f1b51d048a7fd6ead418ed0826daf6149e95e30aa67c168af33", size = 11928120, upload-time = "2025-11-06T22:07:08.023Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/a3/92/d5f1770e9988cc0742fefaa351e840d9aef04ec24ae1be36f333f96d5704/ruff-0.14.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:81b40d27924f1f02dfa827b9c0712a13c0e4b108421665322218fc38caf615c2", size = 12370877, upload-time = "2025-11-06T22:07:10.015Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/e2/29/e9282efa55f1973d109faf839a63235575519c8ad278cc87a182a366810e/ruff-0.14.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f5e649052a294fe00818650712083cddc6cc02744afaf37202c65df9ea52efa5", size = 12408538, upload-time = "2025-11-06T22:07:13.085Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/8e/01/930ed6ecfce130144b32d77d8d69f5c610e6d23e6857927150adf5d7379a/ruff-0.14.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa082a8f878deeba955531f975881828fd6afd90dfa757c2b0808aadb437136e", size = 13141942, upload-time = "2025-11-06T22:07:15.386Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/6a/46/a9c89b42b231a9f487233f17a89cbef9d5acd538d9488687a02ad288fa6b/ruff-0.14.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:1043c6811c2419e39011890f14d0a30470f19d47d197c4858b2787dfa698f6c8", size = 14544306, upload-time = "2025-11-06T22:07:17.631Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/78/96/9c6cf86491f2a6d52758b830b89b78c2ae61e8ca66b86bf5a20af73d20e6/ruff-0.14.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a9f3a936ac27fb7c2a93e4f4b943a662775879ac579a433291a6f69428722649", size = 14210427, upload-time = "2025-11-06T22:07:19.832Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/71/f4/0666fe7769a54f63e66404e8ff698de1dcde733e12e2fd1c9c6efb689cb5/ruff-0.14.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:95643ffd209ce78bc113266b88fba3d39e0461f0cbc8b55fb92505030fb4a850", size = 13658488, upload-time = "2025-11-06T22:07:22.32Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/ee/79/6ad4dda2cfd55e41ac9ed6d73ef9ab9475b1eef69f3a85957210c74ba12c/ruff-0.14.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:456daa2fa1021bc86ca857f43fe29d5d8b3f0e55e9f90c58c317c1dcc2afc7b5", size = 13354908, upload-time = "2025-11-06T22:07:24.347Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/b5/60/f0b6990f740bb15c1588601d19d21bcc1bd5de4330a07222041678a8e04f/ruff-0.14.4-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:f911bba769e4a9f51af6e70037bb72b70b45a16db5ce73e1f72aefe6f6d62132", size = 13587803, upload-time = "2025-11-06T22:07:26.327Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/c9/da/eaaada586f80068728338e0ef7f29ab3e4a08a692f92eb901a4f06bbff24/ruff-0.14.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:76158a7369b3979fa878612c623a7e5430c18b2fd1c73b214945c2d06337db67", size = 12279654, upload-time = "2025-11-06T22:07:28.46Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/66/d4/b1d0e82cf9bf8aed10a6d45be47b3f402730aa2c438164424783ac88c0ed/ruff-0.14.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:f3b8f3b442d2b14c246e7aeca2e75915159e06a3540e2f4bed9f50d062d24469", size = 12357520, upload-time = "2025-11-06T22:07:31.468Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/04/f4/53e2b42cc82804617e5c7950b7079d79996c27e99c4652131c6a1100657f/ruff-0.14.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c62da9a06779deecf4d17ed04939ae8b31b517643b26370c3be1d26f3ef7dbde", size = 12719431, upload-time = "2025-11-06T22:07:33.831Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/a2/94/80e3d74ed9a72d64e94a7b7706b1c1ebaa315ef2076fd33581f6a1cd2f95/ruff-0.14.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:5a443a83a1506c684e98acb8cb55abaf3ef725078be40237463dae4463366349", size = 13464394, upload-time = "2025-11-06T22:07:35.905Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/54/1a/a49f071f04c42345c793d22f6cf5e0920095e286119ee53a64a3a3004825/ruff-0.14.4-py3-none-win32.whl", hash = "sha256:643b69cb63cd996f1fc7229da726d07ac307eae442dd8974dbc7cf22c1e18fff", size = 12493429, upload-time = "2025-11-06T22:07:38.43Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/bc/22/e58c43e641145a2b670328fb98bc384e20679b5774258b1e540207580266/ruff-0.14.4-py3-none-win_amd64.whl", hash = "sha256:26673da283b96fe35fa0c939bf8411abec47111644aa9f7cfbd3c573fb125d2c", size = 13635380, upload-time = "2025-11-06T22:07:40.496Z" },
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/30/bd/4168a751ddbbf43e86544b4de8b5c3b7be8d7167a2a5cb977d274e04f0a1/ruff-0.14.4-py3-none-win_arm64.whl", hash = "sha256:dd09c292479596b0e6fec8cd95c65c3a6dc68e9ad17b8f2382130f87ff6a75bb", size = 12663065, upload-time = "2025-11-06T22:07:42.603Z" },
]
[[package]] [[package]]
name = "s3transfer" name = "s3transfer"
version = "0.10.4" version = "0.10.4"