fix confluence connector
This commit is contained in:
parent
052772b604
commit
df4219d30e
2 changed files with 82 additions and 53 deletions
|
|
@ -1494,7 +1494,7 @@ class ConfluenceConnector(
|
||||||
return comment_string
|
return comment_string
|
||||||
|
|
||||||
def _convert_page_to_document(
|
def _convert_page_to_document(
|
||||||
self, page: dict[str, Any], title_counts: dict[str, int] | None = None
|
self, page: dict[str, Any]
|
||||||
) -> Document | ConnectorFailure:
|
) -> Document | ConnectorFailure:
|
||||||
"""
|
"""
|
||||||
Converts a Confluence page to a Document object.
|
Converts a Confluence page to a Document object.
|
||||||
|
|
@ -1510,6 +1510,27 @@ class ConfluenceConnector(
|
||||||
self.wiki_base, page["_links"]["webui"], self.is_cloud
|
self.wiki_base, page["_links"]["webui"], self.is_cloud
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Build hierarchical path for semantic identifier
|
||||||
|
space_name = page.get("space", {}).get("name", "")
|
||||||
|
|
||||||
|
# Build path from ancestors
|
||||||
|
path_parts = []
|
||||||
|
if space_name:
|
||||||
|
path_parts.append(space_name)
|
||||||
|
|
||||||
|
# Add ancestor pages to path if available
|
||||||
|
if "ancestors" in page and page["ancestors"]:
|
||||||
|
for ancestor in page["ancestors"]:
|
||||||
|
ancestor_title = ancestor.get("title", "")
|
||||||
|
if ancestor_title:
|
||||||
|
path_parts.append(ancestor_title)
|
||||||
|
|
||||||
|
# Add current page title
|
||||||
|
path_parts.append(page_title)
|
||||||
|
|
||||||
|
# Create full path identifier
|
||||||
|
semantic_identifier = " / ".join(path_parts) if len(path_parts) > 1 else page_title
|
||||||
|
|
||||||
# Get the page content
|
# Get the page content
|
||||||
page_content = extract_text_from_confluence_html(
|
page_content = extract_text_from_confluence_html(
|
||||||
self.confluence_client, page, self._fetched_titles
|
self.confluence_client, page, self._fetched_titles
|
||||||
|
|
@ -1552,18 +1573,11 @@ class ConfluenceConnector(
|
||||||
BasicExpertInfo(display_name=display_name, email=email)
|
BasicExpertInfo(display_name=display_name, email=email)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build semantic identifier - use full path only if title appears multiple times
|
|
||||||
semantic_id = page_title
|
|
||||||
if title_counts and title_counts.get(page_title, 0) > 1:
|
|
||||||
space_name = page.get("space", {}).get("name", "")
|
|
||||||
if space_name:
|
|
||||||
semantic_id = f"{space_name} / {page_title}"
|
|
||||||
|
|
||||||
# Create the document
|
# Create the document
|
||||||
return Document(
|
return Document(
|
||||||
id=page_url,
|
id=page_url,
|
||||||
source=DocumentSource.CONFLUENCE,
|
source=DocumentSource.CONFLUENCE,
|
||||||
semantic_identifier=semantic_id,
|
semantic_identifier=semantic_identifier,
|
||||||
extension=".html", # Confluence pages are HTML
|
extension=".html", # Confluence pages are HTML
|
||||||
blob=page_content.encode("utf-8"), # Encode page content as bytes
|
blob=page_content.encode("utf-8"), # Encode page content as bytes
|
||||||
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
|
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
|
||||||
|
|
@ -1600,23 +1614,11 @@ class ConfluenceConnector(
|
||||||
attachment_docs: list[Document] = []
|
attachment_docs: list[Document] = []
|
||||||
page_url = ""
|
page_url = ""
|
||||||
|
|
||||||
# Collect all attachments first to count filename occurrences
|
|
||||||
all_attachments = []
|
|
||||||
for attachment in self.confluence_client.paginated_cql_retrieval(
|
for attachment in self.confluence_client.paginated_cql_retrieval(
|
||||||
cql=attachment_query,
|
cql=attachment_query,
|
||||||
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
|
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
|
||||||
):
|
):
|
||||||
all_attachments.append(attachment)
|
|
||||||
|
|
||||||
# Count attachment title occurrences
|
|
||||||
attachment_title_counts: dict[str, int] = {}
|
|
||||||
for attachment in all_attachments:
|
|
||||||
attachment_title = attachment.get("title", "")
|
|
||||||
attachment_title_counts[attachment_title] = attachment_title_counts.get(attachment_title, 0) + 1
|
|
||||||
|
|
||||||
for attachment in all_attachments:
|
|
||||||
media_type: str = attachment.get("metadata", {}).get("mediaType", "")
|
media_type: str = attachment.get("metadata", {}).get("mediaType", "")
|
||||||
|
|
||||||
# TODO(rkuo): this check is partially redundant with validate_attachment_filetype
|
# TODO(rkuo): this check is partially redundant with validate_attachment_filetype
|
||||||
# and checks in convert_attachment_to_content/process_attachment
|
# and checks in convert_attachment_to_content/process_attachment
|
||||||
# but doing the check here avoids an unnecessary download. Due for refactoring.
|
# but doing the check here avoids an unnecessary download. Due for refactoring.
|
||||||
|
|
@ -1684,6 +1686,21 @@ class ConfluenceConnector(
|
||||||
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
|
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Build semantic identifier with space and page context
|
||||||
|
attachment_title = attachment.get("title", object_url)
|
||||||
|
space_name = page.get("space", {}).get("name", "")
|
||||||
|
page_title = page.get("title", "")
|
||||||
|
|
||||||
|
# Create hierarchical name: Space > Page > Attachment
|
||||||
|
attachment_path_parts = []
|
||||||
|
if space_name:
|
||||||
|
attachment_path_parts.append(space_name)
|
||||||
|
if page_title:
|
||||||
|
attachment_path_parts.append(page_title)
|
||||||
|
attachment_path_parts.append(attachment_title)
|
||||||
|
|
||||||
|
attachment_semantic_identifier = " / ".join(attachment_path_parts) if len(attachment_path_parts) > 1 else attachment_title
|
||||||
|
|
||||||
primary_owners: list[BasicExpertInfo] | None = None
|
primary_owners: list[BasicExpertInfo] | None = None
|
||||||
if "version" in attachment and "by" in attachment["version"]:
|
if "version" in attachment and "by" in attachment["version"]:
|
||||||
author = attachment["version"]["by"]
|
author = attachment["version"]["by"]
|
||||||
|
|
@ -1695,22 +1712,12 @@ class ConfluenceConnector(
|
||||||
|
|
||||||
extension = Path(attachment.get("title", "")).suffix or ".unknown"
|
extension = Path(attachment.get("title", "")).suffix or ".unknown"
|
||||||
|
|
||||||
# Build semantic identifier - use full path only if title appears multiple times
|
|
||||||
attachment_title = attachment.get("title", object_url)
|
|
||||||
semantic_id = attachment_title
|
|
||||||
if attachment_title_counts.get(attachment_title, 0) > 1:
|
|
||||||
space_name = attachment.get("space", {}).get("name", "")
|
|
||||||
page_title = page.get("title", "")
|
|
||||||
if space_name and page_title:
|
|
||||||
semantic_id = f"{space_name} / {page_title} / {attachment_title}"
|
|
||||||
elif page_title:
|
|
||||||
semantic_id = f"{page_title} / {attachment_title}"
|
|
||||||
|
|
||||||
attachment_doc = Document(
|
attachment_doc = Document(
|
||||||
id=attachment_id,
|
id=attachment_id,
|
||||||
# sections=sections,
|
# sections=sections,
|
||||||
source=DocumentSource.CONFLUENCE,
|
source=DocumentSource.CONFLUENCE,
|
||||||
semantic_identifier=semantic_id,
|
semantic_identifier=attachment_semantic_identifier,
|
||||||
extension=extension,
|
extension=extension,
|
||||||
blob=file_blob,
|
blob=file_blob,
|
||||||
size_bytes=len(file_blob),
|
size_bytes=len(file_blob),
|
||||||
|
|
@ -1767,10 +1774,8 @@ class ConfluenceConnector(
|
||||||
start_ts, end, self.batch_size
|
start_ts, end, self.batch_size
|
||||||
)
|
)
|
||||||
logging.debug(f"page_query_url: {page_query_url}")
|
logging.debug(f"page_query_url: {page_query_url}")
|
||||||
|
|
||||||
# Collect all pages first to count title occurrences
|
|
||||||
all_pages = []
|
|
||||||
|
|
||||||
|
# store the next page start for confluence server, cursor for confluence cloud
|
||||||
def store_next_page_url(next_page_url: str) -> None:
|
def store_next_page_url(next_page_url: str) -> None:
|
||||||
checkpoint.next_page_url = next_page_url
|
checkpoint.next_page_url = next_page_url
|
||||||
|
|
||||||
|
|
@ -1779,18 +1784,8 @@ class ConfluenceConnector(
|
||||||
limit=self.batch_size,
|
limit=self.batch_size,
|
||||||
next_page_callback=store_next_page_url,
|
next_page_callback=store_next_page_url,
|
||||||
):
|
):
|
||||||
all_pages.append(page)
|
# Build doc from page
|
||||||
|
doc_or_failure = self._convert_page_to_document(page)
|
||||||
# Count page title occurrences
|
|
||||||
title_counts: dict[str, int] = {}
|
|
||||||
for page in all_pages:
|
|
||||||
page_title = page.get("title", "")
|
|
||||||
title_counts[page_title] = title_counts.get(page_title, 0) + 1
|
|
||||||
|
|
||||||
# Process all pages
|
|
||||||
for page in all_pages:
|
|
||||||
# Build doc from page with conditional semantic_id
|
|
||||||
doc_or_failure = self._convert_page_to_document(page, title_counts)
|
|
||||||
|
|
||||||
if isinstance(doc_or_failure, ConnectorFailure):
|
if isinstance(doc_or_failure, ConnectorFailure):
|
||||||
yield doc_or_failure
|
yield doc_or_failure
|
||||||
|
|
|
||||||
|
|
@ -196,14 +196,48 @@ class Confluence(SyncBase):
|
||||||
|
|
||||||
end_time = datetime.now(timezone.utc).timestamp()
|
end_time = datetime.now(timezone.utc).timestamp()
|
||||||
|
|
||||||
document_generator = load_all_docs_from_checkpoint_connector(
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
||||||
connector=self.connector,
|
try:
|
||||||
start=start_time,
|
batch_size = int(raw_batch_size)
|
||||||
end=end_time,
|
except (TypeError, ValueError):
|
||||||
)
|
batch_size = INDEX_BATCH_SIZE
|
||||||
|
if batch_size <= 0:
|
||||||
|
batch_size = INDEX_BATCH_SIZE
|
||||||
|
|
||||||
|
def document_batches():
|
||||||
|
checkpoint = self.connector.build_dummy_checkpoint()
|
||||||
|
pending_docs = []
|
||||||
|
iterations = 0
|
||||||
|
iteration_limit = 100_000
|
||||||
|
|
||||||
|
while checkpoint.has_more:
|
||||||
|
wrapper = CheckpointOutputWrapper()
|
||||||
|
doc_generator = wrapper(self.connector.load_from_checkpoint(start_time, end_time, checkpoint))
|
||||||
|
for document, failure, next_checkpoint in doc_generator:
|
||||||
|
if failure is not None:
|
||||||
|
logging.warning("Confluence connector failure: %s", getattr(failure, "failure_message", failure))
|
||||||
|
continue
|
||||||
|
if document is not None:
|
||||||
|
pending_docs.append(document)
|
||||||
|
if len(pending_docs) >= batch_size:
|
||||||
|
yield pending_docs
|
||||||
|
pending_docs = []
|
||||||
|
if next_checkpoint is not None:
|
||||||
|
checkpoint = next_checkpoint
|
||||||
|
|
||||||
|
iterations += 1
|
||||||
|
if iterations > iteration_limit:
|
||||||
|
raise RuntimeError("Too many iterations while loading Confluence documents.")
|
||||||
|
|
||||||
|
if pending_docs:
|
||||||
|
yield pending_docs
|
||||||
|
|
||||||
|
async def async_wrapper():
|
||||||
|
for batch in document_batches():
|
||||||
|
yield batch
|
||||||
|
|
||||||
logging.info("Connect to Confluence: {} {}".format(self.conf["wiki_base"], begin_info))
|
logging.info("Connect to Confluence: {} {}".format(self.conf["wiki_base"], begin_info))
|
||||||
return [document_generator]
|
return async_wrapper()
|
||||||
|
|
||||||
|
|
||||||
class Notion(SyncBase):
|
class Notion(SyncBase):
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue