Fix security issue: Unauthenticated Access to Privileged Permission Management Endpoints (CWE-306, CWE-862)

This commit is contained in:
pensarapp[bot] 2025-05-22 08:49:57 +00:00 committed by GitHub
parent b1b4ae3d5f
commit 27f4dc0fae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,14 +1,30 @@
from uuid import UUID
from fastapi import APIRouter
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import JSONResponse
# Import your authentication dependency and User type
from cognee.modules.users.auth.dependencies import get_current_user
def get_permissions_router() -> APIRouter:
permissions_router = APIRouter()
# Authorization function (example: only superadmins allowed)
def check_superadmin_user(user):
if not hasattr(user, "is_superadmin") or not user.is_superadmin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Operation not permitted: insufficient privileges."
)
@permissions_router.post("/roles/{role_id}/permissions")
async def give_default_permission_to_role(role_id: UUID, permission_name: str):
async def give_default_permission_to_role(
role_id: UUID,
permission_name: str,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.permissions.methods import (
give_default_permission_to_role as set_default_permission_to_role,
)
@ -18,7 +34,12 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Permission assigned to role"})
@permissions_router.post("/tenants/{tenant_id}/permissions")
async def give_default_permission_to_tenant(tenant_id: UUID, permission_name: str):
async def give_default_permission_to_tenant(
tenant_id: UUID,
permission_name: str,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.permissions.methods import (
give_default_permission_to_tenant as set_tenant_default_permissions,
)
@ -28,7 +49,12 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Permission assigned to tenant"})
@permissions_router.post("/users/{user_id}/permissions")
async def give_default_permission_to_user(user_id: UUID, permission_name: str):
async def give_default_permission_to_user(
user_id: UUID,
permission_name: str,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.permissions.methods import (
give_default_permission_to_user as set_default_permission_to_user,
)
@ -41,7 +67,9 @@ def get_permissions_router() -> APIRouter:
async def create_role(
role_name: str,
tenant_id: UUID,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.roles.methods import create_role as create_role_method
await create_role_method(role_name=role_name, tenant_id=tenant_id)
@ -49,7 +77,12 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Role created for tenant"})
@permissions_router.post("/users/{user_id}/roles")
async def add_user_to_role(user_id: UUID, role_id: UUID):
async def add_user_to_role(
user_id: UUID,
role_id: UUID,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.roles.methods import add_user_to_role as add_user_to_role_method
await add_user_to_role_method(user_id=user_id, role_id=role_id)
@ -57,11 +90,15 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "User added to role"})
@permissions_router.post("/tenants")
async def create_tenant(tenant_name: str):
async def create_tenant(
tenant_name: str,
current_user=Depends(get_current_user)
):
check_superadmin_user(current_user)
from cognee.modules.users.tenants.methods import create_tenant as create_tenant_method
await create_tenant_method(tenant_name=tenant_name)
return JSONResponse(status_code=200, content={"message": "Tenant created."})
return permissions_router
return permissions_router