From f9700c16cd201872f9b0eba2f6f3360972f33a7f Mon Sep 17 00:00:00 2001 From: Jonah879 Date: Thu, 20 Nov 2025 15:48:46 +0000 Subject: [PATCH] fix: webdav connector restarting on DB error. Enhance error handling in SyncBase for document synchronization. --- rag/svr/sync_data_source.py | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 195b9c523..66df335fa 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -47,14 +47,8 @@ import faulthandler from common.constants import FileSource, TaskStatus from common import settings from common.config_utils import show_configs +from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, JiraConnector from common.constants import FileSource, TaskStatus -from common.data_source import ( - BlobStorageConnector, - DiscordConnector, - GoogleDriveConnector, - JiraConnector, - NotionConnector, -) from common.data_source.config import INDEX_BATCH_SIZE from common.data_source.confluence_connector import ConfluenceConnector from common.data_source.interfaces import CheckpointOutputWrapper @@ -83,6 +77,8 @@ class SyncBase: next_update = datetime(1970, 1, 1, tzinfo=timezone.utc) if task["poll_range_start"]: next_update = task["poll_range_start"] + + failed_docs = 0 for document_batch in document_batch_generator: if not document_batch: continue @@ -103,13 +99,30 @@ class SyncBase: for doc in document_batch ] - e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) - err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) - SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) - doc_num += len(docs) + try: + e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) + err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) + SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) + doc_num += len(docs) + except Exception as batch_ex: + error_msg = str(batch_ex) + error_code = getattr(batch_ex, 'args', (None,))[0] if hasattr(batch_ex, 'args') else None + + if error_code == 1267 or "collation" in error_msg.lower(): + logging.warning(f"Skipping {len(docs)} document(s) due to database collation conflict (error 1267)") + for doc in docs: + logging.debug(f"Skipped: {doc['semantic_identifier']}") + else: + logging.error(f"Error processing batch of {len(docs)} documents: {error_msg}") + + failed_docs += len(docs) + continue prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" - logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") + if failed_docs > 0: + logging.info(f"{prefix}{doc_num} docs synchronized till {next_update} ({failed_docs} skipped)") + else: + logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") SyncLogsService.done(task["id"], task["connector_id"]) task["poll_range_start"] = next_update