From 057fa0057a73aaf39078b22c739232f1e6d648af Mon Sep 17 00:00:00 2001 From: yongtenglei Date: Tue, 2 Dec 2025 14:33:40 +0800 Subject: [PATCH] chunk --- api/apps/chunk_app.py | 152 +++++++++++++++++++++++------------------- 1 file changed, 83 insertions(+), 69 deletions(-) diff --git a/api/apps/chunk_app.py b/api/apps/chunk_app.py index d5d928342..c2b384224 100644 --- a/api/apps/chunk_app.py +++ b/api/apps/chunk_app.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import asyncio import datetime import json import re @@ -147,31 +148,35 @@ async def set(): d["available_int"] = req["available_int"] try: - tenant_id = DocumentService.get_tenant_id(req["doc_id"]) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") + def _set_sync(): + tenant_id = DocumentService.get_tenant_id(req["doc_id"]) + if not tenant_id: + return get_data_error_result(message="Tenant not found!") - embd_id = DocumentService.get_embd_id(req["doc_id"]) - embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING, embd_id) + embd_id = DocumentService.get_embd_id(req["doc_id"]) + embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING, embd_id) - e, doc = DocumentService.get_by_id(req["doc_id"]) - if not e: - return get_data_error_result(message="Document not found!") + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") - if doc.parser_id == ParserType.QA: - arr = [ - t for t in re.split( - r"[\n\t]", - req["content_with_weight"]) if len(t) > 1] - q, a = rmPrefix(arr[0]), rmPrefix("\n".join(arr[1:])) - d = beAdoc(d, q, a, not any( - [rag_tokenizer.is_chinese(t) for t in q + a])) + _d = d + if doc.parser_id == ParserType.QA: + arr = [ + t for t in re.split( + r"[\n\t]", + req["content_with_weight"]) if len(t) > 1] + q, a = rmPrefix(arr[0]), rmPrefix("\n".join(arr[1:])) + _d = beAdoc(d, q, a, not any( + [rag_tokenizer.is_chinese(t) for t in q + a])) - v, c = embd_mdl.encode([doc.name, req["content_with_weight"] if not d.get("question_kwd") else "\n".join(d["question_kwd"])]) - v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1] - d["q_%d_vec" % len(v)] = v.tolist() - settings.docStoreConn.update({"id": req["chunk_id"]}, d, search.index_name(tenant_id), doc.kb_id) - return get_json_result(data=True) + v, c = embd_mdl.encode([doc.name, req["content_with_weight"] if not _d.get("question_kwd") else "\n".join(_d["question_kwd"])]) + v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1] + _d["q_%d_vec" % len(v)] = v.tolist() + settings.docStoreConn.update({"id": req["chunk_id"]}, _d, search.index_name(tenant_id), doc.kb_id) + return get_json_result(data=True) + + return await asyncio.to_thread(_set_sync) except Exception as e: return server_error_response(e) @@ -182,16 +187,19 @@ async def set(): async def switch(): req = await get_request_json() try: - e, doc = DocumentService.get_by_id(req["doc_id"]) - if not e: - return get_data_error_result(message="Document not found!") - for cid in req["chunk_ids"]: - if not settings.docStoreConn.update({"id": cid}, - {"available_int": int(req["available_int"])}, - search.index_name(DocumentService.get_tenant_id(req["doc_id"])), - doc.kb_id): - return get_data_error_result(message="Index updating failure") - return get_json_result(data=True) + def _switch_sync(): + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") + for cid in req["chunk_ids"]: + if not settings.docStoreConn.update({"id": cid}, + {"available_int": int(req["available_int"])}, + search.index_name(DocumentService.get_tenant_id(req["doc_id"])), + doc.kb_id): + return get_data_error_result(message="Index updating failure") + return get_json_result(data=True) + + return await asyncio.to_thread(_switch_sync) except Exception as e: return server_error_response(e) @@ -202,20 +210,23 @@ async def switch(): async def rm(): req = await get_request_json() try: - e, doc = DocumentService.get_by_id(req["doc_id"]) - if not e: - return get_data_error_result(message="Document not found!") - if not settings.docStoreConn.delete({"id": req["chunk_ids"]}, - search.index_name(DocumentService.get_tenant_id(req["doc_id"])), - doc.kb_id): - return get_data_error_result(message="Chunk deleting failure") - deleted_chunk_ids = req["chunk_ids"] - chunk_number = len(deleted_chunk_ids) - DocumentService.decrement_chunk_num(doc.id, doc.kb_id, 1, chunk_number, 0) - for cid in deleted_chunk_ids: - if settings.STORAGE_IMPL.obj_exist(doc.kb_id, cid): - settings.STORAGE_IMPL.rm(doc.kb_id, cid) - return get_json_result(data=True) + def _rm_sync(): + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") + if not settings.docStoreConn.delete({"id": req["chunk_ids"]}, + search.index_name(DocumentService.get_tenant_id(req["doc_id"])), + doc.kb_id): + return get_data_error_result(message="Chunk deleting failure") + deleted_chunk_ids = req["chunk_ids"] + chunk_number = len(deleted_chunk_ids) + DocumentService.decrement_chunk_num(doc.id, doc.kb_id, 1, chunk_number, 0) + for cid in deleted_chunk_ids: + if settings.STORAGE_IMPL.obj_exist(doc.kb_id, cid): + settings.STORAGE_IMPL.rm(doc.kb_id, cid) + return get_json_result(data=True) + + return await asyncio.to_thread(_rm_sync) except Exception as e: return server_error_response(e) @@ -245,35 +256,38 @@ async def create(): d["tag_feas"] = req["tag_feas"] try: - e, doc = DocumentService.get_by_id(req["doc_id"]) - if not e: - return get_data_error_result(message="Document not found!") - d["kb_id"] = [doc.kb_id] - d["docnm_kwd"] = doc.name - d["title_tks"] = rag_tokenizer.tokenize(doc.name) - d["doc_id"] = doc.id + def _create_sync(): + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") + d["kb_id"] = [doc.kb_id] + d["docnm_kwd"] = doc.name + d["title_tks"] = rag_tokenizer.tokenize(doc.name) + d["doc_id"] = doc.id - tenant_id = DocumentService.get_tenant_id(req["doc_id"]) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") + tenant_id = DocumentService.get_tenant_id(req["doc_id"]) + if not tenant_id: + return get_data_error_result(message="Tenant not found!") - e, kb = KnowledgebaseService.get_by_id(doc.kb_id) - if not e: - return get_data_error_result(message="Knowledgebase not found!") - if kb.pagerank: - d[PAGERANK_FLD] = kb.pagerank + e, kb = KnowledgebaseService.get_by_id(doc.kb_id) + if not e: + return get_data_error_result(message="Knowledgebase not found!") + if kb.pagerank: + d[PAGERANK_FLD] = kb.pagerank - embd_id = DocumentService.get_embd_id(req["doc_id"]) - embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING.value, embd_id) + embd_id = DocumentService.get_embd_id(req["doc_id"]) + embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING.value, embd_id) - v, c = embd_mdl.encode([doc.name, req["content_with_weight"] if not d["question_kwd"] else "\n".join(d["question_kwd"])]) - v = 0.1 * v[0] + 0.9 * v[1] - d["q_%d_vec" % len(v)] = v.tolist() - settings.docStoreConn.insert([d], search.index_name(tenant_id), doc.kb_id) + v, c = embd_mdl.encode([doc.name, req["content_with_weight"] if not d["question_kwd"] else "\n".join(d["question_kwd"])]) + v = 0.1 * v[0] + 0.9 * v[1] + d["q_%d_vec" % len(v)] = v.tolist() + settings.docStoreConn.insert([d], search.index_name(tenant_id), doc.kb_id) - DocumentService.increment_chunk_num( - doc.id, doc.kb_id, c, 1, 0) - return get_json_result(data={"chunk_id": chunck_id}) + DocumentService.increment_chunk_num( + doc.id, doc.kb_id, c, 1, 0) + return get_json_result(data={"chunk_id": chunck_id}) + + return await asyncio.to_thread(_create_sync) except Exception as e: return server_error_response(e)