This commit is contained in:
yongtenglei 2025-11-20 16:54:36 +08:00
parent 4d0bff2d5a
commit 7a07a23d84

View file

@ -1,42 +1,45 @@
import logging
import html
import logging
from collections.abc import Generator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
from retry import retry
from common.data_source.config import (
INDEX_BATCH_SIZE,
DocumentSource, NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP
NOTION_CONNECTOR_DISABLE_RECURSIVE_PAGE_LOOKUP,
DocumentSource,
)
from common.data_source.exceptions import (
ConnectorMissingCredentialError,
ConnectorValidationError,
CredentialExpiredError,
InsufficientPermissionsError,
UnexpectedValidationError,
)
from common.data_source.interfaces import (
LoadConnector,
PollConnector,
SecondsSinceUnixEpoch
SecondsSinceUnixEpoch,
)
from common.data_source.models import (
Document,
TextSection, GenerateDocumentsOutput
)
from common.data_source.exceptions import (
ConnectorValidationError,
CredentialExpiredError,
InsufficientPermissionsError,
UnexpectedValidationError, ConnectorMissingCredentialError
)
from common.data_source.models import (
NotionPage,
GenerateDocumentsOutput,
NotionBlock,
NotionSearchResponse
NotionPage,
NotionSearchResponse,
TextSection,
)
from common.data_source.utils import (
rl_requests,
batch_generator,
datetime_from_string,
fetch_notion_data,
filter_pages_by_time,
properties_to_str,
filter_pages_by_time, datetime_from_string
rl_requests,
)
@ -65,9 +68,7 @@ class NotionConnector(LoadConnector, PollConnector):
self.recursive_index_enabled = recursive_index_enabled or bool(root_page_id)
@retry(tries=3, delay=1, backoff=2)
def _fetch_child_blocks(
self, block_id: str, cursor: Optional[str] = None
) -> dict[str, Any] | None:
def _fetch_child_blocks(self, block_id: str, cursor: Optional[str] = None) -> dict[str, Any] | None:
"""Fetch all child blocks via the Notion API."""
logging.debug(f"[Notion]: Fetching children of block with ID {block_id}")
block_url = f"https://api.notion.com/v1/blocks/{block_id}/children"
@ -83,11 +84,8 @@ class NotionConnector(LoadConnector, PollConnector):
response.raise_for_status()
return response.json()
except Exception as e:
if hasattr(e, 'response') and e.response.status_code == 404:
logging.error(
f"[Notion]: Unable to access block with ID {block_id}. "
f"This is likely due to the block not being shared with the integration."
)
if hasattr(e, "response") and e.response.status_code == 404:
logging.error(f"[Notion]: Unable to access block with ID {block_id}. This is likely due to the block not being shared with the integration.")
return None
else:
logging.exception(f"[Notion]: Error fetching blocks: {e}")
@ -114,16 +112,12 @@ class NotionConnector(LoadConnector, PollConnector):
data = fetch_notion_data(database_url, self.headers, "GET")
database_name = data.get("title")
database_name = (
database_name[0].get("text", {}).get("content") if database_name else None
)
database_name = database_name[0].get("text", {}).get("content") if database_name else None
return NotionPage(**data, database_name=database_name)
@retry(tries=3, delay=1, backoff=2)
def _fetch_database(
self, database_id: str, cursor: Optional[str] = None
) -> dict[str, Any]:
def _fetch_database(self, database_id: str, cursor: Optional[str] = None) -> dict[str, Any]:
"""Fetch a database from its ID via the Notion API."""
logging.debug(f"[Notion]: Fetching database for ID {database_id}")
block_url = f"https://api.notion.com/v1/databases/{database_id}/query"
@ -133,17 +127,12 @@ class NotionConnector(LoadConnector, PollConnector):
data = fetch_notion_data(block_url, self.headers, "POST", body)
return data
except Exception as e:
if hasattr(e, 'response') and e.response.status_code in [404, 400]:
logging.error(
f"[Notion]: Unable to access database with ID {database_id}. "
f"This is likely due to the database not being shared with the integration."
)
if hasattr(e, "response") and e.response.status_code in [404, 400]:
logging.error(f"[Notion]: Unable to access database with ID {database_id}. This is likely due to the database not being shared with the integration.")
return {"results": [], "next_cursor": None}
raise
def _read_pages_from_database(
self, database_id: str
) -> tuple[list[NotionBlock], list[str]]:
def _read_pages_from_database(self, database_id: str) -> tuple[list[NotionBlock], list[str]]:
"""Returns a list of top level blocks and all page IDs in the database."""
result_blocks: list[NotionBlock] = []
result_pages: list[str] = []
@ -253,9 +242,7 @@ class NotionConnector(LoadConnector, PollConnector):
logging.warning(f"[Notion]: Failed to download Notion file from {url}: {exc}")
return None
def _extract_file_metadata(
self, result_obj: dict[str, Any], block_id: str
) -> tuple[str | None, str, str | None]:
def _extract_file_metadata(self, result_obj: dict[str, Any], block_id: str) -> tuple[str | None, str, str | None]:
file_source_type = result_obj.get("type")
file_source = result_obj.get(file_source_type, {}) if file_source_type else {}
url = file_source.get("url")
@ -289,11 +276,7 @@ class NotionConnector(LoadConnector, PollConnector):
if not extension:
extension = ".bin"
updated_at = (
datetime_from_string(page_last_edited_time)
if page_last_edited_time
else datetime.now(timezone.utc)
)
updated_at = datetime_from_string(page_last_edited_time) if page_last_edited_time else datetime.now(timezone.utc)
semantic_identifier = caption or name or f"Notion file {block_id}"
return Document(
@ -306,9 +289,7 @@ class NotionConnector(LoadConnector, PollConnector):
doc_updated_at=updated_at,
)
def _read_blocks(
self, base_block_id: str, page_last_edited_time: Optional[str] = None
) -> tuple[list[NotionBlock], list[str], list[Document]]:
def _read_blocks(self, base_block_id: str, page_last_edited_time: Optional[str] = None) -> tuple[list[NotionBlock], list[str], list[Document]]:
result_blocks: list[NotionBlock] = []
child_pages: list[str] = []
attachments: list[Document] = []
@ -390,9 +371,7 @@ class NotionConnector(LoadConnector, PollConnector):
child_pages.append(result_block_id)
else:
logging.debug(f"[Notion]: Entering sub-block: {result_block_id}")
subblocks, subblock_child_pages, subblock_attachments = self._read_blocks(
result_block_id, page_last_edited_time
)
subblocks, subblock_child_pages, subblock_attachments = self._read_blocks(result_block_id, page_last_edited_time)
logging.debug(f"[Notion]: Finished sub-block: {result_block_id}")
result_blocks.extend(subblocks)
child_pages.extend(subblock_child_pages)
@ -432,9 +411,7 @@ class NotionConnector(LoadConnector, PollConnector):
return None
def _read_pages(
self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None
) -> Generator[Document, None, None]:
def _read_pages(self, pages: list[NotionPage], start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[Document, None, None]:
"""Reads pages for rich text content and generates Documents."""
all_child_page_ids: list[str] = []
@ -452,9 +429,7 @@ class NotionConnector(LoadConnector, PollConnector):
continue
logging.info(f"[Notion]: Reading page with ID {page.id}, with url {page.url}")
page_blocks, child_page_ids, attachment_docs = self._read_blocks(
page.id, page.last_edited_time
)
page_blocks, child_page_ids, attachment_docs = self._read_blocks(page.id, page.last_edited_time)
all_child_page_ids.extend(child_page_ids)
self.indexed_pages.add(page.id)
@ -468,9 +443,7 @@ class NotionConnector(LoadConnector, PollConnector):
text = page_title
if page.properties:
text += "\n\n" + "\n".join(
[f"{key}: {value}" for key, value in page.properties.items()]
)
text += "\n\n" + "\n".join([f"{key}: {value}" for key, value in page.properties.items()])
sections = [TextSection(link=page.url, text=text)]
else:
sections = [
@ -484,13 +457,7 @@ class NotionConnector(LoadConnector, PollConnector):
joined_text = "\n".join(sec.text for sec in sections)
blob = joined_text.encode("utf-8")
yield Document(
id=page.id,
blob=blob,
source=DocumentSource.NOTION,
semantic_identifier=page_title,
extension=".txt",
size_bytes=len(blob),
doc_updated_at=datetime_from_string(page.last_edited_time)
id=page.id, blob=blob, source=DocumentSource.NOTION, semantic_identifier=page_title, extension=".txt", size_bytes=len(blob), doc_updated_at=datetime_from_string(page.last_edited_time)
)
for attachment_doc in attachment_docs:
@ -498,11 +465,7 @@ class NotionConnector(LoadConnector, PollConnector):
if self.recursive_index_enabled and all_child_page_ids:
for child_page_batch_ids in batch_generator(all_child_page_ids, INDEX_BATCH_SIZE):
child_page_batch = [
self._fetch_page(page_id)
for page_id in child_page_batch_ids
if page_id not in self.indexed_pages
]
child_page_batch = [self._fetch_page(page_id) for page_id in child_page_batch_ids if page_id not in self.indexed_pages]
yield from self._read_pages(child_page_batch, start, end)
@retry(tries=3, delay=1, backoff=2)
@ -512,9 +475,7 @@ class NotionConnector(LoadConnector, PollConnector):
data = fetch_notion_data("https://api.notion.com/v1/search", self.headers, "POST", query_dict)
return NotionSearchResponse(**data)
def _recursive_load(
self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None
) -> Generator[list[Document], None, None]:
def _recursive_load(self, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None) -> Generator[list[Document], None, None]:
"""Recursively load pages starting from root page ID."""
if self.root_page_id is None or not self.recursive_index_enabled:
raise RuntimeError("Recursive page lookup is not enabled")
@ -525,7 +486,7 @@ class NotionConnector(LoadConnector, PollConnector):
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Applies integration token to headers."""
self.headers["Authorization"] = f'Bearer {credentials["notion_integration_token"]}'
self.headers["Authorization"] = f"Bearer {credentials['notion_integration_token']}"
return None
def load_from_state(self) -> GenerateDocumentsOutput:
@ -549,9 +510,7 @@ class NotionConnector(LoadConnector, PollConnector):
else:
break
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
"""Poll Notion for updated pages within a time period."""
if self.recursive_index_enabled and self.root_page_id:
yield from self._recursive_load(start, end)