From 27f4dc0faec24e5b18646f8d3fc7c22dc67d7b8d Mon Sep 17 00:00:00 2001 From: "pensarapp[bot]" <182705637+pensarapp[bot]@users.noreply.github.com> Date: Thu, 22 May 2025 08:49:57 +0000 Subject: [PATCH] Fix security issue: Unauthenticated Access to Privileged Permission Management Endpoints (CWE-306, CWE-862) --- .../routers/get_permissions_router.py | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/cognee/api/v1/permissions/routers/get_permissions_router.py b/cognee/api/v1/permissions/routers/get_permissions_router.py index 8a52c4a42..8185b6103 100644 --- a/cognee/api/v1/permissions/routers/get_permissions_router.py +++ b/cognee/api/v1/permissions/routers/get_permissions_router.py @@ -1,14 +1,30 @@ from uuid import UUID -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import JSONResponse +# Import your authentication dependency and User type +from cognee.modules.users.auth.dependencies import get_current_user + def get_permissions_router() -> APIRouter: permissions_router = APIRouter() + # Authorization function (example: only superadmins allowed) + def check_superadmin_user(user): + if not hasattr(user, "is_superadmin") or not user.is_superadmin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Operation not permitted: insufficient privileges." + ) + @permissions_router.post("/roles/{role_id}/permissions") - async def give_default_permission_to_role(role_id: UUID, permission_name: str): + async def give_default_permission_to_role( + role_id: UUID, + permission_name: str, + current_user=Depends(get_current_user) + ): + check_superadmin_user(current_user) from cognee.modules.users.permissions.methods import ( give_default_permission_to_role as set_default_permission_to_role, ) @@ -18,7 +34,12 @@ def get_permissions_router() -> APIRouter: return JSONResponse(status_code=200, content={"message": "Permission assigned to role"}) @permissions_router.post("/tenants/{tenant_id}/permissions") - async def give_default_permission_to_tenant(tenant_id: UUID, permission_name: str): + async def give_default_permission_to_tenant( + tenant_id: UUID, + permission_name: str, + current_user=Depends(get_current_user) + ): + check_superadmin_user(current_user) from cognee.modules.users.permissions.methods import ( give_default_permission_to_tenant as set_tenant_default_permissions, ) @@ -28,7 +49,12 @@ def get_permissions_router() -> APIRouter: return JSONResponse(status_code=200, content={"message": "Permission assigned to tenant"}) @permissions_router.post("/users/{user_id}/permissions") - async def give_default_permission_to_user(user_id: UUID, permission_name: str): + async def give_default_permission_to_user( + user_id: UUID, + permission_name: str, + current_user=Depends(get_current_user) + ): + check_superadmin_user(current_user) from cognee.modules.users.permissions.methods import ( give_default_permission_to_user as set_default_permission_to_user, ) @@ -41,7 +67,9 @@ def get_permissions_router() -> APIRouter: async def create_role( role_name: str, tenant_id: UUID, + current_user=Depends(get_current_user) ): + check_superadmin_user(current_user) from cognee.modules.users.roles.methods import create_role as create_role_method await create_role_method(role_name=role_name, tenant_id=tenant_id) @@ -49,7 +77,12 @@ def get_permissions_router() -> APIRouter: return JSONResponse(status_code=200, content={"message": "Role created for tenant"}) @permissions_router.post("/users/{user_id}/roles") - async def add_user_to_role(user_id: UUID, role_id: UUID): + async def add_user_to_role( + user_id: UUID, + role_id: UUID, + current_user=Depends(get_current_user) + ): + check_superadmin_user(current_user) from cognee.modules.users.roles.methods import add_user_to_role as add_user_to_role_method await add_user_to_role_method(user_id=user_id, role_id=role_id) @@ -57,11 +90,15 @@ def get_permissions_router() -> APIRouter: return JSONResponse(status_code=200, content={"message": "User added to role"}) @permissions_router.post("/tenants") - async def create_tenant(tenant_name: str): + async def create_tenant( + tenant_name: str, + current_user=Depends(get_current_user) + ): + check_superadmin_user(current_user) from cognee.modules.users.tenants.methods import create_tenant as create_tenant_method await create_tenant_method(tenant_name=tenant_name) return JSONResponse(status_code=200, content={"message": "Tenant created."}) - return permissions_router + return permissions_router \ No newline at end of file