diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 4947ff26..d906aa5c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,6 +3,7 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio +from functools import lru_cache from lightrag.utils import logger, get_pinyin_sort_key import aiofiles import shutil @@ -23,23 +24,31 @@ from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus -from lightrag.utils import generate_track_id +from lightrag.utils import ( + generate_track_id, + compute_mdhash_id, + sanitize_text_for_encoding, +) from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args -# Check docling availability at module load time -DOCLING_AVAILABLE = False -try: - import docling # noqa: F401 # type: ignore[import-not-found] - DOCLING_AVAILABLE = True -except ImportError: - if global_args.document_loading_engine == "DOCLING": - logger.warning( - "DOCLING engine requested but 'docling' package not installed. " - "Falling back to standard document processing. " - "To use DOCLING, install with: pip install lightrag-hku[api,docling]" - ) +@lru_cache(maxsize=1) +def _is_docling_available() -> bool: + """Check if docling is available (cached check). + + This function uses lru_cache to avoid repeated import attempts. + The result is cached after the first call. + + Returns: + bool: True if docling is available, False otherwise + """ + try: + import docling # noqa: F401 # type: ignore[import-not-found] + + return True + except ImportError: + return False # Function to format datetime to ISO format string with timezone information @@ -154,7 +163,7 @@ class ReprocessResponse(BaseModel): Attributes: status: Status of the reprocessing operation message: Message describing the operation result - track_id: Tracking ID for monitoring reprocessing progress + track_id: Always empty string. Reprocessed documents retain their original track_id. """ status: Literal["reprocessing_started"] = Field( @@ -162,7 +171,8 @@ class ReprocessResponse(BaseModel): ) message: str = Field(description="Human-readable message describing the operation") track_id: str = Field( - description="Tracking ID for monitoring reprocessing progress" + default="", + description="Always empty string. Reprocessed documents retain their original track_id from initial upload.", ) class Config: @@ -170,7 +180,29 @@ class ReprocessResponse(BaseModel): "example": { "status": "reprocessing_started", "message": "Reprocessing of failed documents has been initiated in background", - "track_id": "retry_20250729_170612_def456", + "track_id": "", + } + } + + +class CancelPipelineResponse(BaseModel): + """Response model for pipeline cancellation operation + + Attributes: + status: Status of the cancellation request + message: Message describing the operation result + """ + + status: Literal["cancellation_requested", "not_busy"] = Field( + description="Status of the cancellation request" + ) + message: str = Field(description="Human-readable message describing the operation") + + class Config: + json_schema_extra = { + "example": { + "status": "cancellation_requested", + "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.", } } @@ -350,6 +382,10 @@ class DeleteDocRequest(BaseModel): default=False, description="Whether to delete the corresponding file in the upload directory.", ) + delete_llm_cache: bool = Field( + default=False, + description="Whether to delete cached LLM extraction results for the documents.", + ) @field_validator("doc_ids", mode="after") @classmethod @@ -945,19 +981,82 @@ def _extract_pdf_pypdf(file_bytes: bytes, password: str = None) -> str: def _extract_docx(file_bytes: bytes) -> str: - """Extract DOCX content (synchronous). + """Extract DOCX content including tables in document order (synchronous). Args: file_bytes: DOCX file content as bytes Returns: - str: Extracted text content + str: Extracted text content with tables in their original positions. + Tables are separated from paragraphs with blank lines for clarity. """ from docx import Document # type: ignore + from docx.table import Table # type: ignore + from docx.text.paragraph import Paragraph # type: ignore docx_file = BytesIO(file_bytes) doc = Document(docx_file) - return "\n".join([paragraph.text for paragraph in doc.paragraphs]) + + def escape_cell(cell_value: str | None) -> str: + """Escape characters that would break tab-delimited layout. + + Escape order is critical: backslashes first, then tabs/newlines. + This prevents double-escaping issues. + + Args: + cell_value: The cell value to escape (can be None or str) + + Returns: + str: Escaped cell value safe for tab-delimited format + """ + if cell_value is None: + return "" + text = str(cell_value) + # CRITICAL: Escape backslash first to avoid double-escaping + return ( + text.replace("\\", "\\\\") # Must be first: \ -> \\ + .replace("\t", "\\t") # Tab -> \t (visible) + .replace("\r\n", "\\n") # Windows newline -> \n + .replace("\r", "\\n") # Mac newline -> \n + .replace("\n", "\\n") # Unix newline -> \n + ) + + content_parts = [] + in_table = False # Track if we're currently processing a table + + # Iterate through all body elements in document order + for element in doc.element.body: + # Check if element is a paragraph + if element.tag.endswith("p"): + # If coming out of a table, add blank line after table + if in_table: + content_parts.append("") # Blank line after table + in_table = False + + paragraph = Paragraph(element, doc) + text = paragraph.text + # Always append to preserve document spacing (including blank paragraphs) + content_parts.append(text) + + # Check if element is a table + elif element.tag.endswith("tbl"): + # Add blank line before table (if content exists) + if content_parts and not in_table: + content_parts.append("") # Blank line before table + + in_table = True + table = Table(element, doc) + for row in table.rows: + row_text = [] + for cell in row.cells: + cell_text = cell.text + # Escape special characters to preserve tab-delimited structure + row_text.append(escape_cell(cell_text)) + # Only add row if at least one cell has content + if any(cell for cell in row_text): + content_parts.append("\t".join(row_text)) + + return "\n".join(content_parts) def _extract_pptx(file_bytes: bytes) -> str: @@ -982,27 +1081,112 @@ def _extract_pptx(file_bytes: bytes) -> str: def _extract_xlsx(file_bytes: bytes) -> str: - """Extract XLSX content (synchronous). + """Extract XLSX content in tab-delimited format with clear sheet separation. + + This function processes Excel workbooks and converts them to a structured text format + suitable for LLM prompts and RAG systems. Each sheet is clearly delimited with + separator lines, and special characters are escaped to preserve the tab-delimited structure. + + Features: + - Each sheet is wrapped with '====================' separators for visual distinction + - Special characters (tabs, newlines, backslashes) are escaped to prevent structure corruption + - Column alignment is preserved across all rows to maintain tabular structure + - Empty rows are preserved as blank lines to maintain row structure + - Uses sheet.max_column to determine column width efficiently Args: file_bytes: XLSX file content as bytes Returns: - str: Extracted text content + str: Extracted text content with all sheets in tab-delimited format. + Format: Sheet separators, sheet name, then tab-delimited rows. + + Example output: + ==================== Sheet: Data ==================== + Name\tAge\tCity + Alice\t30\tNew York + Bob\t25\tLondon + + ==================== Sheet: Summary ==================== + Total\t2 + ==================== """ from openpyxl import load_workbook # type: ignore xlsx_file = BytesIO(file_bytes) wb = load_workbook(xlsx_file) - content = "" - for sheet in wb: - content += f"Sheet: {sheet.title}\n" + + def escape_cell(cell_value: str | int | float | None) -> str: + """Escape characters that would break tab-delimited layout. + + Escape order is critical: backslashes first, then tabs/newlines. + This prevents double-escaping issues. + + Args: + cell_value: The cell value to escape (can be None, str, int, or float) + + Returns: + str: Escaped cell value safe for tab-delimited format + """ + if cell_value is None: + return "" + text = str(cell_value) + # CRITICAL: Escape backslash first to avoid double-escaping + return ( + text.replace("\\", "\\\\") # Must be first: \ -> \\ + .replace("\t", "\\t") # Tab -> \t (visible) + .replace("\r\n", "\\n") # Windows newline -> \n + .replace("\r", "\\n") # Mac newline -> \n + .replace("\n", "\\n") # Unix newline -> \n + ) + + def escape_sheet_title(title: str) -> str: + """Escape sheet title to prevent formatting issues in separators. + + Args: + title: Original sheet title + + Returns: + str: Sanitized sheet title with tabs/newlines replaced + """ + return str(title).replace("\n", " ").replace("\t", " ").replace("\r", " ") + + content_parts: list[str] = [] + sheet_separator = "=" * 20 + + for idx, sheet in enumerate(wb): + if idx > 0: + content_parts.append("") # Blank line between sheets for readability + + # Escape sheet title to handle edge cases with special characters + safe_title = escape_sheet_title(sheet.title) + content_parts.append(f"{sheet_separator} Sheet: {safe_title} {sheet_separator}") + + # Use sheet.max_column to get the maximum column width directly + max_columns = sheet.max_column if sheet.max_column else 0 + + # Extract rows with consistent width to preserve column alignment for row in sheet.iter_rows(values_only=True): - content += ( - "\t".join(str(cell) if cell is not None else "" for cell in row) + "\n" - ) - content += "\n" - return content + row_parts = [] + + # Build row up to max_columns width + for idx in range(max_columns): + if idx < len(row): + row_parts.append(escape_cell(row[idx])) + else: + row_parts.append("") # Pad short rows + + # Check if row is completely empty + if all(part == "" for part in row_parts): + # Preserve empty rows as blank lines (maintains row structure) + content_parts.append("") + else: + # Join all columns to maintain consistent column count + content_parts.append("\t".join(row_parts)) + + # Final separator for symmetry (makes parsing easier) + content_parts.append(sheet_separator) + return "\n".join(content_parts) async def pipeline_enqueue_file( @@ -1178,36 +1362,25 @@ async def pipeline_enqueue_file( # Try DOCLING first if configured and available if ( global_args.document_loading_engine == "DOCLING" - and DOCLING_AVAILABLE + and _is_docling_available() ): content = await asyncio.to_thread( _convert_with_docling, file_path ) else: -<<<<<<< HEAD -<<<<<<< HEAD - if not pm.is_installed("pypdf2"): # type: ignore - pm.install("pypdf2") - from PyPDF2 import PdfReader # type: ignore - from io import BytesIO - - pdf_file = BytesIO(file) - reader = PdfReader(pdf_file) - for page in reader.pages: - content += page.extract_text() + "\n" -======= -======= ->>>>>>> 69a0b74c (refactor: move document deps to api group, remove dynamic imports) + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf." + ) # Use pypdf (non-blocking via to_thread) content = await asyncio.to_thread( _extract_pdf_pypdf, file, global_args.pdf_decrypt_password, ) -<<<<<<< HEAD ->>>>>>> 4b31942e (refactor: move document deps to api group, remove dynamic imports) -======= ->>>>>>> 69a0b74c (refactor: move document deps to api group, remove dynamic imports) except Exception as e: error_files = [ { @@ -1230,12 +1403,19 @@ async def pipeline_enqueue_file( # Try DOCLING first if configured and available if ( global_args.document_loading_engine == "DOCLING" - and DOCLING_AVAILABLE + and _is_docling_available() ): content = await asyncio.to_thread( _convert_with_docling, file_path ) else: + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx." + ) # Use python-docx (non-blocking via to_thread) content = await asyncio.to_thread(_extract_docx, file) except Exception as e: @@ -1260,12 +1440,19 @@ async def pipeline_enqueue_file( # Try DOCLING first if configured and available if ( global_args.document_loading_engine == "DOCLING" - and DOCLING_AVAILABLE + and _is_docling_available() ): content = await asyncio.to_thread( _convert_with_docling, file_path ) else: + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx." + ) # Use python-pptx (non-blocking via to_thread) content = await asyncio.to_thread(_extract_pptx, file) except Exception as e: @@ -1290,12 +1477,19 @@ async def pipeline_enqueue_file( # Try DOCLING first if configured and available if ( global_args.document_loading_engine == "DOCLING" - and DOCLING_AVAILABLE + and _is_docling_available() ): content = await asyncio.to_thread( _convert_with_docling, file_path ) else: + if ( + global_args.document_loading_engine == "DOCLING" + and not _is_docling_available() + ): + logger.warning( + f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl." + ) # Use openpyxl (non-blocking via to_thread) content = await asyncio.to_thread(_extract_xlsx, file) except Exception as e: @@ -1595,15 +1789,20 @@ async def background_delete_documents( doc_manager: DocumentManager, doc_ids: List[str], delete_file: bool = False, + delete_llm_cache: bool = False, ): """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_namespace_lock, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) total_docs = len(doc_ids) successful_deletions = [] @@ -1630,6 +1829,10 @@ async def background_delete_documents( ) # Use slice assignment to clear the list in place pipeline_status["history_messages"][:] = ["Starting document deletion process"] + if delete_llm_cache: + pipeline_status["history_messages"].append( + "LLM cache cleanup requested for this deletion job" + ) try: # Loop through each document ID and delete them one by one @@ -1655,7 +1858,9 @@ async def background_delete_documents( file_path = "#" try: - result = await rag.adelete_by_doc_id(doc_id) + result = await rag.adelete_by_doc_id( + doc_id, delete_llm_cache=delete_llm_cache + ) file_path = ( getattr(result, "file_path", "-") if "result" in locals() else "-" ) @@ -1897,12 +2102,14 @@ def create_document_routes( # Check if filename already exists in doc_status storage existing_doc_data = await rag.doc_status.get_doc_by_file_path(safe_filename) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File '{safe_filename}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) file_path = doc_manager.input_dir / safe_filename @@ -1966,14 +2173,30 @@ def create_document_routes( request.file_source ) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{request.file_source}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) + # Check if content already exists by computing content hash (doc_id) + sanitized_text = sanitize_text_for_encoding(request.text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + existing_doc = await rag.doc_status.get_by_id(content_doc_id) + if existing_doc: + # Content already exists, return duplicated with existing track_id + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + track_id=existing_track_id, + ) + # Generate track_id for text insertion track_id = generate_track_id("insert") @@ -2032,14 +2255,31 @@ def create_document_routes( file_source ) if existing_doc_data: - # Get document status information for error message + # Get document status and track_id from existing document status = existing_doc_data.get("status", "unknown") + # Use `or ""` to handle both missing key and None value (e.g., legacy rows without track_id) + existing_track_id = existing_doc_data.get("track_id") or "" return InsertResponse( status="duplicated", message=f"File source '{file_source}' already exists in document storage (Status: {status}).", - track_id="", + track_id=existing_track_id, ) + # Check if any content already exists by computing content hash (doc_id) + for text in request.texts: + sanitized_text = sanitize_text_for_encoding(text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + existing_doc = await rag.doc_status.get_by_id(content_doc_id) + if existing_doc: + # Content already exists, return duplicated with existing track_id + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + track_id=existing_track_id, + ) + # Generate track_id for texts insertion track_id = generate_track_id("insert") @@ -2087,12 +2327,16 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_namespace_lock, ) # Get pipeline status and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Check and set status with lock async with pipeline_status_lock: @@ -2128,6 +2372,8 @@ def create_document_routes( rag.full_docs, rag.full_entities, rag.full_relations, + rag.entity_chunks, + rag.relation_chunks, rag.entities_vdb, rag.relationships_vdb, rag.chunks_vdb, @@ -2281,13 +2527,19 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, + get_namespace_lock, get_all_update_flags_status, ) - pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Get update flags status for all namespaces - update_status = await get_all_update_flags_status() + update_status = await get_all_update_flags_status(workspace=rag.workspace) # Convert MutableBoolean objects to regular boolean values processed_update_status = {} @@ -2301,8 +2553,9 @@ def create_document_routes( processed_flags.append(bool(flag)) processed_update_status[namespace] = processed_flags - # Convert to regular dict if it's a Manager.dict - status_dict = dict(pipeline_status) + async with pipeline_status_lock: + # Convert to regular dict if it's a Manager.dict + status_dict = dict(pipeline_status) # Add processed update_status to the status dictionary status_dict["update_status"] = processed_update_status @@ -2467,21 +2720,20 @@ def create_document_routes( Delete documents and all their associated data by their IDs using background processing. Deletes specific documents and all their associated data, including their status, - text chunks, vector embeddings, and any related graph data. + text chunks, vector embeddings, and any related graph data. When requested, + cached LLM extraction responses are removed after graph deletion/rebuild completes. The deletion process runs in the background to avoid blocking the client connection. - It is disabled when llm cache for entity extraction is disabled. This operation is irreversible and will interact with the pipeline status. Args: - delete_request (DeleteDocRequest): The request containing the document IDs and delete_file options. + delete_request (DeleteDocRequest): The request containing the document IDs and deletion options. background_tasks: FastAPI BackgroundTasks for async processing Returns: DeleteDocByIdResponse: The result of the deletion operation. - status="deletion_started": The document deletion has been initiated in the background. - status="busy": The pipeline is busy with another operation. - - status="not_allowed": Operation not allowed when LLM cache for entity extraction is disabled. Raises: HTTPException: @@ -2489,27 +2741,27 @@ def create_document_routes( """ doc_ids = delete_request.doc_ids - # The rag object is initialized from the server startup args, - # so we can access its properties here. - if not rag.enable_llm_cache_for_entity_extract: - return DeleteDocByIdResponse( - status="not_allowed", - message="Operation not allowed when LLM cache for entity extraction is disabled.", - doc_id=", ".join(delete_request.doc_ids), + try: + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, ) - try: - from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) - pipeline_status = await get_namespace_data("pipeline_status") - - # Check if pipeline is busy - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Check if pipeline is busy with proper lock + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2518,6 +2770,7 @@ def create_document_routes( doc_manager, doc_ids, delete_request.delete_file, + delete_request.delete_llm_cache, ) return DeleteDocByIdResponse( @@ -2845,29 +3098,27 @@ def create_document_routes( This is useful for recovering from server crashes, network errors, LLM service outages, or other temporary failures that caused document processing to fail. - The processing happens in the background and can be monitored using the - returned track_id or by checking the pipeline status. + The processing happens in the background and can be monitored by checking the + pipeline status. The reprocessed documents retain their original track_id from + initial upload, so use their original track_id to monitor progress. Returns: - ReprocessResponse: Response with status, message, and track_id + ReprocessResponse: Response with status and message. + track_id is always empty string because reprocessed documents retain + their original track_id from initial upload. Raises: HTTPException: If an error occurs while initiating reprocessing (500). """ try: - # Generate track_id with "retry" prefix for retry operation - track_id = generate_track_id("retry") - # Start the reprocessing in the background + # Note: Reprocessed documents retain their original track_id from initial upload background_tasks.add_task(rag.apipeline_process_enqueue_documents) - logger.info( - f"Reprocessing of failed documents initiated with track_id: {track_id}" - ) + logger.info("Reprocessing of failed documents initiated") return ReprocessResponse( status="reprocessing_started", - message="Reprocessing of failed documents has been initiated in background", - track_id=track_id, + message="Reprocessing of failed documents has been initiated in background. Documents retain their original track_id.", ) except Exception as e: @@ -2875,4 +3126,67 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.post( + "/cancel_pipeline", + response_model=CancelPipelineResponse, + dependencies=[Depends(combined_auth)], + ) + async def cancel_pipeline(): + """ + Request cancellation of the currently running pipeline. + + This endpoint sets a cancellation flag in the pipeline status. The pipeline will: + 1. Check this flag at key processing points + 2. Stop processing new documents + 3. Cancel all running document processing tasks + 4. Mark all PROCESSING documents as FAILED with reason "User cancelled" + + The cancellation is graceful and ensures data consistency. Documents that have + completed processing will remain in PROCESSED status. + + Returns: + CancelPipelineResponse: Response with status and message + - status="cancellation_requested": Cancellation flag has been set + - status="not_busy": Pipeline is not currently running + + Raises: + HTTPException: If an error occurs while setting cancellation flag (500). + """ + try: + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + return CancelPipelineResponse( + status="not_busy", + message="Pipeline is not currently running. No cancellation needed.", + ) + + # Set cancellation flag + pipeline_status["cancellation_requested"] = True + cancel_msg = "Pipeline cancellation requested by user" + logger.info(cancel_msg) + pipeline_status["latest_message"] = cancel_msg + pipeline_status["history_messages"].append(cancel_msg) + + return CancelPipelineResponse( + status="cancellation_requested", + message="Pipeline cancellation has been requested. Documents will be marked as FAILED.", + ) + + except Exception as e: + logger.error(f"Error requesting pipeline cancellation: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + return router