From dd51513f78afa0a4f27c6eff243963ef8ad756e3 Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Fri, 21 Nov 2025 15:37:12 +0800 Subject: [PATCH 1/3] fix teams_connector load and valid vredentaials --- common/data_source/teams_connector.py | 132 +++++++++++++++++++++----- common/data_source/utils.py | 19 ++++ 2 files changed, 129 insertions(+), 22 deletions(-) diff --git a/common/data_source/teams_connector.py b/common/data_source/teams_connector.py index 0b4cd5642..b5aff3b3c 100644 --- a/common/data_source/teams_connector.py +++ b/common/data_source/teams_connector.py @@ -1,27 +1,32 @@ """Microsoft Teams connector""" from typing import Any - +import logging import msal from office365.graph_client import GraphClient from office365.runtime.client_request_exception import ClientRequestException +from common.data_source.utils import run_with_timeout from common.data_source.exceptions import ( ConnectorValidationError, InsufficientPermissionsError, - UnexpectedValidationError, ConnectorMissingCredentialError + UnexpectedValidationError, + ConnectorMissingCredentialError ) from common.data_source.interfaces import ( SecondsSinceUnixEpoch, - SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync + SlimConnectorWithPermSync, + CheckpointedConnectorWithPermSync ) from common.data_source.models import ( ConnectorCheckpoint ) -_SLIM_DOC_BATCH_SIZE = 5000 +_SLIM_DOC_BATCH_SIZE = 5000 +_MAX_WORKERS = 10 + class TeamsCheckpoint(ConnectorCheckpoint): """Teams-specific checkpoint""" todo_team_ids: list[str] | None = None @@ -30,9 +35,12 @@ class TeamsCheckpoint(ConnectorCheckpoint): class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): """Microsoft Teams connector for accessing Teams messages and channels""" - def __init__(self, batch_size: int = _SLIM_DOC_BATCH_SIZE) -> None: - self.batch_size = batch_size - self.teams_client = None + def __init__(self, teams_lst: list[str] = None, max_workers: int = _MAX_WORKERS) -> None: + self.teams_lst = teams_lst + self.max_workers = max_workers + self.teams_client: GraphClient | None = None + self.msal_app: msal.ConfidentialClientApplication | None = None + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: """Load Microsoft Teams credentials""" @@ -45,20 +53,25 @@ class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSyn raise ConnectorMissingCredentialError("Microsoft Teams credentials are incomplete") # Create MSAL confidential client - app = msal.ConfidentialClientApplication( + self.msal_app = msal.ConfidentialClientApplication( client_id=client_id, client_credential=client_secret, authority=f"https://login.microsoftonline.com/{tenant_id}" ) - # Get access token - result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) + def _acquire_token_callback() -> dict[str, Any]: + if self.msal_app is None: + raise RuntimeError("Failed to create MSAL ConfidentialClientApplication") - if "access_token" not in result: - raise ConnectorMissingCredentialError("Failed to acquire Microsoft Teams access token") + # Get access token + token = self.msal_app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) + if not isinstance(token, dict) or "access_token" not in token: + raise RuntimeError("Failed to acquire token for Microsoft Graph API") + return token + # Create Graph client for Teams - self.teams_client = GraphClient(result["access_token"]) + self.teams_client = GraphClient(token_callback=_acquire_token_callback) return None except Exception as e: @@ -69,16 +82,67 @@ class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSyn if not self.teams_client: raise ConnectorMissingCredentialError("Microsoft Teams") + # Check for special characters in team names + has_special_chars = self._has_odata_incompatible_chars(self.teams_lst) + if has_special_chars: + logging.info( + "Some requested team names contain special characters (&, (, )) that require " + "client-side filtering during data retrieval." + ) + + timeout = 10 try: - # Test connection by getting teams - teams = self.teams_client.teams.get().execute_query() - if not teams: - raise ConnectorValidationError("Failed to access Microsoft Teams") + logging.info( + f"Requested team count: {len(self.teams_lst) if self.teams_lst else 0}, " + f"Has special chars: {has_special_chars}" + ) + + validation_query = self.teams_client.teams.get().top(1) + run_with_timeout( + timeout=timeout, + func=lambda: validation_query.execute_query() + ) + + logging.info("Microsoft Teams connector settings validated successfully.") + + except TimeoutError as e: + raise ConnectorValidationError( + f"Timeout while validating Teams access (waited {timeout}s). " + f"This may indicate network issues or authentication problems. " + f"Error: {e}" + ) except ClientRequestException as e: - if "401" in str(e) or "403" in str(e): - raise InsufficientPermissionsError("Invalid credentials or insufficient permissions") - else: - raise UnexpectedValidationError(f"Microsoft Teams validation error: {e}") + if not e.response: + raise RuntimeError(f"No response provided in {e=}") + status_code = e.response.status_code + if status_code == 401: + raise ConnectorValidationError( + "Invalid or expired Microsoft Teams credentials. (401 Unauthorized)" + ) + elif status_code == 403: + raise InsufficientPermissionsError( + "Microsoft Teams connector lacks necessary permissions. (403 Forbidden)" + ) + raise UnexpectedValidationError( + f"Unexpected error during Teams validation: {e} (Status code: {status_code})" + ) + except Exception as e: + error_str = str(e).lower() + if ( + "unauthorized" in error_str + or "401" in error_str + or "invalid_grant" in error_str + ): + raise ConnectorValidationError( + "Invalid or expired Microsoft Teams credentials." + ) + elif "forbidden" in error_str or "403" in error_str: + raise InsufficientPermissionsError( + "App lacks required permissions to read from Microsoft Teams." + ) + raise ConnectorValidationError( + f"Unexpected error during Teams validation: {e}" + ) def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Any: """Poll Microsoft Teams for recent messages""" @@ -112,4 +176,28 @@ class TeamsConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSyn ) -> Any: """Retrieve all simplified documents with permission sync""" # Simplified implementation - return [] \ No newline at end of file + return [] + + def load_from_checkpoint_with_perm_sync(self, start, end, checkpoint): + return super().load_from_checkpoint_with_perm_sync(start, end, checkpoint) + + + def _has_odata_incompatible_chars(self, team_names: list[str] | None) -> bool: + """Check if any team name contains characters that break Microsoft Graph OData filters. + + The Microsoft Graph Teams API has limited OData support. Characters like + &, (, and ) cause parsing errors and require client-side filtering instead. + """ + if not team_names: + return False + return any(char in name for name in team_names for char in ["&", "(", ")"]) + +if __name__ == "__main__": + connector = TeamsConnector() + creds = { + "tenant_id": "", + "client_id": "", + "client_secret": "" + } + connector.load_credentials(creds) + connector.validate_connector_settings() \ No newline at end of file diff --git a/common/data_source/utils.py b/common/data_source/utils.py index c079b2b9a..727b5a7c1 100644 --- a/common/data_source/utils.py +++ b/common/data_source/utils.py @@ -1134,3 +1134,22 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R future_to_index[executor.submit(_next_or_none, ind, gens[ind])] = next_ind next_ind += 1 del future_to_index[future] + +def run_with_timeout( + timeout: float, func: Callable[..., R], *args: Any, **kwargs: Any +) -> R: + """ + Executes a function with a timeout. If the function doesn't complete within the specified + timeout, raises TimeoutError. + """ + context = contextvars.copy_context() + task = TimeoutThread(timeout, context.run, func, *args, **kwargs) + task.start() + task.join(timeout) + + if task.exception is not None: + raise task.exception + if task.is_alive(): + task.end() + + return task.result # type: ignore \ No newline at end of file From 57bc68ca78d6cda4ffb42dd836c8d47ec4ca6062 Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Fri, 21 Nov 2025 17:36:20 +0800 Subject: [PATCH 2/3] fix --- api/apps/document_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 7ec8c1587..6e5689625 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -607,7 +607,7 @@ async def get_image(image_id): @login_required @validate_request("conversation_id") async def upload_and_parse(): - files = await request.file + files = await request.files if "file" not in files: return get_json_result(data=False, message="No file part!", code=RetCode.ARGUMENT_ERROR) From 8b0b552da8c8835bdb3ea5e6681f4981a77c1113 Mon Sep 17 00:00:00 2001 From: Billy Bao Date: Mon, 24 Nov 2025 11:06:33 +0800 Subject: [PATCH 3/3] rmove dulicate func --- common/data_source/utils.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/common/data_source/utils.py b/common/data_source/utils.py index 727b5a7c1..ec4940c53 100644 --- a/common/data_source/utils.py +++ b/common/data_source/utils.py @@ -1135,21 +1135,3 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R next_ind += 1 del future_to_index[future] -def run_with_timeout( - timeout: float, func: Callable[..., R], *args: Any, **kwargs: Any -) -> R: - """ - Executes a function with a timeout. If the function doesn't complete within the specified - timeout, raises TimeoutError. - """ - context = contextvars.copy_context() - task = TimeoutThread(timeout, context.run, func, *args, **kwargs) - task.start() - task.join(timeout) - - if task.exception is not None: - raise task.exception - if task.is_alive(): - task.end() - - return task.result # type: ignore \ No newline at end of file