fix: prevent Path Traversal vulnerability in upload endpoint

- Add sanitize_filename() function to validate and clean uploaded filenames
- Remove path separators, traversal sequences, and control characters
- Verify final paths stay within input directory using Path.resolve()
- Return HTTP 400 errors for unsafe filenames
- Prevents directory traversal attacks like ../../../etc/passwd
This commit is contained in:
yangdx 2025-06-27 02:33:05 +08:00
parent 44daf51501
commit 60777d535b

View file

@ -62,6 +62,49 @@ router = APIRouter(
temp_prefix = "__tmp__"
def sanitize_filename(filename: str, input_dir: Path) -> str:
"""
Sanitize uploaded filename to prevent Path Traversal attacks.
Args:
filename: The original filename from the upload
input_dir: The target input directory
Returns:
str: Sanitized filename that is safe to use
Raises:
HTTPException: If the filename is unsafe or invalid
"""
# Basic validation
if not filename or not filename.strip():
raise HTTPException(status_code=400, detail="Filename cannot be empty")
# Remove path separators and traversal sequences
clean_name = filename.replace('/', '').replace('\\', '')
clean_name = clean_name.replace('..', '')
# Remove control characters and null bytes
clean_name = ''.join(c for c in clean_name if ord(c) >= 32 and c != '\x7f')
# Remove leading/trailing whitespace and dots
clean_name = clean_name.strip().strip('.')
# Check if anything is left after sanitization
if not clean_name:
raise HTTPException(status_code=400, detail="Invalid filename after sanitization")
# Verify the final path stays within the input directory
try:
final_path = (input_dir / clean_name).resolve()
if not final_path.is_relative_to(input_dir.resolve()):
raise HTTPException(status_code=400, detail="Unsafe filename detected")
except (OSError, ValueError):
raise HTTPException(status_code=400, detail="Invalid filename")
return clean_name
class ScanResponse(BaseModel):
"""Response model for document scanning operation
@ -986,18 +1029,21 @@ def create_document_routes(
HTTPException: If the file type is not supported (400) or other errors occur (500).
"""
try:
if not doc_manager.is_supported_file(file.filename):
# Sanitize filename to prevent Path Traversal attacks
safe_filename = sanitize_filename(file.filename, doc_manager.input_dir)
if not doc_manager.is_supported_file(safe_filename):
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
file_path = doc_manager.input_dir / file.filename
file_path = doc_manager.input_dir / safe_filename
# Check if file already exists
if file_path.exists():
return InsertResponse(
status="duplicated",
message=f"File '{file.filename}' already exists in the input directory.",
message=f"File '{safe_filename}' already exists in the input directory.",
)
with open(file_path, "wb") as buffer:
@ -1008,7 +1054,7 @@ def create_document_routes(
return InsertResponse(
status="success",
message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.",
message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.",
)
except Exception as e:
logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")