From d923ea021e38875103533e566a28000f70c515cd Mon Sep 17 00:00:00 2001 From: "David L. Carrascal" Date: Wed, 10 Dec 2025 11:18:05 +0100 Subject: [PATCH] fix: change the invocation so the coroutine si properly executed --- rag/svr/sync_data_source.py | 73 ++++++++++++------------------------- 1 file changed, 24 insertions(+), 49 deletions(-) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 400f90370..5953a0512 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -70,15 +70,13 @@ class SyncBase: return except Exception as ex: - msg = "\n".join([ - "".join(traceback.format_exception_only(None, ex)).strip(), - "".join(traceback.format_exception(None, ex, ex.__traceback__)).strip(), - ]) - SyncLogsService.update_by_id(task["id"], { - "status": TaskStatus.FAIL, - "full_exception_trace": msg, - "error_msg": str(ex) - }) + msg = "\n".join( + [ + "".join(traceback.format_exception_only(None, ex)).strip(), + "".join(traceback.format_exception(None, ex, ex.__traceback__)).strip(), + ] + ) + SyncLogsService.update_by_id(task["id"], {"status": TaskStatus.FAIL, "full_exception_trace": msg, "error_msg": str(ex)}) return SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"]) @@ -93,7 +91,7 @@ class SyncBase: if task["poll_range_start"]: next_update = task["poll_range_start"] - async for document_batch in document_batch_generator: # 如果是 async generator + async for document_batch in document_batch_generator: # 如果是 async generator if not document_batch: continue @@ -119,15 +117,8 @@ class SyncBase: try: e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) - err, dids = SyncLogsService.duplicate_and_parse( - kb, docs, task["tenant_id"], - f"{self.SOURCE_NAME}/{task['connector_id']}", - task["auto_parse"] - ) - SyncLogsService.increase_docs( - task["id"], min_update, max_update, - len(docs), "\n".join(err), len(err) - ) + err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) + SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs) @@ -351,9 +342,7 @@ class Dropbox(SyncBase): begin_info = "totally" else: poll_start = task["poll_range_start"] - document_generator = self.connector.poll_source( - poll_start.timestamp(), datetime.now(timezone.utc).timestamp() - ) + document_generator = self.connector.poll_source(poll_start.timestamp(), datetime.now(timezone.utc).timestamp()) begin_info = f"from {poll_start}" logging.info(f"[Dropbox] Connect to Dropbox {begin_info}") @@ -511,9 +500,7 @@ class Jira(SyncBase): ) for document, failure, next_checkpoint in generator: if failure is not None: - logging.warning( - f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}" - ) + logging.warning(f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}") continue if document is not None: pending_docs.append(document) @@ -568,15 +555,12 @@ class WebDAV(SyncBase): SOURCE_NAME: str = FileSource.WEBDAV async def _generate(self, task: dict): - self.connector = WebDAVConnector( - base_url=self.conf["base_url"], - remote_path=self.conf.get("remote_path", "/") - ) + self.connector = WebDAVConnector(base_url=self.conf["base_url"], remote_path=self.conf.get("remote_path", "/")) self.connector.load_credentials(self.conf["credentials"]) - + logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}") - - if task["reindex"]=="1" or not task["poll_range_start"]: + + if task["reindex"] == "1" or not task["poll_range_start"]: logging.info("Using load_from_state (full sync)") document_batch_generator = self.connector.load_from_state() begin_info = "totally" @@ -586,23 +570,17 @@ class WebDAV(SyncBase): logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})") document_batch_generator = self.connector.poll_source(start_ts, end_ts) begin_info = "from {}".format(task["poll_range_start"]) - - logging.info("Connect to WebDAV: {}(path: {}) {}".format( - self.conf["base_url"], - self.conf.get("remote_path", "/"), - begin_info - )) + + logging.info("Connect to WebDAV: {}(path: {}) {}".format(self.conf["base_url"], self.conf.get("remote_path", "/"), begin_info)) return document_batch_generator - + + class Moodle(SyncBase): SOURCE_NAME: str = FileSource.MOODLE async def _generate(self, task: dict): - self.connector = MoodleConnector( - moodle_url=self.conf["moodle_url"], - batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE) - ) - + self.connector = MoodleConnector(moodle_url=self.conf["moodle_url"], batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE)) + self.connector.load_credentials(self.conf["credentials"]) # Determine the time range for synchronization based on reindex or poll_range_start @@ -615,10 +593,7 @@ class Moodle(SyncBase): document_generator = self.connector.load_from_state() begin_info = "totally" else: - document_generator = self.connector.poll_source( - poll_start.timestamp(), - datetime.now(timezone.utc).timestamp() - ) + document_generator = self.connector.poll_source(poll_start.timestamp(), datetime.now(timezone.utc).timestamp()) begin_info = "from {}".format(poll_start) logging.info("Connect to Moodle: {} {}".format(self.conf["moodle_url"], begin_info)) @@ -714,4 +689,4 @@ async def main(): if __name__ == "__main__": faulthandler.enable() init_root_logger(CONSUMER_NAME) - asyncio.run(main) + asyncio.run(main())