From ff16f4b3ab3696d9f531dbb411ae9b4c693af8cb Mon Sep 17 00:00:00 2001 From: yongtenglei Date: Tue, 2 Dec 2025 13:20:50 +0800 Subject: [PATCH] document --- api/apps/document_app.py | 152 +++++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 71 deletions(-) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index a56f11317..ba52bd61c 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License # +import asyncio import json import os.path import pathlib @@ -72,7 +73,7 @@ async def upload(): if not check_kb_team_permission(kb, current_user.id): return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) - err, files = FileService.upload_document(kb, file_objs, current_user.id) + err, files = await asyncio.to_thread(FileService.upload_document, kb, file_objs, current_user.id) if err: return get_json_result(data=files, message="\n".join(err), code=RetCode.SERVER_ERROR) @@ -390,7 +391,7 @@ async def rm(): if not DocumentService.accessible4deletion(doc_id, current_user.id): return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) - errors = FileService.delete_docs(doc_ids, current_user.id) + errors = await asyncio.to_thread(FileService.delete_docs, doc_ids, current_user.id) if errors: return get_json_result(data=False, message=errors, code=RetCode.SERVER_ERROR) @@ -403,44 +404,48 @@ async def rm(): @validate_request("doc_ids", "run") async def run(): req = await get_request_json() - for doc_id in req["doc_ids"]: - if not DocumentService.accessible(doc_id, current_user.id): - return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) try: - kb_table_num_map = {} - for id in req["doc_ids"]: - info = {"run": str(req["run"]), "progress": 0} - if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False): - info["progress_msg"] = "" - info["chunk_num"] = 0 - info["token_num"] = 0 + def _run_sync(): + for doc_id in req["doc_ids"]: + if not DocumentService.accessible(doc_id, current_user.id): + return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) - tenant_id = DocumentService.get_tenant_id(id) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") - e, doc = DocumentService.get_by_id(id) - if not e: - return get_data_error_result(message="Document not found!") + kb_table_num_map = {} + for id in req["doc_ids"]: + info = {"run": str(req["run"]), "progress": 0} + if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False): + info["progress_msg"] = "" + info["chunk_num"] = 0 + info["token_num"] = 0 - if str(req["run"]) == TaskStatus.CANCEL.value: - if str(doc.run) == TaskStatus.RUNNING.value: - cancel_all_task_of(id) - else: - return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status") - if all([("delete" not in req or req["delete"]), str(req["run"]) == TaskStatus.RUNNING.value, str(doc.run) == TaskStatus.DONE.value]): - DocumentService.clear_chunk_num_when_rerun(doc.id) + tenant_id = DocumentService.get_tenant_id(id) + if not tenant_id: + return get_data_error_result(message="Tenant not found!") + e, doc = DocumentService.get_by_id(id) + if not e: + return get_data_error_result(message="Document not found!") - DocumentService.update_by_id(id, info) - if req.get("delete", False): - TaskService.filter_delete([Task.doc_id == id]) - if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): - settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) + if str(req["run"]) == TaskStatus.CANCEL.value: + if str(doc.run) == TaskStatus.RUNNING.value: + cancel_all_task_of(id) + else: + return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status") + if all([("delete" not in req or req["delete"]), str(req["run"]) == TaskStatus.RUNNING.value, str(doc.run) == TaskStatus.DONE.value]): + DocumentService.clear_chunk_num_when_rerun(doc.id) - if str(req["run"]) == TaskStatus.RUNNING.value: - doc = doc.to_dict() - DocumentService.run(tenant_id, doc, kb_table_num_map) + DocumentService.update_by_id(id, info) + if req.get("delete", False): + TaskService.filter_delete([Task.doc_id == id]) + if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): + settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) - return get_json_result(data=True) + if str(req["run"]) == TaskStatus.RUNNING.value: + doc_dict = doc.to_dict() + DocumentService.run(tenant_id, doc_dict, kb_table_num_map) + + return get_json_result(data=True) + + return await asyncio.to_thread(_run_sync) except Exception as e: return server_error_response(e) @@ -450,45 +455,49 @@ async def run(): @validate_request("doc_id", "name") async def rename(): req = await get_request_json() - if not DocumentService.accessible(req["doc_id"], current_user.id): - return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) try: - e, doc = DocumentService.get_by_id(req["doc_id"]) - if not e: - return get_data_error_result(message="Document not found!") - if pathlib.Path(req["name"].lower()).suffix != pathlib.Path(doc.name.lower()).suffix: - return get_json_result(data=False, message="The extension of file can't be changed", code=RetCode.ARGUMENT_ERROR) - if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT: - return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR) + def _rename_sync(): + if not DocumentService.accessible(req["doc_id"], current_user.id): + return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) - for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id): - if d.name == req["name"]: - return get_data_error_result(message="Duplicated document name in the same knowledgebase.") + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") + if pathlib.Path(req["name"].lower()).suffix != pathlib.Path(doc.name.lower()).suffix: + return get_json_result(data=False, message="The extension of file can't be changed", code=RetCode.ARGUMENT_ERROR) + if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT: + return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR) - if not DocumentService.update_by_id(req["doc_id"], {"name": req["name"]}): - return get_data_error_result(message="Database error (Document rename)!") + for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id): + if d.name == req["name"]: + return get_data_error_result(message="Duplicated document name in the same knowledgebase.") - informs = File2DocumentService.get_by_document_id(req["doc_id"]) - if informs: - e, file = FileService.get_by_id(informs[0].file_id) - FileService.update_by_id(file.id, {"name": req["name"]}) + if not DocumentService.update_by_id(req["doc_id"], {"name": req["name"]}): + return get_data_error_result(message="Database error (Document rename)!") - tenant_id = DocumentService.get_tenant_id(req["doc_id"]) - title_tks = rag_tokenizer.tokenize(req["name"]) - es_body = { - "docnm_kwd": req["name"], - "title_tks": title_tks, - "title_sm_tks": rag_tokenizer.fine_grained_tokenize(title_tks), - } - if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): - settings.docStoreConn.update( - {"doc_id": req["doc_id"]}, - es_body, - search.index_name(tenant_id), - doc.kb_id, - ) + informs = File2DocumentService.get_by_document_id(req["doc_id"]) + if informs: + e, file = FileService.get_by_id(informs[0].file_id) + FileService.update_by_id(file.id, {"name": req["name"]}) + + tenant_id = DocumentService.get_tenant_id(req["doc_id"]) + title_tks = rag_tokenizer.tokenize(req["name"]) + es_body = { + "docnm_kwd": req["name"], + "title_tks": title_tks, + "title_sm_tks": rag_tokenizer.fine_grained_tokenize(title_tks), + } + if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id): + settings.docStoreConn.update( + {"doc_id": req["doc_id"]}, + es_body, + search.index_name(tenant_id), + doc.kb_id, + ) + return get_json_result(data=True) + + return await asyncio.to_thread(_rename_sync) - return get_json_result(data=True) except Exception as e: return server_error_response(e) @@ -502,7 +511,8 @@ async def get(doc_id): return get_data_error_result(message="Document not found!") b, n = File2DocumentService.get_storage_address(doc_id=doc_id) - response = await make_response(settings.STORAGE_IMPL.get(b, n)) + data = await asyncio.to_thread(settings.STORAGE_IMPL.get, b, n) + response = await make_response(data) ext = re.search(r"\.([^.]+)$", doc.name.lower()) ext = ext.group(1) if ext else None @@ -523,8 +533,7 @@ async def get(doc_id): async def download_attachment(attachment_id): try: ext = request.args.get("ext", "markdown") - data = settings.STORAGE_IMPL.get(current_user.id, attachment_id) - # data = settings.STORAGE_IMPL.get("eb500d50bb0411f0907561d2782adda5", attachment_id) + data = await asyncio.to_thread(settings.STORAGE_IMPL.get, current_user.id, attachment_id) response = await make_response(data) response.headers.set("Content-Type", CONTENT_TYPE_MAP.get(ext, f"application/{ext}")) @@ -596,7 +605,8 @@ async def get_image(image_id): if len(arr) != 2: return get_data_error_result(message="Image not found.") bkt, nm = image_id.split("-") - response = await make_response(settings.STORAGE_IMPL.get(bkt, nm)) + data = await asyncio.to_thread(settings.STORAGE_IMPL.get, bkt, nm) + response = await make_response(data) response.headers.set("Content-Type", "image/JPEG") return response except Exception as e: