diff --git a/common/data_source/notion_connector.py b/common/data_source/notion_connector.py index 0be40ab7a..c98eafcaf 100644 --- a/common/data_source/notion_connector.py +++ b/common/data_source/notion_connector.py @@ -1,42 +1,45 @@ -import logging import html +import logging from collections.abc import Generator from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse + from retry import retry from common.data_source.config import ( INDEX_BATCH_SIZE, - DocumentSource, NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP + NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP, + DocumentSource, +) +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + CredentialExpiredError, + InsufficientPermissionsError, + UnexpectedValidationError, ) from common.data_source.interfaces import ( LoadConnector, PollConnector, - SecondsSinceUnixEpoch + SecondsSinceUnixEpoch, ) from common.data_source.models import ( Document, - TextSection, GenerateDocumentsOutput -) -from common.data_source.exceptions import ( - ConnectorValidationError, - CredentialExpiredError, - InsufficientPermissionsError, - UnexpectedValidationError, ConnectorMissingCredentialError -) -from common.data_source.models import ( - NotionPage, + GenerateDocumentsOutput, NotionBlock, - NotionSearchResponse + NotionPage, + NotionSearchResponse, + TextSection, ) from common.data_source.utils import ( - rl_requests, batch_generator, + datetime_from_string, fetch_notion_data, + filter_pages_by_time, properties_to_str, - filter_pages_by_time, datetime_from_string + rl_requests, ) @@ -65,9 +68,7 @@ class NotionConnector(LoadConnector, PollConnector): self.recursive_index_enabled = recursive_index_enabled or bool(root_page_id) @retry(tries=3, delay=1, backoff=2) - def _fetch_child_blocks( - self, block_id: str, cursor: Optional[str] = None - ) -> dict[str, Any] | None: + def _fetch_child_blocks(self, block_id: str, cursor: Optional[str] = None) -> dict[str, Any] | None: """Fetch all child blocks via the Notion API.""" logging.debug(f"[Notion]: Fetching children of block with ID {block_id}") block_url = f"https://api.notion.com/v1/blocks/{block_id}/children" @@ -83,11 +84,8 @@ class NotionConnector(LoadConnector, PollConnector): response.raise_for_status() return response.json() except Exception as e: - if hasattr(e, 'response') and e.response.status_code == 404: - logging.error( - f"[Notion]: Unable to access block with ID {block_id}. " - f"This is likely due to the block not being shared with the integration." - ) + if hasattr(e, "response") and e.response.status_code == 404: + logging.error(f"[Notion]: Unable to access block with ID {block_id}. This is likely due to the block not being shared with the integration.") return None else: logging.exception(f"[Notion]: Error fetching blocks: {e}") @@ -114,16 +112,12 @@ class NotionConnector(LoadConnector, PollConnector): data = fetch_notion_data(database_url, self.headers, "GET") database_name = data.get("title") - database_name = ( - database_name[0].get("text", {}).get("content") if database_name else None - ) + database_name = database_name[0].get("text", {}).get("content") if database_name else None return NotionPage(**data, database_name=database_name) @retry(tries=3, delay=1, backoff=2) - def _fetch_database( - self, database_id: str, cursor: Optional[str] = None - ) -> dict[str, Any]: + def _fetch_database(self, database_id: str, cursor: Optional[str] = None) -> dict[str, Any]: """Fetch a database from its ID via the Notion API.""" logging.debug(f"[Notion]: Fetching database for ID {database_id}") block_url = f"https://api.notion.com/v1/databases/{database_id}/query" @@ -133,17 +127,12 @@ class NotionConnector(LoadConnector, PollConnector): data = fetch_notion_data(block_url, self.headers, "POST", body) return data except Exception as e: - if hasattr(e, 'response') and e.response.status_code in [404, 400]: - logging.error( - f"[Notion]: Unable to access database with ID {database_id}. " - f"This is likely due to the database not being shared with the integration." - ) + if hasattr(e, "response") and e.response.status_code in [404, 400]: + logging.error(f"[Notion]: Unable to access database with ID {database_id}. This is likely due to the database not being shared with the integration.") return {"results": [], "next_cursor": None} raise - def _read_pages_from_database( - self, database_id: str - ) -> tuple[list[NotionBlock], list[str]]: + def _read_pages_from_database(self, database_id: str) -> tuple[list[NotionBlock], list[str]]: """Returns a list of top level blocks and all page IDs in the database.""" result_blocks: list[NotionBlock] = [] result_pages: list[str] = [] @@ -253,9 +242,7 @@ class NotionConnector(LoadConnector, PollConnector): logging.warning(f"[Notion]: Failed to download Notion file from {url}: {exc}") return None - def _extract_file_metadata( - self, result_obj: dict[str, Any], block_id: str - ) -> tuple[str | None, str, str | None]: + def _extract_file_metadata(self, result_obj: dict[str, Any], block_id: str) -> tuple[str | None, str, str | None]: file_source_type = result_obj.get("type") file_source = result_obj.get(file_source_type, {}) if file_source_type else {} url = file_source.get("url") @@ -289,11 +276,7 @@ class NotionConnector(LoadConnector, PollConnector): if not extension: extension = ".bin" - updated_at = ( - datetime_from_string(page_last_edited_time) - if page_last_edited_time - else datetime.now(timezone.utc) - ) + updated_at = datetime_from_string(page_last_edited_time) if page_last_edited_time else datetime.now(timezone.utc) semantic_identifier = caption or name or f"Notion file {block_id}" return Document( @@ -306,9 +289,7 @@ class NotionConnector(LoadConnector, PollConnector): doc_updated_at=updated_at, ) - def _read_blocks( - self, base_block_id: str, page_last_edited_time: Optional[str] = None - ) -> tuple[list[NotionBlock], list[str], list[Document]]: + def _read_blocks(self, base_block_id: str, page_last_edited_time: Optional[str] = None) -> tuple[list[NotionBlock], list[str], list[Document]]: result_blocks: list[NotionBlock] = [] child_pages: list[str] = [] attachments: list[Document] = [] @@ -390,9 +371,7 @@ class NotionConnector(LoadConnector, PollConnector): child_pages.append(result_block_id) else: logging.debug(f"[Notion]: Entering sub-block: {result_block_id}") - subblocks, subblock_child_pages, subblock_attachments = self._read_blocks( - result_block_id, page_last_edited_time - ) + subblocks, subblock_child_pages, subblock_attachments = self._read_blocks(result_block_id, page_last_edited_time) logging.debug(f"[Notion]: Finished sub-block: {result_block_id}") result_blocks.extend(subblocks) child_pages.extend(subblock_child_pages) @@ -432,9 +411,7 @@ class NotionConnector(LoadConnector, PollConnector): return None - def _read_pages( - self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None - ) -> Generator[Document, None, None]: + def _read_pages(self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[Document, None, None]: """Reads pages for rich text content and generates Documents.""" all_child_page_ids: list[str] = [] @@ -452,9 +429,7 @@ class NotionConnector(LoadConnector, PollConnector): continue logging.info(f"[Notion]: Reading page with ID {page.id}, with url {page.url}") - page_blocks, child_page_ids, attachment_docs = self._read_blocks( - page.id, page.last_edited_time - ) + page_blocks, child_page_ids, attachment_docs = self._read_blocks(page.id, page.last_edited_time) all_child_page_ids.extend(child_page_ids) self.indexed_pages.add(page.id) @@ -468,9 +443,7 @@ class NotionConnector(LoadConnector, PollConnector): text = page_title if page.properties: - text += "\n\n" + "\n".join( - [f"{key}: {value}" for key, value in page.properties.items()] - ) + text += "\n\n" + "\n".join([f"{key}: {value}" for key, value in page.properties.items()]) sections = [TextSection(link=page.url, text=text)] else: sections = [ @@ -484,13 +457,7 @@ class NotionConnector(LoadConnector, PollConnector): joined_text = "\n".join(sec.text for sec in sections) blob = joined_text.encode("utf-8") yield Document( - id=page.id, - blob=blob, - source=DocumentSource.NOTION, - semantic_identifier=page_title, - extension=".txt", - size_bytes=len(blob), - doc_updated_at=datetime_from_string(page.last_edited_time) + id=page.id, blob=blob, source=DocumentSource.NOTION, semantic_identifier=page_title, extension=".txt", size_bytes=len(blob), doc_updated_at=datetime_from_string(page.last_edited_time) ) for attachment_doc in attachment_docs: @@ -498,11 +465,7 @@ class NotionConnector(LoadConnector, PollConnector): if self.recursive_index_enabled and all_child_page_ids: for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE): - child_page_batch = [ - self._fetch_page(page_id) - for page_id in child_page_batch_ids - if page_id not in self.indexed_pages - ] + child_page_batch = [self._fetch_page(page_id) for page_id in child_page_batch_ids if page_id not in self.indexed_pages] yield from self._read_pages(child_page_batch, start, end) @retry(tries=3, delay=1, backoff=2) @@ -512,9 +475,7 @@ class NotionConnector(LoadConnector, PollConnector): data = fetch_notion_data("https://api.notion.com/v1/search", self.headers, "POST", query_dict) return NotionSearchResponse(**data) - def _recursive_load( - self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None - ) -> Generator[list[Document], None, None]: + def _recursive_load(self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[list[Document], None, None]: """Recursively load pages starting from root page ID.""" if self.root_page_id is None or not self.recursive_index_enabled: raise RuntimeError("Recursive page lookup is not enabled") @@ -525,7 +486,7 @@ class NotionConnector(LoadConnector, PollConnector): def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: """Applies integration token to headers.""" - self.headers["Authorization"] = f'Bearer {credentials["notion_integration_token"]}' + self.headers["Authorization"] = f"Bearer {credentials['notion_integration_token']}" return None def load_from_state(self) -> GenerateDocumentsOutput: @@ -549,9 +510,7 @@ class NotionConnector(LoadConnector, PollConnector): else: break - def poll_source( - self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch - ) -> GenerateDocumentsOutput: + def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: """Poll Notion for updated pages within a time period.""" if self.recursive_index_enabled and self.root_page_id: yield from self._recursive_load(start, end)