diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index 256ce4a69..43dfcaa43 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1494,7 +1494,7 @@ class ConfluenceConnector( return comment_string def _convert_page_to_document( - self, page: dict[str, Any], title_counts: dict[str, int] | None = None + self, page: dict[str, Any] ) -> Document | ConnectorFailure: """ Converts a Confluence page to a Document object. @@ -1510,6 +1510,27 @@ class ConfluenceConnector( self.wiki_base, page["_links"]["webui"], self.is_cloud ) + # Build hierarchical path for semantic identifier + space_name = page.get("space", {}).get("name", "") + + # Build path from ancestors + path_parts = [] + if space_name: + path_parts.append(space_name) + + # Add ancestor pages to path if available + if "ancestors" in page and page["ancestors"]: + for ancestor in page["ancestors"]: + ancestor_title = ancestor.get("title", "") + if ancestor_title: + path_parts.append(ancestor_title) + + # Add current page title + path_parts.append(page_title) + + # Create full path identifier + semantic_identifier = " / ".join(path_parts) if len(path_parts) > 1 else page_title + # Get the page content page_content = extract_text_from_confluence_html( self.confluence_client, page, self._fetched_titles @@ -1552,18 +1573,11 @@ class ConfluenceConnector( BasicExpertInfo(display_name=display_name, email=email) ) - # Build semantic identifier - use full path only if title appears multiple times - semantic_id = page_title - if title_counts and title_counts.get(page_title, 0) > 1: - space_name = page.get("space", {}).get("name", "") - if space_name: - semantic_id = f"{space_name} / {page_title}" - # Create the document return Document( id=page_url, source=DocumentSource.CONFLUENCE, - semantic_identifier=semantic_id, + semantic_identifier=semantic_identifier, extension=".html", # Confluence pages are HTML blob=page_content.encode("utf-8"), # Encode page content as bytes size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes @@ -1600,23 +1614,11 @@ class ConfluenceConnector( attachment_docs: list[Document] = [] page_url = "" - # Collect all attachments first to count filename occurrences - all_attachments = [] for attachment in self.confluence_client.paginated_cql_retrieval( cql=attachment_query, expand=",".join(_ATTACHMENT_EXPANSION_FIELDS), ): - all_attachments.append(attachment) - - # Count attachment title occurrences - attachment_title_counts: dict[str, int] = {} - for attachment in all_attachments: - attachment_title = attachment.get("title", "") - attachment_title_counts[attachment_title] = attachment_title_counts.get(attachment_title, 0) + 1 - - for attachment in all_attachments: media_type: str = attachment.get("metadata", {}).get("mediaType", "") - # TODO(rkuo): this check is partially redundant with validate_attachment_filetype # and checks in convert_attachment_to_content/process_attachment # but doing the check here avoids an unnecessary download. Due for refactoring. @@ -1684,6 +1686,21 @@ class ConfluenceConnector( self.wiki_base, attachment["_links"]["webui"], self.is_cloud ) + # Build semantic identifier with space and page context + attachment_title = attachment.get("title", object_url) + space_name = page.get("space", {}).get("name", "") + page_title = page.get("title", "") + + # Create hierarchical name: Space > Page > Attachment + attachment_path_parts = [] + if space_name: + attachment_path_parts.append(space_name) + if page_title: + attachment_path_parts.append(page_title) + attachment_path_parts.append(attachment_title) + + attachment_semantic_identifier = " / ".join(attachment_path_parts) if len(attachment_path_parts) > 1 else attachment_title + primary_owners: list[BasicExpertInfo] | None = None if "version" in attachment and "by" in attachment["version"]: author = attachment["version"]["by"] @@ -1695,22 +1712,12 @@ class ConfluenceConnector( extension = Path(attachment.get("title", "")).suffix or ".unknown" - # Build semantic identifier - use full path only if title appears multiple times - attachment_title = attachment.get("title", object_url) - semantic_id = attachment_title - if attachment_title_counts.get(attachment_title, 0) > 1: - space_name = attachment.get("space", {}).get("name", "") - page_title = page.get("title", "") - if space_name and page_title: - semantic_id = f"{space_name} / {page_title} / {attachment_title}" - elif page_title: - semantic_id = f"{page_title} / {attachment_title}" attachment_doc = Document( id=attachment_id, # sections=sections, source=DocumentSource.CONFLUENCE, - semantic_identifier=semantic_id, + semantic_identifier=attachment_semantic_identifier, extension=extension, blob=file_blob, size_bytes=len(file_blob), @@ -1767,10 +1774,8 @@ class ConfluenceConnector( start_ts, end, self.batch_size ) logging.debug(f"page_query_url: {page_query_url}") - - # Collect all pages first to count title occurrences - all_pages = [] + # store the next page start for confluence server, cursor for confluence cloud def store_next_page_url(next_page_url: str) -> None: checkpoint.next_page_url = next_page_url @@ -1779,18 +1784,8 @@ class ConfluenceConnector( limit=self.batch_size, next_page_callback=store_next_page_url, ): - all_pages.append(page) - - # Count page title occurrences - title_counts: dict[str, int] = {} - for page in all_pages: - page_title = page.get("title", "") - title_counts[page_title] = title_counts.get(page_title, 0) + 1 - - # Process all pages - for page in all_pages: - # Build doc from page with conditional semantic_id - doc_or_failure = self._convert_page_to_document(page, title_counts) + # Build doc from page + doc_or_failure = self._convert_page_to_document(page) if isinstance(doc_or_failure, ConnectorFailure): yield doc_or_failure diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 4349b6f55..991ad7467 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -196,14 +196,48 @@ class Confluence(SyncBase): end_time = datetime.now(timezone.utc).timestamp() - document_generator = load_all_docs_from_checkpoint_connector( - connector=self.connector, - start=start_time, - end=end_time, - ) + raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + def document_batches(): + checkpoint = self.connector.build_dummy_checkpoint() + pending_docs = [] + iterations = 0 + iteration_limit = 100_000 + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper() + doc_generator = wrapper(self.connector.load_from_checkpoint(start_time, end_time, checkpoint)) + for document, failure, next_checkpoint in doc_generator: + if failure is not None: + logging.warning("Confluence connector failure: %s", getattr(failure, "failure_message", failure)) + continue + if document is not None: + pending_docs.append(document) + if len(pending_docs) >= batch_size: + yield pending_docs + pending_docs = [] + if next_checkpoint is not None: + checkpoint = next_checkpoint + + iterations += 1 + if iterations > iteration_limit: + raise RuntimeError("Too many iterations while loading Confluence documents.") + + if pending_docs: + yield pending_docs + + async def async_wrapper(): + for batch in document_batches(): + yield batch + logging.info("Connect to Confluence: {} {}".format(self.conf["wiki_base"], begin_info)) - return [document_generator] + return async_wrapper() class Notion(SyncBase):