diff --git a/api/apps/document_app.py b/api/apps/document_app.py index ba52bd61c..1a987cd39 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -27,6 +27,7 @@ from api.db import VALID_FILE_TYPES, FileType from api.db.db_models import Task from api.db.services import duplicate_name from api.db.services.document_service import DocumentService, doc_upload_and_parse +from api.db.services.dialog_service import meta_filter, convert_conditions from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -246,9 +247,19 @@ async def list_docs(): return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}") suffix = req.get("suffix", []) + metadata_condition = req.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_data_error_result(message="metadata_condition must be an object.") + + doc_ids_filter = None + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([kb_id]) + doc_ids_filter = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) + if metadata_condition.get("conditions") and not doc_ids_filter: + return get_json_result(data={"total": 0, "docs": []}) try: - docs, tol = DocumentService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix) + docs, tol = DocumentService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix, doc_ids_filter) if create_time_from or create_time_to: filtered_docs = [] @@ -319,6 +330,87 @@ async def doc_infos(): return get_json_result(data=list(docs.dicts())) +@manager.route("/metadata/summary", methods=["POST"]) # noqa: F821 +@login_required +async def metadata_summary(): + req = await get_request_json() + kb_id = req.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR) + + tenants = UserTenantService.query(user_id=current_user.id) + for tenant in tenants: + if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id): + break + else: + return get_json_result(data=False, message="Only owner of knowledgebase authorized for this operation.", code=RetCode.OPERATING_ERROR) + + try: + summary = DocumentService.get_metadata_summary(kb_id) + return get_json_result(data={"summary": summary}) + except Exception as e: + return server_error_response(e) + + +@manager.route("/metadata/update", methods=["POST"]) # noqa: F821 +@login_required +async def metadata_update(): + req = await get_request_json() + kb_id = req.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR) + + tenants = UserTenantService.query(user_id=current_user.id) + for tenant in tenants: + if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id): + break + else: + return get_json_result(data=False, message="Only owner of knowledgebase authorized for this operation.", code=RetCode.OPERATING_ERROR) + + selector = req.get("selector", {}) or {} + updates = req.get("updates", []) or [] + deletes = req.get("deletes", []) or [] + + if not isinstance(selector, dict): + return get_json_result(data=False, message="selector must be an object.", code=RetCode.ARGUMENT_ERROR) + if not isinstance(updates, list) or not isinstance(deletes, list): + return get_json_result(data=False, message="updates and deletes must be lists.", code=RetCode.ARGUMENT_ERROR) + + metadata_condition = selector.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_json_result(data=False, message="metadata_condition must be an object.", code=RetCode.ARGUMENT_ERROR) + + document_ids = selector.get("document_ids", []) or [] + if document_ids and not isinstance(document_ids, list): + return get_json_result(data=False, message="document_ids must be a list.", code=RetCode.ARGUMENT_ERROR) + + for upd in updates: + if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd: + return get_json_result(data=False, message="Each update requires key and value.", code=RetCode.ARGUMENT_ERROR) + for d in deletes: + if not isinstance(d, dict) or not d.get("key"): + return get_json_result(data=False, message="Each delete requires key.", code=RetCode.ARGUMENT_ERROR) + + kb_doc_ids = KnowledgebaseService.list_documents_by_ids([kb_id]) + target_doc_ids = set(kb_doc_ids) + if document_ids: + invalid_ids = set(document_ids) - set(kb_doc_ids) + if invalid_ids: + return get_json_result(data=False, message=f"These documents do not belong to dataset {kb_id}: {', '.join(invalid_ids)}", code=RetCode.ARGUMENT_ERROR) + target_doc_ids = set(document_ids) + + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([kb_id]) + filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) + target_doc_ids = target_doc_ids & filtered_ids + if metadata_condition.get("conditions") and not target_doc_ids: + return get_json_result(data={"updated": 0, "matched_docs": 0}) + + target_doc_ids = list(target_doc_ids) + updated = DocumentService.batch_update_metadata(kb_id, target_doc_ids, updates, deletes) + return get_json_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) + + @manager.route("/thumbnails", methods=["GET"]) # noqa: F821 # @login_required def thumbnails(): @@ -698,7 +790,10 @@ async def set_meta(): if not isinstance(meta, dict): return get_json_result(data=False, message="Only dictionary type supported.", code=RetCode.ARGUMENT_ERROR) for k, v in meta.items(): - if not isinstance(v, str) and not isinstance(v, int) and not isinstance(v, float): + if isinstance(v, list): + if not all(isinstance(i, (str, int, float)) for i in v): + return get_json_result(data=False, message=f"The type is not supported in list: {v}", code=RetCode.ARGUMENT_ERROR) + elif not isinstance(v, (str, int, float)): return get_json_result(data=False, message=f"The type is not supported: {v}", code=RetCode.ARGUMENT_ERROR) except Exception as e: return get_json_result(data=False, message=f"Json syntax error: {e}", code=RetCode.ARGUMENT_ERROR) diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py index 0019e2a42..b65a20133 100644 --- a/api/apps/sdk/doc.py +++ b/api/apps/sdk/doc.py @@ -14,6 +14,7 @@ # limitations under the License. # import datetime +import json import logging import pathlib import re @@ -551,13 +552,29 @@ def list_docs(dataset_id, tenant_id): run_status = q.getlist("run") create_time_from = int(q.get("create_time_from", 0)) create_time_to = int(q.get("create_time_to", 0)) + metadata_condition_raw = q.get("metadata_condition") + metadata_condition = {} + if metadata_condition_raw: + try: + metadata_condition = json.loads(metadata_condition_raw) + except Exception: + return get_error_data_result(message="metadata_condition must be valid JSON.") + if metadata_condition and not isinstance(metadata_condition, dict): + return get_error_data_result(message="metadata_condition must be an object.") # map run status (text or numeric) - align with API parameter run_status_text_to_numeric = {"UNSTART": "0", "RUNNING": "1", "CANCEL": "2", "DONE": "3", "FAIL": "4"} run_status_converted = [run_status_text_to_numeric.get(v, v) for v in run_status] + doc_ids_filter = None + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + doc_ids_filter = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) + if metadata_condition.get("conditions") and not doc_ids_filter: + return get_result(data={"total": 0, "docs": []}) + docs, total = DocumentService.get_list( - dataset_id, page, page_size, orderby, desc, keywords, document_id, name, suffix, run_status_converted + dataset_id, page, page_size, orderby, desc, keywords, document_id, name, suffix, run_status_converted, doc_ids_filter ) # time range filter (0 means no bound) @@ -586,6 +603,70 @@ def list_docs(dataset_id, tenant_id): return get_result(data={"total": total, "docs": output_docs}) + +@manager.route("/datasets//metadata/summary", methods=["GET"]) # noqa: F821 +@token_required +def metadata_summary(dataset_id, tenant_id): + if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): + return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") + + try: + summary = DocumentService.get_metadata_summary(dataset_id) + return get_result(data={"summary": summary}) + except Exception as e: + return server_error_response(e) + + +@manager.route("/datasets//metadata/update", methods=["POST"]) # noqa: F821 +@token_required +async def metadata_batch_update(dataset_id, tenant_id): + if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): + return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") + + req = await get_request_json() + selector = req.get("selector", {}) or {} + updates = req.get("updates", []) or [] + deletes = req.get("deletes", []) or [] + + if not isinstance(selector, dict): + return get_error_data_result(message="selector must be an object.") + if not isinstance(updates, list) or not isinstance(deletes, list): + return get_error_data_result(message="updates and deletes must be lists.") + + metadata_condition = selector.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_error_data_result(message="metadata_condition must be an object.") + + document_ids = selector.get("document_ids", []) or [] + if document_ids and not isinstance(document_ids, list): + return get_error_data_result(message="document_ids must be a list.") + + for upd in updates: + if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd: + return get_error_data_result(message="Each update requires key and value.") + for d in deletes: + if not isinstance(d, dict) or not d.get("key"): + return get_error_data_result(message="Each delete requires key.") + + kb_doc_ids = KnowledgebaseService.list_documents_by_ids([dataset_id]) + target_doc_ids = set(kb_doc_ids) + if document_ids: + invalid_ids = set(document_ids) - set(kb_doc_ids) + if invalid_ids: + return get_error_data_result(message=f"These documents do not belong to dataset {dataset_id}: {', '.join(invalid_ids)}") + target_doc_ids = set(document_ids) + + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) + target_doc_ids = target_doc_ids & filtered_ids + if metadata_condition.get("conditions") and not target_doc_ids: + return get_result(data={"updated": 0, "matched_docs": 0}) + + target_doc_ids = list(target_doc_ids) + updated = DocumentService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes) + return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) + @manager.route("/datasets//documents", methods=["DELETE"]) # noqa: F821 @token_required async def delete(tenant_id, dataset_id): diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 43adf5d8e..6326ef5de 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -79,7 +79,7 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_list(cls, kb_id, page_number, items_per_page, - orderby, desc, keywords, id, name, suffix=None, run = None): + orderby, desc, keywords, id, name, suffix=None, run = None, doc_ids=None): fields = cls.get_cls_model_fields() docs = cls.model.select(*[*fields, UserCanvas.title]).join(File2Document, on = (File2Document.document_id == cls.model.id))\ .join(File, on = (File.id == File2Document.file_id))\ @@ -96,6 +96,8 @@ class DocumentService(CommonService): docs = docs.where( fn.LOWER(cls.model.name).contains(keywords.lower()) ) + if doc_ids: + docs = docs.where(cls.model.id.in_(doc_ids)) if suffix: docs = docs.where(cls.model.suffix.in_(suffix)) if run: @@ -123,7 +125,7 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_by_kb_id(cls, kb_id, page_number, items_per_page, - orderby, desc, keywords, run_status, types, suffix): + orderby, desc, keywords, run_status, types, suffix, doc_ids=None): fields = cls.get_cls_model_fields() if keywords: docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\ @@ -143,6 +145,8 @@ class DocumentService(CommonService): .join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\ .where(cls.model.kb_id == kb_id) + if doc_ids: + docs = docs.where(cls.model.id.in_(doc_ids)) if run_status: docs = docs.where(cls.model.run.in_(run_status)) if types: @@ -644,6 +648,13 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_meta_by_kbs(cls, kb_ids): + """ + Legacy metadata aggregator (backward-compatible). + - Does NOT expand list values and a list is kept as one string key. + Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id] + - Expects meta_fields is a dict. + Use when existing callers rely on the old list-as-string semantics. + """ fields = [ cls.model.id, cls.model.meta_fields, @@ -660,6 +671,162 @@ class DocumentService(CommonService): meta[k][v].append(doc_id) return meta + @classmethod + @DB.connection_context() + def get_flatted_meta_by_kbs(cls, kb_ids): + """ + - Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values. + - Expands list values into individual entries. + Example: {"tags": ["foo","bar"], "author": "alice"} -> + meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id] + Prefer for metadata_condition filtering and scenarios that must respect list semantics. + """ + fields = [ + cls.model.id, + cls.model.meta_fields, + ] + meta = {} + for r in cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)): + doc_id = r.id + meta_fields = r.meta_fields or {} + if isinstance(meta_fields, str): + try: + meta_fields = json.loads(meta_fields) + except Exception: + continue + if not isinstance(meta_fields, dict): + continue + for k, v in meta_fields.items(): + if k not in meta: + meta[k] = {} + values = v if isinstance(v, list) else [v] + for vv in values: + if vv is None: + continue + sv = str(vv) + if sv not in meta[k]: + meta[k][sv] = [] + meta[k][sv].append(doc_id) + return meta + + @classmethod + @DB.connection_context() + def get_metadata_summary(cls, kb_id): + fields = [cls.model.id, cls.model.meta_fields] + summary = {} + for r in cls.model.select(*fields).where(cls.model.kb_id == kb_id): + meta_fields = r.meta_fields or {} + if isinstance(meta_fields, str): + try: + meta_fields = json.loads(meta_fields) + except Exception: + continue + if not isinstance(meta_fields, dict): + continue + for k, v in meta_fields.items(): + values = v if isinstance(v, list) else [v] + for vv in values: + if not vv: + continue + sv = str(vv) + if k not in summary: + summary[k] = {} + summary[k][sv] = summary[k].get(sv, 0) + 1 + return {k: sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True) for k, v in summary.items()} + + @classmethod + @DB.connection_context() + def batch_update_metadata(cls, kb_id, doc_ids, updates=None, deletes=None): + updates = updates or [] + deletes = deletes or [] + if not doc_ids: + return 0 + + def _normalize_meta(meta): + if isinstance(meta, str): + try: + meta = json.loads(meta) + except Exception: + return {} + if not isinstance(meta, dict): + return {} + return deepcopy(meta) + + def _str_equal(a, b): + return str(a) == str(b) + + def _apply_updates(meta): + changed = False + for upd in updates: + key = upd.get("key") + if not key or key not in meta: + continue + new_value = upd.get("value") + match_value = upd.get("match", new_value) + if isinstance(meta[key], list): + replaced = False + new_list = [] + for item in meta[key]: + if match_value and _str_equal(item, match_value): + new_list.append(new_value) + replaced = True + else: + new_list.append(item) + if replaced: + meta[key] = new_list + changed = True + else: + if not match_value: + continue + if _str_equal(meta[key], match_value): + meta[key] = new_value + changed = True + return changed + + def _apply_deletes(meta): + changed = False + for d in deletes: + key = d.get("key") + if not key or key not in meta: + continue + value = d.get("value", None) + if isinstance(meta[key], list): + if value is None: + del meta[key] + changed = True + continue + new_list = [item for item in meta[key] if not _str_equal(item, value)] + if len(new_list) != len(meta[key]): + if new_list: + meta[key] = new_list + else: + del meta[key] + changed = True + else: + if value is None or _str_equal(meta[key], value): + del meta[key] + changed = True + return changed + + updated_docs = 0 + with DB.atomic(): + rows = cls.model.select(cls.model.id, cls.model.meta_fields).where( + (cls.model.id.in_(doc_ids)) & (cls.model.kb_id == kb_id) + ) + for r in rows: + meta = _normalize_meta(r.meta_fields or {}) + original_meta = deepcopy(meta) + changed = _apply_updates(meta) + changed = _apply_deletes(meta) or changed + if changed and meta != original_meta: + cls.model.update( + meta_fields=meta, + update_time=current_timestamp(), + update_date=get_format_time() + ).where(cls.model.id == r.id).execute() + updated_docs += 1 + return updated_docs + @classmethod @DB.connection_context() def update_progress(cls):