refactor: Add filtering of non current tenant results when authorizing dataset

This commit is contained in:
Igor Ilic 2025-11-04 17:56:01 +01:00
parent f4117c42e9
commit cd32b492a4
2 changed files with 16 additions and 13 deletions

View file

@ -24,18 +24,14 @@ async def get_all_user_permission_datasets(user: User, permission_type: str) ->
# Get all tenants user is a part of
tenants = await user.awaitable_attrs.tenants
for tenant in tenants:
# If tenant is the user's selected tenant add datasets that users roles in the tenant and the tenant itself
# have access for
if tenant.id == user.tenant_id:
# Get all datasets all tenant members have access to
datasets.extend(await get_principal_datasets(tenant, permission_type))
# Get all datasets all tenant members have access to
datasets.extend(await get_principal_datasets(tenant, permission_type))
# Get all datasets accessible by roles user is a part of
roles = await user.awaitable_attrs.roles
for role in roles:
datasets.extend(await get_principal_datasets(role, permission_type))
# Get all datasets accessible by roles user is a part of
roles = await user.awaitable_attrs.roles
for role in roles:
datasets.extend(await get_principal_datasets(role, permission_type))
# Deduplicate datasets with same ID
unique = {}
@ -43,5 +39,10 @@ async def get_all_user_permission_datasets(user: User, permission_type: str) ->
# If the dataset id key already exists, leave the dictionary unchanged.
unique.setdefault(dataset.id, dataset)
# TODO: Add filtering out of datasets that aren't currently selected tenant of user (currently selected tenant is the tenant_id value in the User model)
return list(unique.values())
# Filter out dataset that aren't part of the current user's tenant
filtered_datasets = []
for dataset in list(unique.values()):
if dataset.tenant_id == user.tenant_id:
filtered_datasets.append(dataset)
return filtered_datasets

View file

@ -42,11 +42,13 @@ async def add_user_to_role(user_id: UUID, role_id: UUID, owner_id: UUID):
.first()
)
user_tenants = await user.awaitable_attrs.tenants
if not user:
raise UserNotFoundError
elif not role:
raise RoleNotFoundError
elif user.tenant_id != role.tenant_id:
elif role.tenant_id not in [tenant.id for tenant in user_tenants]: # TESTME
raise TenantNotFoundError(
message="User tenant does not match role tenant. User cannot be added to role."
)