diff --git a/.github/workflows/build-langflow-responses.yml b/.github/workflows/build-langflow-responses.yml new file mode 100644 index 00000000..0f9d3d08 --- /dev/null +++ b/.github/workflows/build-langflow-responses.yml @@ -0,0 +1,59 @@ +name: Build Langflow Responses Multi-Arch + +on: + workflow_dispatch: + +jobs: + build: + strategy: + fail-fast: false + matrix: + include: + - platform: linux/amd64 + arch: amd64 + runs-on: ubuntu-latest + - platform: linux/arm64 + arch: arm64 + runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2] + + runs-on: ${{ matrix.runs-on }} + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push langflow (${{ matrix.arch }}) + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile.langflow + platforms: ${{ matrix.platform }} + push: true + tags: phact/langflow:responses-${{ matrix.arch }} + cache-from: type=gha,scope=langflow-responses-${{ matrix.arch }} + cache-to: type=gha,mode=max,scope=langflow-responses-${{ matrix.arch }} + + manifest: + needs: build + runs-on: ubuntu-latest + steps: + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Create and push multi-arch manifest + run: | + docker buildx imagetools create -t phact/langflow:responses \ + phact/langflow:responses-amd64 \ + phact/langflow:responses-arm64 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4f22035a..8bf471e7 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ wheels/ 1001*.pdf *.json .DS_Store + +config.yaml diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 3bafb8bd..00000000 --- a/config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# OpenRAG Configuration File -# This file allows you to configure OpenRAG settings. -# Environment variables will override these settings unless edited is true. - -# Track if this config has been manually edited (prevents env var overrides) -edited: false - -# Model provider configuration -provider: - # Supported providers: "openai", "anthropic", "azure", etc. - model_provider: "openai" - # API key for the model provider (can also be set via OPENAI_API_KEY env var) - api_key: "" - -# Knowledge base and document processing configuration -knowledge: - # Embedding model for vector search - embedding_model: "text-embedding-3-small" - # Text chunk size for document processing - chunk_size: 1000 - # Overlap between chunks - chunk_overlap: 200 - # Docling preset setting - doclingPresets: standard - -# AI agent configuration -agent: - # Language model for the chat agent - llm_model: "gpt-4o-mini" - # System prompt for the agent - system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context." diff --git a/docs/docs/get-started/docker.mdx b/docs/docs/get-started/docker.mdx index a394bc69..84f0fca6 100644 --- a/docs/docs/get-started/docker.mdx +++ b/docs/docs/get-started/docker.mdx @@ -1,40 +1,88 @@ --- -title: Docker Deployment +title: Docker deployment slug: /get-started/docker --- -# Docker Deployment +There are two different Docker Compose files. +They deploy the same applications and containers, but to different environments. -## Standard Deployment +- [`docker-compose.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose.yml) is an OpenRAG deployment with GPU support for accelerated AI processing. -```bash -# Build and start all services -docker compose build -docker compose up -d -``` +- [`docker-compose-cpu.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose-cpu.yml) is a CPU-only version of OpenRAG for systems without GPU support. Use this Docker compose file for environments where GPU drivers aren't available. -## CPU-Only Deployment +To install OpenRAG with Docker Compose: -For environments without GPU support: +1. Clone the OpenRAG repository. + ```bash + git clone https://github.com/langflow-ai/openrag.git + cd openrag + ``` -```bash -docker compose -f docker-compose-cpu.yml up -d -``` +2. Copy the example `.env` file that is included in the repository root. + The example file includes all environment variables with comments to guide you in finding and setting their values. + ```bash + cp .env.example .env + ``` -## Force Rebuild + Alternatively, create a new `.env` file in the repository root. + ``` + touch .env + ``` -If you need to reset state or rebuild everything: +3. Set environment variables. The Docker Compose files are populated with values from your `.env`, so the following values are **required** to be set: + + ```bash + OPENSEARCH_PASSWORD=your_secure_password + OPENAI_API_KEY=your_openai_api_key + + LANGFLOW_SUPERUSER=admin + LANGFLOW_SUPERUSER_PASSWORD=your_langflow_password + LANGFLOW_SECRET_KEY=your_secret_key + ``` + For more information on configuring OpenRAG with environment variables, see [Environment variables](/configure/configuration). + For additional configuration values, including `config.yaml`, see [Configuration](/configure/configuration). + +4. Deploy OpenRAG with Docker Compose based on your deployment type. + + For GPU-enabled systems, run the following command: + ```bash + docker compose up -d + ``` + + For CPU-only systems, run the following command: + ```bash + docker compose -f docker-compose-cpu.yml up -d + ``` + + The OpenRAG Docker Compose file starts five containers: + | Container Name | Default Address | Purpose | + |---|---|---| + | OpenRAG Backend | http://localhost:8000 | FastAPI server and core functionality. | + | OpenRAG Frontend | http://localhost:3000 | React web interface for users. | + | Langflow | http://localhost:7860 | AI workflow engine and flow management. | + | OpenSearch | http://localhost:9200 | Vector database for document storage. | + | OpenSearch Dashboards | http://localhost:5601 | Database administration interface. | + +5. Verify installation by confirming all services are running. + + ```bash + docker compose ps + ``` + + You can now access the application at: + + - **Frontend**: http://localhost:3000 + - **Backend API**: http://localhost:8000 + - **Langflow**: http://localhost:7860 + +Continue with the [Quickstart](/quickstart). + +## Rebuild all Docker containers + +If you need to reset state and rebuild all of your containers, run the following command. +Your OpenSearch and Langflow databases will be lost. +Documents stored in the `./documents` directory will persist, since the directory is mounted as a volume in the OpenRAG backend container. ```bash docker compose up --build --force-recreate --remove-orphans ``` - -## Service URLs - -After deployment, services are available at: - -- Frontend: http://localhost:3000 -- Backend API: http://localhost:8000 -- Langflow: http://localhost:7860 -- OpenSearch: http://localhost:9200 -- OpenSearch Dashboards: http://localhost:5601 diff --git a/docs/docs/get-started/install.mdx b/docs/docs/get-started/install.mdx index dcb5c5f1..27cafb44 100644 --- a/docs/docs/get-started/install.mdx +++ b/docs/docs/get-started/install.mdx @@ -10,7 +10,7 @@ OpenRAG can be installed in multiple ways: * [**Python wheel**](#install-python-wheel): Install the OpenRAG Python wheel and use the [OpenRAG Terminal User Interface (TUI)](/get-started/tui) to install, run, and configure your OpenRAG deployment without running Docker commands. -* [**Docker Compose**](#install-and-run-docker): Clone the OpenRAG repository and deploy OpenRAG with Docker Compose, including all services and dependencies. +* [**Docker Compose**](get-started/docker): Clone the OpenRAG repository and deploy OpenRAG with Docker Compose, including all services and dependencies. ## Prerequisites @@ -138,80 +138,4 @@ The `LANGFLOW_PUBLIC_URL` controls where the Langflow web interface can be acces The `WEBHOOK_BASE_URL` controls where the endpoint for `/connectors/CONNECTOR_TYPE/webhook` will be available. This connection enables real-time document synchronization with external services. -For example, for Google Drive file synchronization the webhook URL is `/connectors/google_drive/webhook`. - -## Docker {#install-and-run-docker} - -There are two different Docker Compose files. -They deploy the same applications and containers, but to different environments. - -- [`docker-compose.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose.yml) is an OpenRAG deployment with GPU support for accelerated AI processing. - -- [`docker-compose-cpu.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose-cpu.yml) is a CPU-only version of OpenRAG for systems without GPU support. Use this Docker compose file for environments where GPU drivers aren't available. - -To install OpenRAG with Docker Compose: - -1. Clone the OpenRAG repository. - ```bash - git clone https://github.com/langflow-ai/openrag.git - cd openrag - ``` - -2. Copy the example `.env` file that is included in the repository root. - The example file includes all environment variables with comments to guide you in finding and setting their values. - ```bash - cp .env.example .env - ``` - - Alternatively, create a new `.env` file in the repository root. - ``` - touch .env - ``` - -3. Set environment variables. The Docker Compose files are populated with values from your `.env`, so the following values are **required** to be set: - - ```bash - OPENSEARCH_PASSWORD=your_secure_password - OPENAI_API_KEY=your_openai_api_key - - LANGFLOW_SUPERUSER=admin - LANGFLOW_SUPERUSER_PASSWORD=your_langflow_password - LANGFLOW_SECRET_KEY=your_secret_key - ``` - For more information on configuring OpenRAG with environment variables, see [Environment variables](/configure/configuration). - For additional configuration values, including `config.yaml`, see [Configuration](/configure/configuration). - -4. Deploy OpenRAG with Docker Compose based on your deployment type. - - For GPU-enabled systems, run the following command: - ```bash - docker compose up -d - ``` - - For CPU-only systems, run the following command: - ```bash - docker compose -f docker-compose-cpu.yml up -d - ``` - - The OpenRAG Docker Compose file starts five containers: - | Container Name | Default Address | Purpose | - |---|---|---| - | OpenRAG Backend | http://localhost:8000 | FastAPI server and core functionality. | - | OpenRAG Frontend | http://localhost:3000 | React web interface for users. | - | Langflow | http://localhost:7860 | AI workflow engine and flow management. | - | OpenSearch | http://localhost:9200 | Vector database for document storage. | - | OpenSearch Dashboards | http://localhost:5601 | Database administration interface. | - -5. Verify installation by confirming all services are running. - - ```bash - docker compose ps - ``` - - You can now access the application at: - - - **Frontend**: http://localhost:3000 - - **Backend API**: http://localhost:8000 - - **Langflow**: http://localhost:7860 - -Continue with the Quickstart. \ No newline at end of file +For example, for Google Drive file synchronization the webhook URL is `/connectors/google_drive/webhook`. \ No newline at end of file diff --git a/docs/docs/get-started/tui.mdx b/docs/docs/get-started/tui.mdx index 2f0a048d..5ca4e934 100644 --- a/docs/docs/get-started/tui.mdx +++ b/docs/docs/get-started/tui.mdx @@ -1,66 +1,94 @@ --- -title: Terminal Interface (TUI) +title: Terminal User Interface (TUI) commands slug: /get-started/tui --- # OpenRAG TUI Guide -The OpenRAG Terminal User Interface (TUI) provides a streamlined way to set up, configure, and monitor your OpenRAG deployment directly from the terminal. +The OpenRAG Terminal User Interface (TUI) provides a streamlined way to set up, configure, and monitor your OpenRAG deployment directly from the terminal, on any operating system. ![OpenRAG TUI Interface](@site/static/img/OpenRAG_TUI_2025-09-10T13_04_11_757637.svg) -## Launch +The TUI offers an easier way to use OpenRAG without sacrificing control. +Instead of starting OpenRAG using Docker commands and manually editing values in the `.env` file, the TUI walks you through the setup. It prompts for variables where required, creates a `.env` file for you, and then starts OpenRAG. + +Once OpenRAG is running, use the TUI to monitor your application, control your containers, and retrieve logs. + +## Start the TUI + +To start the TUI, run the following commands from the directory where you installed OpenRAG. +For more information, see [Install OpenRAG](/install). ```bash uv sync uv run openrag ``` -## Features - -### Welcome Screen -- Quick setup options: basic (no auth) or advanced (OAuth) -- Service monitoring: container status at a glance -- Quick actions: diagnostics, logs, configuration - -### Configuration Screen -- Environment variables: guided forms for required settings -- API keys: secure input with validation -- OAuth setup: Google and Microsoft -- Document paths: configure ingestion directories -- Auto-save: generates and updates `.env` - -### Service Monitor -- Container status: real-time state of services -- Resource usage: CPU, memory, network -- Service control: start/stop/restart -- Health checks: health indicators for all components - -### Log Viewer -- Live logs: stream logs across services -- Filtering: by service (backend, frontend, Langflow, OpenSearch) -- Levels: DEBUG/INFO/WARNING/ERROR -- Export: save logs for later analysis - -### Diagnostics -- System checks: Docker/Podman availability and configuration -- Environment validation: verify required variables -- Network tests: connectivity between services -- Performance metrics: system capacity and recommendations +The TUI Welcome Screen offers basic and advanced setup options. +For more information on setup values during installation, see [Install OpenRAG](/install). ## Navigation -- Arrow keys: move between options -- Tab/Shift+Tab: switch fields and buttons -- Enter: select/confirm -- Escape: back -- Q: quit -- Number keys (1-4): quick access to main screens -## Benefits -1. Simplified setup without manual file edits -2. Clear visual feedback and error messages -3. Integrated monitoring and control -4. Cross-platform: Linux, macOS, Windows -5. Fully terminal-based; no browser required +The TUI accepts mouse input or keyboard commands. +- Arrow keys: move between options +- Tab/Shift+Tab: switch fields and buttons +- Enter: select/confirm +- Escape: back +- Q: quit +- Number keys (1-4): quick access to main screens +## Container management + +The TUI can deploy, manage, and upgrade your OpenRAG containers. + +### Start container services + +Click **Start Container Services** to start the OpenRAG containers. +The TUI automatically detects your container runtime, and then checks if your machine has compatible GPU support by checking for `CUDA`, `NVIDIA_SMI`, and Docker/Podman runtime support. This check determines which Docker Compose file OpenRAG uses. +The TUI then pulls the images and deploys the containers with the following command. +```bash +docker compose up -d +``` +If images are missing, the TUI runs `docker compose pull`, then runs `docker compose up -d`. + +### Start native services + +A "native" service in OpenRAG refers to a service run natively on your machine, and not within a container. +The `docling-serve` process is a native service in OpenRAG, because it's a document processing service that is run on your local machine, and controlled separately from the containers. + +To start or stop `docling-serve` or any other native services, in the TUI main menu, click **Start Native Services** or **Stop Native Services**. + +To view the status, port, or PID of a native service, in the TUI main menu, click [Status](#status). + +### Status + +The **Status** menu displays information on your container deployment. +Here you can check container health, find your service ports, view logs, and upgrade your containers. + +To view streaming logs, select the container you want to view, and press l. +To copy your logs, click **Copy to Clipboard**. + +To **upgrade** your containers, click **Upgrade**. +**Upgrade** runs `docker compose pull` and then `docker compose up -d --force-recreate`. +The first command pulls the latest images of OpenRAG. +The second command recreates the containers with your data persisted. + +To **reset** your containers, click **Reset**. +Reset gives you a completely fresh start. +Reset deletes all of your data, including OpenSearch data, uploaded documents, and authentication. +**Reset** runs two commands. +It first stops and removes all containers, volumes, and local images. +``` +docker compose down --volumes --remove-orphans --rmi local +``` + +When the first command is complete, OpenRAG removes any additional Docker objects with `prune`. + +``` +docker system prune -f +``` + +## Diagnostics + +The **Diagnostics** menu provides health monitoring for your container runtimes and monitoring of your OpenSearch security. \ No newline at end of file diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index a5595813..1c83724d 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -231,11 +231,8 @@ async def upload_and_ingest_user_file( except Exception: # Clean up temp file on error - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + from utils.file_utils import safe_unlink + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/api/router.py b/src/api/router.py index 154757a5..56789d41 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -164,12 +164,9 @@ async def langflow_upload_ingest_task( except Exception: # Clean up temp files on error + from utils.file_utils import safe_unlink for temp_path in temp_file_paths: - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/api/tasks.py b/src/api/tasks.py index de4bf505..92779d09 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager): task_id = request.path_params.get("task_id") user = request.state.user - success = task_service.cancel_task(user.user_id, task_id) + success = await task_service.cancel_task(user.user_id, task_id) if not success: return JSONResponse( {"error": "Task not found or cannot be cancelled"}, status_code=400 diff --git a/src/config/settings.py b/src/config/settings.py index 3bf1e6cf..517bf2df 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -514,6 +514,9 @@ class AppClients: ssl_assert_fingerprint=None, headers=headers, http_compress=True, + timeout=30, # 30 second timeout + max_retries=3, + retry_on_timeout=True, ) diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index ef68816d..545c6190 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -53,25 +53,27 @@ class LangflowConnectorService: filename=document.filename, ) + from utils.file_utils import auto_cleanup_tempfile + suffix = self._get_file_extension(document.mimetype) # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=suffix - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + # Step 1: Upload file to Langflow + logger.debug("Uploading file to Langflow", filename=document.filename) + content = document.content + file_tuple = ( + document.filename.replace(" ", "_").replace("/", "_")+suffix, + content, + document.mimetype or "application/octet-stream", + ) + + langflow_file_id = None # Initialize to track if upload succeeded try: - # Step 1: Upload file to Langflow - logger.debug("Uploading file to Langflow", filename=document.filename) - content = document.content - file_tuple = ( - document.filename.replace(" ", "_").replace("/", "_")+suffix, - content, - document.mimetype or "application/octet-stream", - ) - upload_result = await self.langflow_service.upload_user_file( file_tuple, jwt_token ) @@ -125,7 +127,7 @@ class LangflowConnectorService: error=str(e), ) # Try to clean up Langflow file if upload succeeded but processing failed - if "langflow_file_id" in locals(): + if langflow_file_id is not None: try: await self.langflow_service.delete_user_file(langflow_file_id) logger.debug( @@ -140,10 +142,6 @@ class LangflowConnectorService: ) raise - finally: - # Clean up temporary file - os.unlink(tmp_file.name) - def _get_file_extension(self, mimetype: str) -> str: """Get file extension based on MIME type""" mime_to_ext = { diff --git a/src/connectors/service.py b/src/connectors/service.py index 01a41519..792d8d1f 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -54,52 +54,50 @@ class ConnectorService: """Process a document from a connector using existing processing pipeline""" # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=self._get_file_extension(document.mimetype) - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + from utils.file_utils import auto_cleanup_tempfile - try: - # Use existing process_file_common function with connector document metadata - # We'll use the document service's process_file_common method - from services.document_service import DocumentService + with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) - doc_service = DocumentService(session_manager=self.session_manager) + # Use existing process_file_common function with connector document metadata + # We'll use the document service's process_file_common method + from services.document_service import DocumentService - logger.debug("Processing connector document", document_id=document.id) + doc_service = DocumentService(session_manager=self.session_manager) - # Process using the existing pipeline but with connector document metadata - result = await doc_service.process_file_common( - file_path=tmp_file.name, - file_hash=document.id, # Use connector document ID as hash - owner_user_id=owner_user_id, - original_filename=document.filename, # Pass the original Google Doc title - jwt_token=jwt_token, - owner_name=owner_name, - owner_email=owner_email, - file_size=len(document.content) if document.content else 0, - connector_type=connector_type, + logger.debug("Processing connector document", document_id=document.id) + + # Process using consolidated processing pipeline + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=doc_service) + result = await processor.process_document_standard( + file_path=tmp_path, + file_hash=document.id, # Use connector document ID as hash + owner_user_id=owner_user_id, + original_filename=document.filename, # Pass the original Google Doc title + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + file_size=len(document.content) if document.content else 0, + connector_type=connector_type, + ) + + logger.debug("Document processing result", result=result) + + # If successfully indexed or already exists, update the indexed documents with connector metadata + if result["status"] in ["indexed", "unchanged"]: + # Update all chunks with connector-specific metadata + await self._update_connector_metadata( + document, owner_user_id, connector_type, jwt_token ) - logger.debug("Document processing result", result=result) - - # If successfully indexed or already exists, update the indexed documents with connector metadata - if result["status"] in ["indexed", "unchanged"]: - # Update all chunks with connector-specific metadata - await self._update_connector_metadata( - document, owner_user_id, connector_type, jwt_token - ) - - return { - **result, - "filename": document.filename, - "source_url": document.source_url, - } - - finally: - # Clean up temporary file - os.unlink(tmp_file.name) + return { + **result, + "filename": document.filename, + "source_url": document.source_url, + } async def _update_connector_metadata( self, diff --git a/src/models/processors.py b/src/models/processors.py index a817f8d4..ecec9c49 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,4 +1,3 @@ -from abc import ABC, abstractmethod from typing import Any from .tasks import UploadTask, FileTask from utils.logging_config import get_logger @@ -6,22 +5,160 @@ from utils.logging_config import get_logger logger = get_logger(__name__) -class TaskProcessor(ABC): - """Abstract base class for task processors""" +class TaskProcessor: + """Base class for task processors with shared processing logic""" + + def __init__(self, document_service=None): + self.document_service = document_service + + async def check_document_exists( + self, + file_hash: str, + opensearch_client, + ) -> bool: + """ + Check if a document with the given hash already exists in OpenSearch. + Consolidated hash checking for all processors. + """ + from config.settings import INDEX_NAME + import asyncio + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) + return exists + except (asyncio.TimeoutError, Exception) as e: + if attempt == max_retries - 1: + logger.error( + "OpenSearch exists check failed after retries", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1 + ) + # On final failure, assume document doesn't exist (safer to reprocess than skip) + logger.warning( + "Assuming document doesn't exist due to connection issues", + file_hash=file_hash + ) + return False + else: + logger.warning( + "OpenSearch exists check failed, retrying", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1, + retry_in=retry_delay + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + async def process_document_standard( + self, + file_path: str, + file_hash: str, + owner_user_id: str = None, + original_filename: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + file_size: int = None, + connector_type: str = "local", + ): + """ + Standard processing pipeline for non-Langflow processors: + docling conversion + embeddings + OpenSearch indexing. + """ + import datetime + from config.settings import INDEX_NAME, EMBED_MODEL, clients + from services.document_service import chunk_texts_for_embeddings + from utils.document_processing import extract_relevant + + # Get user's OpenSearch client with JWT for OIDC auth + opensearch_client = self.document_service.session_manager.get_user_opensearch_client( + owner_user_id, jwt_token + ) + + # Check if already exists + if await self.check_document_exists(file_hash, opensearch_client): + return {"status": "unchanged", "id": file_hash} + + # Convert and extract + result = clients.converter.convert(file_path) + full_doc = result.document.export_to_dict() + slim_doc = extract_relevant(full_doc) + + texts = [c["text"] for c in slim_doc["chunks"]] + + # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) + text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) + embeddings = [] + + for batch in text_batches: + resp = await clients.patched_async_client.embeddings.create( + model=EMBED_MODEL, input=batch + ) + embeddings.extend([d.embedding for d in resp.data]) + + # Index each chunk as a separate document + for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): + chunk_doc = { + "document_id": file_hash, + "filename": original_filename + if original_filename + else slim_doc["filename"], + "mimetype": slim_doc["mimetype"], + "page": chunk["page"], + "text": chunk["text"], + "chunk_embedding": vect, + "file_size": file_size, + "connector_type": connector_type, + "indexed_time": datetime.datetime.now().isoformat(), + } + + # Only set owner fields if owner_user_id is provided (for no-auth mode support) + if owner_user_id is not None: + chunk_doc["owner"] = owner_user_id + if owner_name is not None: + chunk_doc["owner_name"] = owner_name + if owner_email is not None: + chunk_doc["owner_email"] = owner_email + chunk_id = f"{file_hash}_{i}" + try: + await opensearch_client.index( + index=INDEX_NAME, id=chunk_id, body=chunk_doc + ) + except Exception as e: + logger.error( + "OpenSearch indexing failed for chunk", + chunk_id=chunk_id, + error=str(e), + ) + logger.error("Chunk document details", chunk_doc=chunk_doc) + raise + return {"status": "indexed", "id": file_hash} - @abstractmethod async def process_item( self, upload_task: UploadTask, item: Any, file_task: FileTask ) -> None: """ Process a single item in the task. + This is a base implementation that should be overridden by subclasses. + When TaskProcessor is used directly (not via subclass), this method + is not called - only the utility methods like process_document_standard + are used. + Args: upload_task: The overall upload task item: The item to process (could be file path, file info, etc.) file_task: The specific file task to update """ - pass + raise NotImplementedError( + "process_item should be overridden by subclasses when used in task processing" + ) class DocumentFileProcessor(TaskProcessor): @@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): - self.document_service = document_service + super().__init__(document_service) self.owner_user_id = owner_user_id self.jwt_token = jwt_token self.owner_name = owner_name @@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor): async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a regular file path using DocumentService""" - # This calls the existing logic with user context - await self.document_service.process_single_file_task( - upload_task, - item, - owner_user_id=self.owner_user_id, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + """Process a regular file path using consolidated methods""" + from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import time + import os + + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # Compute hash + file_hash = hash_id(item) + + # Get file size + try: + file_size = os.path.getsize(item) + except Exception: + file_size = 0 + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=item, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=os.path.basename(item), + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="local", + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise + finally: + upload_task.processed_files += 1 + upload_task.updated_at = time.time() class ConnectorFileProcessor(TaskProcessor): @@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.connector_service = connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor): self.jwt_token = jwt_token self.owner_name = owner_name self.owner_email = owner_email - # Create lookup map for file info - handle both file objects and file IDs - self.file_info_map = {} - for f in files_to_process: - if isinstance(f, dict): - # Full file info objects - self.file_info_map[f["id"]] = f - else: - # Just file IDs - will need to fetch metadata during processing - self.file_info_map[f] = None async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a connector file using ConnectorService""" + """Process a connector file using consolidated methods""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.connector_service.get_connector(self.connection_id) - connection = await self.connector_service.connection_manager.get_connection( - self.connection_id - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + try: + file_id = item # item is the connector file ID - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get the connector and connection info + connector = await self.connector_service.get_connector(self.connection_id) + connection = await self.connector_service.connection_manager.get_connection( + self.connection_id + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to ConnectorFileProcessor") + # Get file content from connector + document = await connector.get_file_content(file_id) - # Process using existing pipeline - result = await self.connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + if not self.user_id: + raise ValueError("user_id not provided to ConnectorFileProcessor") - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + # Create temporary file from document content + from utils.file_utils import auto_cleanup_tempfile + + suffix = self.connector_service._get_file_extension(document.mimetype) + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + + # Compute hash + file_hash = hash_id(tmp_path) + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.user_id, + original_filename=document.filename, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=len(document.content), + connector_type=connection.connector_type, + ) + + # Add connector-specific metadata + result.update({ + "source_url": document.source_url, + "document_id": document.id, + }) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class LangflowConnectorFileProcessor(TaskProcessor): @@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.langflow_connector_service = langflow_connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor): self.jwt_token = jwt_token self.owner_name = owner_name self.owner_email = owner_email - # Create lookup map for file info - handle both file objects and file IDs - self.file_info_map = {} - for f in files_to_process: - if isinstance(f, dict): - # Full file info objects - self.file_info_map[f["id"]] = f - else: - # Just file IDs - will need to fetch metadata during processing - self.file_info_map[f] = None async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: """Process a connector file using LangflowConnectorService""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.langflow_connector_service.get_connector( - self.connection_id - ) - connection = ( - await self.langflow_connector_service.connection_manager.get_connection( + try: + file_id = item # item is the connector file ID + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( self.connection_id ) - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get file content from connector + document = await connector.get_file_content(file_id) - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") - # Process using Langflow pipeline - result = await self.langflow_connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + # Create temporary file and compute hash to check for duplicates + from utils.file_utils import auto_cleanup_tempfile - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + suffix = self.langflow_connector_service._get_file_extension(document.mimetype) + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + + # Compute hash and check if already exists + file_hash = hash_id(tmp_path) + + # Check if document already exists + opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client( + self.user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class S3FileProcessor(TaskProcessor): @@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor): ): import boto3 - self.document_service = document_service + super().__init__(document_service) self.bucket = bucket self.s3_client = s3_client or boto3.client("s3") self.owner_user_id = owner_user_id @@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor): file_task.status = TaskStatus.RUNNING file_task.updated_at = time.time() - tmp = tempfile.NamedTemporaryFile(delete=False) + from utils.file_utils import auto_cleanup_tempfile + from utils.hash_utils import hash_id + try: - # Download object to temporary file - self.s3_client.download_fileobj(self.bucket, item, tmp) - tmp.flush() + with auto_cleanup_tempfile() as tmp_path: + # Download object to temporary file + with open(tmp_path, 'wb') as tmp_file: + self.s3_client.download_fileobj(self.bucket, item, tmp_file) - loop = asyncio.get_event_loop() - slim_doc = await loop.run_in_executor( - self.document_service.process_pool, process_document_sync, tmp.name - ) - - opensearch_client = ( - self.document_service.session_manager.get_user_opensearch_client( - self.owner_user_id, self.jwt_token - ) - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - texts = [c["text"] for c in slim_doc["chunks"]] - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) + # Compute hash + file_hash = hash_id(tmp_path) # Get object size try: @@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor): except Exception: file_size = 0 - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": "s3", # S3 uploads - "indexed_time": datetime.datetime.now().isoformat(), - } + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=item, # Use S3 key as filename + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="s3", + ) - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if self.owner_user_id is not None: - chunk_doc["owner"] = self.owner_user_id - if self.owner_name is not None: - chunk_doc["owner_name"] = self.owner_name - if self.owner_email is not None: - chunk_doc["owner_email"] = self.owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for S3 chunk", - chunk_id=chunk_id, - error=str(e), - chunk_doc=chunk_doc, - ) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - - result["path"] = f"s3://{self.bucket}/{item}" - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + result["path"] = f"s3://{self.bucket}/{item}" + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED file_task.error = str(e) upload_task.failed_files += 1 finally: - tmp.close() - os.remove(tmp.name) file_task.updated_at = time.time() @@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor): settings: dict = None, delete_after_ingest: bool = True, ): + super().__init__() self.langflow_file_service = langflow_file_service self.session_manager = session_manager self.owner_user_id = owner_user_id @@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor): file_task.updated_at = time.time() try: - # Read file content + # Compute hash and check if already exists + from utils.hash_utils import hash_id + file_hash = hash_id(item) + + # Check if document already exists + opensearch_client = self.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Read file content for processing with open(item, 'rb') as f: content = f.read() diff --git a/src/services/document_service.py b/src/services/document_service.py index 949515e3..5204ea0e 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -112,98 +112,6 @@ class DocumentService: return False return False - async def process_file_common( - self, - file_path: str, - file_hash: str = None, - owner_user_id: str = None, - original_filename: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - file_size: int = None, - connector_type: str = "local", - ): - """ - Common processing logic for both upload and upload_path. - 1. Optionally compute SHA256 hash if not provided. - 2. Convert with docling and extract relevant content. - 3. Add embeddings. - 4. Index into OpenSearch. - """ - if file_hash is None: - sha256 = hashlib.sha256() - async with aiofiles.open(file_path, "rb") as f: - while True: - chunk = await f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() - - # Get user's OpenSearch client with JWT for OIDC auth - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - - exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) - if exists: - return {"status": "unchanged", "id": file_hash} - - # convert and extract - result = clients.converter.convert(file_path) - full_doc = result.document.export_to_dict() - slim_doc = extract_relevant(full_doc) - - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Index each chunk as a separate document - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": file_hash, - "filename": original_filename - if original_filename - else slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{file_hash}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - return {"status": "indexed", "id": file_hash} async def process_upload_file( self, @@ -214,20 +122,22 @@ class DocumentService: owner_email: str = None, ): """Process an uploaded file from form data""" - sha256 = hashlib.sha256() - tmp = tempfile.NamedTemporaryFile(delete=False) - file_size = 0 - try: - while True: - chunk = await upload_file.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - tmp.write(chunk) - file_size += len(chunk) - tmp.flush() + from utils.hash_utils import hash_id + from utils.file_utils import auto_cleanup_tempfile + import os - file_hash = sha256.hexdigest() + with auto_cleanup_tempfile() as tmp_path: + # Stream upload file to temporary file + file_size = 0 + with open(tmp_path, 'wb') as tmp_file: + while True: + chunk = await upload_file.read(1 << 20) + if not chunk: + break + tmp_file.write(chunk) + file_size += len(chunk) + + file_hash = hash_id(tmp_path) # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.session_manager.get_user_opensearch_client( owner_user_id, jwt_token @@ -243,22 +153,22 @@ class DocumentService: if exists: return {"status": "unchanged", "id": file_hash} - result = await self.process_file_common( - tmp.name, - file_hash, + # Use consolidated standard processing + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=self) + result = await processor.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, owner_user_id=owner_user_id, original_filename=upload_file.filename, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, file_size=file_size, + connector_type="local", ) return result - finally: - tmp.close() - os.remove(tmp.name) - async def process_upload_context(self, upload_file, filename: str = None): """Process uploaded file and return content for context""" import io @@ -294,145 +204,3 @@ class DocumentService: "pages": len(slim_doc["chunks"]), "content_length": len(full_content), } - - async def process_single_file_task( - self, - upload_task, - file_path: str, - owner_user_id: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - connector_type: str = "local", - ): - """Process a single file and update task tracking - used by task service""" - from models.tasks import TaskStatus - import time - import asyncio - - file_task = upload_task.file_tasks[file_path] - file_task.status = TaskStatus.RUNNING - file_task.updated_at = time.time() - - try: - # Handle regular file processing - loop = asyncio.get_event_loop() - - # Run CPU-intensive docling processing in separate process - slim_doc = await loop.run_in_executor( - self.process_pool, process_document_sync, file_path - ) - - # Check if already indexed - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - # Generate embeddings and index (I/O bound, keep in main process) - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Get file size - file_size = 0 - try: - file_size = os.path.getsize(file_path) - except OSError: - pass # Keep file_size as 0 if can't get size - - # Index each chunk - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for batch chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - - result["path"] = file_path - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 - - except Exception as e: - import traceback - from concurrent.futures import BrokenExecutor - - if isinstance(e, BrokenExecutor): - logger.error( - "Process pool broken while processing file", file_path=file_path - ) - logger.info("Worker process likely crashed") - logger.info( - "You should see detailed crash logs above from the worker process" - ) - - # Mark pool as broken for potential recreation - self._process_pool_broken = True - - # Attempt to recreate the pool for future operations - if self._recreate_process_pool(): - logger.info("Process pool successfully recreated") - else: - logger.warning( - "Failed to recreate process pool - future operations may fail" - ) - - file_task.error = f"Worker process crashed: {str(e)}" - else: - logger.error( - "Failed to process file", file_path=file_path, error=str(e) - ) - file_task.error = str(e) - - logger.error("Full traceback available") - traceback.print_exc() - file_task.status = TaskStatus.FAILED - upload_task.failed_files += 1 - finally: - file_task.updated_at = time.time() - upload_task.processed_files += 1 - upload_task.updated_at = time.time() - - if upload_task.processed_files >= upload_task.total_files: - upload_task.status = TaskStatus.COMPLETED - diff --git a/src/services/task_service.py b/src/services/task_service.py index de297dff..be5312a0 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -130,10 +130,21 @@ class TaskService: async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task( - upload_task, file_path + from models.processors import DocumentFileProcessor + file_task = upload_task.file_tasks[file_path] + + # Create processor with user context (all None for background processing) + processor = DocumentFileProcessor( + document_service=self.document_service, + owner_user_id=None, + jwt_token=None, + owner_name=None, + owner_email=None, ) + # Process the file + await processor.process_item(upload_task, file_path, file_task) + tasks = [ process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys() @@ -141,6 +152,11 @@ class TaskService: await asyncio.gather(*tasks, return_exceptions=True) + # Check if task is complete + if upload_task.processed_files >= upload_task.total_files: + upload_task.status = TaskStatus.COMPLETED + upload_task.updated_at = time.time() + except Exception as e: logger.error( "Background upload processor failed", task_id=task_id, error=str(e) @@ -336,7 +352,7 @@ class TaskService: tasks.sort(key=lambda x: x["created_at"], reverse=True) return tasks - def cancel_task(self, user_id: str, task_id: str) -> bool: + async def cancel_task(self, user_id: str, task_id: str) -> bool: """Cancel a task if it exists and is not already completed. Supports cancellation of shared default tasks stored under the anonymous user. @@ -368,18 +384,28 @@ class TaskService: and not upload_task.background_task.done() ): upload_task.background_task.cancel() + # Wait for the background task to actually stop to avoid race conditions + try: + await upload_task.background_task + except asyncio.CancelledError: + pass # Expected when we cancel the task + except Exception: + pass # Ignore other errors during cancellation # Mark task as failed (cancelled) upload_task.status = TaskStatus.FAILED upload_task.updated_at = time.time() - # Mark all pending file tasks as failed + # Mark all pending and running file tasks as failed for file_task in upload_task.file_tasks.values(): - if file_task.status == TaskStatus.PENDING: + if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]: + # Increment failed_files counter for both pending and running + # (running files haven't been counted yet in either counter) + upload_task.failed_files += 1 + file_task.status = TaskStatus.FAILED file_task.error = "Task cancelled by user" file_task.updated_at = time.time() - upload_task.failed_files += 1 return True diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index a8792e46..fcb458fb 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -229,15 +229,9 @@ def process_document_sync(file_path: str): # Compute file hash try: + from utils.hash_utils import hash_id logger.info("Computing file hash", worker_pid=os.getpid()) - sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - while True: - chunk = f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() + file_hash = hash_id(file_path) logger.info( "File hash computed", worker_pid=os.getpid(), diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py new file mode 100644 index 00000000..2afc4024 --- /dev/null +++ b/src/utils/file_utils.py @@ -0,0 +1,60 @@ +"""File handling utilities for OpenRAG""" + +import os +import tempfile +from contextlib import contextmanager +from typing import Optional + + +@contextmanager +def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None): + """ + Context manager for temporary files that automatically cleans up. + + Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file + on disk for the duration of the context, making it safe for async operations. + + Usage: + with auto_cleanup_tempfile(suffix=".pdf") as tmp_path: + # Write to the file + with open(tmp_path, 'wb') as f: + f.write(content) + # Use tmp_path for processing + result = await process_file(tmp_path) + # File is automatically deleted here + + Args: + suffix: Optional file suffix/extension (e.g., ".pdf") + prefix: Optional file prefix + dir: Optional directory for temp file + + Yields: + str: Path to the temporary file + """ + fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) + try: + os.close(fd) # Close the file descriptor immediately + yield path + finally: + # Always clean up, even if an exception occurred + try: + if os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore cleanup errors + pass + + +def safe_unlink(path: str) -> None: + """ + Safely delete a file, ignoring errors if it doesn't exist. + + Args: + path: Path to the file to delete + """ + try: + if path and os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore errors + pass \ No newline at end of file diff --git a/src/utils/hash_utils.py b/src/utils/hash_utils.py new file mode 100644 index 00000000..c25c8856 --- /dev/null +++ b/src/utils/hash_utils.py @@ -0,0 +1,76 @@ +import io +import os +import base64 +import hashlib +from typing import BinaryIO, Optional, Union + + +def _b64url(data: bytes) -> str: + """URL-safe base64 without padding""" + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8") + + +def stream_hash( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + chunk_size: int = 1024 * 1024, # 1 MiB +) -> bytes: + """ + Memory-safe, incremental hash of a file path or binary stream. + - source: path or file-like object with .read() + - algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.) + - include_filename: if provided, the UTF-8 bytes of this string are prepended + - chunk_size: read size per iteration + Returns: raw digest bytes + """ + try: + h = hashlib.new(algo) + except ValueError as e: + raise ValueError(f"Unsupported hash algorithm: {algo}") from e + + def _update_from_file(f: BinaryIO): + if include_filename: + h.update(include_filename.encode("utf-8")) + for chunk in iter(lambda: f.read(chunk_size), b""): + h.update(chunk) + + if isinstance(source, (str, os.PathLike)): + with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f: + _update_from_file(f) + else: + f = source + # Preserve position if seekable + pos = None + try: + if f.seekable(): + pos = f.tell() + f.seek(0) + except Exception: + pos = None + try: + _update_from_file(f) + finally: + if pos is not None: + try: + f.seek(pos) + except Exception: + pass + + return h.digest() + + +def hash_id( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + length: int = 24, # characters of base64url (set 0 or None for full) +) -> str: + """ + Deterministic, URL-safe base64 digest (no prefix). + """ + b = stream_hash(source, algo=algo, include_filename=include_filename) + s = _b64url(b) + return s[:length] if length else s \ No newline at end of file diff --git a/uv.lock b/uv.lock index c64e6db4..30f7727a 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.13" resolution-markers = [ "sys_platform == 'darwin'", @@ -2282,7 +2282,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.13" +version = "0.1.14.dev1" source = { editable = "." } dependencies = [ { name = "agentd" },