From 7b424c70fc33cdc4564ffd72d6ef7e75a0c22cfd Mon Sep 17 00:00:00 2001 From: Stephen Hu <812791840@qq.com> Date: Mon, 24 Nov 2025 09:11:37 +0800 Subject: [PATCH] fix when re index will delete the files not in remote 1. add method in document service to query doc ids by src 2. add method in connector service to record delete 3. when re index support delete --- api/db/services/connector_service.py | 14 +++++++++++++- api/db/services/document_service.py | 17 +++++++++++++++++ rag/svr/sync_data_source.py | 23 +++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index db8c713e2..1a724042e 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -194,7 +194,19 @@ class SyncLogsService(CommonService): update_date=timestamp_to_date(current_timestamp()) )\ .where(cls.model.id == id).execute() - + + @classmethod + def increase_deleted_docs(cls, min_update, max_update, doc_num, err_msg="", error_count=0): + cls.model.update( + docs_removed_from_index=cls.model.docs_removed_from_index + doc_num, + total_docs_indexed=cls.model.total_docs_indexed - doc_num, + poll_range_start=fn.COALESCE(fn.LEAST(cls.model.poll_range_start,min_update), min_update), + poll_range_end=fn.COALESCE(fn.GREATEST(cls.model.poll_range_end, max_update), max_update), + error_msg=cls.model.error_msg + err_msg, + error_count=cls.model.error_count + error_count, + update_time=current_timestamp(), + update_date=timestamp_to_date(current_timestamp()) + ) @classmethod def duplicate_and_parse(cls, kb, docs, tenant_id, src, auto_parse=True): from api.db.services.file_service import FileService diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 514b3fd87..312b0233d 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -750,6 +750,22 @@ class DocumentService(CommonService): for row in rows: result[row.kb_id] = row.count return result + + @classmethod + @DB.connection_context() + def list_document_ids_by_src(cls, tenant_id, kb, src): + fields = [cls.model.id] + docs = cls.model.select(*fields)\ + .where( + (cls.model.kb_id == kb), + (cls.model.source_type == src) + ) + + res = [] + for doc in docs: + res.append(doc.id) + + return res @classmethod @DB.connection_context() @@ -1028,3 +1044,4 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id): doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0) return [d["id"] for d, _ in files] + diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index b29ad15de..41f1e196d 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -35,6 +35,8 @@ import trio from api.db.services.connector_service import ConnectorService, SyncLogsService from api.db.services.knowledgebase_service import KnowledgebaseService +from api.db.services.document_service import DocumentService +from api.db.services.file_service import FileService from common import settings from common.config_utils import show_configs from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, MoodleConnector, JiraConnector @@ -61,6 +63,13 @@ class SyncBase: SyncLogsService.start(task["id"], task["connector_id"]) try: async with task_limiter: + synced_doc_ids = set() #synced document ids for this sync task + source_type = f"{self.SOURCE_NAME}/{task['connector_id']}" + existing_doc_ids = [] + with trio.fail_after(task["timeout_secs"]): + # get current synced docs from last sync + existing_doc_ids = DocumentService.list_documents_by_source(task["tenant_id"], task["kb_id"], source_type) + with trio.fail_after(task["timeout_secs"]): document_batch_generator = await self._generate(task) doc_num = 0 @@ -91,6 +100,20 @@ class SyncBase: err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs) + for did in dids: + synced_doc_ids.add(did) + + # delete removed docs + if not existing_doc_ids and (task["reindex"] == "1" or not task["poll_range_start"]): + to_delete_ids = [] + for doc_id in existing_doc_ids: + if doc_id not in synced_doc_ids: + to_delete_ids.append(doc_id) + + if to_delete_ids: + FileService.delete_docs(to_delete_ids, task["tenant_id"]) + SyncLogsService.increase_deleted_docs(task["id"], len(to_delete_ids)) + logging.info(f"Deleted {len(to_delete_ids)} documents from knowledge base {task['kb_id']} for connector {task['connector_id']}") prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}")