cognee/cognee/modules/users/methods/get_authenticated_user.py
Igor Ilic 88ed411f03
feat: user authorization [COG-1189] (#593)
<!-- .github/pull_request_template.md -->

## Description
Added user authorization through JWT header, reworked user and relevant
RBAC models to accompany future User Permission system.

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced an automated workflow to validate server startup.
  - Added secure JWT token generation for improved session handling.
- Enabled a new structure for permission management with role and
tenant-based controls, including endpoints for creating roles, tenants,
and assigning permissions.
- Added methods for assigning default permissions to roles, tenants, and
users.
- Introduced new classes for managing default permissions for roles,
tenants, and users.

- **Refactor**
- Streamlined authentication and user management flows with enhanced
error handling.

- **Tests**
- Upgraded integration tests with improved database initialization and
data pruning for a more stable environment.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
2025-03-13 13:33:42 +01:00

31 lines
1.1 KiB
Python

from types import SimpleNamespace
from ..get_fastapi_users import get_fastapi_users
from fastapi import HTTPException, Header
import os
import jwt
fastapi_users = get_fastapi_users()
async def get_authenticated_user(authorization: str = Header(...)) -> SimpleNamespace:
"""Extract and validate JWT from Authorization header."""
try:
scheme, token = authorization.split()
if scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
payload = jwt.decode(
token, os.getenv("FASTAPI_USERS_JWT_SECRET", "super_secret"), algorithms=["HS256"]
)
# SimpleNamespace lets us access dictionary elements like attributes
auth_data = SimpleNamespace(
id=payload["user_id"], tenant_id=payload["tenant_id"], roles=payload["roles"]
)
return auth_data
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")