diff --git a/common/data_source/notion_connector.py b/common/data_source/notion_connector.py index 8c6a522ad..0be40ab7a 100644 --- a/common/data_source/notion_connector.py +++ b/common/data_source/notion_connector.py @@ -1,6 +1,10 @@ import logging +import html from collections.abc import Generator +from datetime import datetime, timezone +from pathlib import Path from typing import Any, Optional +from urllib.parse import urlparse from retry import retry from common.data_source.config import ( @@ -65,7 +69,7 @@ class NotionConnector(LoadConnector, PollConnector): self, block_id: str, cursor: Optional[str] = None ) -> dict[str, Any] | None: """Fetch all child blocks via the Notion API.""" - logging.debug(f"Fetching children of block with ID '{block_id}'") + logging.debug(f"[Notion]: Fetching children of block with ID {block_id}") block_url = f"https://api.notion.com/v1/blocks/{block_id}/children" query_params = {"start_cursor": cursor} if cursor else None @@ -81,31 +85,31 @@ class NotionConnector(LoadConnector, PollConnector): except Exception as e: if hasattr(e, 'response') and e.response.status_code == 404: logging.error( - f"Unable to access block with ID '{block_id}'. " + f"[Notion]: Unable to access block with ID {block_id}. " f"This is likely due to the block not being shared with the integration." ) return None else: - logging.exception(f"Error fetching blocks: {e}") + logging.exception(f"[Notion]: Error fetching blocks: {e}") raise @retry(tries=3, delay=1, backoff=2) def _fetch_page(self, page_id: str) -> NotionPage: """Fetch a page from its ID via the Notion API.""" - logging.debug(f"Fetching page for ID '{page_id}'") + logging.debug(f"[Notion]: Fetching page for ID {page_id}") page_url = f"https://api.notion.com/v1/pages/{page_id}" try: data = fetch_notion_data(page_url, self.headers, "GET") return NotionPage(**data) except Exception as e: - logging.warning(f"Failed to fetch page, trying database for ID '{page_id}': {e}") + logging.warning(f"[Notion]: Failed to fetch page, trying database for ID {page_id}: {e}") return self._fetch_database_as_page(page_id) @retry(tries=3, delay=1, backoff=2) def _fetch_database_as_page(self, database_id: str) -> NotionPage: """Attempt to fetch a database as a page.""" - logging.debug(f"Fetching database for ID '{database_id}' as a page") + logging.debug(f"[Notion]: Fetching database for ID {database_id} as a page") database_url = f"https://api.notion.com/v1/databases/{database_id}" data = fetch_notion_data(database_url, self.headers, "GET") @@ -121,7 +125,7 @@ class NotionConnector(LoadConnector, PollConnector): self, database_id: str, cursor: Optional[str] = None ) -> dict[str, Any]: """Fetch a database from its ID via the Notion API.""" - logging.debug(f"Fetching database for ID '{database_id}'") + logging.debug(f"[Notion]: Fetching database for ID {database_id}") block_url = f"https://api.notion.com/v1/databases/{database_id}/query" body = {"start_cursor": cursor} if cursor else None @@ -131,7 +135,7 @@ class NotionConnector(LoadConnector, PollConnector): except Exception as e: if hasattr(e, 'response') and e.response.status_code in [404, 400]: logging.error( - f"Unable to access database with ID '{database_id}'. " + f"[Notion]: Unable to access database with ID {database_id}. " f"This is likely due to the database not being shared with the integration." ) return {"results": [], "next_cursor": None} @@ -158,10 +162,10 @@ class NotionConnector(LoadConnector, PollConnector): if self.recursive_index_enabled: if obj_type == "page": - logging.debug(f"Found page with ID '{obj_id}' in database '{database_id}'") + logging.debug(f"[Notion]: Found page with ID {obj_id} in database {database_id}") result_pages.append(result["id"]) elif obj_type == "database": - logging.debug(f"Found database with ID '{obj_id}' in database '{database_id}'") + logging.debug(f"[Notion]: Found database with ID {obj_id} in database {database_id}") _, child_pages = self._read_pages_from_database(obj_id) result_pages.extend(child_pages) @@ -172,44 +176,227 @@ class NotionConnector(LoadConnector, PollConnector): return result_blocks, result_pages - def _read_blocks(self, base_block_id: str) -> tuple[list[NotionBlock], list[str]]: - """Reads all child blocks for the specified block, returns blocks and child page ids.""" + def _extract_rich_text(self, rich_text_array: list[dict[str, Any]]) -> str: + collected_text: list[str] = [] + for rich_text in rich_text_array: + content = "" + r_type = rich_text.get("type") + + if r_type == "equation": + expr = rich_text.get("equation", {}).get("expression") + if expr: + content = expr + elif r_type == "mention": + mention = rich_text.get("mention", {}) or {} + mention_type = mention.get("type") + mention_value = mention.get(mention_type, {}) if mention_type else {} + if mention_type == "date": + start = mention_value.get("start") + end = mention_value.get("end") + if start and end: + content = f"{start} - {end}" + elif start: + content = start + elif mention_type in {"page", "database"}: + content = mention_value.get("id", rich_text.get("plain_text", "")) + elif mention_type == "link_preview": + content = mention_value.get("url", rich_text.get("plain_text", "")) + else: + content = rich_text.get("plain_text", "") or str(mention_value) + else: + if rich_text.get("plain_text"): + content = rich_text["plain_text"] + elif "text" in rich_text and rich_text["text"].get("content"): + content = rich_text["text"]["content"] + + href = rich_text.get("href") + if content and href: + content = f"{content} ({href})" + + if content: + collected_text.append(content) + + return "".join(collected_text).strip() + + def _build_table_html(self, table_block_id: str) -> str | None: + rows: list[str] = [] + cursor = None + while True: + data = self._fetch_child_blocks(table_block_id, cursor) + if data is None: + break + + for result in data["results"]: + if result.get("type") != "table_row": + continue + cells_html: list[str] = [] + for cell in result["table_row"].get("cells", []): + cell_text = self._extract_rich_text(cell) + cell_html = html.escape(cell_text) if cell_text else "" + cells_html.append(f"{cell_html}") + rows.append(f"{''.join(cells_html)}") + + if data.get("next_cursor") is None: + break + cursor = data["next_cursor"] + + if not rows: + return None + return "\n" + "\n".join(rows) + "\n
" + + def _download_file(self, url: str) -> bytes | None: + try: + response = rl_requests.get(url, timeout=60) + response.raise_for_status() + return response.content + except Exception as exc: + logging.warning(f"[Notion]: Failed to download Notion file from {url}: {exc}") + return None + + def _extract_file_metadata( + self, result_obj: dict[str, Any], block_id: str + ) -> tuple[str | None, str, str | None]: + file_source_type = result_obj.get("type") + file_source = result_obj.get(file_source_type, {}) if file_source_type else {} + url = file_source.get("url") + + name = result_obj.get("name") or file_source.get("name") + if url and not name: + parsed_name = Path(urlparse(url).path).name + name = parsed_name or f"notion_file_{block_id}" + elif not name: + name = f"notion_file_{block_id}" + + caption = self._extract_rich_text(result_obj.get("caption", [])) if "caption" in result_obj else None + + return url, name, caption + + def _build_attachment_document( + self, + block_id: str, + url: str, + name: str, + caption: Optional[str], + page_last_edited_time: Optional[str], + ) -> Document | None: + file_bytes = self._download_file(url) + if file_bytes is None: + return None + + extension = Path(name).suffix or Path(urlparse(url).path).suffix or ".bin" + if extension and not extension.startswith("."): + extension = f".{extension}" + if not extension: + extension = ".bin" + + updated_at = ( + datetime_from_string(page_last_edited_time) + if page_last_edited_time + else datetime.now(timezone.utc) + ) + semantic_identifier = caption or name or f"Notion file {block_id}" + + return Document( + id=block_id, + blob=file_bytes, + source=DocumentSource.NOTION, + semantic_identifier=semantic_identifier, + extension=extension, + size_bytes=len(file_bytes), + doc_updated_at=updated_at, + ) + + def _read_blocks( + self, base_block_id: str, page_last_edited_time: Optional[str] = None + ) -> tuple[list[NotionBlock], list[str], list[Document]]: result_blocks: list[NotionBlock] = [] child_pages: list[str] = [] + attachments: list[Document] = [] cursor = None while True: data = self._fetch_child_blocks(base_block_id, cursor) if data is None: - return result_blocks, child_pages + return result_blocks, child_pages, attachments for result in data["results"]: - logging.debug(f"Found child block for block with ID '{base_block_id}': {result}") + logging.debug(f"[Notion]: Found child block for block with ID {base_block_id}: {result}") result_block_id = result["id"] result_type = result["type"] result_obj = result[result_type] if result_type in ["ai_block", "unsupported", "external_object_instance_page"]: - logging.warning(f"Skipping unsupported block type '{result_type}'") + logging.warning(f"[Notion]: Skipping unsupported block type {result_type}") + continue + + if result_type == "table": + table_html = self._build_table_html(result_block_id) + if table_html: + result_blocks.append( + NotionBlock( + id=result_block_id, + text=table_html, + prefix="\n\n", + ) + ) + continue + + if result_type == "equation": + expr = result_obj.get("expression") + if expr: + result_blocks.append( + NotionBlock( + id=result_block_id, + text=expr, + prefix="\n", + ) + ) continue cur_result_text_arr = [] if "rich_text" in result_obj: - for rich_text in result_obj["rich_text"]: - if "text" in rich_text: - text = rich_text["text"]["content"] - cur_result_text_arr.append(text) + text = self._extract_rich_text(result_obj["rich_text"]) + if text: + cur_result_text_arr.append(text) + + if result_type == "to_do": + checked = result_obj.get("checked") + checkbox_prefix = "[x]" if checked else "[ ]" + if cur_result_text_arr: + cur_result_text_arr = [f"{checkbox_prefix} {cur_result_text_arr[0]}"] + cur_result_text_arr[1:] + else: + cur_result_text_arr = [checkbox_prefix] + + if result_type in {"file", "image", "pdf", "video", "audio"}: + file_url, file_name, caption = self._extract_file_metadata(result_obj, result_block_id) + if file_url: + attachment_doc = self._build_attachment_document( + block_id=result_block_id, + url=file_url, + name=file_name, + caption=caption, + page_last_edited_time=page_last_edited_time, + ) + if attachment_doc: + attachments.append(attachment_doc) + + attachment_label = caption or file_name + if attachment_label: + cur_result_text_arr.append(f"{result_type.capitalize()}: {attachment_label}") if result["has_children"]: if result_type == "child_page": child_pages.append(result_block_id) else: - logging.debug(f"Entering sub-block: {result_block_id}") - subblocks, subblock_child_pages = self._read_blocks(result_block_id) - logging.debug(f"Finished sub-block: {result_block_id}") + logging.debug(f"[Notion]: Entering sub-block: {result_block_id}") + subblocks, subblock_child_pages, subblock_attachments = self._read_blocks( + result_block_id, page_last_edited_time + ) + logging.debug(f"[Notion]: Finished sub-block: {result_block_id}") result_blocks.extend(subblocks) child_pages.extend(subblock_child_pages) + attachments.extend(subblock_attachments) if result_type == "child_database": inner_blocks, inner_child_pages = self._read_pages_from_database(result_block_id) @@ -231,7 +418,7 @@ class NotionConnector(LoadConnector, PollConnector): cursor = data["next_cursor"] - return result_blocks, child_pages + return result_blocks, child_pages, attachments def _read_page_title(self, page: NotionPage) -> Optional[str]: """Extracts the title from a Notion page.""" @@ -246,7 +433,7 @@ class NotionConnector(LoadConnector, PollConnector): return None def _read_pages( - self, pages: list[NotionPage] + self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None ) -> Generator[Document, None, None]: """Reads pages for rich text content and generates Documents.""" all_child_page_ids: list[str] = [] @@ -255,11 +442,19 @@ class NotionConnector(LoadConnector, PollConnector): if isinstance(page, dict): page = NotionPage(**page) if page.id in self.indexed_pages: - logging.debug(f"Already indexed page with ID '{page.id}'. Skipping.") + logging.debug(f"[Notion]: Already indexed page with ID {page.id}. Skipping.") continue - logging.info(f"Reading page with ID '{page.id}', with url {page.url}") - page_blocks, child_page_ids = self._read_blocks(page.id) + if start is not None and end is not None: + page_ts = datetime_from_string(page.last_edited_time).timestamp() + if not (page_ts > start and page_ts <= end): + logging.debug(f"[Notion]: Skipping page {page.id} outside polling window.") + continue + + logging.info(f"[Notion]: Reading page with ID {page.id}, with url {page.url}") + page_blocks, child_page_ids, attachment_docs = self._read_blocks( + page.id, page.last_edited_time + ) all_child_page_ids.extend(child_page_ids) self.indexed_pages.add(page.id) @@ -268,7 +463,7 @@ class NotionConnector(LoadConnector, PollConnector): if not page_blocks: if not raw_page_title: - logging.warning(f"No blocks OR title found for page with ID '{page.id}'. Skipping.") + logging.warning(f"[Notion]: No blocks OR title found for page with ID {page.id}. Skipping.") continue text = page_title @@ -286,7 +481,8 @@ class NotionConnector(LoadConnector, PollConnector): for block in page_blocks ] - blob = ("\n".join([sec.text for sec in sections])).encode("utf-8") + joined_text = "\n".join(sec.text for sec in sections) + blob = joined_text.encode("utf-8") yield Document( id=page.id, blob=blob, @@ -297,6 +493,9 @@ class NotionConnector(LoadConnector, PollConnector): doc_updated_at=datetime_from_string(page.last_edited_time) ) + for attachment_doc in attachment_docs: + yield attachment_doc + if self.recursive_index_enabled and all_child_page_ids: for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE): child_page_batch = [ @@ -304,23 +503,25 @@ class NotionConnector(LoadConnector, PollConnector): for page_id in child_page_batch_ids if page_id not in self.indexed_pages ] - yield from self._read_pages(child_page_batch) + yield from self._read_pages(child_page_batch, start, end) @retry(tries=3, delay=1, backoff=2) def _search_notion(self, query_dict: dict[str, Any]) -> NotionSearchResponse: """Search for pages from a Notion database.""" - logging.debug(f"Searching for pages in Notion with query_dict: {query_dict}") + logging.debug(f"[Notion]: Searching for pages in Notion with query_dict: {query_dict}") data = fetch_notion_data("https://api.notion.com/v1/search", self.headers, "POST", query_dict) return NotionSearchResponse(**data) - def _recursive_load(self) -> Generator[list[Document], None, None]: + def _recursive_load( + self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None + ) -> Generator[list[Document], None, None]: """Recursively load pages starting from root page ID.""" if self.root_page_id is None or not self.recursive_index_enabled: raise RuntimeError("Recursive page lookup is not enabled") - logging.info(f"Recursively loading pages from Notion based on root page with ID: {self.root_page_id}") + logging.info(f"[Notion]: Recursively loading pages from Notion based on root page with ID: {self.root_page_id}") pages = [self._fetch_page(page_id=self.root_page_id)] - yield from batch_generator(self._read_pages(pages), self.batch_size) + yield from batch_generator(self._read_pages(pages, start, end), self.batch_size) def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: """Applies integration token to headers.""" @@ -353,7 +554,7 @@ class NotionConnector(LoadConnector, PollConnector): ) -> GenerateDocumentsOutput: """Poll Notion for updated pages within a time period.""" if self.recursive_index_enabled and self.root_page_id: - yield from self._recursive_load() + yield from self._recursive_load(start, end) return query_dict = { @@ -367,7 +568,7 @@ class NotionConnector(LoadConnector, PollConnector): pages = filter_pages_by_time(db_res.results, start, end, "last_edited_time") if pages: - yield from batch_generator(self._read_pages(pages), self.batch_size) + yield from batch_generator(self._read_pages(pages, start, end), self.batch_size) if db_res.has_more: query_dict["start_cursor"] = db_res.next_cursor else: