From d876ba893ea3c5b2507f440cef15bdd2fef26c1e Mon Sep 17 00:00:00 2001 From: buua436 Date: Wed, 12 Nov 2025 19:22:57 +0800 Subject: [PATCH] Feat:support API for generating knowledge graph and raptor --- api/apps/sdk/dataset.py | 159 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 156 insertions(+), 3 deletions(-) diff --git a/api/apps/sdk/dataset.py b/api/apps/sdk/dataset.py index 8a315ce69..de5434de7 100644 --- a/api/apps/sdk/dataset.py +++ b/api/apps/sdk/dataset.py @@ -21,10 +21,11 @@ import json from flask import request from peewee import OperationalError from api.db.db_models import File -from api.db.services.document_service import DocumentService +from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService +from api.db.services.task_service import GRAPH_RAPTOR_FAKE_DOC_ID, TaskService from api.db.services.user_service import TenantService from common.constants import RetCode, FileSource, StatusEnum from api.utils.api_utils import ( @@ -118,7 +119,6 @@ def create(tenant_id): req, err = validate_and_parse_json_request(request, CreateDatasetReq) if err is not None: return get_error_argument_result(err) - req = KnowledgebaseService.create_with_name( name = req.pop("name", None), tenant_id = tenant_id, @@ -144,7 +144,6 @@ def create(tenant_id): ok, k = KnowledgebaseService.get_by_id(req["id"]) if not ok: return get_error_data_result(message="Dataset created failed") - response_data = remap_dictionary_keys(k.to_dict()) return get_result(data=response_data) except Exception as e: @@ -532,3 +531,157 @@ def delete_knowledge_graph(tenant_id, dataset_id): search.index_name(kb.tenant_id), dataset_id) return get_result(data=True) + + +@manager.route("/datasets//run_graphrag", methods=["POST"]) # noqa: F821 +@token_required +def run_graphrag(tenant_id,dataset_id): + if not dataset_id: + return get_error_data_result(message='Lack of "Dataset ID"') + if not KnowledgebaseService.accessible(dataset_id, tenant_id): + return get_result( + data=False, + message='No authorization.', + code=RetCode.AUTHENTICATION_ERROR + ) + + ok, kb = KnowledgebaseService.get_by_id(dataset_id) + if not ok: + return get_error_data_result(message="Invalid Dataset ID") + + task_id = kb.graphrag_task_id + if task_id: + ok, task = TaskService.get_by_id(task_id) + if not ok: + logging.warning(f"A valid GraphRAG task id is expected for Dataset {dataset_id}") + + if task and task.progress not in [-1, 1]: + return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A Graph Task is already running.") + + documents, _ = DocumentService.get_by_kb_id( + kb_id=dataset_id, + page_number=0, + items_per_page=0, + orderby="create_time", + desc=False, + keywords="", + run_status=[], + types=[], + suffix=[], + ) + if not documents: + return get_error_data_result(message=f"No documents in Dataset {dataset_id}") + + sample_document = documents[0] + document_ids = [document["id"] for document in documents] + + task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids)) + + if not KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id}): + logging.warning(f"Cannot save graphrag_task_id for Dataset {dataset_id}") + + return get_result(data={"graphrag_task_id": task_id}) + + +@manager.route("/datasets//trace_graphrag", methods=["GET"]) # noqa: F821 +@token_required +def trace_graphrag(tenant_id,dataset_id): + if not dataset_id: + return get_error_data_result(message='Lack of "Dataset ID"') + if not KnowledgebaseService.accessible(dataset_id, tenant_id): + return get_result( + data=False, + message='No authorization.', + code=RetCode.AUTHENTICATION_ERROR + ) + + ok, kb = KnowledgebaseService.get_by_id(dataset_id) + if not ok: + return get_error_data_result(message="Invalid Dataset ID") + + task_id = kb.graphrag_task_id + if not task_id: + return get_result(data={}) + + ok, task = TaskService.get_by_id(task_id) + if not ok: + return get_result(data={}) + + return get_result(data=task.to_dict()) + + +@manager.route("/datasets//run_raptor", methods=["POST"]) # noqa: F821 +@token_required +def run_raptor(tenant_id,dataset_id): + if not dataset_id: + return get_error_data_result(message='Lack of "Dataset ID"') + if not KnowledgebaseService.accessible(dataset_id, tenant_id): + return get_result( + data=False, + message='No authorization.', + code=RetCode.AUTHENTICATION_ERROR + ) + + ok, kb = KnowledgebaseService.get_by_id(dataset_id) + if not ok: + return get_error_data_result(message="Invalid Dataset ID") + + task_id = kb.raptor_task_id + if task_id: + ok, task = TaskService.get_by_id(task_id) + if not ok: + logging.warning(f"A valid RAPTOR task id is expected for Dataset {dataset_id}") + + if task and task.progress not in [-1, 1]: + return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A RAPTOR Task is already running.") + + documents, _ = DocumentService.get_by_kb_id( + kb_id=dataset_id, + page_number=0, + items_per_page=0, + orderby="create_time", + desc=False, + keywords="", + run_status=[], + types=[], + suffix=[], + ) + if not documents: + return get_error_data_result(message=f"No documents in Dataset {dataset_id}") + + sample_document = documents[0] + document_ids = [document["id"] for document in documents] + + task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids)) + + if not KnowledgebaseService.update_by_id(kb.id, {"raptor_task_id": task_id}): + logging.warning(f"Cannot save raptor_task_id for Dataset {dataset_id}") + + return get_result(data={"raptor_task_id": task_id}) + + +@manager.route("/datasets//trace_raptor", methods=["GET"]) # noqa: F821 +@token_required +def trace_raptor(tenant_id,dataset_id): + if not dataset_id: + return get_error_data_result(message='Lack of "Dataset ID"') + + if not KnowledgebaseService.accessible(dataset_id, tenant_id): + return get_result( + data=False, + message='No authorization.', + code=RetCode.AUTHENTICATION_ERROR + ) + ok, kb = KnowledgebaseService.get_by_id(dataset_id) + if not ok: + return get_error_data_result(message="Invalid Dataset ID") + + task_id = kb.raptor_task_id + if not task_id: + return get_result(data={}) + + ok, task = TaskService.get_by_id(task_id) + if not ok: + return get_error_data_result(message="RAPTOR Task Not Found or Error Occurred") + + return get_result(data=task.to_dict()) \ No newline at end of file