fix: change the invocation so the coroutine si properly executed
This commit is contained in:
parent
ab4b62031f
commit
d923ea021e
1 changed files with 24 additions and 49 deletions
|
|
@ -70,15 +70,13 @@ class SyncBase:
|
||||||
return
|
return
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
msg = "\n".join([
|
msg = "\n".join(
|
||||||
"".join(traceback.format_exception_only(None, ex)).strip(),
|
[
|
||||||
"".join(traceback.format_exception(None, ex, ex.__traceback__)).strip(),
|
"".join(traceback.format_exception_only(None, ex)).strip(),
|
||||||
])
|
"".join(traceback.format_exception(None, ex, ex.__traceback__)).strip(),
|
||||||
SyncLogsService.update_by_id(task["id"], {
|
]
|
||||||
"status": TaskStatus.FAIL,
|
)
|
||||||
"full_exception_trace": msg,
|
SyncLogsService.update_by_id(task["id"], {"status": TaskStatus.FAIL, "full_exception_trace": msg, "error_msg": str(ex)})
|
||||||
"error_msg": str(ex)
|
|
||||||
})
|
|
||||||
return
|
return
|
||||||
|
|
||||||
SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"])
|
SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"])
|
||||||
|
|
@ -93,7 +91,7 @@ class SyncBase:
|
||||||
if task["poll_range_start"]:
|
if task["poll_range_start"]:
|
||||||
next_update = task["poll_range_start"]
|
next_update = task["poll_range_start"]
|
||||||
|
|
||||||
async for document_batch in document_batch_generator: # 如果是 async generator
|
async for document_batch in document_batch_generator: # 如果是 async generator
|
||||||
if not document_batch:
|
if not document_batch:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -119,15 +117,8 @@ class SyncBase:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
|
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
|
||||||
err, dids = SyncLogsService.duplicate_and_parse(
|
err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"])
|
||||||
kb, docs, task["tenant_id"],
|
SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err))
|
||||||
f"{self.SOURCE_NAME}/{task['connector_id']}",
|
|
||||||
task["auto_parse"]
|
|
||||||
)
|
|
||||||
SyncLogsService.increase_docs(
|
|
||||||
task["id"], min_update, max_update,
|
|
||||||
len(docs), "\n".join(err), len(err)
|
|
||||||
)
|
|
||||||
|
|
||||||
doc_num += len(docs)
|
doc_num += len(docs)
|
||||||
|
|
||||||
|
|
@ -351,9 +342,7 @@ class Dropbox(SyncBase):
|
||||||
begin_info = "totally"
|
begin_info = "totally"
|
||||||
else:
|
else:
|
||||||
poll_start = task["poll_range_start"]
|
poll_start = task["poll_range_start"]
|
||||||
document_generator = self.connector.poll_source(
|
document_generator = self.connector.poll_source(poll_start.timestamp(), datetime.now(timezone.utc).timestamp())
|
||||||
poll_start.timestamp(), datetime.now(timezone.utc).timestamp()
|
|
||||||
)
|
|
||||||
begin_info = f"from {poll_start}"
|
begin_info = f"from {poll_start}"
|
||||||
|
|
||||||
logging.info(f"[Dropbox] Connect to Dropbox {begin_info}")
|
logging.info(f"[Dropbox] Connect to Dropbox {begin_info}")
|
||||||
|
|
@ -511,9 +500,7 @@ class Jira(SyncBase):
|
||||||
)
|
)
|
||||||
for document, failure, next_checkpoint in generator:
|
for document, failure, next_checkpoint in generator:
|
||||||
if failure is not None:
|
if failure is not None:
|
||||||
logging.warning(
|
logging.warning(f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}")
|
||||||
f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}"
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
if document is not None:
|
if document is not None:
|
||||||
pending_docs.append(document)
|
pending_docs.append(document)
|
||||||
|
|
@ -568,15 +555,12 @@ class WebDAV(SyncBase):
|
||||||
SOURCE_NAME: str = FileSource.WEBDAV
|
SOURCE_NAME: str = FileSource.WEBDAV
|
||||||
|
|
||||||
async def _generate(self, task: dict):
|
async def _generate(self, task: dict):
|
||||||
self.connector = WebDAVConnector(
|
self.connector = WebDAVConnector(base_url=self.conf["base_url"], remote_path=self.conf.get("remote_path", "/"))
|
||||||
base_url=self.conf["base_url"],
|
|
||||||
remote_path=self.conf.get("remote_path", "/")
|
|
||||||
)
|
|
||||||
self.connector.load_credentials(self.conf["credentials"])
|
self.connector.load_credentials(self.conf["credentials"])
|
||||||
|
|
||||||
logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}")
|
logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}")
|
||||||
|
|
||||||
if task["reindex"]=="1" or not task["poll_range_start"]:
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||||
logging.info("Using load_from_state (full sync)")
|
logging.info("Using load_from_state (full sync)")
|
||||||
document_batch_generator = self.connector.load_from_state()
|
document_batch_generator = self.connector.load_from_state()
|
||||||
begin_info = "totally"
|
begin_info = "totally"
|
||||||
|
|
@ -586,23 +570,17 @@ class WebDAV(SyncBase):
|
||||||
logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})")
|
logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})")
|
||||||
document_batch_generator = self.connector.poll_source(start_ts, end_ts)
|
document_batch_generator = self.connector.poll_source(start_ts, end_ts)
|
||||||
begin_info = "from {}".format(task["poll_range_start"])
|
begin_info = "from {}".format(task["poll_range_start"])
|
||||||
|
|
||||||
logging.info("Connect to WebDAV: {}(path: {}) {}".format(
|
logging.info("Connect to WebDAV: {}(path: {}) {}".format(self.conf["base_url"], self.conf.get("remote_path", "/"), begin_info))
|
||||||
self.conf["base_url"],
|
|
||||||
self.conf.get("remote_path", "/"),
|
|
||||||
begin_info
|
|
||||||
))
|
|
||||||
return document_batch_generator
|
return document_batch_generator
|
||||||
|
|
||||||
|
|
||||||
class Moodle(SyncBase):
|
class Moodle(SyncBase):
|
||||||
SOURCE_NAME: str = FileSource.MOODLE
|
SOURCE_NAME: str = FileSource.MOODLE
|
||||||
|
|
||||||
async def _generate(self, task: dict):
|
async def _generate(self, task: dict):
|
||||||
self.connector = MoodleConnector(
|
self.connector = MoodleConnector(moodle_url=self.conf["moodle_url"], batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE))
|
||||||
moodle_url=self.conf["moodle_url"],
|
|
||||||
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
||||||
)
|
|
||||||
|
|
||||||
self.connector.load_credentials(self.conf["credentials"])
|
self.connector.load_credentials(self.conf["credentials"])
|
||||||
|
|
||||||
# Determine the time range for synchronization based on reindex or poll_range_start
|
# Determine the time range for synchronization based on reindex or poll_range_start
|
||||||
|
|
@ -615,10 +593,7 @@ class Moodle(SyncBase):
|
||||||
document_generator = self.connector.load_from_state()
|
document_generator = self.connector.load_from_state()
|
||||||
begin_info = "totally"
|
begin_info = "totally"
|
||||||
else:
|
else:
|
||||||
document_generator = self.connector.poll_source(
|
document_generator = self.connector.poll_source(poll_start.timestamp(), datetime.now(timezone.utc).timestamp())
|
||||||
poll_start.timestamp(),
|
|
||||||
datetime.now(timezone.utc).timestamp()
|
|
||||||
)
|
|
||||||
begin_info = "from {}".format(poll_start)
|
begin_info = "from {}".format(poll_start)
|
||||||
|
|
||||||
logging.info("Connect to Moodle: {} {}".format(self.conf["moodle_url"], begin_info))
|
logging.info("Connect to Moodle: {} {}".format(self.conf["moodle_url"], begin_info))
|
||||||
|
|
@ -714,4 +689,4 @@ async def main():
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
faulthandler.enable()
|
faulthandler.enable()
|
||||||
init_root_logger(CONSUMER_NAME)
|
init_root_logger(CONSUMER_NAME)
|
||||||
asyncio.run(main)
|
asyncio.run(main())
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue