From 774ab0fa7463035a67fea5399d87984a98195fc5 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 18 Sep 2025 15:41:07 -0400 Subject: [PATCH] get_effective_jwt_token --- src/session_manager.py | 47 ++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/session_manager.py b/src/session_manager.py index 1008d1fd..6eef5c70 100644 --- a/src/session_manager.py +++ b/src/session_manager.py @@ -191,26 +191,8 @@ class SessionManager: def get_user_opensearch_client(self, user_id: str, jwt_token: str): """Get or create OpenSearch client for user with their JWT""" - from config.settings import is_no_auth_mode - - logger.debug( - "get_user_opensearch_client", - user_id=user_id, - jwt_token_present=(jwt_token is not None), - no_auth_mode=is_no_auth_mode(), - ) - - # In no-auth mode, create anonymous JWT for OpenSearch DLS - if jwt_token is None and (is_no_auth_mode() or user_id in (None, AnonymousUser().user_id)): - if not hasattr(self, "_anonymous_jwt"): - # Create anonymous JWT token for OpenSearch OIDC - logger.debug("Creating anonymous JWT") - self._anonymous_jwt = self._create_anonymous_jwt() - logger.debug( - "Anonymous JWT created", jwt_prefix=self._anonymous_jwt[:50] - ) - jwt_token = self._anonymous_jwt - logger.debug("Using anonymous JWT for OpenSearch") + # Get the effective JWT token (handles anonymous JWT creation) + jwt_token = self.get_effective_jwt_token(user_id, jwt_token) # Check if we have a cached client for this user if user_id not in self.user_opensearch_clients: @@ -222,6 +204,31 @@ class SessionManager: return self.user_opensearch_clients[user_id] + def get_effective_jwt_token(self, user_id: str, jwt_token: str) -> str: + """Get the effective JWT token, creating anonymous JWT if needed in no-auth mode""" + from config.settings import is_no_auth_mode + + logger.debug( + "get_effective_jwt_token", + user_id=user_id, + jwt_token_present=(jwt_token is not None), + no_auth_mode=is_no_auth_mode(), + ) + + # In no-auth mode, create anonymous JWT if needed + if jwt_token is None and (is_no_auth_mode() or user_id in (None, AnonymousUser().user_id)): + if not hasattr(self, "_anonymous_jwt"): + # Create anonymous JWT token for OpenSearch OIDC + logger.debug("Creating anonymous JWT") + self._anonymous_jwt = self._create_anonymous_jwt() + logger.debug( + "Anonymous JWT created", jwt_prefix=self._anonymous_jwt[:50] + ) + jwt_token = self._anonymous_jwt + logger.debug("Using anonymous JWT") + + return jwt_token + def _create_anonymous_jwt(self) -> str: """Create JWT token for anonymous user in no-auth mode""" anonymous_user = AnonymousUser()