support dev setting to control if sync deleted

This commit is contained in:
“hu 2025-11-26 08:35:31 +08:00
parent f19e64f4c8
commit 1c8dcf7ac6
2 changed files with 35 additions and 32 deletions

View file

@ -83,6 +83,8 @@ kg_retriever = None
# user registration switch # user registration switch
REGISTER_ENABLED = 1 REGISTER_ENABLED = 1
ENABLE_SYNC_DELETED_CHANGE = os.getenv('ENABLE_SYNC_DELETED_CHANGE', False)
# sandbox-executor-manager # sandbox-executor-manager
SANDBOX_HOST = None SANDBOX_HOST = None

View file

@ -101,40 +101,41 @@ class SyncBase:
SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err))
doc_num += len(docs) doc_num += len(docs)
task_copy = copy.deepcopy(task) if settings.ENABLE_SYNC_DELETED_CHANGE:
task_copy.pop("poll_range_start", None) task_copy = copy.deepcopy(task)
document_batch_generator = await self._generate(task) task_copy.pop("poll_range_start", None)
for document_batch in document_batch_generator: document_batch_generator = await self._generate(task)
if not document_batch: for document_batch in document_batch_generator:
continue if not document_batch:
docs = [ continue
{ docs = [
"id": doc.id, {
"connector_id": task["connector_id"], "id": doc.id,
"source": self.SOURCE_NAME, "connector_id": task["connector_id"],
"semantic_identifier": doc.semantic_identifier, "source": self.SOURCE_NAME,
"extension": doc.extension, "semantic_identifier": doc.semantic_identifier,
"size_bytes": doc.size_bytes, "extension": doc.extension,
"doc_updated_at": doc.doc_updated_at, "size_bytes": doc.size_bytes,
"blob": doc.blob, "doc_updated_at": doc.doc_updated_at,
} "blob": doc.blob,
for doc in document_batch }
] for doc in document_batch
]
for doc in docs: for doc in docs:
synced_doc_ids.add(doc["id"]) synced_doc_ids.add(doc["id"])
# delete removed docs # delete removed docs
if not existing_doc_ids: if not existing_doc_ids:
to_delete_ids = [] to_delete_ids = []
for doc_id in existing_doc_ids: for doc_id in existing_doc_ids:
if doc_id not in synced_doc_ids: if doc_id not in synced_doc_ids:
to_delete_ids.append(doc_id) to_delete_ids.append(doc_id)
if to_delete_ids: if to_delete_ids:
FileService.delete_docs(to_delete_ids, task["tenant_id"]) FileService.delete_docs(to_delete_ids, task["tenant_id"])
SyncLogsService.increase_deleted_docs(task["id"], len(to_delete_ids)) SyncLogsService.increase_deleted_docs(task["id"], len(to_delete_ids))
logging.info(f"Deleted {len(to_delete_ids)} documents from knowledge base {task['kb_id']} for connector {task['connector_id']}") logging.info(f"Deleted {len(to_delete_ids)} documents from knowledge base {task['kb_id']} for connector {task['connector_id']}")
prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else "" prefix = "[Jira] " if self.SOURCE_NAME == FileSource.JIRA else ""
logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}")