Merge branch 'main' into feat/filters-design-sweep

This commit is contained in:
Cole Goldsmith 2025-09-30 11:22:57 -05:00
commit 33cd353564
19 changed files with 828 additions and 678 deletions

View file

@ -0,0 +1,59 @@
name: Build Langflow Responses Multi-Arch
on:
workflow_dispatch:
jobs:
build:
strategy:
fail-fast: false
matrix:
include:
- platform: linux/amd64
arch: amd64
runs-on: ubuntu-latest
- platform: linux/arm64
arch: arm64
runs-on: [self-hosted, linux, ARM64, langflow-ai-arm64-2]
runs-on: ${{ matrix.runs-on }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and push langflow (${{ matrix.arch }})
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile.langflow
platforms: ${{ matrix.platform }}
push: true
tags: phact/langflow:responses-${{ matrix.arch }}
cache-from: type=gha,scope=langflow-responses-${{ matrix.arch }}
cache-to: type=gha,mode=max,scope=langflow-responses-${{ matrix.arch }}
manifest:
needs: build
runs-on: ubuntu-latest
steps:
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Create and push multi-arch manifest
run: |
docker buildx imagetools create -t phact/langflow:responses \
phact/langflow:responses-amd64 \
phact/langflow:responses-arm64

2
.gitignore vendored
View file

@ -18,3 +18,5 @@ wheels/
1001*.pdf
*.json
.DS_Store
config.yaml

View file

@ -1,31 +0,0 @@
# OpenRAG Configuration File
# This file allows you to configure OpenRAG settings.
# Environment variables will override these settings unless edited is true.
# Track if this config has been manually edited (prevents env var overrides)
edited: false
# Model provider configuration
provider:
# Supported providers: "openai", "anthropic", "azure", etc.
model_provider: "openai"
# API key for the model provider (can also be set via OPENAI_API_KEY env var)
api_key: ""
# Knowledge base and document processing configuration
knowledge:
# Embedding model for vector search
embedding_model: "text-embedding-3-small"
# Text chunk size for document processing
chunk_size: 1000
# Overlap between chunks
chunk_overlap: 200
# Docling preset setting
doclingPresets: standard
# AI agent configuration
agent:
# Language model for the chat agent
llm_model: "gpt-4o-mini"
# System prompt for the agent
system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."

View file

@ -1,40 +1,88 @@
---
title: Docker Deployment
title: Docker deployment
slug: /get-started/docker
---
# Docker Deployment
There are two different Docker Compose files.
They deploy the same applications and containers, but to different environments.
## Standard Deployment
- [`docker-compose.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose.yml) is an OpenRAG deployment with GPU support for accelerated AI processing.
```bash
# Build and start all services
docker compose build
docker compose up -d
```
- [`docker-compose-cpu.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose-cpu.yml) is a CPU-only version of OpenRAG for systems without GPU support. Use this Docker compose file for environments where GPU drivers aren't available.
## CPU-Only Deployment
To install OpenRAG with Docker Compose:
For environments without GPU support:
1. Clone the OpenRAG repository.
```bash
git clone https://github.com/langflow-ai/openrag.git
cd openrag
```
```bash
docker compose -f docker-compose-cpu.yml up -d
```
2. Copy the example `.env` file that is included in the repository root.
The example file includes all environment variables with comments to guide you in finding and setting their values.
```bash
cp .env.example .env
```
## Force Rebuild
Alternatively, create a new `.env` file in the repository root.
```
touch .env
```
If you need to reset state or rebuild everything:
3. Set environment variables. The Docker Compose files are populated with values from your `.env`, so the following values are **required** to be set:
```bash
OPENSEARCH_PASSWORD=your_secure_password
OPENAI_API_KEY=your_openai_api_key
LANGFLOW_SUPERUSER=admin
LANGFLOW_SUPERUSER_PASSWORD=your_langflow_password
LANGFLOW_SECRET_KEY=your_secret_key
```
For more information on configuring OpenRAG with environment variables, see [Environment variables](/configure/configuration).
For additional configuration values, including `config.yaml`, see [Configuration](/configure/configuration).
4. Deploy OpenRAG with Docker Compose based on your deployment type.
For GPU-enabled systems, run the following command:
```bash
docker compose up -d
```
For CPU-only systems, run the following command:
```bash
docker compose -f docker-compose-cpu.yml up -d
```
The OpenRAG Docker Compose file starts five containers:
| Container Name | Default Address | Purpose |
|---|---|---|
| OpenRAG Backend | http://localhost:8000 | FastAPI server and core functionality. |
| OpenRAG Frontend | http://localhost:3000 | React web interface for users. |
| Langflow | http://localhost:7860 | AI workflow engine and flow management. |
| OpenSearch | http://localhost:9200 | Vector database for document storage. |
| OpenSearch Dashboards | http://localhost:5601 | Database administration interface. |
5. Verify installation by confirming all services are running.
```bash
docker compose ps
```
You can now access the application at:
- **Frontend**: http://localhost:3000
- **Backend API**: http://localhost:8000
- **Langflow**: http://localhost:7860
Continue with the [Quickstart](/quickstart).
## Rebuild all Docker containers
If you need to reset state and rebuild all of your containers, run the following command.
Your OpenSearch and Langflow databases will be lost.
Documents stored in the `./documents` directory will persist, since the directory is mounted as a volume in the OpenRAG backend container.
```bash
docker compose up --build --force-recreate --remove-orphans
```
## Service URLs
After deployment, services are available at:
- Frontend: http://localhost:3000
- Backend API: http://localhost:8000
- Langflow: http://localhost:7860
- OpenSearch: http://localhost:9200
- OpenSearch Dashboards: http://localhost:5601

View file

@ -10,7 +10,7 @@ OpenRAG can be installed in multiple ways:
* [**Python wheel**](#install-python-wheel): Install the OpenRAG Python wheel and use the [OpenRAG Terminal User Interface (TUI)](/get-started/tui) to install, run, and configure your OpenRAG deployment without running Docker commands.
* [**Docker Compose**](#install-and-run-docker): Clone the OpenRAG repository and deploy OpenRAG with Docker Compose, including all services and dependencies.
* [**Docker Compose**](get-started/docker): Clone the OpenRAG repository and deploy OpenRAG with Docker Compose, including all services and dependencies.
## Prerequisites
@ -138,80 +138,4 @@ The `LANGFLOW_PUBLIC_URL` controls where the Langflow web interface can be acces
The `WEBHOOK_BASE_URL` controls where the endpoint for `/connectors/CONNECTOR_TYPE/webhook` will be available.
This connection enables real-time document synchronization with external services.
For example, for Google Drive file synchronization the webhook URL is `/connectors/google_drive/webhook`.
## Docker {#install-and-run-docker}
There are two different Docker Compose files.
They deploy the same applications and containers, but to different environments.
- [`docker-compose.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose.yml) is an OpenRAG deployment with GPU support for accelerated AI processing.
- [`docker-compose-cpu.yml`](https://github.com/langflow-ai/openrag/blob/main/docker-compose-cpu.yml) is a CPU-only version of OpenRAG for systems without GPU support. Use this Docker compose file for environments where GPU drivers aren't available.
To install OpenRAG with Docker Compose:
1. Clone the OpenRAG repository.
```bash
git clone https://github.com/langflow-ai/openrag.git
cd openrag
```
2. Copy the example `.env` file that is included in the repository root.
The example file includes all environment variables with comments to guide you in finding and setting their values.
```bash
cp .env.example .env
```
Alternatively, create a new `.env` file in the repository root.
```
touch .env
```
3. Set environment variables. The Docker Compose files are populated with values from your `.env`, so the following values are **required** to be set:
```bash
OPENSEARCH_PASSWORD=your_secure_password
OPENAI_API_KEY=your_openai_api_key
LANGFLOW_SUPERUSER=admin
LANGFLOW_SUPERUSER_PASSWORD=your_langflow_password
LANGFLOW_SECRET_KEY=your_secret_key
```
For more information on configuring OpenRAG with environment variables, see [Environment variables](/configure/configuration).
For additional configuration values, including `config.yaml`, see [Configuration](/configure/configuration).
4. Deploy OpenRAG with Docker Compose based on your deployment type.
For GPU-enabled systems, run the following command:
```bash
docker compose up -d
```
For CPU-only systems, run the following command:
```bash
docker compose -f docker-compose-cpu.yml up -d
```
The OpenRAG Docker Compose file starts five containers:
| Container Name | Default Address | Purpose |
|---|---|---|
| OpenRAG Backend | http://localhost:8000 | FastAPI server and core functionality. |
| OpenRAG Frontend | http://localhost:3000 | React web interface for users. |
| Langflow | http://localhost:7860 | AI workflow engine and flow management. |
| OpenSearch | http://localhost:9200 | Vector database for document storage. |
| OpenSearch Dashboards | http://localhost:5601 | Database administration interface. |
5. Verify installation by confirming all services are running.
```bash
docker compose ps
```
You can now access the application at:
- **Frontend**: http://localhost:3000
- **Backend API**: http://localhost:8000
- **Langflow**: http://localhost:7860
Continue with the Quickstart.
For example, for Google Drive file synchronization the webhook URL is `/connectors/google_drive/webhook`.

View file

@ -1,66 +1,94 @@
---
title: Terminal Interface (TUI)
title: Terminal User Interface (TUI) commands
slug: /get-started/tui
---
# OpenRAG TUI Guide
The OpenRAG Terminal User Interface (TUI) provides a streamlined way to set up, configure, and monitor your OpenRAG deployment directly from the terminal.
The OpenRAG Terminal User Interface (TUI) provides a streamlined way to set up, configure, and monitor your OpenRAG deployment directly from the terminal, on any operating system.
![OpenRAG TUI Interface](@site/static/img/OpenRAG_TUI_2025-09-10T13_04_11_757637.svg)
## Launch
The TUI offers an easier way to use OpenRAG without sacrificing control.
Instead of starting OpenRAG using Docker commands and manually editing values in the `.env` file, the TUI walks you through the setup. It prompts for variables where required, creates a `.env` file for you, and then starts OpenRAG.
Once OpenRAG is running, use the TUI to monitor your application, control your containers, and retrieve logs.
## Start the TUI
To start the TUI, run the following commands from the directory where you installed OpenRAG.
For more information, see [Install OpenRAG](/install).
```bash
uv sync
uv run openrag
```
## Features
### Welcome Screen
- Quick setup options: basic (no auth) or advanced (OAuth)
- Service monitoring: container status at a glance
- Quick actions: diagnostics, logs, configuration
### Configuration Screen
- Environment variables: guided forms for required settings
- API keys: secure input with validation
- OAuth setup: Google and Microsoft
- Document paths: configure ingestion directories
- Auto-save: generates and updates `.env`
### Service Monitor
- Container status: real-time state of services
- Resource usage: CPU, memory, network
- Service control: start/stop/restart
- Health checks: health indicators for all components
### Log Viewer
- Live logs: stream logs across services
- Filtering: by service (backend, frontend, Langflow, OpenSearch)
- Levels: DEBUG/INFO/WARNING/ERROR
- Export: save logs for later analysis
### Diagnostics
- System checks: Docker/Podman availability and configuration
- Environment validation: verify required variables
- Network tests: connectivity between services
- Performance metrics: system capacity and recommendations
The TUI Welcome Screen offers basic and advanced setup options.
For more information on setup values during installation, see [Install OpenRAG](/install).
## Navigation
- Arrow keys: move between options
- Tab/Shift+Tab: switch fields and buttons
- Enter: select/confirm
- Escape: back
- Q: quit
- Number keys (1-4): quick access to main screens
## Benefits
1. Simplified setup without manual file edits
2. Clear visual feedback and error messages
3. Integrated monitoring and control
4. Cross-platform: Linux, macOS, Windows
5. Fully terminal-based; no browser required
The TUI accepts mouse input or keyboard commands.
- <kbd>Arrow keys</kbd>: move between options
- <kbd>Tab</kbd>/<kbd>Shift+Tab</kbd>: switch fields and buttons
- <kbd>Enter</kbd>: select/confirm
- <kbd>Escape</kbd>: back
- <kbd>Q</kbd>: quit
- <kbd>Number keys (1-4)</kbd>: quick access to main screens
## Container management
The TUI can deploy, manage, and upgrade your OpenRAG containers.
### Start container services
Click **Start Container Services** to start the OpenRAG containers.
The TUI automatically detects your container runtime, and then checks if your machine has compatible GPU support by checking for `CUDA`, `NVIDIA_SMI`, and Docker/Podman runtime support. This check determines which Docker Compose file OpenRAG uses.
The TUI then pulls the images and deploys the containers with the following command.
```bash
docker compose up -d
```
If images are missing, the TUI runs `docker compose pull`, then runs `docker compose up -d`.
### Start native services
A "native" service in OpenRAG refers to a service run natively on your machine, and not within a container.
The `docling-serve` process is a native service in OpenRAG, because it's a document processing service that is run on your local machine, and controlled separately from the containers.
To start or stop `docling-serve` or any other native services, in the TUI main menu, click **Start Native Services** or **Stop Native Services**.
To view the status, port, or PID of a native service, in the TUI main menu, click [Status](#status).
### Status
The **Status** menu displays information on your container deployment.
Here you can check container health, find your service ports, view logs, and upgrade your containers.
To view streaming logs, select the container you want to view, and press <kbd>l</kbd>.
To copy your logs, click **Copy to Clipboard**.
To **upgrade** your containers, click **Upgrade**.
**Upgrade** runs `docker compose pull` and then `docker compose up -d --force-recreate`.
The first command pulls the latest images of OpenRAG.
The second command recreates the containers with your data persisted.
To **reset** your containers, click **Reset**.
Reset gives you a completely fresh start.
Reset deletes all of your data, including OpenSearch data, uploaded documents, and authentication.
**Reset** runs two commands.
It first stops and removes all containers, volumes, and local images.
```
docker compose down --volumes --remove-orphans --rmi local
```
When the first command is complete, OpenRAG removes any additional Docker objects with `prune`.
```
docker system prune -f
```
## Diagnostics
The **Diagnostics** menu provides health monitoring for your container runtimes and monitoring of your OpenSearch security.

View file

@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
from utils.file_utils import safe_unlink
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -164,12 +164,9 @@ async def langflow_upload_ingest_task(
except Exception:
# Clean up temp files on error
from utils.file_utils import safe_unlink
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id")
user = request.state.user
success = task_service.cancel_task(user.user_id, task_id)
success = await task_service.cancel_task(user.user_id, task_id)
if not success:
return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400

View file

@ -514,6 +514,9 @@ class AppClients:
ssl_assert_fingerprint=None,
headers=headers,
http_compress=True,
timeout=30, # 30 second timeout
max_retries=3,
retry_on_timeout=True,
)

View file

@ -53,25 +53,27 @@ class LangflowConnectorService:
filename=document.filename,
)
from utils.file_utils import auto_cleanup_tempfile
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=suffix
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
langflow_file_id = None # Initialize to track if upload succeeded
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
@ -125,7 +127,7 @@ class LangflowConnectorService:
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
if langflow_file_id is not None:
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
@ -140,10 +142,6 @@ class LangflowConnectorService:
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {

View file

@ -54,52 +54,50 @@ class ConnectorService:
"""Process a document from a connector using existing processing pipeline"""
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
from utils.file_utils import auto_cleanup_tempfile
try:
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
doc_service = DocumentService(session_manager=self.session_manager)
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
logger.debug("Processing connector document", document_id=document.id)
doc_service = DocumentService(session_manager=self.session_manager)
# Process using the existing pipeline but with connector document metadata
result = await doc_service.process_file_common(
file_path=tmp_file.name,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
logger.debug("Processing connector document", document_id=document.id)
# Process using consolidated processing pipeline
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=doc_service)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
async def _update_connector_metadata(
self,

View file

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@ -6,22 +5,160 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
class TaskProcessor(ABC):
"""Abstract base class for task processors"""
class TaskProcessor:
"""Base class for task processors with shared processing logic"""
def __init__(self, document_service=None):
self.document_service = document_service
async def check_document_exists(
self,
file_hash: str,
opensearch_client,
) -> bool:
"""
Check if a document with the given hash already exists in OpenSearch.
Consolidated hash checking for all processors.
"""
from config.settings import INDEX_NAME
import asyncio
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
return exists
except (asyncio.TimeoutError, Exception) as e:
if attempt == max_retries - 1:
logger.error(
"OpenSearch exists check failed after retries",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1
)
# On final failure, assume document doesn't exist (safer to reprocess than skip)
logger.warning(
"Assuming document doesn't exist due to connection issues",
file_hash=file_hash
)
return False
else:
logger.warning(
"OpenSearch exists check failed, retrying",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1,
retry_in=retry_delay
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def process_document_standard(
self,
file_path: str,
file_hash: str,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Standard processing pipeline for non-Langflow processors:
docling conversion + embeddings + OpenSearch indexing.
"""
import datetime
from config.settings import INDEX_NAME, EMBED_MODEL, clients
from services.document_service import chunk_texts_for_embeddings
from utils.document_processing import extract_relevant
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
# Check if already exists
if await self.check_document_exists(file_hash, opensearch_client):
return {"status": "unchanged", "id": file_hash}
# Convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
@abstractmethod
async def process_item(
self, upload_task: UploadTask, item: Any, file_task: FileTask
) -> None:
"""
Process a single item in the task.
This is a base implementation that should be overridden by subclasses.
When TaskProcessor is used directly (not via subclass), this method
is not called - only the utility methods like process_document_standard
are used.
Args:
upload_task: The overall upload task
item: The item to process (could be file path, file info, etc.)
file_task: The specific file task to update
"""
pass
raise NotImplementedError(
"process_item should be overridden by subclasses when used in task processing"
)
class DocumentFileProcessor(TaskProcessor):
@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
self.document_service = document_service
super().__init__(document_service)
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor):
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a regular file path using DocumentService"""
# This calls the existing logic with user context
await self.document_service.process_single_file_task(
upload_task,
item,
owner_user_id=self.owner_user_id,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
"""Process a regular file path using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import time
import os
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Compute hash
file_hash = hash_id(item)
# Get file size
try:
file_size = os.path.getsize(item)
except Exception:
file_size = 0
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=item,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=os.path.basename(item),
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="local",
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
finally:
upload_task.processed_files += 1
upload_task.updated_at = time.time()
class ConnectorFileProcessor(TaskProcessor):
@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.connector_service = connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using ConnectorService"""
"""Process a connector file using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
try:
file_id = item # item is the connector file ID
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
# Get file content from connector
document = await connector.get_file_content(file_id)
# Process using existing pipeline
result = await self.connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
# Create temporary file from document content
from utils.file_utils import auto_cleanup_tempfile
suffix = self.connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash
file_hash = hash_id(tmp_path)
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.user_id,
original_filename=document.filename,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=len(document.content),
connector_type=connection.connector_type,
)
# Add connector-specific metadata
result.update({
"source_url": document.source_url,
"document_id": document.id,
})
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class LangflowConnectorFileProcessor(TaskProcessor):
@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
try:
file_id = item # item is the connector file ID
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get file content from connector
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
# Create temporary file and compute hash to check for duplicates
from utils.file_utils import auto_cleanup_tempfile
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash and check if already exists
file_hash = hash_id(tmp_path)
# Check if document already exists
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
self.user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class S3FileProcessor(TaskProcessor):
@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor):
):
import boto3
self.document_service = document_service
super().__init__(document_service)
self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id
@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor):
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
tmp = tempfile.NamedTemporaryFile(delete=False)
from utils.file_utils import auto_cleanup_tempfile
from utils.hash_utils import hash_id
try:
# Download object to temporary file
self.s3_client.download_fileobj(self.bucket, item, tmp)
tmp.flush()
with auto_cleanup_tempfile() as tmp_path:
# Download object to temporary file
with open(tmp_path, 'wb') as tmp_file:
self.s3_client.download_fileobj(self.bucket, item, tmp_file)
loop = asyncio.get_event_loop()
slim_doc = await loop.run_in_executor(
self.document_service.process_pool, process_document_sync, tmp.name
)
opensearch_client = (
self.document_service.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
texts = [c["text"] for c in slim_doc["chunks"]]
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Compute hash
file_hash = hash_id(tmp_path)
# Get object size
try:
@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor):
except Exception:
file_size = 0
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": "s3", # S3 uploads
"indexed_time": datetime.datetime.now().isoformat(),
}
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=item, # Use S3 key as filename
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="s3",
)
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if self.owner_user_id is not None:
chunk_doc["owner"] = self.owner_user_id
if self.owner_name is not None:
chunk_doc["owner_name"] = self.owner_name
if self.owner_email is not None:
chunk_doc["owner_email"] = self.owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for S3 chunk",
chunk_id=chunk_id,
error=str(e),
chunk_doc=chunk_doc,
)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()
@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor):
settings: dict = None,
delete_after_ingest: bool = True,
):
super().__init__()
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time()
try:
# Read file content
# Compute hash and check if already exists
from utils.hash_utils import hash_id
file_hash = hash_id(item)
# Check if document already exists
opensearch_client = self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Read file content for processing
with open(item, 'rb') as f:
content = f.read()

View file

@ -112,98 +112,6 @@ class DocumentService:
return False
return False
async def process_file_common(
self,
file_path: str,
file_hash: str = None,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Common processing logic for both upload and upload_path.
1. Optionally compute SHA256 hash if not provided.
2. Convert with docling and extract relevant content.
3. Add embeddings.
4. Index into OpenSearch.
"""
if file_hash is None:
sha256 = hashlib.sha256()
async with aiofiles.open(file_path, "rb") as f:
while True:
chunk = await f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
if exists:
return {"status": "unchanged", "id": file_hash}
# convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
async def process_upload_file(
self,
@ -214,20 +122,22 @@ class DocumentService:
owner_email: str = None,
):
"""Process an uploaded file from form data"""
sha256 = hashlib.sha256()
tmp = tempfile.NamedTemporaryFile(delete=False)
file_size = 0
try:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
tmp.write(chunk)
file_size += len(chunk)
tmp.flush()
from utils.hash_utils import hash_id
from utils.file_utils import auto_cleanup_tempfile
import os
file_hash = sha256.hexdigest()
with auto_cleanup_tempfile() as tmp_path:
# Stream upload file to temporary file
file_size = 0
with open(tmp_path, 'wb') as tmp_file:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
tmp_file.write(chunk)
file_size += len(chunk)
file_hash = hash_id(tmp_path)
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
@ -243,22 +153,22 @@ class DocumentService:
if exists:
return {"status": "unchanged", "id": file_hash}
result = await self.process_file_common(
tmp.name,
file_hash,
# Use consolidated standard processing
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=self)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=owner_user_id,
original_filename=upload_file.filename,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=file_size,
connector_type="local",
)
return result
finally:
tmp.close()
os.remove(tmp.name)
async def process_upload_context(self, upload_file, filename: str = None):
"""Process uploaded file and return content for context"""
import io
@ -294,145 +204,3 @@ class DocumentService:
"pages": len(slim_doc["chunks"]),
"content_length": len(full_content),
}
async def process_single_file_task(
self,
upload_task,
file_path: str,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
connector_type: str = "local",
):
"""Process a single file and update task tracking - used by task service"""
from models.tasks import TaskStatus
import time
import asyncio
file_task = upload_task.file_tasks[file_path]
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Handle regular file processing
loop = asyncio.get_event_loop()
# Run CPU-intensive docling processing in separate process
slim_doc = await loop.run_in_executor(
self.process_pool, process_document_sync, file_path
)
# Check if already indexed
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
# Generate embeddings and index (I/O bound, keep in main process)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Get file size
file_size = 0
try:
file_size = os.path.getsize(file_path)
except OSError:
pass # Keep file_size as 0 if can't get size
# Index each chunk
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for batch chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = file_path
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
import traceback
from concurrent.futures import BrokenExecutor
if isinstance(e, BrokenExecutor):
logger.error(
"Process pool broken while processing file", file_path=file_path
)
logger.info("Worker process likely crashed")
logger.info(
"You should see detailed crash logs above from the worker process"
)
# Mark pool as broken for potential recreation
self._process_pool_broken = True
# Attempt to recreate the pool for future operations
if self._recreate_process_pool():
logger.info("Process pool successfully recreated")
else:
logger.warning(
"Failed to recreate process pool - future operations may fail"
)
file_task.error = f"Worker process crashed: {str(e)}"
else:
logger.error(
"Failed to process file", file_path=file_path, error=str(e)
)
file_task.error = str(e)
logger.error("Full traceback available")
traceback.print_exc()
file_task.status = TaskStatus.FAILED
upload_task.failed_files += 1
finally:
file_task.updated_at = time.time()
upload_task.processed_files += 1
upload_task.updated_at = time.time()
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED

View file

@ -130,10 +130,21 @@ class TaskService:
async def process_with_semaphore(file_path: str):
async with semaphore:
await self.document_service.process_single_file_task(
upload_task, file_path
from models.processors import DocumentFileProcessor
file_task = upload_task.file_tasks[file_path]
# Create processor with user context (all None for background processing)
processor = DocumentFileProcessor(
document_service=self.document_service,
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
# Process the file
await processor.process_item(upload_task, file_path, file_task)
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
@ -141,6 +152,11 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True)
# Check if task is complete
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
@ -336,7 +352,7 @@ class TaskService:
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool:
async def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
@ -368,18 +384,28 @@ class TaskService:
and not upload_task.background_task.done()
):
upload_task.background_task.cancel()
# Wait for the background task to actually stop to avoid race conditions
try:
await upload_task.background_task
except asyncio.CancelledError:
pass # Expected when we cancel the task
except Exception:
pass # Ignore other errors during cancellation
# Mark task as failed (cancelled)
upload_task.status = TaskStatus.FAILED
upload_task.updated_at = time.time()
# Mark all pending file tasks as failed
# Mark all pending and running file tasks as failed
for file_task in upload_task.file_tasks.values():
if file_task.status == TaskStatus.PENDING:
if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
# Increment failed_files counter for both pending and running
# (running files haven't been counted yet in either counter)
upload_task.failed_files += 1
file_task.status = TaskStatus.FAILED
file_task.error = "Task cancelled by user"
file_task.updated_at = time.time()
upload_task.failed_files += 1
return True

View file

@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
# Compute file hash
try:
from utils.hash_utils import hash_id
logger.info("Computing file hash", worker_pid=os.getpid())
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
file_hash = hash_id(file_path)
logger.info(
"File hash computed",
worker_pid=os.getpid(),

60
src/utils/file_utils.py Normal file
View file

@ -0,0 +1,60 @@
"""File handling utilities for OpenRAG"""
import os
import tempfile
from contextlib import contextmanager
from typing import Optional
@contextmanager
def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
"""
Context manager for temporary files that automatically cleans up.
Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
on disk for the duration of the context, making it safe for async operations.
Usage:
with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
# Write to the file
with open(tmp_path, 'wb') as f:
f.write(content)
# Use tmp_path for processing
result = await process_file(tmp_path)
# File is automatically deleted here
Args:
suffix: Optional file suffix/extension (e.g., ".pdf")
prefix: Optional file prefix
dir: Optional directory for temp file
Yields:
str: Path to the temporary file
"""
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
try:
os.close(fd) # Close the file descriptor immediately
yield path
finally:
# Always clean up, even if an exception occurred
try:
if os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore cleanup errors
pass
def safe_unlink(path: str) -> None:
"""
Safely delete a file, ignoring errors if it doesn't exist.
Args:
path: Path to the file to delete
"""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore errors
pass

76
src/utils/hash_utils.py Normal file
View file

@ -0,0 +1,76 @@
import io
import os
import base64
import hashlib
from typing import BinaryIO, Optional, Union
def _b64url(data: bytes) -> str:
"""URL-safe base64 without padding"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def stream_hash(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
chunk_size: int = 1024 * 1024, # 1 MiB
) -> bytes:
"""
Memory-safe, incremental hash of a file path or binary stream.
- source: path or file-like object with .read()
- algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
- include_filename: if provided, the UTF-8 bytes of this string are prepended
- chunk_size: read size per iteration
Returns: raw digest bytes
"""
try:
h = hashlib.new(algo)
except ValueError as e:
raise ValueError(f"Unsupported hash algorithm: {algo}") from e
def _update_from_file(f: BinaryIO):
if include_filename:
h.update(include_filename.encode("utf-8"))
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
if isinstance(source, (str, os.PathLike)):
with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
_update_from_file(f)
else:
f = source
# Preserve position if seekable
pos = None
try:
if f.seekable():
pos = f.tell()
f.seek(0)
except Exception:
pos = None
try:
_update_from_file(f)
finally:
if pos is not None:
try:
f.seek(pos)
except Exception:
pass
return h.digest()
def hash_id(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
length: int = 24, # characters of base64url (set 0 or None for full)
) -> str:
"""
Deterministic, URL-safe base64 digest (no prefix).
"""
b = stream_hash(source, algo=algo, include_filename=include_filename)
s = _b64url(b)
return s[:length] if length else s

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@ -2282,7 +2282,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.13"
version = "0.1.14.dev1"
source = { editable = "." }
dependencies = [
{ name = "agentd" },