From 052772b60476c1e6f5e920aea5e7339cc6bf3e97 Mon Sep 17 00:00:00 2001 From: Jonah879 Date: Mon, 8 Dec 2025 12:54:22 +0000 Subject: [PATCH] Feat: handle duplicates for confluence, dropbox and notion --- common/data_source/confluence_connector.py | 53 +++++++++++++-- common/data_source/dropbox_connector.py | 76 +++++++++++++++++----- common/data_source/notion_connector.py | 11 ++++ 3 files changed, 116 insertions(+), 24 deletions(-) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index a057d0694..256ce4a69 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1494,7 +1494,7 @@ class ConfluenceConnector( return comment_string def _convert_page_to_document( - self, page: dict[str, Any] + self, page: dict[str, Any], title_counts: dict[str, int] | None = None ) -> Document | ConnectorFailure: """ Converts a Confluence page to a Document object. @@ -1552,11 +1552,18 @@ class ConfluenceConnector( BasicExpertInfo(display_name=display_name, email=email) ) + # Build semantic identifier - use full path only if title appears multiple times + semantic_id = page_title + if title_counts and title_counts.get(page_title, 0) > 1: + space_name = page.get("space", {}).get("name", "") + if space_name: + semantic_id = f"{space_name} / {page_title}" + # Create the document return Document( id=page_url, source=DocumentSource.CONFLUENCE, - semantic_identifier=page_title, + semantic_identifier=semantic_id, extension=".html", # Confluence pages are HTML blob=page_content.encode("utf-8"), # Encode page content as bytes size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes @@ -1593,10 +1600,21 @@ class ConfluenceConnector( attachment_docs: list[Document] = [] page_url = "" + # Collect all attachments first to count filename occurrences + all_attachments = [] for attachment in self.confluence_client.paginated_cql_retrieval( cql=attachment_query, expand=",".join(_ATTACHMENT_EXPANSION_FIELDS), ): + all_attachments.append(attachment) + + # Count attachment title occurrences + attachment_title_counts: dict[str, int] = {} + for attachment in all_attachments: + attachment_title = attachment.get("title", "") + attachment_title_counts[attachment_title] = attachment_title_counts.get(attachment_title, 0) + 1 + + for attachment in all_attachments: media_type: str = attachment.get("metadata", {}).get("mediaType", "") # TODO(rkuo): this check is partially redundant with validate_attachment_filetype @@ -1677,11 +1695,22 @@ class ConfluenceConnector( extension = Path(attachment.get("title", "")).suffix or ".unknown" + # Build semantic identifier - use full path only if title appears multiple times + attachment_title = attachment.get("title", object_url) + semantic_id = attachment_title + if attachment_title_counts.get(attachment_title, 0) > 1: + space_name = attachment.get("space", {}).get("name", "") + page_title = page.get("title", "") + if space_name and page_title: + semantic_id = f"{space_name} / {page_title} / {attachment_title}" + elif page_title: + semantic_id = f"{page_title} / {attachment_title}" + attachment_doc = Document( id=attachment_id, # sections=sections, source=DocumentSource.CONFLUENCE, - semantic_identifier=attachment.get("title", object_url), + semantic_identifier=semantic_id, extension=extension, blob=file_blob, size_bytes=len(file_blob), @@ -1739,7 +1768,9 @@ class ConfluenceConnector( ) logging.debug(f"page_query_url: {page_query_url}") - # store the next page start for confluence server, cursor for confluence cloud + # Collect all pages first to count title occurrences + all_pages = [] + def store_next_page_url(next_page_url: str) -> None: checkpoint.next_page_url = next_page_url @@ -1748,8 +1779,18 @@ class ConfluenceConnector( limit=self.batch_size, next_page_callback=store_next_page_url, ): - # Build doc from page - doc_or_failure = self._convert_page_to_document(page) + all_pages.append(page) + + # Count page title occurrences + title_counts: dict[str, int] = {} + for page in all_pages: + page_title = page.get("title", "") + title_counts[page_title] = title_counts.get(page_title, 0) + 1 + + # Process all pages + for page in all_pages: + # Build doc from page with conditional semantic_id + doc_or_failure = self._convert_page_to_document(page, title_counts) if isinstance(doc_or_failure, ConnectorFailure): yield doc_or_failure diff --git a/common/data_source/dropbox_connector.py b/common/data_source/dropbox_connector.py index 0a0a3c2de..0e7131d8f 100644 --- a/common/data_source/dropbox_connector.py +++ b/common/data_source/dropbox_connector.py @@ -87,15 +87,69 @@ class DropboxConnector(LoadConnector, PollConnector): if self.dropbox_client is None: raise ConnectorMissingCredentialError("Dropbox") + # Collect all files first to count filename occurrences + all_files = [] + self._collect_files_recursive(path, start, end, all_files) + + # Count filename occurrences + filename_counts: dict[str, int] = {} + for entry, _ in all_files: + filename_counts[entry.name] = filename_counts.get(entry.name, 0) + 1 + + # Process files in batches + batch: list[Document] = [] + for entry, downloaded_file in all_files: + modified_time = entry.client_modified + if modified_time.tzinfo is None: + modified_time = modified_time.replace(tzinfo=timezone.utc) + else: + modified_time = modified_time.astimezone(timezone.utc) + + # Use full path only if filename appears multiple times + if filename_counts.get(entry.name, 0) > 1: + # Remove leading slash and replace slashes with ' / ' + relative_path = entry.path_display.lstrip('/') + semantic_id = relative_path.replace('/', ' / ') if relative_path else entry.name + else: + semantic_id = entry.name + + batch.append( + Document( + id=f"dropbox:{entry.id}", + blob=downloaded_file, + source=DocumentSource.DROPBOX, + semantic_identifier=semantic_id, + extension=get_file_ext(entry.name), + doc_updated_at=modified_time, + size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file), + ) + ) + + if len(batch) == self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + + def _collect_files_recursive( + self, + path: str, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + all_files: list, + ) -> None: + """Recursively collect all files matching time criteria.""" + if self.dropbox_client is None: + raise ConnectorMissingCredentialError("Dropbox") + result = self.dropbox_client.files_list_folder( path, - limit=self.batch_size, recursive=False, include_non_downloadable_files=False, ) while True: - batch: list[Document] = [] for entry in result.entries: if isinstance(entry, FileMetadata): modified_time = entry.client_modified @@ -112,27 +166,13 @@ class DropboxConnector(LoadConnector, PollConnector): try: downloaded_file = self._download_file(entry.path_display) + all_files.append((entry, downloaded_file)) except Exception: logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}") continue - batch.append( - Document( - id=f"dropbox:{entry.id}", - blob=downloaded_file, - source=DocumentSource.DROPBOX, - semantic_identifier=entry.name, - extension=get_file_ext(entry.name), - doc_updated_at=modified_time, - size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file), - ) - ) - elif isinstance(entry, FolderMetadata): - yield from self._yield_files_recursive(entry.path_lower, start, end) - - if batch: - yield batch + self._collect_files_recursive(entry.path_lower, start, end, all_files) if not result.has_more: break diff --git a/common/data_source/notion_connector.py b/common/data_source/notion_connector.py index e29bbbe76..85e3a5258 100644 --- a/common/data_source/notion_connector.py +++ b/common/data_source/notion_connector.py @@ -447,6 +447,17 @@ class NotionConnector(LoadConnector, PollConnector): raw_page_title = self._read_page_title(page) page_title = raw_page_title or f"Untitled Page with ID {page.id}" + + # Count attachment semantic_identifier occurrences within this page + attachment_name_counts: dict[str, int] = {} + for att_doc in attachment_docs: + name = att_doc.semantic_identifier + attachment_name_counts[name] = attachment_name_counts.get(name, 0) + 1 + + # Update semantic identifiers for duplicate attachments + for att_doc in attachment_docs: + if attachment_name_counts.get(att_doc.semantic_identifier, 0) > 1: + att_doc.semantic_identifier = f"{page_title} / {att_doc.semantic_identifier}" if not page_blocks: if not raw_page_title: