LightRAG/lightrag/security.py
2025-12-05 14:31:13 +08:00

189 lines
5.9 KiB
Python

"""Security utilities for multi-tenant validation and sanitization."""
from pathlib import Path
from typing import Tuple
import uuid
from fastapi import HTTPException, status
import logging
logger = logging.getLogger(__name__)
def validate_uuid(value: str, param_name: str = "id") -> str:
"""Validate that a string is a valid UUID format.
Args:
value: String to validate
param_name: Name of parameter (for error messages)
Returns:
The validated UUID string
Raises:
HTTPException: If value is not a valid UUID
"""
if not value or not value.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {param_name}: empty value",
)
try:
# Validate UUID format
uuid.UUID(value.strip())
return value.strip()
except ValueError:
logger.warning(f"Invalid UUID format for {param_name}: {value}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {param_name} format",
)
def validate_identifier(value: str, param_name: str = "id") -> str:
"""Validate that a string is a safe identifier (UUID or slug).
Accepts both UUID format and string slugs (alphanumeric with hyphens/underscores).
Used for tenant_id and kb_id which can be either UUIDs or human-readable strings.
Args:
value: String to validate
param_name: Name of parameter (for error messages)
Returns:
The validated identifier string
Raises:
HTTPException: If value is empty or contains unsafe characters
"""
if not value or not value.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {param_name}: empty value",
)
value = value.strip()
# Try UUID first
try:
uuid.UUID(value)
return value
except ValueError:
pass # Not a UUID, check if it's a valid slug
# Validate as slug: alphanumeric, hyphens, underscores only
# This prevents path traversal (no slashes) and injection attacks
import re
if not re.match(r"^[a-zA-Z0-9_-]+$", value):
logger.warning(f"Invalid identifier format for {param_name}: {value}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {param_name} format. Must be UUID or alphanumeric with hyphens/underscores only.",
)
# Limit length to prevent abuse
if len(value) > 255:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {param_name}: too long (max 255 characters)",
)
return value
def validate_working_directory(
base_dir: str, tenant_id: str, kb_id: str
) -> Tuple[Path, str]:
"""Validate and create safe working directory for tenant/KB.
This function does the following:
1. Validates tenant_id and kb_id are safe identifiers (UUID or slug)
2. Creates the directory path safely
3. Verifies the resolved path stays within base_dir (prevents path traversal)
4. Returns both the path and composite workspace identifier
Args:
base_dir: Base working directory for all tenants
tenant_id: Tenant identifier (UUID or slug like 'acme-corp')
kb_id: Knowledge base identifier (UUID or slug like 'kb-main')
Returns:
Tuple of (validated_path, composite_workspace_id)
Raises:
HTTPException: If validation fails or path traversal detected
"""
# Validate identifiers (accepts both UUIDs and slugs)
tenant_id = validate_identifier(tenant_id, "tenant_id")
kb_id = validate_identifier(kb_id, "kb_id")
try:
# Create and resolve paths
base_path = Path(base_dir).resolve()
tenant_path = (base_path / tenant_id / kb_id).resolve()
# Critical security check: verify path stays within base_dir
if not tenant_path.is_relative_to(base_path):
logger.error(
f"Path traversal attempt detected: base={base_path}, "
f"requested={tenant_path}, tenant={tenant_id}, kb={kb_id}"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid path detected"
)
# Create composite workspace identifier
composite_workspace = f"{tenant_id}:{kb_id}"
logger.debug(
f"Validated working directory: path={tenant_path}, workspace={composite_workspace}"
)
return tenant_path, composite_workspace
except (OSError, RuntimeError) as e:
logger.error(f"Error validating working directory: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to validate working directory",
)
def sanitize_identifier(value: str, max_length: int = 255) -> str:
"""Sanitize an identifier string for safe use.
Removes or replaces potentially dangerous characters while preserving
readability. Used for names, descriptions, etc. - not UUIDs.
Args:
value: String to sanitize
max_length: Maximum allowed length
Returns:
Sanitized string
Raises:
HTTPException: If value is empty or too long after sanitization
"""
if not value or not value.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Value cannot be empty"
)
# Remove null bytes and control characters
sanitized = "".join(c for c in value if ord(c) >= 32 and c != "\x7f")
# Remove path separators
sanitized = sanitized.replace("/", "").replace("\\", "")
# Strip and limit length
sanitized = sanitized.strip()[:max_length]
if not sanitized:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid value after sanitization",
)
return sanitized