get_effective_jwt_token

This commit is contained in:
phact 2025-09-18 15:41:07 -04:00
parent 435c740cf0
commit 774ab0fa74

View file

@ -191,26 +191,8 @@ class SessionManager:
def get_user_opensearch_client(self, user_id: str, jwt_token: str):
"""Get or create OpenSearch client for user with their JWT"""
from config.settings import is_no_auth_mode
logger.debug(
"get_user_opensearch_client",
user_id=user_id,
jwt_token_present=(jwt_token is not None),
no_auth_mode=is_no_auth_mode(),
)
# In no-auth mode, create anonymous JWT for OpenSearch DLS
if jwt_token is None and (is_no_auth_mode() or user_id in (None, AnonymousUser().user_id)):
if not hasattr(self, "_anonymous_jwt"):
# Create anonymous JWT token for OpenSearch OIDC
logger.debug("Creating anonymous JWT")
self._anonymous_jwt = self._create_anonymous_jwt()
logger.debug(
"Anonymous JWT created", jwt_prefix=self._anonymous_jwt[:50]
)
jwt_token = self._anonymous_jwt
logger.debug("Using anonymous JWT for OpenSearch")
# Get the effective JWT token (handles anonymous JWT creation)
jwt_token = self.get_effective_jwt_token(user_id, jwt_token)
# Check if we have a cached client for this user
if user_id not in self.user_opensearch_clients:
@ -222,6 +204,31 @@ class SessionManager:
return self.user_opensearch_clients[user_id]
def get_effective_jwt_token(self, user_id: str, jwt_token: str) -> str:
"""Get the effective JWT token, creating anonymous JWT if needed in no-auth mode"""
from config.settings import is_no_auth_mode
logger.debug(
"get_effective_jwt_token",
user_id=user_id,
jwt_token_present=(jwt_token is not None),
no_auth_mode=is_no_auth_mode(),
)
# In no-auth mode, create anonymous JWT if needed
if jwt_token is None and (is_no_auth_mode() or user_id in (None, AnonymousUser().user_id)):
if not hasattr(self, "_anonymous_jwt"):
# Create anonymous JWT token for OpenSearch OIDC
logger.debug("Creating anonymous JWT")
self._anonymous_jwt = self._create_anonymous_jwt()
logger.debug(
"Anonymous JWT created", jwt_prefix=self._anonymous_jwt[:50]
)
jwt_token = self._anonymous_jwt
logger.debug("Using anonymous JWT")
return jwt_token
def _create_anonymous_jwt(self) -> str:
"""Create JWT token for anonymous user in no-auth mode"""
anonymous_user = AnonymousUser()