diff --git a/Dockerfile.langflow b/Dockerfile.langflow index 99e6e155..f8f57f5c 100644 --- a/Dockerfile.langflow +++ b/Dockerfile.langflow @@ -7,7 +7,7 @@ ENV RUSTFLAGS="--cfg reqwest_unstable" # Accept build arguments for git repository and branch ARG GIT_REPO=https://github.com/langflow-ai/langflow.git -ARG GIT_BRANCH=load_flows_autologin_false +ARG GIT_BRANCH=main WORKDIR /app diff --git a/frontend/components/knowledge-actions-dropdown.tsx b/frontend/components/knowledge-actions-dropdown.tsx index f73ffe25..a7cf1337 100644 --- a/frontend/components/knowledge-actions-dropdown.tsx +++ b/frontend/components/knowledge-actions-dropdown.tsx @@ -38,7 +38,7 @@ export const KnowledgeActionsDropdown = ({ return ( <> - + diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index f9176311..ee49fc3a 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -11,6 +11,7 @@ import { } from "lucide-react"; import { useRouter } from "next/navigation"; import { useEffect, useRef, useState } from "react"; +import { toast } from "sonner"; import { Button } from "@/components/ui/button"; import { Dialog, @@ -313,6 +314,11 @@ export function KnowledgeDropdown({ window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } else { console.error("Folder upload failed:", result.error); + if (response.status === 400) { + toast.error("Upload failed", { + description: result.error || "Bad request", + }); + } } } catch (error) { console.error("Folder upload error:", error); @@ -353,6 +359,11 @@ export function KnowledgeDropdown({ window.dispatchEvent(new CustomEvent("knowledgeUpdated")); } else { console.error("S3 upload failed:", result.error); + if (response.status === 400) { + toast.error("Upload failed", { + description: result.error || "Bad request", + }); + } } } catch (error) { console.error("S3 upload error:", error); diff --git a/frontend/src/app/api/queries/useGetSearchQuery.ts b/frontend/src/app/api/queries/useGetSearchQuery.ts index de1daeb6..9928af3d 100644 --- a/frontend/src/app/api/queries/useGetSearchQuery.ts +++ b/frontend/src/app/api/queries/useGetSearchQuery.ts @@ -128,7 +128,7 @@ export const useGetSearchQuery = ( } >(); - data.results.forEach((chunk: ChunkResult) => { + (data.results || []).forEach((chunk: ChunkResult) => { const existing = fileMap.get(chunk.filename); if (existing) { existing.chunks.push(chunk); diff --git a/frontend/src/components/task-notification-menu.tsx b/frontend/src/components/task-notification-menu.tsx index c6f94959..e17f9579 100644 --- a/frontend/src/components/task-notification-menu.tsx +++ b/frontend/src/components/task-notification-menu.tsx @@ -60,7 +60,9 @@ export function TaskNotificationMenu() { const processed = task.processed_files || 0 const successful = task.successful_files || 0 const failed = task.failed_files || 0 - + const running = task.running_files || 0 + const pending = task.pending_files || 0 + if (total > 0) { return { basic: `${processed}/${total} files`, @@ -69,6 +71,8 @@ export function TaskNotificationMenu() { processed, successful, failed, + running, + pending, remaining: total - processed } } @@ -196,10 +200,16 @@ export function TaskNotificationMenu() { {formatTaskProgress(task)?.detailed.failed} failed +
+
+ + {formatTaskProgress(task)?.detailed.running} running + +
- {formatTaskProgress(task)?.detailed.remaining} pending + {formatTaskProgress(task)?.detailed.pending} pending
@@ -288,6 +298,9 @@ export function TaskNotificationMenu() {
{formatTaskProgress(task)?.detailed.successful} success, {' '} {formatTaskProgress(task)?.detailed.failed} failed + {(formatTaskProgress(task)?.detailed.running || 0) > 0 && ( + , {formatTaskProgress(task)?.detailed.running} running + )}
)} {task.status === 'failed' && task.error && ( diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index c58e1d19..f15e9cc1 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -25,6 +25,8 @@ export interface Task { processed_files?: number; successful_files?: number; failed_files?: number; + running_files?: number; + pending_files?: number; created_at: string; updated_at: string; duration_seconds?: number; diff --git a/pyproject.toml b/pyproject.toml index a2a0e41f..ea096d62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "openrag" -version = "0.1.8" +version = "0.1.12" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" diff --git a/securityconfig/roles.yml b/securityconfig/roles.yml index 555c069a..07532bfb 100644 --- a/securityconfig/roles.yml +++ b/securityconfig/roles.yml @@ -7,6 +7,8 @@ openrag_user_role: cluster_permissions: - "indices:data/write/bulk" - "indices:data/write/index" + - "indices:data/read/scroll" + - "indices:data/read/scroll/clear" - "cluster:admin/opensearch/notifications/configs/create" - "cluster:admin/opensearch/notifications/configs/list" - "cluster:admin/opensearch/notifications/configs/get" diff --git a/src/api/documents.py b/src/api/documents.py index 57a4abb3..82afb349 100644 --- a/src/api/documents.py +++ b/src/api/documents.py @@ -15,8 +15,8 @@ async def delete_documents_by_filename(request: Request, document_service, sessi return JSONResponse({"error": "filename is required"}, status_code=400) user = request.state.user - jwt_token = request.state.jwt_token - + jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token) + try: # Get user's OpenSearch client opensearch_client = session_manager.get_user_opensearch_client( diff --git a/src/config/settings.py b/src/config/settings.py index c4e36b51..17000f4b 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -4,12 +4,12 @@ import time import httpx import requests from agentd.patch import patch_openai_with_mcp -from docling.document_converter import DocumentConverter from dotenv import load_dotenv from openai import AsyncOpenAI from opensearchpy import AsyncOpenSearch from opensearchpy._async.http_aiohttp import AIOHttpConnection +from utils.document_processing import create_document_converter from utils.logging_config import get_logger load_dotenv() @@ -45,6 +45,7 @@ LANGFLOW_KEY = os.getenv("LANGFLOW_KEY") SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-production") GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") +DOCLING_OCR_ENGINE = os.getenv("DOCLING_OCR_ENGINE") # Ingestion configuration DISABLE_INGEST_WITH_LANGFLOW = os.getenv("DISABLE_INGEST_WITH_LANGFLOW", "false").lower() in ("true", "1", "yes") @@ -287,7 +288,7 @@ class AppClients: self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI()) # Initialize document converter - self.converter = DocumentConverter() + self.converter = create_document_converter(ocr_engine=DOCLING_OCR_ENGINE) # Initialize Langflow HTTP client self.langflow_http_client = httpx.AsyncClient( diff --git a/src/services/task_service.py b/src/services/task_service.py index 0341aadf..c9328b90 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -220,6 +220,9 @@ class TaskService: return None file_statuses = {} + running_files_count = 0 + pending_files_count = 0 + for file_path, file_task in upload_task.file_tasks.items(): file_statuses[file_path] = { "status": file_task.status.value, @@ -231,6 +234,12 @@ class TaskService: "duration_seconds": file_task.duration_seconds, } + # Count running and pending files + if file_task.status.value == "running": + running_files_count += 1 + elif file_task.status.value == "pending": + pending_files_count += 1 + return { "task_id": upload_task.task_id, "status": upload_task.status.value, @@ -238,6 +247,8 @@ class TaskService: "processed_files": upload_task.processed_files, "successful_files": upload_task.successful_files, "failed_files": upload_task.failed_files, + "running_files": running_files_count, + "pending_files": pending_files_count, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, "duration_seconds": upload_task.duration_seconds, @@ -259,6 +270,16 @@ class TaskService: for task_id, upload_task in self.task_store[store_user_id].items(): if task_id in tasks_by_id: continue + + # Calculate running and pending counts + running_files_count = 0 + pending_files_count = 0 + for file_task in upload_task.file_tasks.values(): + if file_task.status.value == "running": + running_files_count += 1 + elif file_task.status.value == "pending": + pending_files_count += 1 + tasks_by_id[task_id] = { "task_id": upload_task.task_id, "status": upload_task.status.value, @@ -266,6 +287,8 @@ class TaskService: "processed_files": upload_task.processed_files, "successful_files": upload_task.successful_files, "failed_files": upload_task.failed_files, + "running_files": running_files_count, + "pending_files": pending_files_count, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, "duration_seconds": upload_task.duration_seconds, diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index e46d8f16..a8792e46 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -12,12 +12,80 @@ logger = get_logger(__name__) _worker_converter = None +def create_document_converter(ocr_engine: str | None = None): + """Create a Docling DocumentConverter with OCR disabled unless requested.""" + if ocr_engine is None: + ocr_engine = os.getenv("DOCLING_OCR_ENGINE") + + try: + from docling.document_converter import ( + DocumentConverter, + InputFormat, + PdfFormatOption, + ) + from docling.datamodel.pipeline_options import PdfPipelineOptions + except Exception as exc: # pragma: no cover - fallback path + logger.debug( + "Falling back to default DocumentConverter import", + error=str(exc), + ) + from docling.document_converter import DocumentConverter # type: ignore + + return DocumentConverter() + + pipeline_options = PdfPipelineOptions() + pipeline_options.do_ocr = False + + if ocr_engine: + try: + from docling.models.factories import get_ocr_factory + + factory = get_ocr_factory(allow_external_plugins=False) + pipeline_options.do_ocr = True + pipeline_options.ocr_options = factory.create_options(kind=ocr_engine) + except Exception as exc: # pragma: no cover - optional path + pipeline_options.do_ocr = False + logger.warning( + "Unable to enable requested Docling OCR engine, using OCR-off", + ocr_engine=ocr_engine, + error=str(exc), + ) + + format_options = {} + if hasattr(InputFormat, "PDF"): + format_options[getattr(InputFormat, "PDF")] = PdfFormatOption( + pipeline_options=pipeline_options + ) + if hasattr(InputFormat, "IMAGE"): + format_options[getattr(InputFormat, "IMAGE")] = PdfFormatOption( + pipeline_options=pipeline_options + ) + + try: + converter = DocumentConverter( + format_options=format_options if format_options else None + ) + except Exception as exc: # pragma: no cover - fallback path + logger.warning( + "Docling converter initialization failed, falling back to defaults", + error=str(exc), + ) + converter = DocumentConverter() + + logger.info( + "Docling converter initialized", + ocr_engine=ocr_engine if pipeline_options.do_ocr else None, + ocr_enabled=pipeline_options.do_ocr, + ) + + return converter + + def get_worker_converter(): """Get or create a DocumentConverter instance for this worker process""" global _worker_converter if _worker_converter is None: - from docling.document_converter import DocumentConverter - + # Configure GPU settings for this worker has_gpu_devices, _ = detect_gpu_devices() if not has_gpu_devices: @@ -45,7 +113,7 @@ def get_worker_converter(): logger.info( "Initializing DocumentConverter in worker process", worker_pid=os.getpid() ) - _worker_converter = DocumentConverter() + _worker_converter = create_document_converter() logger.info("DocumentConverter ready in worker process", worker_pid=os.getpid()) return _worker_converter diff --git a/uv.lock b/uv.lock index 0a60fd52..7a6a6fbc 100644 --- a/uv.lock +++ b/uv.lock @@ -2282,7 +2282,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.8" +version = "0.1.11" source = { editable = "." } dependencies = [ { name = "agentd" }, diff --git a/warm_up_docling.py b/warm_up_docling.py index c605bef5..3a834e2f 100644 --- a/warm_up_docling.py +++ b/warm_up_docling.py @@ -1,6 +1,13 @@ import logging +import os +import sys -from docling.document_converter import DocumentConverter +repo_root = os.path.dirname(__file__) +src_path = os.path.join(repo_root, "src") +if src_path not in sys.path: + sys.path.insert(0, src_path) + +from utils.document_processing import create_document_converter logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -11,7 +18,8 @@ try: # Use the sample document to warm up docling test_file = "/app/warmup_ocr.pdf" logger.info(f"Using test file to warm up docling: {test_file}") - DocumentConverter().convert(test_file) + converter = create_document_converter() + converter.convert(test_file) logger.info("Docling models warmed up successfully") except Exception as e: logger.info(f"Docling warm-up completed with exception: {str(e)}")