Merge pull request #2113 from danielaskdd/fix-file-delete

Add path traversal security validation for file deletion operations
This commit is contained in:
Daniel.y 2025-09-17 02:39:45 +08:00 committed by GitHub
commit 2e6596e9b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -745,6 +745,60 @@ class DocumentManager:
return any(filename.lower().endswith(ext) for ext in self.supported_extensions)
def validate_file_path_security(file_path_str: str, base_dir: Path) -> Optional[Path]:
"""
Validate file path security to prevent Path Traversal attacks.
Args:
file_path_str: The file path string to validate
base_dir: The base directory that the file must be within
Returns:
Path: Safe file path if valid, None if unsafe or invalid
"""
if not file_path_str or not file_path_str.strip():
return None
try:
# Clean the file path string
clean_path_str = file_path_str.strip()
# Check for obvious path traversal patterns before processing
# This catches both Unix (..) and Windows (..\) style traversals
if ".." in clean_path_str:
# Additional check for Windows-style backslash traversal
if (
"\\..\\" in clean_path_str
or clean_path_str.startswith("..\\")
or clean_path_str.endswith("\\..")
):
# logger.warning(
# f"Security violation: Windows path traversal attempt detected - {file_path_str}"
# )
return None
# Normalize path separators (convert backslashes to forward slashes)
# This helps handle Windows-style paths on Unix systems
normalized_path = clean_path_str.replace("\\", "/")
# Create path object and resolve it (handles symlinks and relative paths)
candidate_path = (base_dir / normalized_path).resolve()
base_dir_resolved = base_dir.resolve()
# Check if the resolved path is within the base directory
if not candidate_path.is_relative_to(base_dir_resolved):
# logger.warning(
# f"Security violation: Path traversal attempt detected - {file_path_str}"
# )
return None
return candidate_path
except (OSError, ValueError, Exception) as e:
logger.warning(f"Invalid file path detected: {file_path_str} - {str(e)}")
return None
def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str:
"""Generate a unique filename in the target directory by adding numeric suffixes if needed
@ -1429,51 +1483,37 @@ async def background_delete_documents(
):
try:
deleted_files = []
# check and delete files from input_dir directory
file_path = doc_manager.input_dir / result.file_path
if file_path.exists():
try:
file_path.unlink()
deleted_files.append(file_path.name)
file_delete_msg = f"Successfully deleted input_dir file: {result.file_path}"
logger.info(file_delete_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = (
file_delete_msg
)
pipeline_status["history_messages"].append(
file_delete_msg
)
except Exception as file_error:
file_error_msg = f"Failed to delete input_dir file {result.file_path}: {str(file_error)}"
logger.debug(file_error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = (
file_error_msg
)
pipeline_status["history_messages"].append(
file_error_msg
)
# SECURITY FIX: Use secure path validation to prevent arbitrary file deletion
safe_file_path = validate_file_path_security(
result.file_path, doc_manager.input_dir
)
# Also check and delete files from __enqueued__ directory
enqueued_dir = doc_manager.input_dir / "__enqueued__"
if enqueued_dir.exists():
# Look for files with the same name or similar names (with numeric suffixes)
base_name = Path(result.file_path).stem
extension = Path(result.file_path).suffix
# Search for exact match and files with numeric suffixes
for enqueued_file in enqueued_dir.glob(
f"{base_name}*{extension}"
):
if safe_file_path is None:
# Security violation detected - log and skip file deletion
security_msg = f"Security violation: Unsafe file path detected for deletion - {result.file_path}"
logger.warning(security_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = security_msg
pipeline_status["history_messages"].append(
security_msg
)
else:
# check and delete files from input_dir directory
if safe_file_path.exists():
try:
enqueued_file.unlink()
deleted_files.append(enqueued_file.name)
logger.info(
f"Successfully deleted enqueued file: {enqueued_file.name}"
)
except Exception as enqueued_error:
file_error_msg = f"Failed to delete enqueued file {enqueued_file.name}: {str(enqueued_error)}"
safe_file_path.unlink()
deleted_files.append(safe_file_path.name)
file_delete_msg = f"Successfully deleted input_dir file: {result.file_path}"
logger.info(file_delete_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = (
file_delete_msg
)
pipeline_status["history_messages"].append(
file_delete_msg
)
except Exception as file_error:
file_error_msg = f"Failed to delete input_dir file {result.file_path}: {str(file_error)}"
logger.debug(file_error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = (
@ -1483,8 +1523,47 @@ async def background_delete_documents(
file_error_msg
)
# Also check and delete files from __enqueued__ directory
enqueued_dir = doc_manager.input_dir / "__enqueued__"
if enqueued_dir.exists():
# SECURITY FIX: Validate that the file path is safe before processing
# Only proceed if the original path validation passed
base_name = Path(result.file_path).stem
extension = Path(result.file_path).suffix
# Search for exact match and files with numeric suffixes
for enqueued_file in enqueued_dir.glob(
f"{base_name}*{extension}"
):
# Additional security check: ensure enqueued file is within enqueued directory
safe_enqueued_path = (
validate_file_path_security(
enqueued_file.name, enqueued_dir
)
)
if safe_enqueued_path is not None:
try:
enqueued_file.unlink()
deleted_files.append(enqueued_file.name)
logger.info(
f"Successfully deleted enqueued file: {enqueued_file.name}"
)
except Exception as enqueued_error:
file_error_msg = f"Failed to delete enqueued file {enqueued_file.name}: {str(enqueued_error)}"
logger.debug(file_error_msg)
async with pipeline_status_lock:
pipeline_status[
"latest_message"
] = file_error_msg
pipeline_status[
"history_messages"
].append(file_error_msg)
else:
security_msg = f"Security violation: Unsafe enqueued file path detected - {enqueued_file.name}"
logger.warning(security_msg)
if deleted_files == []:
file_error_msg = f"File deletion skipped, missing file: {result.file_path}"
file_error_msg = f"File deletion skipped, missing or unsafe file: {result.file_path}"
logger.warning(file_error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = file_error_msg