Feat: handle duplicates for confluence, dropbox and notion
This commit is contained in:
parent
26debf0927
commit
052772b604
3 changed files with 116 additions and 24 deletions
|
|
@ -1494,7 +1494,7 @@ class ConfluenceConnector(
|
|||
return comment_string
|
||||
|
||||
def _convert_page_to_document(
|
||||
self, page: dict[str, Any]
|
||||
self, page: dict[str, Any], title_counts: dict[str, int] | None = None
|
||||
) -> Document | ConnectorFailure:
|
||||
"""
|
||||
Converts a Confluence page to a Document object.
|
||||
|
|
@ -1552,11 +1552,18 @@ class ConfluenceConnector(
|
|||
BasicExpertInfo(display_name=display_name, email=email)
|
||||
)
|
||||
|
||||
# Build semantic identifier - use full path only if title appears multiple times
|
||||
semantic_id = page_title
|
||||
if title_counts and title_counts.get(page_title, 0) > 1:
|
||||
space_name = page.get("space", {}).get("name", "")
|
||||
if space_name:
|
||||
semantic_id = f"{space_name} / {page_title}"
|
||||
|
||||
# Create the document
|
||||
return Document(
|
||||
id=page_url,
|
||||
source=DocumentSource.CONFLUENCE,
|
||||
semantic_identifier=page_title,
|
||||
semantic_identifier=semantic_id,
|
||||
extension=".html", # Confluence pages are HTML
|
||||
blob=page_content.encode("utf-8"), # Encode page content as bytes
|
||||
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
|
||||
|
|
@ -1593,10 +1600,21 @@ class ConfluenceConnector(
|
|||
attachment_docs: list[Document] = []
|
||||
page_url = ""
|
||||
|
||||
# Collect all attachments first to count filename occurrences
|
||||
all_attachments = []
|
||||
for attachment in self.confluence_client.paginated_cql_retrieval(
|
||||
cql=attachment_query,
|
||||
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
|
||||
):
|
||||
all_attachments.append(attachment)
|
||||
|
||||
# Count attachment title occurrences
|
||||
attachment_title_counts: dict[str, int] = {}
|
||||
for attachment in all_attachments:
|
||||
attachment_title = attachment.get("title", "")
|
||||
attachment_title_counts[attachment_title] = attachment_title_counts.get(attachment_title, 0) + 1
|
||||
|
||||
for attachment in all_attachments:
|
||||
media_type: str = attachment.get("metadata", {}).get("mediaType", "")
|
||||
|
||||
# TODO(rkuo): this check is partially redundant with validate_attachment_filetype
|
||||
|
|
@ -1677,11 +1695,22 @@ class ConfluenceConnector(
|
|||
|
||||
extension = Path(attachment.get("title", "")).suffix or ".unknown"
|
||||
|
||||
# Build semantic identifier - use full path only if title appears multiple times
|
||||
attachment_title = attachment.get("title", object_url)
|
||||
semantic_id = attachment_title
|
||||
if attachment_title_counts.get(attachment_title, 0) > 1:
|
||||
space_name = attachment.get("space", {}).get("name", "")
|
||||
page_title = page.get("title", "")
|
||||
if space_name and page_title:
|
||||
semantic_id = f"{space_name} / {page_title} / {attachment_title}"
|
||||
elif page_title:
|
||||
semantic_id = f"{page_title} / {attachment_title}"
|
||||
|
||||
attachment_doc = Document(
|
||||
id=attachment_id,
|
||||
# sections=sections,
|
||||
source=DocumentSource.CONFLUENCE,
|
||||
semantic_identifier=attachment.get("title", object_url),
|
||||
semantic_identifier=semantic_id,
|
||||
extension=extension,
|
||||
blob=file_blob,
|
||||
size_bytes=len(file_blob),
|
||||
|
|
@ -1739,7 +1768,9 @@ class ConfluenceConnector(
|
|||
)
|
||||
logging.debug(f"page_query_url: {page_query_url}")
|
||||
|
||||
# store the next page start for confluence server, cursor for confluence cloud
|
||||
# Collect all pages first to count title occurrences
|
||||
all_pages = []
|
||||
|
||||
def store_next_page_url(next_page_url: str) -> None:
|
||||
checkpoint.next_page_url = next_page_url
|
||||
|
||||
|
|
@ -1748,8 +1779,18 @@ class ConfluenceConnector(
|
|||
limit=self.batch_size,
|
||||
next_page_callback=store_next_page_url,
|
||||
):
|
||||
# Build doc from page
|
||||
doc_or_failure = self._convert_page_to_document(page)
|
||||
all_pages.append(page)
|
||||
|
||||
# Count page title occurrences
|
||||
title_counts: dict[str, int] = {}
|
||||
for page in all_pages:
|
||||
page_title = page.get("title", "")
|
||||
title_counts[page_title] = title_counts.get(page_title, 0) + 1
|
||||
|
||||
# Process all pages
|
||||
for page in all_pages:
|
||||
# Build doc from page with conditional semantic_id
|
||||
doc_or_failure = self._convert_page_to_document(page, title_counts)
|
||||
|
||||
if isinstance(doc_or_failure, ConnectorFailure):
|
||||
yield doc_or_failure
|
||||
|
|
|
|||
|
|
@ -87,15 +87,69 @@ class DropboxConnector(LoadConnector, PollConnector):
|
|||
if self.dropbox_client is None:
|
||||
raise ConnectorMissingCredentialError("Dropbox")
|
||||
|
||||
# Collect all files first to count filename occurrences
|
||||
all_files = []
|
||||
self._collect_files_recursive(path, start, end, all_files)
|
||||
|
||||
# Count filename occurrences
|
||||
filename_counts: dict[str, int] = {}
|
||||
for entry, _ in all_files:
|
||||
filename_counts[entry.name] = filename_counts.get(entry.name, 0) + 1
|
||||
|
||||
# Process files in batches
|
||||
batch: list[Document] = []
|
||||
for entry, downloaded_file in all_files:
|
||||
modified_time = entry.client_modified
|
||||
if modified_time.tzinfo is None:
|
||||
modified_time = modified_time.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
modified_time = modified_time.astimezone(timezone.utc)
|
||||
|
||||
# Use full path only if filename appears multiple times
|
||||
if filename_counts.get(entry.name, 0) > 1:
|
||||
# Remove leading slash and replace slashes with ' / '
|
||||
relative_path = entry.path_display.lstrip('/')
|
||||
semantic_id = relative_path.replace('/', ' / ') if relative_path else entry.name
|
||||
else:
|
||||
semantic_id = entry.name
|
||||
|
||||
batch.append(
|
||||
Document(
|
||||
id=f"dropbox:{entry.id}",
|
||||
blob=downloaded_file,
|
||||
source=DocumentSource.DROPBOX,
|
||||
semantic_identifier=semantic_id,
|
||||
extension=get_file_ext(entry.name),
|
||||
doc_updated_at=modified_time,
|
||||
size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file),
|
||||
)
|
||||
)
|
||||
|
||||
if len(batch) == self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def _collect_files_recursive(
|
||||
self,
|
||||
path: str,
|
||||
start: SecondsSinceUnixEpoch | None,
|
||||
end: SecondsSinceUnixEpoch | None,
|
||||
all_files: list,
|
||||
) -> None:
|
||||
"""Recursively collect all files matching time criteria."""
|
||||
if self.dropbox_client is None:
|
||||
raise ConnectorMissingCredentialError("Dropbox")
|
||||
|
||||
result = self.dropbox_client.files_list_folder(
|
||||
path,
|
||||
limit=self.batch_size,
|
||||
recursive=False,
|
||||
include_non_downloadable_files=False,
|
||||
)
|
||||
|
||||
while True:
|
||||
batch: list[Document] = []
|
||||
for entry in result.entries:
|
||||
if isinstance(entry, FileMetadata):
|
||||
modified_time = entry.client_modified
|
||||
|
|
@ -112,27 +166,13 @@ class DropboxConnector(LoadConnector, PollConnector):
|
|||
|
||||
try:
|
||||
downloaded_file = self._download_file(entry.path_display)
|
||||
all_files.append((entry, downloaded_file))
|
||||
except Exception:
|
||||
logger.exception(f"[Dropbox]: Error downloading file {entry.path_display}")
|
||||
continue
|
||||
|
||||
batch.append(
|
||||
Document(
|
||||
id=f"dropbox:{entry.id}",
|
||||
blob=downloaded_file,
|
||||
source=DocumentSource.DROPBOX,
|
||||
semantic_identifier=entry.name,
|
||||
extension=get_file_ext(entry.name),
|
||||
doc_updated_at=modified_time,
|
||||
size_bytes=entry.size if getattr(entry, "size", None) is not None else len(downloaded_file),
|
||||
)
|
||||
)
|
||||
|
||||
elif isinstance(entry, FolderMetadata):
|
||||
yield from self._yield_files_recursive(entry.path_lower, start, end)
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
self._collect_files_recursive(entry.path_lower, start, end, all_files)
|
||||
|
||||
if not result.has_more:
|
||||
break
|
||||
|
|
|
|||
|
|
@ -447,6 +447,17 @@ class NotionConnector(LoadConnector, PollConnector):
|
|||
|
||||
raw_page_title = self._read_page_title(page)
|
||||
page_title = raw_page_title or f"Untitled Page with ID {page.id}"
|
||||
|
||||
# Count attachment semantic_identifier occurrences within this page
|
||||
attachment_name_counts: dict[str, int] = {}
|
||||
for att_doc in attachment_docs:
|
||||
name = att_doc.semantic_identifier
|
||||
attachment_name_counts[name] = attachment_name_counts.get(name, 0) + 1
|
||||
|
||||
# Update semantic identifiers for duplicate attachments
|
||||
for att_doc in attachment_docs:
|
||||
if attachment_name_counts.get(att_doc.semantic_identifier, 0) > 1:
|
||||
att_doc.semantic_identifier = f"{page_title} / {att_doc.semantic_identifier}"
|
||||
|
||||
if not page_blocks:
|
||||
if not raw_page_title:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue