From 1c8dcf7ac6001a162f49dcace05e8f0cf37174ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chu?= <812791840@qq.com> Date: Wed, 26 Nov 2025 08:35:31 +0800 Subject: [PATCH] support dev setting to control if sync deleted --- common/settings.py | 2 ++ rag/svr/sync_data_source.py | 65 +++++++++++++++++++------------------ 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/common/settings.py b/common/settings.py index 9df0c0cd2..3921f4aa9 100644 --- a/common/settings.py +++ b/common/settings.py @@ -83,6 +83,8 @@ kg_retriever = None # user registration switch REGISTER_ENABLED = 1 +ENABLE_SYNC_DELETED_CHANGE = os.getenv('ENABLE_SYNC_DELETED_CHANGE', False) + # sandbox-executor-manager SANDBOX_HOST = None diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 1c8b332e7..4ea0b4fe5 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -101,40 +101,41 @@ class SyncBase: SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs) - task_copy = copy.deepcopy(task) - task_copy.pop("poll_range_start", None) - document_batch_generator = await self._generate(task) - for document_batch in document_batch_generator: - if not document_batch: - continue - docs = [ - { - "id": doc.id, - "connector_id": task["connector_id"], - "source": self.SOURCE_NAME, - "semantic_identifier": doc.semantic_identifier, - "extension": doc.extension, - "size_bytes": doc.size_bytes, - "doc_updated_at": doc.doc_updated_at, - "blob": doc.blob, - } - for doc in document_batch - ] + if settings.ENABLE_SYNC_DELETED_CHANGE: + task_copy = copy.deepcopy(task) + task_copy.pop("poll_range_start", None) + document_batch_generator = await self._generate(task) + for document_batch in document_batch_generator: + if not document_batch: + continue + docs = [ + { + "id": doc.id, + "connector_id": task["connector_id"], + "source": self.SOURCE_NAME, + "semantic_identifier": doc.semantic_identifier, + "extension": doc.extension, + "size_bytes": doc.size_bytes, + "doc_updated_at": doc.doc_updated_at, + "blob": doc.blob, + } + for doc in document_batch + ] - for doc in docs: - synced_doc_ids.add(doc["id"]) + for doc in docs: + synced_doc_ids.add(doc["id"]) - # delete removed docs - if not existing_doc_ids: - to_delete_ids = [] - for doc_id in existing_doc_ids: - if doc_id not in synced_doc_ids: - to_delete_ids.append(doc_id) - - if to_delete_ids: - FileService.delete_docs(to_delete_ids, task["tenant_id"]) - SyncLogsService.increase_deleted_docs(task["id"], len(to_delete_ids)) - logging.info(f"Deleted {len(to_delete_ids)} documents from knowledge base {task['kb_id']} for connector {task['connector_id']}") + # delete removed docs + if not existing_doc_ids: + to_delete_ids = [] + for doc_id in existing_doc_ids: + if doc_id not in synced_doc_ids: + to_delete_ids.append(doc_id) + + if to_delete_ids: + FileService.delete_docs(to_delete_ids, task["tenant_id"]) + SyncLogsService.increase_deleted_docs(task["id"], len(to_delete_ids)) + logging.info(f"Deleted {len(to_delete_ids)} documents from knowledge base {task['kb_id']} for connector {task['connector_id']}") prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}")