From 080a29912cdd9faa1f845b4c84657c4d7e3acf99 Mon Sep 17 00:00:00 2001 From: kevinhu Date: Mon, 18 Dec 2023 16:51:55 +0800 Subject: [PATCH] add field progress msg into docinfo; add file processing procedure --- docker/docker-compose.yml | 18 +- .../src/m20220101_000001_create_table.rs | 4 +- python/conf/sys.cnf | 7 +- python/nlp/huchunk.py | 41 +++++ python/svr/parse_user_docs.py | 171 ++++++++++++++++++ python/util/__init__.py | 19 ++ python/util/config.py | 1 - python/util/db_conn.py | 23 ++- python/util/es_conn.py | 2 +- src/api/doc_info.rs | 3 +- src/entity/doc_info.rs | 3 +- src/service/doc_info.rs | 4 +- 12 files changed, 275 insertions(+), 21 deletions(-) create mode 100644 python/svr/parse_user_docs.py diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bdc0f472..249e5ab55 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,7 +1,7 @@ version: '2.2' services: es01: - container_name: docass-es-01 + container_name: docgpt-es-01 image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION} volumes: - esdata01:/usr/share/elasticsearch/data @@ -20,14 +20,14 @@ services: soft: -1 hard: -1 networks: - - docass + - docgpt restart: always kibana: depends_on: - es01 image: docker.elastic.co/kibana/kibana:${STACK_VERSION} - container_name: docass-kibana + container_name: docgpt-kibana volumes: - kibanadata:/usr/share/kibana/data ports: @@ -37,21 +37,21 @@ services: - ELASTICSEARCH_HOSTS=http://es01:9200 mem_limit: ${MEM_LIMIT} networks: - - docass + - docgpt postgres: image: postgres - container_name: docass-postgres + container_name: docgpt-postgres environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - POSTGRES_DB=${POSTGRES_DB} ports: - - 5455:5455 + - 5455:5432 volumes: - - pg_data:/usr/share/elasticsearch/data + - pg_data:/var/lib/postgresql/data networks: - - docass + - docgpt restart: always @@ -64,5 +64,5 @@ volumes: driver: local networks: - docass: + docgpt: driver: bridge diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index 928a1e611..e78873723 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -134,6 +134,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(DocInfo::Size).big_integer().not_null()) .col(ColumnDef::new(DocInfo::Type).string().not_null()).comment("doc|folder") .col(ColumnDef::new(DocInfo::KbProgress).float().default(0)) + .col(ColumnDef::new(DocInfo::KbProgressMsg).string().default("")) .col(ColumnDef::new(DocInfo::CreatedAt).date().not_null()) .col(ColumnDef::new(DocInfo::UpdatedAt).date().not_null()) .col(ColumnDef::new(DocInfo::IsDeleted).boolean().default(false)) @@ -285,6 +286,7 @@ enum DocInfo { Size, Type, KbProgress, + KbProgressMsg, CreatedAt, UpdatedAt, IsDeleted, @@ -300,4 +302,4 @@ enum DialogInfo { CreatedAt, UpdatedAt, IsDeleted, -} \ No newline at end of file +} diff --git a/python/conf/sys.cnf b/python/conf/sys.cnf index fc0d64c41..b8d3268dd 100755 --- a/python/conf/sys.cnf +++ b/python/conf/sys.cnf @@ -1,8 +1,7 @@ -[online] +[infiniflow] es=127.0.0.1:9200 -idx_nm=toxic pgdb_usr=root -pgdb_pwd=infiniflow_docass +pgdb_pwd=infiniflow_docgpt pgdb_host=127.0.0.1 -pgdb_port=5432 +pgdb_port=5455 diff --git a/python/nlp/huchunk.py b/python/nlp/huchunk.py index 619640227..6164375d9 100644 --- a/python/nlp/huchunk.py +++ b/python/nlp/huchunk.py @@ -359,6 +359,47 @@ class ExcelChunker(HuChunker): return flds +class PptChunker(HuChunker): + + @dataclass + class Fields: + text_chunks: List = None + table_chunks: List = None + + def __init__(self): + super().__init__() + + def __call__(self, fnm): + from pptx import Presentation + ppt = Presentation(fnm) + flds = self.Fields() + for slide in ppt.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + flds.text_chunks.append((shape.text, None)) + flds.table_chunks = [] + return flds + + +class TextChunker(HuChunker): + + @dataclass + class Fields: + text_chunks: List = None + table_chunks: List = None + + def __init__(self): + super().__init__() + + def __call__(self, fnm): + flds = self.Fields() + with open(fnm, "r") as f: + txt = f.read() + flds.text_chunks = self.naive_text_chunk(txt) + flds.table_chunks = [] + return flds + + if __name__ == "__main__": import sys sys.path.append(os.path.dirname(__file__) + "/../") diff --git a/python/svr/parse_user_docs.py b/python/svr/parse_user_docs.py new file mode 100644 index 000000000..9ff14c40e --- /dev/null +++ b/python/svr/parse_user_docs.py @@ -0,0 +1,171 @@ +import json, re, sys, os, hashlib, copy, glob, util, time, random +from util.es_conn import HuEs, Postgres +from util import rmSpace, findMaxDt +from FlagEmbedding import FlagModel +from nlp import huchunk, huqie +import base64, hashlib +from io import BytesIO +from elasticsearch_dsl import Q +from parser import ( + PdfParser, + DocxParser, + ExcelParser +) +from nlp.huchunk import ( + PdfChunker, + DocxChunker, + ExcelChunker, + PptChunker, + TextChunker +) + +ES = HuEs("infiniflow") +BATCH_SIZE = 64 +PG = Postgres("infiniflow", "docgpt") + +PDF = PdfChunker(PdfParser()) +DOC = DocxChunker(DocxParser()) +EXC = ExcelChunker(ExcelParser()) +PPT = PptChunker() + + +def chuck_doc(name): + name = os.path.split(name)[-1].lower().split(".")[-1] + if name.find("pdf") >= 0: return PDF(name) + if name.find("doc") >= 0: return DOC(name) + if name.find("xlsx") >= 0: return EXC(name) + if name.find("ppt") >= 0: return PDF(name) + if name.find("pdf") >= 0: return PPT(name) + + if re.match(r"(txt|csv)", name): return TextChunker(name) + + +def collect(comm, mod, tm): + sql = f""" + select + did, + uid, + doc_name, + location, + updated_at + from docinfo + where + updated_at >= '{tm}' + and kb_progress = 0 + and type = 'doc' + and MOD(uid, {comm}) = {mod} + order by updated_at asc + limit 1000 + """ + df = PG.select(sql) + df = df.fillna("") + mtm = str(df["updated_at"].max())[:19] + print("TOTAL:", len(df), "To: ", mtm) + return df, mtm + + +def set_progress(did, prog, msg): + sql = f""" + update docinfo set kb_progress={prog}, kb_progress_msg='{msg}' where did={did} + """ + PG.update(sql) + + +def build(row): + if row["size"] > 256000000: + set_progress(row["did"], -1, "File size exceeds( <= 256Mb )") + return [] + doc = { + "doc_id": row["did"], + "title_tks": huqie.qie(os.path.split(row["location"])[-1]), + "updated_at": row["updated_at"] + } + random.seed(time.time()) + set_progress(row["did"], random.randint(0, 20)/100., "Finished preparing! Start to slice file!") + obj = chuck_doc(row["location"]) + if not obj: + set_progress(row["did"], -1, "Unsuported file type.") + return [] + + set_progress(row["did"], random.randint(20, 60)/100.) + + output_buffer = BytesIO() + docs = [] + md5 = hashlib.md5() + for txt, img in obj.text_chunks: + d = copy.deepcopy(doc) + md5.update((txt + str(d["doc_id"])).encode("utf-8")) + d["_id"] = md5.hexdigest() + d["content_ltks"] = huqie.qie(txt) + d["docnm_kwd"] = rmSpace(d["docnm_tks"]) + if not img: + docs.append(d) + continue + img.save(output_buffer, format='JPEG') + d["img_bin"] = base64.b64encode(output_buffer.getvalue()) + docs.append(d) + + for arr, img in obj.table_chunks: + for i, txt in enumerate(arr): + d = copy.deepcopy(doc) + d["content_ltks"] = huqie.qie(txt) + md5.update((txt + str(d["doc_id"])).encode("utf-8")) + d["_id"] = md5.hexdigest() + if not img: + docs.append(d) + continue + img.save(output_buffer, format='JPEG') + d["img_bin"] = base64.b64encode(output_buffer.getvalue()) + docs.append(d) + set_progress(row["did"], random.randint(60, 70)/100., "Finished slicing. Start to embedding the content.") + + return docs + + +def index_name(uid):return f"docgpt_{uid}" + +def init_kb(row): + idxnm = index_name(row["uid"]) + if ES.indexExist(idxnm): return + return ES.createIdx(idxnm, json.load(open("res/mapping.json", "r"))) + + +model = None +def embedding(docs): + global model + tts = model.encode([rmSpace(d["title_tks"]) for d in docs]) + cnts = model.encode([rmSpace(d["content_ltks"]) for d in docs]) + vects = 0.1 * tts + 0.9 * cnts + assert len(vects) == len(docs) + for i,d in enumerate(docs):d["q_vec"] = vects[i].tolist() + for d in docs: + set_progress(d["doc_id"], random.randint(70, 95)/100., + "Finished embedding! Start to build index!") + + +def main(comm, mod): + tm_fnm = f"res/{comm}-{mod}.tm" + tmf = open(tm_fnm, "a+") + tm = findMaxDt(tm_fnm) + rows, tm = collect(comm, mod, tm) + for r in rows: + if r["is_deleted"]: + ES.deleteByQuery(Q("term", dock_id=r["did"]), index_name(r["uid"])) + continue + + cks = build(r) + ## TODO: exception handler + ## set_progress(r["did"], -1, "ERROR: ") + embedding(cks) + if cks: init_kb(r) + ES.bulk(cks, index_name(r["uid"])) + tmf.write(str(r["updated_at"]) + "\n") + tmf.close() + + +if __name__ == "__main__": + from mpi4py import MPI + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + main(comm, rank) + diff --git a/python/util/__init__.py b/python/util/__init__.py index e69de29bb..c7080b766 100644 --- a/python/util/__init__.py +++ b/python/util/__init__.py @@ -0,0 +1,19 @@ +import re + +def rmSpace(txt): + txt = re.sub(r"([^a-z0-9.,]) +([^ ])", r"\1\2", txt) + return re.sub(r"([^ ]) +([^a-z0-9.,])", r"\1\2", txt) + +def findMaxDt(fnm): + m = "1970-01-01 00:00:00" + try: + with open(fnm, "r") as f: + while True: + l = f.readline() + if not l:break + l = l.strip("\n") + if l == 'nan':continue + if l > m:m = l + except Exception as e: + print("WARNING: can't find "+ fnm) + return m diff --git a/python/util/config.py b/python/util/config.py index 868855d12..78429e570 100755 --- a/python/util/config.py +++ b/python/util/config.py @@ -9,7 +9,6 @@ if not os.path.exists(__fnm): __fnm = "./sys.cnf" CF.read(__fnm) - class Config: def __init__(self, env): self.env = env diff --git a/python/util/db_conn.py b/python/util/db_conn.py index b67e13e92..ca9e4baed 100644 --- a/python/util/db_conn.py +++ b/python/util/db_conn.py @@ -3,7 +3,7 @@ import time from util import config import pandas as pd -class Postgre(object): +class Postgres(object): def __init__(self, env, dbnm): self.config = config.init(env) self.conn = None @@ -36,9 +36,28 @@ class Postgre(object): try: return pd.read_sql(sql, self.conn) except Exception as e: - logging.error(f"Fail to exec {sql}l "+str(e)) + logging.error(f"Fail to exec {sql} "+str(e)) self.__open__() time.sleep(1) return pd.DataFrame() + + def update(self, sql): + for _ in range(10): + try: + cur = self.conn.cursor() + cur.execute(sql) + updated_rows = cur.rowcount + conn.commit() + cur.close() + return updated_rows + except Exception as e: + logging.error(f"Fail to exec {sql} "+str(e)) + self.__open__() + time.sleep(1) + return 0 + +if __name__ == "__main__": + Postgres("infiniflow", "docgpt") + diff --git a/python/util/es_conn.py b/python/util/es_conn.py index 3e41ab502..ea917a723 100755 --- a/python/util/es_conn.py +++ b/python/util/es_conn.py @@ -31,7 +31,7 @@ class HuEs: self.info = {} self.config = config.init(env) self.conn() - self.idxnm = self.config.get("idx_nm") + self.idxnm = self.config.get("idx_nm","") if not self.es.ping(): raise Exception("Can't connect to ES cluster") diff --git a/src/api/doc_info.rs b/src/api/doc_info.rs index da6079c8c..e2b95face 100644 --- a/src/api/doc_info.rs +++ b/src/api/doc_info.rs @@ -85,6 +85,7 @@ async fn upload(mut payload: Multipart, filename: web::Data, did: web::D size, kb_infos: Vec::new(), kb_progress: 0.0, + kb_progress_msg: "".to_string(), location: "".to_string(), r#type: "".to_string(), created_at: Local::now().date_naive(), @@ -124,4 +125,4 @@ async fn mv(params: web::Json, data: web::Data) -> Result, @@ -57,4 +58,4 @@ impl Related for Entity { } } -impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/service/doc_info.rs b/src/service/doc_info.rs index f05685e35..9d7595a08 100644 --- a/src/service/doc_info.rs +++ b/src/service/doc_info.rs @@ -104,6 +104,7 @@ impl Mutation { size: Set(form_data.size.to_owned()), r#type: Set(form_data.r#type.to_owned()), kb_progress: Set(form_data.kb_progress.to_owned()), + kb_progress_msg: Set(form_data.kb_progress_msg.to_owned()), location: Set(form_data.location.to_owned()), created_at: Set(Local::now().date_naive()), updated_at: Set(Local::now().date_naive()), @@ -130,6 +131,7 @@ impl Mutation { size: Set(form_data.size.to_owned()), r#type: Set(form_data.r#type.to_owned()), kb_progress: Set(form_data.kb_progress.to_owned()), + kb_progress_msg: Set(form_data.kb_progress_msg.to_owned()), location: Set(form_data.location.to_owned()), created_at: Default::default(), updated_at: Set(Local::now().date_naive()), @@ -151,4 +153,4 @@ impl Mutation { pub async fn delete_all_doc_infos(db: &DbConn) -> Result { Entity::delete_many().exec(db).await } -} \ No newline at end of file +}