Merge pull request #138 from langflow-ai/shared-process-document-common-refactor

Shared process document common refactor
This commit is contained in:
Sebastián Estévez 2025-09-29 15:23:48 -04:00 committed by GitHub
commit 19bb003b01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 620 additions and 529 deletions

2
.gitignore vendored
View file

@ -18,3 +18,5 @@ wheels/
1001*.pdf
*.json
.DS_Store
config.yaml

View file

@ -1,31 +0,0 @@
# OpenRAG Configuration File
# This file allows you to configure OpenRAG settings.
# Environment variables will override these settings unless edited is true.
# Track if this config has been manually edited (prevents env var overrides)
edited: false
# Model provider configuration
provider:
# Supported providers: "openai", "anthropic", "azure", etc.
model_provider: "openai"
# API key for the model provider (can also be set via OPENAI_API_KEY env var)
api_key: ""
# Knowledge base and document processing configuration
knowledge:
# Embedding model for vector search
embedding_model: "text-embedding-3-small"
# Text chunk size for document processing
chunk_size: 1000
# Overlap between chunks
chunk_overlap: 200
# Docling preset setting
doclingPresets: standard
# AI agent configuration
agent:
# Language model for the chat agent
llm_model: "gpt-4o-mini"
# System prompt for the agent
system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."

View file

@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
from utils.file_utils import safe_unlink
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -164,12 +164,9 @@ async def langflow_upload_ingest_task(
except Exception:
# Clean up temp files on error
from utils.file_utils import safe_unlink
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id")
user = request.state.user
success = task_service.cancel_task(user.user_id, task_id)
success = await task_service.cancel_task(user.user_id, task_id)
if not success:
return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400

View file

@ -514,6 +514,9 @@ class AppClients:
ssl_assert_fingerprint=None,
headers=headers,
http_compress=True,
timeout=30, # 30 second timeout
max_retries=3,
retry_on_timeout=True,
)

View file

@ -53,25 +53,27 @@ class LangflowConnectorService:
filename=document.filename,
)
from utils.file_utils import auto_cleanup_tempfile
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=suffix
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
langflow_file_id = None # Initialize to track if upload succeeded
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
@ -125,7 +127,7 @@ class LangflowConnectorService:
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
if langflow_file_id is not None:
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
@ -140,10 +142,6 @@ class LangflowConnectorService:
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {

View file

@ -54,52 +54,50 @@ class ConnectorService:
"""Process a document from a connector using existing processing pipeline"""
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
from utils.file_utils import auto_cleanup_tempfile
try:
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
doc_service = DocumentService(session_manager=self.session_manager)
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
logger.debug("Processing connector document", document_id=document.id)
doc_service = DocumentService(session_manager=self.session_manager)
# Process using the existing pipeline but with connector document metadata
result = await doc_service.process_file_common(
file_path=tmp_file.name,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
logger.debug("Processing connector document", document_id=document.id)
# Process using consolidated processing pipeline
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=doc_service)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
async def _update_connector_metadata(
self,

View file

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@ -6,22 +5,160 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
class TaskProcessor(ABC):
"""Abstract base class for task processors"""
class TaskProcessor:
"""Base class for task processors with shared processing logic"""
def __init__(self, document_service=None):
self.document_service = document_service
async def check_document_exists(
self,
file_hash: str,
opensearch_client,
) -> bool:
"""
Check if a document with the given hash already exists in OpenSearch.
Consolidated hash checking for all processors.
"""
from config.settings import INDEX_NAME
import asyncio
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
return exists
except (asyncio.TimeoutError, Exception) as e:
if attempt == max_retries - 1:
logger.error(
"OpenSearch exists check failed after retries",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1
)
# On final failure, assume document doesn't exist (safer to reprocess than skip)
logger.warning(
"Assuming document doesn't exist due to connection issues",
file_hash=file_hash
)
return False
else:
logger.warning(
"OpenSearch exists check failed, retrying",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1,
retry_in=retry_delay
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def process_document_standard(
self,
file_path: str,
file_hash: str,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Standard processing pipeline for non-Langflow processors:
docling conversion + embeddings + OpenSearch indexing.
"""
import datetime
from config.settings import INDEX_NAME, EMBED_MODEL, clients
from services.document_service import chunk_texts_for_embeddings
from utils.document_processing import extract_relevant
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
# Check if already exists
if await self.check_document_exists(file_hash, opensearch_client):
return {"status": "unchanged", "id": file_hash}
# Convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
@abstractmethod
async def process_item(
self, upload_task: UploadTask, item: Any, file_task: FileTask
) -> None:
"""
Process a single item in the task.
This is a base implementation that should be overridden by subclasses.
When TaskProcessor is used directly (not via subclass), this method
is not called - only the utility methods like process_document_standard
are used.
Args:
upload_task: The overall upload task
item: The item to process (could be file path, file info, etc.)
file_task: The specific file task to update
"""
pass
raise NotImplementedError(
"process_item should be overridden by subclasses when used in task processing"
)
class DocumentFileProcessor(TaskProcessor):
@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
self.document_service = document_service
super().__init__(document_service)
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor):
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a regular file path using DocumentService"""
# This calls the existing logic with user context
await self.document_service.process_single_file_task(
upload_task,
item,
owner_user_id=self.owner_user_id,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
"""Process a regular file path using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import time
import os
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Compute hash
file_hash = hash_id(item)
# Get file size
try:
file_size = os.path.getsize(item)
except Exception:
file_size = 0
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=item,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=os.path.basename(item),
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="local",
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
finally:
upload_task.processed_files += 1
upload_task.updated_at = time.time()
class ConnectorFileProcessor(TaskProcessor):
@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.connector_service = connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using ConnectorService"""
"""Process a connector file using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
try:
file_id = item # item is the connector file ID
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
# Get file content from connector
document = await connector.get_file_content(file_id)
# Process using existing pipeline
result = await self.connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
# Create temporary file from document content
from utils.file_utils import auto_cleanup_tempfile
suffix = self.connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash
file_hash = hash_id(tmp_path)
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.user_id,
original_filename=document.filename,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=len(document.content),
connector_type=connection.connector_type,
)
# Add connector-specific metadata
result.update({
"source_url": document.source_url,
"document_id": document.id,
})
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class LangflowConnectorFileProcessor(TaskProcessor):
@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
try:
file_id = item # item is the connector file ID
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get file content from connector
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
# Create temporary file and compute hash to check for duplicates
from utils.file_utils import auto_cleanup_tempfile
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash and check if already exists
file_hash = hash_id(tmp_path)
# Check if document already exists
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
self.user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class S3FileProcessor(TaskProcessor):
@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor):
):
import boto3
self.document_service = document_service
super().__init__(document_service)
self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id
@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor):
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
tmp = tempfile.NamedTemporaryFile(delete=False)
from utils.file_utils import auto_cleanup_tempfile
from utils.hash_utils import hash_id
try:
# Download object to temporary file
self.s3_client.download_fileobj(self.bucket, item, tmp)
tmp.flush()
with auto_cleanup_tempfile() as tmp_path:
# Download object to temporary file
with open(tmp_path, 'wb') as tmp_file:
self.s3_client.download_fileobj(self.bucket, item, tmp_file)
loop = asyncio.get_event_loop()
slim_doc = await loop.run_in_executor(
self.document_service.process_pool, process_document_sync, tmp.name
)
opensearch_client = (
self.document_service.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
texts = [c["text"] for c in slim_doc["chunks"]]
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Compute hash
file_hash = hash_id(tmp_path)
# Get object size
try:
@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor):
except Exception:
file_size = 0
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": "s3", # S3 uploads
"indexed_time": datetime.datetime.now().isoformat(),
}
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=item, # Use S3 key as filename
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="s3",
)
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if self.owner_user_id is not None:
chunk_doc["owner"] = self.owner_user_id
if self.owner_name is not None:
chunk_doc["owner_name"] = self.owner_name
if self.owner_email is not None:
chunk_doc["owner_email"] = self.owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for S3 chunk",
chunk_id=chunk_id,
error=str(e),
chunk_doc=chunk_doc,
)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()
@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor):
settings: dict = None,
delete_after_ingest: bool = True,
):
super().__init__()
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time()
try:
# Read file content
# Compute hash and check if already exists
from utils.hash_utils import hash_id
file_hash = hash_id(item)
# Check if document already exists
opensearch_client = self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Read file content for processing
with open(item, 'rb') as f:
content = f.read()

View file

@ -112,98 +112,6 @@ class DocumentService:
return False
return False
async def process_file_common(
self,
file_path: str,
file_hash: str = None,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Common processing logic for both upload and upload_path.
1. Optionally compute SHA256 hash if not provided.
2. Convert with docling and extract relevant content.
3. Add embeddings.
4. Index into OpenSearch.
"""
if file_hash is None:
sha256 = hashlib.sha256()
async with aiofiles.open(file_path, "rb") as f:
while True:
chunk = await f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
if exists:
return {"status": "unchanged", "id": file_hash}
# convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
async def process_upload_file(
self,
@ -214,20 +122,22 @@ class DocumentService:
owner_email: str = None,
):
"""Process an uploaded file from form data"""
sha256 = hashlib.sha256()
tmp = tempfile.NamedTemporaryFile(delete=False)
file_size = 0
try:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
tmp.write(chunk)
file_size += len(chunk)
tmp.flush()
from utils.hash_utils import hash_id
from utils.file_utils import auto_cleanup_tempfile
import os
file_hash = sha256.hexdigest()
with auto_cleanup_tempfile() as tmp_path:
# Stream upload file to temporary file
file_size = 0
with open(tmp_path, 'wb') as tmp_file:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
tmp_file.write(chunk)
file_size += len(chunk)
file_hash = hash_id(tmp_path)
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
@ -243,22 +153,22 @@ class DocumentService:
if exists:
return {"status": "unchanged", "id": file_hash}
result = await self.process_file_common(
tmp.name,
file_hash,
# Use consolidated standard processing
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=self)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=owner_user_id,
original_filename=upload_file.filename,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=file_size,
connector_type="local",
)
return result
finally:
tmp.close()
os.remove(tmp.name)
async def process_upload_context(self, upload_file, filename: str = None):
"""Process uploaded file and return content for context"""
import io
@ -294,145 +204,3 @@ class DocumentService:
"pages": len(slim_doc["chunks"]),
"content_length": len(full_content),
}
async def process_single_file_task(
self,
upload_task,
file_path: str,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
connector_type: str = "local",
):
"""Process a single file and update task tracking - used by task service"""
from models.tasks import TaskStatus
import time
import asyncio
file_task = upload_task.file_tasks[file_path]
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Handle regular file processing
loop = asyncio.get_event_loop()
# Run CPU-intensive docling processing in separate process
slim_doc = await loop.run_in_executor(
self.process_pool, process_document_sync, file_path
)
# Check if already indexed
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
# Generate embeddings and index (I/O bound, keep in main process)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Get file size
file_size = 0
try:
file_size = os.path.getsize(file_path)
except OSError:
pass # Keep file_size as 0 if can't get size
# Index each chunk
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for batch chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = file_path
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
import traceback
from concurrent.futures import BrokenExecutor
if isinstance(e, BrokenExecutor):
logger.error(
"Process pool broken while processing file", file_path=file_path
)
logger.info("Worker process likely crashed")
logger.info(
"You should see detailed crash logs above from the worker process"
)
# Mark pool as broken for potential recreation
self._process_pool_broken = True
# Attempt to recreate the pool for future operations
if self._recreate_process_pool():
logger.info("Process pool successfully recreated")
else:
logger.warning(
"Failed to recreate process pool - future operations may fail"
)
file_task.error = f"Worker process crashed: {str(e)}"
else:
logger.error(
"Failed to process file", file_path=file_path, error=str(e)
)
file_task.error = str(e)
logger.error("Full traceback available")
traceback.print_exc()
file_task.status = TaskStatus.FAILED
upload_task.failed_files += 1
finally:
file_task.updated_at = time.time()
upload_task.processed_files += 1
upload_task.updated_at = time.time()
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED

View file

@ -130,10 +130,21 @@ class TaskService:
async def process_with_semaphore(file_path: str):
async with semaphore:
await self.document_service.process_single_file_task(
upload_task, file_path
from models.processors import DocumentFileProcessor
file_task = upload_task.file_tasks[file_path]
# Create processor with user context (all None for background processing)
processor = DocumentFileProcessor(
document_service=self.document_service,
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
# Process the file
await processor.process_item(upload_task, file_path, file_task)
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
@ -141,6 +152,11 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True)
# Check if task is complete
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
@ -336,7 +352,7 @@ class TaskService:
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool:
async def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
@ -368,18 +384,28 @@ class TaskService:
and not upload_task.background_task.done()
):
upload_task.background_task.cancel()
# Wait for the background task to actually stop to avoid race conditions
try:
await upload_task.background_task
except asyncio.CancelledError:
pass # Expected when we cancel the task
except Exception:
pass # Ignore other errors during cancellation
# Mark task as failed (cancelled)
upload_task.status = TaskStatus.FAILED
upload_task.updated_at = time.time()
# Mark all pending file tasks as failed
# Mark all pending and running file tasks as failed
for file_task in upload_task.file_tasks.values():
if file_task.status == TaskStatus.PENDING:
if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
# Increment failed_files counter for both pending and running
# (running files haven't been counted yet in either counter)
upload_task.failed_files += 1
file_task.status = TaskStatus.FAILED
file_task.error = "Task cancelled by user"
file_task.updated_at = time.time()
upload_task.failed_files += 1
return True

View file

@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
# Compute file hash
try:
from utils.hash_utils import hash_id
logger.info("Computing file hash", worker_pid=os.getpid())
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
file_hash = hash_id(file_path)
logger.info(
"File hash computed",
worker_pid=os.getpid(),

60
src/utils/file_utils.py Normal file
View file

@ -0,0 +1,60 @@
"""File handling utilities for OpenRAG"""
import os
import tempfile
from contextlib import contextmanager
from typing import Optional
@contextmanager
def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
"""
Context manager for temporary files that automatically cleans up.
Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
on disk for the duration of the context, making it safe for async operations.
Usage:
with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
# Write to the file
with open(tmp_path, 'wb') as f:
f.write(content)
# Use tmp_path for processing
result = await process_file(tmp_path)
# File is automatically deleted here
Args:
suffix: Optional file suffix/extension (e.g., ".pdf")
prefix: Optional file prefix
dir: Optional directory for temp file
Yields:
str: Path to the temporary file
"""
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
try:
os.close(fd) # Close the file descriptor immediately
yield path
finally:
# Always clean up, even if an exception occurred
try:
if os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore cleanup errors
pass
def safe_unlink(path: str) -> None:
"""
Safely delete a file, ignoring errors if it doesn't exist.
Args:
path: Path to the file to delete
"""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore errors
pass

76
src/utils/hash_utils.py Normal file
View file

@ -0,0 +1,76 @@
import io
import os
import base64
import hashlib
from typing import BinaryIO, Optional, Union
def _b64url(data: bytes) -> str:
"""URL-safe base64 without padding"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def stream_hash(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
chunk_size: int = 1024 * 1024, # 1 MiB
) -> bytes:
"""
Memory-safe, incremental hash of a file path or binary stream.
- source: path or file-like object with .read()
- algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
- include_filename: if provided, the UTF-8 bytes of this string are prepended
- chunk_size: read size per iteration
Returns: raw digest bytes
"""
try:
h = hashlib.new(algo)
except ValueError as e:
raise ValueError(f"Unsupported hash algorithm: {algo}") from e
def _update_from_file(f: BinaryIO):
if include_filename:
h.update(include_filename.encode("utf-8"))
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
if isinstance(source, (str, os.PathLike)):
with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
_update_from_file(f)
else:
f = source
# Preserve position if seekable
pos = None
try:
if f.seekable():
pos = f.tell()
f.seek(0)
except Exception:
pos = None
try:
_update_from_file(f)
finally:
if pos is not None:
try:
f.seek(pos)
except Exception:
pass
return h.digest()
def hash_id(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
length: int = 24, # characters of base64url (set 0 or None for full)
) -> str:
"""
Deterministic, URL-safe base64 digest (no prefix).
"""
b = stream_hash(source, algo=algo, include_filename=include_filename)
s = _b64url(b)
return s[:length] if length else s

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@ -2282,7 +2282,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.13"
version = "0.1.14.dev1"
source = { editable = "." }
dependencies = [
{ name = "agentd" },