From 60777d535b719631680bcf5d0969bdef79ca4eaf Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 27 Jun 2025 02:33:05 +0800 Subject: [PATCH] fix: prevent Path Traversal vulnerability in upload endpoint - Add sanitize_filename() function to validate and clean uploaded filenames - Remove path separators, traversal sequences, and control characters - Verify final paths stay within input directory using Path.resolve() - Return HTTP 400 errors for unsafe filenames - Prevents directory traversal attacks like ../../../etc/passwd --- lightrag/api/routers/document_routes.py | 54 +++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index fba5c3e8..429820a5 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -62,6 +62,49 @@ router = APIRouter( temp_prefix = "__tmp__" +def sanitize_filename(filename: str, input_dir: Path) -> str: + """ + Sanitize uploaded filename to prevent Path Traversal attacks. + + Args: + filename: The original filename from the upload + input_dir: The target input directory + + Returns: + str: Sanitized filename that is safe to use + + Raises: + HTTPException: If the filename is unsafe or invalid + """ + # Basic validation + if not filename or not filename.strip(): + raise HTTPException(status_code=400, detail="Filename cannot be empty") + + # Remove path separators and traversal sequences + clean_name = filename.replace('/', '').replace('\\', '') + clean_name = clean_name.replace('..', '') + + # Remove control characters and null bytes + clean_name = ''.join(c for c in clean_name if ord(c) >= 32 and c != '\x7f') + + # Remove leading/trailing whitespace and dots + clean_name = clean_name.strip().strip('.') + + # Check if anything is left after sanitization + if not clean_name: + raise HTTPException(status_code=400, detail="Invalid filename after sanitization") + + # Verify the final path stays within the input directory + try: + final_path = (input_dir / clean_name).resolve() + if not final_path.is_relative_to(input_dir.resolve()): + raise HTTPException(status_code=400, detail="Unsafe filename detected") + except (OSError, ValueError): + raise HTTPException(status_code=400, detail="Invalid filename") + + return clean_name + + class ScanResponse(BaseModel): """Response model for document scanning operation @@ -986,18 +1029,21 @@ def create_document_routes( HTTPException: If the file type is not supported (400) or other errors occur (500). """ try: - if not doc_manager.is_supported_file(file.filename): + # Sanitize filename to prevent Path Traversal attacks + safe_filename = sanitize_filename(file.filename, doc_manager.input_dir) + + if not doc_manager.is_supported_file(safe_filename): raise HTTPException( status_code=400, detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", ) - file_path = doc_manager.input_dir / file.filename + file_path = doc_manager.input_dir / safe_filename # Check if file already exists if file_path.exists(): return InsertResponse( status="duplicated", - message=f"File '{file.filename}' already exists in the input directory.", + message=f"File '{safe_filename}' already exists in the input directory.", ) with open(file_path, "wb") as buffer: @@ -1008,7 +1054,7 @@ def create_document_routes( return InsertResponse( status="success", - message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.", + message=f"File '{safe_filename}' uploaded successfully. Processing will continue in background.", ) except Exception as e: logger.error(f"Error /documents/upload: {file.filename}: {str(e)}")