From 3f1bc60bbe37885ef8efc4c3843341c91e5c9cbd Mon Sep 17 00:00:00 2001 From: Stephen Hu <812791840@qq.com> Date: Mon, 24 Nov 2025 12:59:12 +0800 Subject: [PATCH] Update sync_data_source.py --- rag/svr/sync_data_source.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 41f1e196d..1c8b332e7 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -100,11 +100,32 @@ class SyncBase: err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs) - for did in dids: - synced_doc_ids.add(did) + + task_copy = copy.deepcopy(task) + task_copy.pop("poll_range_start", None) + document_batch_generator = await self._generate(task) + for document_batch in document_batch_generator: + if not document_batch: + continue + docs = [ + { + "id": doc.id, + "connector_id": task["connector_id"], + "source": self.SOURCE_NAME, + "semantic_identifier": doc.semantic_identifier, + "extension": doc.extension, + "size_bytes": doc.size_bytes, + "doc_updated_at": doc.doc_updated_at, + "blob": doc.blob, + } + for doc in document_batch + ] + + for doc in docs: + synced_doc_ids.add(doc["id"]) # delete removed docs - if not existing_doc_ids and (task["reindex"] == "1" or not task["poll_range_start"]): + if not existing_doc_ids: to_delete_ids = [] for doc_id in existing_doc_ids: if doc_id not in synced_doc_ids: