Merge branch 'main' into lfx-openrag-update-flows

This commit is contained in:
Edwin Jose 2025-09-29 16:28:46 -04:00 committed by GitHub
commit 9a45d31481
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 620 additions and 529 deletions

2
.gitignore vendored
View file

@ -18,3 +18,5 @@ wheels/
1001*.pdf 1001*.pdf
*.json *.json
.DS_Store .DS_Store
config.yaml

View file

@ -1,31 +0,0 @@
# OpenRAG Configuration File
# This file allows you to configure OpenRAG settings.
# Environment variables will override these settings unless edited is true.
# Track if this config has been manually edited (prevents env var overrides)
edited: false
# Model provider configuration
provider:
# Supported providers: "openai", "anthropic", "azure", etc.
model_provider: "openai"
# API key for the model provider (can also be set via OPENAI_API_KEY env var)
api_key: ""
# Knowledge base and document processing configuration
knowledge:
# Embedding model for vector search
embedding_model: "text-embedding-3-small"
# Text chunk size for document processing
chunk_size: 1000
# Overlap between chunks
chunk_overlap: 200
# Docling preset setting
doclingPresets: standard
# AI agent configuration
agent:
# Language model for the chat agent
llm_model: "gpt-4o-mini"
# System prompt for the agent
system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."

View file

@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
except Exception: except Exception:
# Clean up temp file on error # Clean up temp file on error
try: from utils.file_utils import safe_unlink
if os.path.exists(temp_path): safe_unlink(temp_path)
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise raise
except Exception as e: except Exception as e:

View file

@ -172,12 +172,9 @@ async def langflow_upload_ingest_task(
except Exception: except Exception:
# Clean up temp files on error # Clean up temp files on error
from utils.file_utils import safe_unlink
for temp_path in temp_file_paths: for temp_path in temp_file_paths:
try: safe_unlink(temp_path)
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise raise
except Exception as e: except Exception as e:

View file

@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id") task_id = request.path_params.get("task_id")
user = request.state.user user = request.state.user
success = task_service.cancel_task(user.user_id, task_id) success = await task_service.cancel_task(user.user_id, task_id)
if not success: if not success:
return JSONResponse( return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400 {"error": "Task not found or cannot be cancelled"}, status_code=400

View file

@ -514,6 +514,9 @@ class AppClients:
ssl_assert_fingerprint=None, ssl_assert_fingerprint=None,
headers=headers, headers=headers,
http_compress=True, http_compress=True,
timeout=30, # 30 second timeout
max_retries=3,
retry_on_timeout=True,
) )

View file

@ -53,25 +53,27 @@ class LangflowConnectorService:
filename=document.filename, filename=document.filename,
) )
from utils.file_utils import auto_cleanup_tempfile
suffix = self._get_file_extension(document.mimetype) suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content # Create temporary file from document content
with tempfile.NamedTemporaryFile( with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
delete=False, suffix=suffix # Write document content to temp file
) as tmp_file: with open(tmp_path, 'wb') as f:
tmp_file.write(document.content) f.write(document.content)
tmp_file.flush()
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
langflow_file_id = None # Initialize to track if upload succeeded
try: try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file( upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token file_tuple, jwt_token
) )
@ -125,7 +127,7 @@ class LangflowConnectorService:
error=str(e), error=str(e),
) )
# Try to clean up Langflow file if upload succeeded but processing failed # Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals(): if langflow_file_id is not None:
try: try:
await self.langflow_service.delete_user_file(langflow_file_id) await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug( logger.debug(
@ -140,10 +142,6 @@ class LangflowConnectorService:
) )
raise raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str: def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type""" """Get file extension based on MIME type"""
mime_to_ext = { mime_to_ext = {

View file

@ -54,52 +54,50 @@ class ConnectorService:
"""Process a document from a connector using existing processing pipeline""" """Process a document from a connector using existing processing pipeline"""
# Create temporary file from document content # Create temporary file from document content
with tempfile.NamedTemporaryFile( from utils.file_utils import auto_cleanup_tempfile
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
try: with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
# Use existing process_file_common function with connector document metadata # Write document content to temp file
# We'll use the document service's process_file_common method with open(tmp_path, 'wb') as f:
from services.document_service import DocumentService f.write(document.content)
doc_service = DocumentService(session_manager=self.session_manager) # Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
logger.debug("Processing connector document", document_id=document.id) doc_service = DocumentService(session_manager=self.session_manager)
# Process using the existing pipeline but with connector document metadata logger.debug("Processing connector document", document_id=document.id)
result = await doc_service.process_file_common(
file_path=tmp_file.name, # Process using consolidated processing pipeline
file_hash=document.id, # Use connector document ID as hash from models.processors import TaskProcessor
owner_user_id=owner_user_id, processor = TaskProcessor(document_service=doc_service)
original_filename=document.filename, # Pass the original Google Doc title result = await processor.process_document_standard(
jwt_token=jwt_token, file_path=tmp_path,
owner_name=owner_name, file_hash=document.id, # Use connector document ID as hash
owner_email=owner_email, owner_user_id=owner_user_id,
file_size=len(document.content) if document.content else 0, original_filename=document.filename, # Pass the original Google Doc title
connector_type=connector_type, jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
) )
logger.debug("Document processing result", result=result) return {
**result,
# If successfully indexed or already exists, update the indexed documents with connector metadata "filename": document.filename,
if result["status"] in ["indexed", "unchanged"]: "source_url": document.source_url,
# Update all chunks with connector-specific metadata }
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
async def _update_connector_metadata( async def _update_connector_metadata(
self, self,

View file

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Any from typing import Any
from .tasks import UploadTask, FileTask from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger from utils.logging_config import get_logger
@ -6,22 +5,160 @@ from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
class TaskProcessor(ABC): class TaskProcessor:
"""Abstract base class for task processors""" """Base class for task processors with shared processing logic"""
def __init__(self, document_service=None):
self.document_service = document_service
async def check_document_exists(
self,
file_hash: str,
opensearch_client,
) -> bool:
"""
Check if a document with the given hash already exists in OpenSearch.
Consolidated hash checking for all processors.
"""
from config.settings import INDEX_NAME
import asyncio
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
return exists
except (asyncio.TimeoutError, Exception) as e:
if attempt == max_retries - 1:
logger.error(
"OpenSearch exists check failed after retries",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1
)
# On final failure, assume document doesn't exist (safer to reprocess than skip)
logger.warning(
"Assuming document doesn't exist due to connection issues",
file_hash=file_hash
)
return False
else:
logger.warning(
"OpenSearch exists check failed, retrying",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1,
retry_in=retry_delay
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def process_document_standard(
self,
file_path: str,
file_hash: str,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Standard processing pipeline for non-Langflow processors:
docling conversion + embeddings + OpenSearch indexing.
"""
import datetime
from config.settings import INDEX_NAME, EMBED_MODEL, clients
from services.document_service import chunk_texts_for_embeddings
from utils.document_processing import extract_relevant
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
# Check if already exists
if await self.check_document_exists(file_hash, opensearch_client):
return {"status": "unchanged", "id": file_hash}
# Convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
@abstractmethod
async def process_item( async def process_item(
self, upload_task: UploadTask, item: Any, file_task: FileTask self, upload_task: UploadTask, item: Any, file_task: FileTask
) -> None: ) -> None:
""" """
Process a single item in the task. Process a single item in the task.
This is a base implementation that should be overridden by subclasses.
When TaskProcessor is used directly (not via subclass), this method
is not called - only the utility methods like process_document_standard
are used.
Args: Args:
upload_task: The overall upload task upload_task: The overall upload task
item: The item to process (could be file path, file info, etc.) item: The item to process (could be file path, file info, etc.)
file_task: The specific file task to update file_task: The specific file task to update
""" """
pass raise NotImplementedError(
"process_item should be overridden by subclasses when used in task processing"
)
class DocumentFileProcessor(TaskProcessor): class DocumentFileProcessor(TaskProcessor):
@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor):
owner_name: str = None, owner_name: str = None,
owner_email: str = None, owner_email: str = None,
): ):
self.document_service = document_service super().__init__(document_service)
self.owner_user_id = owner_user_id self.owner_user_id = owner_user_id
self.jwt_token = jwt_token self.jwt_token = jwt_token
self.owner_name = owner_name self.owner_name = owner_name
@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor):
async def process_item( async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None: ) -> None:
"""Process a regular file path using DocumentService""" """Process a regular file path using consolidated methods"""
# This calls the existing logic with user context from models.tasks import TaskStatus
await self.document_service.process_single_file_task( from utils.hash_utils import hash_id
upload_task, import time
item, import os
owner_user_id=self.owner_user_id,
jwt_token=self.jwt_token, file_task.status = TaskStatus.RUNNING
owner_name=self.owner_name, file_task.updated_at = time.time()
owner_email=self.owner_email,
) try:
# Compute hash
file_hash = hash_id(item)
# Get file size
try:
file_size = os.path.getsize(item)
except Exception:
file_size = 0
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=item,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=os.path.basename(item),
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="local",
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
finally:
upload_task.processed_files += 1
upload_task.updated_at = time.time()
class ConnectorFileProcessor(TaskProcessor): class ConnectorFileProcessor(TaskProcessor):
@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor):
owner_name: str = None, owner_name: str = None,
owner_email: str = None, owner_email: str = None,
): ):
super().__init__()
self.connector_service = connector_service self.connector_service = connector_service
self.connection_id = connection_id self.connection_id = connection_id
self.files_to_process = files_to_process self.files_to_process = files_to_process
@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token self.jwt_token = jwt_token
self.owner_name = owner_name self.owner_name = owner_name
self.owner_email = owner_email self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item( async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None: ) -> None:
"""Process a connector file using ConnectorService""" """Process a connector file using consolidated methods"""
from models.tasks import TaskStatus from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID file_task.status = TaskStatus.RUNNING
self.file_info_map.get(file_id) file_task.updated_at = time.time()
# Get the connector and connection info try:
connector = await self.connector_service.get_connector(self.connection_id) file_id = item # item is the connector file ID
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed) # Get the connector and connection info
document = await connector.get_file_content(file_id) connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Use the user_id passed during initialization # Get file content from connector
if not self.user_id: document = await connector.get_file_content(file_id)
raise ValueError("user_id not provided to ConnectorFileProcessor")
# Process using existing pipeline if not self.user_id:
result = await self.connector_service.process_connector_document( raise ValueError("user_id not provided to ConnectorFileProcessor")
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED # Create temporary file from document content
file_task.result = result from utils.file_utils import auto_cleanup_tempfile
upload_task.successful_files += 1
suffix = self.connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash
file_hash = hash_id(tmp_path)
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.user_id,
original_filename=document.filename,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=len(document.content),
connector_type=connection.connector_type,
)
# Add connector-specific metadata
result.update({
"source_url": document.source_url,
"document_id": document.id,
})
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class LangflowConnectorFileProcessor(TaskProcessor): class LangflowConnectorFileProcessor(TaskProcessor):
@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
owner_name: str = None, owner_name: str = None,
owner_email: str = None, owner_email: str = None,
): ):
super().__init__()
self.langflow_connector_service = langflow_connector_service self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id self.connection_id = connection_id
self.files_to_process = files_to_process self.files_to_process = files_to_process
@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token self.jwt_token = jwt_token
self.owner_name = owner_name self.owner_name = owner_name
self.owner_email = owner_email self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item( async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None: ) -> None:
"""Process a connector file using LangflowConnectorService""" """Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID file_task.status = TaskStatus.RUNNING
self.file_info_map.get(file_id) file_task.updated_at = time.time()
# Get the connector and connection info try:
connector = await self.langflow_connector_service.get_connector( file_id = item # item is the connector file ID
self.connection_id
) # Get the connector and connection info
connection = ( connector = await self.langflow_connector_service.get_connector(
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id self.connection_id
) )
) connection = (
if not connector or not connection: await self.langflow_connector_service.connection_manager.get_connection(
raise ValueError(f"Connection '{self.connection_id}' not found") self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed) # Get file content from connector
document = await connector.get_file_content(file_id) document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization if not self.user_id:
if not self.user_id: raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline # Create temporary file and compute hash to check for duplicates
result = await self.langflow_connector_service.process_connector_document( from utils.file_utils import auto_cleanup_tempfile
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
file_task.result = result with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
upload_task.successful_files += 1 # Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash and check if already exists
file_hash = hash_id(tmp_path)
# Check if document already exists
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
self.user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class S3FileProcessor(TaskProcessor): class S3FileProcessor(TaskProcessor):
@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor):
): ):
import boto3 import boto3
self.document_service = document_service super().__init__(document_service)
self.bucket = bucket self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3") self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id self.owner_user_id = owner_user_id
@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor):
file_task.status = TaskStatus.RUNNING file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time() file_task.updated_at = time.time()
tmp = tempfile.NamedTemporaryFile(delete=False) from utils.file_utils import auto_cleanup_tempfile
from utils.hash_utils import hash_id
try: try:
# Download object to temporary file with auto_cleanup_tempfile() as tmp_path:
self.s3_client.download_fileobj(self.bucket, item, tmp) # Download object to temporary file
tmp.flush() with open(tmp_path, 'wb') as tmp_file:
self.s3_client.download_fileobj(self.bucket, item, tmp_file)
loop = asyncio.get_event_loop() # Compute hash
slim_doc = await loop.run_in_executor( file_hash = hash_id(tmp_path)
self.document_service.process_pool, process_document_sync, tmp.name
)
opensearch_client = (
self.document_service.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
texts = [c["text"] for c in slim_doc["chunks"]]
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Get object size # Get object size
try: try:
@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor):
except Exception: except Exception:
file_size = 0 file_size = 0
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): # Use consolidated standard processing
chunk_doc = { result = await self.process_document_standard(
"document_id": slim_doc["id"], file_path=tmp_path,
"filename": slim_doc["filename"], file_hash=file_hash,
"mimetype": slim_doc["mimetype"], owner_user_id=self.owner_user_id,
"page": chunk["page"], original_filename=item, # Use S3 key as filename
"text": chunk["text"], jwt_token=self.jwt_token,
"chunk_embedding": vect, owner_name=self.owner_name,
"file_size": file_size, owner_email=self.owner_email,
"connector_type": "s3", # S3 uploads file_size=file_size,
"indexed_time": datetime.datetime.now().isoformat(), connector_type="s3",
} )
# Only set owner fields if owner_user_id is provided (for no-auth mode support) result["path"] = f"s3://{self.bucket}/{item}"
if self.owner_user_id is not None: file_task.status = TaskStatus.COMPLETED
chunk_doc["owner"] = self.owner_user_id file_task.result = result
if self.owner_name is not None: upload_task.successful_files += 1
chunk_doc["owner_name"] = self.owner_name
if self.owner_email is not None:
chunk_doc["owner_email"] = self.owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for S3 chunk",
chunk_id=chunk_id,
error=str(e),
chunk_doc=chunk_doc,
)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e: except Exception as e:
file_task.status = TaskStatus.FAILED file_task.status = TaskStatus.FAILED
file_task.error = str(e) file_task.error = str(e)
upload_task.failed_files += 1 upload_task.failed_files += 1
finally: finally:
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time() file_task.updated_at = time.time()
@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor):
settings: dict = None, settings: dict = None,
delete_after_ingest: bool = True, delete_after_ingest: bool = True,
): ):
super().__init__()
self.langflow_file_service = langflow_file_service self.langflow_file_service = langflow_file_service
self.session_manager = session_manager self.session_manager = session_manager
self.owner_user_id = owner_user_id self.owner_user_id = owner_user_id
@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time() file_task.updated_at = time.time()
try: try:
# Read file content # Compute hash and check if already exists
from utils.hash_utils import hash_id
file_hash = hash_id(item)
# Check if document already exists
opensearch_client = self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Read file content for processing
with open(item, 'rb') as f: with open(item, 'rb') as f:
content = f.read() content = f.read()

View file

@ -112,98 +112,6 @@ class DocumentService:
return False return False
return False return False
async def process_file_common(
self,
file_path: str,
file_hash: str = None,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Common processing logic for both upload and upload_path.
1. Optionally compute SHA256 hash if not provided.
2. Convert with docling and extract relevant content.
3. Add embeddings.
4. Index into OpenSearch.
"""
if file_hash is None:
sha256 = hashlib.sha256()
async with aiofiles.open(file_path, "rb") as f:
while True:
chunk = await f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
if exists:
return {"status": "unchanged", "id": file_hash}
# convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
async def process_upload_file( async def process_upload_file(
self, self,
@ -214,20 +122,22 @@ class DocumentService:
owner_email: str = None, owner_email: str = None,
): ):
"""Process an uploaded file from form data""" """Process an uploaded file from form data"""
sha256 = hashlib.sha256() from utils.hash_utils import hash_id
tmp = tempfile.NamedTemporaryFile(delete=False) from utils.file_utils import auto_cleanup_tempfile
file_size = 0 import os
try:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
tmp.write(chunk)
file_size += len(chunk)
tmp.flush()
file_hash = sha256.hexdigest() with auto_cleanup_tempfile() as tmp_path:
# Stream upload file to temporary file
file_size = 0
with open(tmp_path, 'wb') as tmp_file:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
tmp_file.write(chunk)
file_size += len(chunk)
file_hash = hash_id(tmp_path)
# Get user's OpenSearch client with JWT for OIDC auth # Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client( opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token owner_user_id, jwt_token
@ -243,22 +153,22 @@ class DocumentService:
if exists: if exists:
return {"status": "unchanged", "id": file_hash} return {"status": "unchanged", "id": file_hash}
result = await self.process_file_common( # Use consolidated standard processing
tmp.name, from models.processors import TaskProcessor
file_hash, processor = TaskProcessor(document_service=self)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=owner_user_id, owner_user_id=owner_user_id,
original_filename=upload_file.filename, original_filename=upload_file.filename,
jwt_token=jwt_token, jwt_token=jwt_token,
owner_name=owner_name, owner_name=owner_name,
owner_email=owner_email, owner_email=owner_email,
file_size=file_size, file_size=file_size,
connector_type="local",
) )
return result return result
finally:
tmp.close()
os.remove(tmp.name)
async def process_upload_context(self, upload_file, filename: str = None): async def process_upload_context(self, upload_file, filename: str = None):
"""Process uploaded file and return content for context""" """Process uploaded file and return content for context"""
import io import io
@ -294,145 +204,3 @@ class DocumentService:
"pages": len(slim_doc["chunks"]), "pages": len(slim_doc["chunks"]),
"content_length": len(full_content), "content_length": len(full_content),
} }
async def process_single_file_task(
self,
upload_task,
file_path: str,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
connector_type: str = "local",
):
"""Process a single file and update task tracking - used by task service"""
from models.tasks import TaskStatus
import time
import asyncio
file_task = upload_task.file_tasks[file_path]
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Handle regular file processing
loop = asyncio.get_event_loop()
# Run CPU-intensive docling processing in separate process
slim_doc = await loop.run_in_executor(
self.process_pool, process_document_sync, file_path
)
# Check if already indexed
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
# Generate embeddings and index (I/O bound, keep in main process)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Get file size
file_size = 0
try:
file_size = os.path.getsize(file_path)
except OSError:
pass # Keep file_size as 0 if can't get size
# Index each chunk
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for batch chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = file_path
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
import traceback
from concurrent.futures import BrokenExecutor
if isinstance(e, BrokenExecutor):
logger.error(
"Process pool broken while processing file", file_path=file_path
)
logger.info("Worker process likely crashed")
logger.info(
"You should see detailed crash logs above from the worker process"
)
# Mark pool as broken for potential recreation
self._process_pool_broken = True
# Attempt to recreate the pool for future operations
if self._recreate_process_pool():
logger.info("Process pool successfully recreated")
else:
logger.warning(
"Failed to recreate process pool - future operations may fail"
)
file_task.error = f"Worker process crashed: {str(e)}"
else:
logger.error(
"Failed to process file", file_path=file_path, error=str(e)
)
file_task.error = str(e)
logger.error("Full traceback available")
traceback.print_exc()
file_task.status = TaskStatus.FAILED
upload_task.failed_files += 1
finally:
file_task.updated_at = time.time()
upload_task.processed_files += 1
upload_task.updated_at = time.time()
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED

View file

@ -130,10 +130,21 @@ class TaskService:
async def process_with_semaphore(file_path: str): async def process_with_semaphore(file_path: str):
async with semaphore: async with semaphore:
await self.document_service.process_single_file_task( from models.processors import DocumentFileProcessor
upload_task, file_path file_task = upload_task.file_tasks[file_path]
# Create processor with user context (all None for background processing)
processor = DocumentFileProcessor(
document_service=self.document_service,
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
) )
# Process the file
await processor.process_item(upload_task, file_path, file_task)
tasks = [ tasks = [
process_with_semaphore(file_path) process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys() for file_path in upload_task.file_tasks.keys()
@ -141,6 +152,11 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
# Check if task is complete
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
except Exception as e: except Exception as e:
logger.error( logger.error(
"Background upload processor failed", task_id=task_id, error=str(e) "Background upload processor failed", task_id=task_id, error=str(e)
@ -336,7 +352,7 @@ class TaskService:
tasks.sort(key=lambda x: x["created_at"], reverse=True) tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool: async def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed. """Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user. Supports cancellation of shared default tasks stored under the anonymous user.
@ -368,18 +384,28 @@ class TaskService:
and not upload_task.background_task.done() and not upload_task.background_task.done()
): ):
upload_task.background_task.cancel() upload_task.background_task.cancel()
# Wait for the background task to actually stop to avoid race conditions
try:
await upload_task.background_task
except asyncio.CancelledError:
pass # Expected when we cancel the task
except Exception:
pass # Ignore other errors during cancellation
# Mark task as failed (cancelled) # Mark task as failed (cancelled)
upload_task.status = TaskStatus.FAILED upload_task.status = TaskStatus.FAILED
upload_task.updated_at = time.time() upload_task.updated_at = time.time()
# Mark all pending file tasks as failed # Mark all pending and running file tasks as failed
for file_task in upload_task.file_tasks.values(): for file_task in upload_task.file_tasks.values():
if file_task.status == TaskStatus.PENDING: if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
# Increment failed_files counter for both pending and running
# (running files haven't been counted yet in either counter)
upload_task.failed_files += 1
file_task.status = TaskStatus.FAILED file_task.status = TaskStatus.FAILED
file_task.error = "Task cancelled by user" file_task.error = "Task cancelled by user"
file_task.updated_at = time.time() file_task.updated_at = time.time()
upload_task.failed_files += 1
return True return True

View file

@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
# Compute file hash # Compute file hash
try: try:
from utils.hash_utils import hash_id
logger.info("Computing file hash", worker_pid=os.getpid()) logger.info("Computing file hash", worker_pid=os.getpid())
sha256 = hashlib.sha256() file_hash = hash_id(file_path)
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
logger.info( logger.info(
"File hash computed", "File hash computed",
worker_pid=os.getpid(), worker_pid=os.getpid(),

60
src/utils/file_utils.py Normal file
View file

@ -0,0 +1,60 @@
"""File handling utilities for OpenRAG"""
import os
import tempfile
from contextlib import contextmanager
from typing import Optional
@contextmanager
def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
"""
Context manager for temporary files that automatically cleans up.
Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
on disk for the duration of the context, making it safe for async operations.
Usage:
with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
# Write to the file
with open(tmp_path, 'wb') as f:
f.write(content)
# Use tmp_path for processing
result = await process_file(tmp_path)
# File is automatically deleted here
Args:
suffix: Optional file suffix/extension (e.g., ".pdf")
prefix: Optional file prefix
dir: Optional directory for temp file
Yields:
str: Path to the temporary file
"""
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
try:
os.close(fd) # Close the file descriptor immediately
yield path
finally:
# Always clean up, even if an exception occurred
try:
if os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore cleanup errors
pass
def safe_unlink(path: str) -> None:
"""
Safely delete a file, ignoring errors if it doesn't exist.
Args:
path: Path to the file to delete
"""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore errors
pass

76
src/utils/hash_utils.py Normal file
View file

@ -0,0 +1,76 @@
import io
import os
import base64
import hashlib
from typing import BinaryIO, Optional, Union
def _b64url(data: bytes) -> str:
"""URL-safe base64 without padding"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def stream_hash(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
chunk_size: int = 1024 * 1024, # 1 MiB
) -> bytes:
"""
Memory-safe, incremental hash of a file path or binary stream.
- source: path or file-like object with .read()
- algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
- include_filename: if provided, the UTF-8 bytes of this string are prepended
- chunk_size: read size per iteration
Returns: raw digest bytes
"""
try:
h = hashlib.new(algo)
except ValueError as e:
raise ValueError(f"Unsupported hash algorithm: {algo}") from e
def _update_from_file(f: BinaryIO):
if include_filename:
h.update(include_filename.encode("utf-8"))
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
if isinstance(source, (str, os.PathLike)):
with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
_update_from_file(f)
else:
f = source
# Preserve position if seekable
pos = None
try:
if f.seekable():
pos = f.tell()
f.seek(0)
except Exception:
pos = None
try:
_update_from_file(f)
finally:
if pos is not None:
try:
f.seek(pos)
except Exception:
pass
return h.digest()
def hash_id(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
length: int = 24, # characters of base64url (set 0 or None for full)
) -> str:
"""
Deterministic, URL-safe base64 digest (no prefix).
"""
b = stream_hash(source, algo=algo, include_filename=include_filename)
s = _b64url(b)
return s[:length] if length else s

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 3 revision = 2
requires-python = ">=3.13" requires-python = ">=3.13"
resolution-markers = [ resolution-markers = [
"sys_platform == 'darwin'", "sys_platform == 'darwin'",
@ -2282,7 +2282,7 @@ wheels = [
[[package]] [[package]]
name = "openrag" name = "openrag"
version = "0.1.13" version = "0.1.14.dev1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "agentd" }, { name = "agentd" },