diff --git a/.gitignore b/.gitignore index 4f22035a..8bf471e7 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ wheels/ 1001*.pdf *.json .DS_Store + +config.yaml diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 3bafb8bd..00000000 --- a/config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# OpenRAG Configuration File -# This file allows you to configure OpenRAG settings. -# Environment variables will override these settings unless edited is true. - -# Track if this config has been manually edited (prevents env var overrides) -edited: false - -# Model provider configuration -provider: - # Supported providers: "openai", "anthropic", "azure", etc. - model_provider: "openai" - # API key for the model provider (can also be set via OPENAI_API_KEY env var) - api_key: "" - -# Knowledge base and document processing configuration -knowledge: - # Embedding model for vector search - embedding_model: "text-embedding-3-small" - # Text chunk size for document processing - chunk_size: 1000 - # Overlap between chunks - chunk_overlap: 200 - # Docling preset setting - doclingPresets: standard - -# AI agent configuration -agent: - # Language model for the chat agent - llm_model: "gpt-4o-mini" - # System prompt for the agent - system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context." diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index a5595813..1c83724d 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -231,11 +231,8 @@ async def upload_and_ingest_user_file( except Exception: # Clean up temp file on error - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + from utils.file_utils import safe_unlink + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/api/router.py b/src/api/router.py index 154757a5..56789d41 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -164,12 +164,9 @@ async def langflow_upload_ingest_task( except Exception: # Clean up temp files on error + from utils.file_utils import safe_unlink for temp_path in temp_file_paths: - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/api/tasks.py b/src/api/tasks.py index de4bf505..92779d09 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager): task_id = request.path_params.get("task_id") user = request.state.user - success = task_service.cancel_task(user.user_id, task_id) + success = await task_service.cancel_task(user.user_id, task_id) if not success: return JSONResponse( {"error": "Task not found or cannot be cancelled"}, status_code=400 diff --git a/src/config/settings.py b/src/config/settings.py index 3bf1e6cf..517bf2df 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -514,6 +514,9 @@ class AppClients: ssl_assert_fingerprint=None, headers=headers, http_compress=True, + timeout=30, # 30 second timeout + max_retries=3, + retry_on_timeout=True, ) diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index ef68816d..545c6190 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -53,25 +53,27 @@ class LangflowConnectorService: filename=document.filename, ) + from utils.file_utils import auto_cleanup_tempfile + suffix = self._get_file_extension(document.mimetype) # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=suffix - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + # Step 1: Upload file to Langflow + logger.debug("Uploading file to Langflow", filename=document.filename) + content = document.content + file_tuple = ( + document.filename.replace(" ", "_").replace("/", "_")+suffix, + content, + document.mimetype or "application/octet-stream", + ) + + langflow_file_id = None # Initialize to track if upload succeeded try: - # Step 1: Upload file to Langflow - logger.debug("Uploading file to Langflow", filename=document.filename) - content = document.content - file_tuple = ( - document.filename.replace(" ", "_").replace("/", "_")+suffix, - content, - document.mimetype or "application/octet-stream", - ) - upload_result = await self.langflow_service.upload_user_file( file_tuple, jwt_token ) @@ -125,7 +127,7 @@ class LangflowConnectorService: error=str(e), ) # Try to clean up Langflow file if upload succeeded but processing failed - if "langflow_file_id" in locals(): + if langflow_file_id is not None: try: await self.langflow_service.delete_user_file(langflow_file_id) logger.debug( @@ -140,10 +142,6 @@ class LangflowConnectorService: ) raise - finally: - # Clean up temporary file - os.unlink(tmp_file.name) - def _get_file_extension(self, mimetype: str) -> str: """Get file extension based on MIME type""" mime_to_ext = { diff --git a/src/connectors/service.py b/src/connectors/service.py index 01a41519..792d8d1f 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -54,52 +54,50 @@ class ConnectorService: """Process a document from a connector using existing processing pipeline""" # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=self._get_file_extension(document.mimetype) - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + from utils.file_utils import auto_cleanup_tempfile - try: - # Use existing process_file_common function with connector document metadata - # We'll use the document service's process_file_common method - from services.document_service import DocumentService + with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) - doc_service = DocumentService(session_manager=self.session_manager) + # Use existing process_file_common function with connector document metadata + # We'll use the document service's process_file_common method + from services.document_service import DocumentService - logger.debug("Processing connector document", document_id=document.id) + doc_service = DocumentService(session_manager=self.session_manager) - # Process using the existing pipeline but with connector document metadata - result = await doc_service.process_file_common( - file_path=tmp_file.name, - file_hash=document.id, # Use connector document ID as hash - owner_user_id=owner_user_id, - original_filename=document.filename, # Pass the original Google Doc title - jwt_token=jwt_token, - owner_name=owner_name, - owner_email=owner_email, - file_size=len(document.content) if document.content else 0, - connector_type=connector_type, + logger.debug("Processing connector document", document_id=document.id) + + # Process using consolidated processing pipeline + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=doc_service) + result = await processor.process_document_standard( + file_path=tmp_path, + file_hash=document.id, # Use connector document ID as hash + owner_user_id=owner_user_id, + original_filename=document.filename, # Pass the original Google Doc title + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + file_size=len(document.content) if document.content else 0, + connector_type=connector_type, + ) + + logger.debug("Document processing result", result=result) + + # If successfully indexed or already exists, update the indexed documents with connector metadata + if result["status"] in ["indexed", "unchanged"]: + # Update all chunks with connector-specific metadata + await self._update_connector_metadata( + document, owner_user_id, connector_type, jwt_token ) - logger.debug("Document processing result", result=result) - - # If successfully indexed or already exists, update the indexed documents with connector metadata - if result["status"] in ["indexed", "unchanged"]: - # Update all chunks with connector-specific metadata - await self._update_connector_metadata( - document, owner_user_id, connector_type, jwt_token - ) - - return { - **result, - "filename": document.filename, - "source_url": document.source_url, - } - - finally: - # Clean up temporary file - os.unlink(tmp_file.name) + return { + **result, + "filename": document.filename, + "source_url": document.source_url, + } async def _update_connector_metadata( self, diff --git a/src/models/processors.py b/src/models/processors.py index a817f8d4..ecec9c49 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,4 +1,3 @@ -from abc import ABC, abstractmethod from typing import Any from .tasks import UploadTask, FileTask from utils.logging_config import get_logger @@ -6,22 +5,160 @@ from utils.logging_config import get_logger logger = get_logger(__name__) -class TaskProcessor(ABC): - """Abstract base class for task processors""" +class TaskProcessor: + """Base class for task processors with shared processing logic""" + + def __init__(self, document_service=None): + self.document_service = document_service + + async def check_document_exists( + self, + file_hash: str, + opensearch_client, + ) -> bool: + """ + Check if a document with the given hash already exists in OpenSearch. + Consolidated hash checking for all processors. + """ + from config.settings import INDEX_NAME + import asyncio + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) + return exists + except (asyncio.TimeoutError, Exception) as e: + if attempt == max_retries - 1: + logger.error( + "OpenSearch exists check failed after retries", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1 + ) + # On final failure, assume document doesn't exist (safer to reprocess than skip) + logger.warning( + "Assuming document doesn't exist due to connection issues", + file_hash=file_hash + ) + return False + else: + logger.warning( + "OpenSearch exists check failed, retrying", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1, + retry_in=retry_delay + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + async def process_document_standard( + self, + file_path: str, + file_hash: str, + owner_user_id: str = None, + original_filename: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + file_size: int = None, + connector_type: str = "local", + ): + """ + Standard processing pipeline for non-Langflow processors: + docling conversion + embeddings + OpenSearch indexing. + """ + import datetime + from config.settings import INDEX_NAME, EMBED_MODEL, clients + from services.document_service import chunk_texts_for_embeddings + from utils.document_processing import extract_relevant + + # Get user's OpenSearch client with JWT for OIDC auth + opensearch_client = self.document_service.session_manager.get_user_opensearch_client( + owner_user_id, jwt_token + ) + + # Check if already exists + if await self.check_document_exists(file_hash, opensearch_client): + return {"status": "unchanged", "id": file_hash} + + # Convert and extract + result = clients.converter.convert(file_path) + full_doc = result.document.export_to_dict() + slim_doc = extract_relevant(full_doc) + + texts = [c["text"] for c in slim_doc["chunks"]] + + # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) + text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) + embeddings = [] + + for batch in text_batches: + resp = await clients.patched_async_client.embeddings.create( + model=EMBED_MODEL, input=batch + ) + embeddings.extend([d.embedding for d in resp.data]) + + # Index each chunk as a separate document + for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): + chunk_doc = { + "document_id": file_hash, + "filename": original_filename + if original_filename + else slim_doc["filename"], + "mimetype": slim_doc["mimetype"], + "page": chunk["page"], + "text": chunk["text"], + "chunk_embedding": vect, + "file_size": file_size, + "connector_type": connector_type, + "indexed_time": datetime.datetime.now().isoformat(), + } + + # Only set owner fields if owner_user_id is provided (for no-auth mode support) + if owner_user_id is not None: + chunk_doc["owner"] = owner_user_id + if owner_name is not None: + chunk_doc["owner_name"] = owner_name + if owner_email is not None: + chunk_doc["owner_email"] = owner_email + chunk_id = f"{file_hash}_{i}" + try: + await opensearch_client.index( + index=INDEX_NAME, id=chunk_id, body=chunk_doc + ) + except Exception as e: + logger.error( + "OpenSearch indexing failed for chunk", + chunk_id=chunk_id, + error=str(e), + ) + logger.error("Chunk document details", chunk_doc=chunk_doc) + raise + return {"status": "indexed", "id": file_hash} - @abstractmethod async def process_item( self, upload_task: UploadTask, item: Any, file_task: FileTask ) -> None: """ Process a single item in the task. + This is a base implementation that should be overridden by subclasses. + When TaskProcessor is used directly (not via subclass), this method + is not called - only the utility methods like process_document_standard + are used. + Args: upload_task: The overall upload task item: The item to process (could be file path, file info, etc.) file_task: The specific file task to update """ - pass + raise NotImplementedError( + "process_item should be overridden by subclasses when used in task processing" + ) class DocumentFileProcessor(TaskProcessor): @@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): - self.document_service = document_service + super().__init__(document_service) self.owner_user_id = owner_user_id self.jwt_token = jwt_token self.owner_name = owner_name @@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor): async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a regular file path using DocumentService""" - # This calls the existing logic with user context - await self.document_service.process_single_file_task( - upload_task, - item, - owner_user_id=self.owner_user_id, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + """Process a regular file path using consolidated methods""" + from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import time + import os + + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # Compute hash + file_hash = hash_id(item) + + # Get file size + try: + file_size = os.path.getsize(item) + except Exception: + file_size = 0 + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=item, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=os.path.basename(item), + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="local", + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise + finally: + upload_task.processed_files += 1 + upload_task.updated_at = time.time() class ConnectorFileProcessor(TaskProcessor): @@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.connector_service = connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor): self.jwt_token = jwt_token self.owner_name = owner_name self.owner_email = owner_email - # Create lookup map for file info - handle both file objects and file IDs - self.file_info_map = {} - for f in files_to_process: - if isinstance(f, dict): - # Full file info objects - self.file_info_map[f["id"]] = f - else: - # Just file IDs - will need to fetch metadata during processing - self.file_info_map[f] = None async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a connector file using ConnectorService""" + """Process a connector file using consolidated methods""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.connector_service.get_connector(self.connection_id) - connection = await self.connector_service.connection_manager.get_connection( - self.connection_id - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + try: + file_id = item # item is the connector file ID - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get the connector and connection info + connector = await self.connector_service.get_connector(self.connection_id) + connection = await self.connector_service.connection_manager.get_connection( + self.connection_id + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to ConnectorFileProcessor") + # Get file content from connector + document = await connector.get_file_content(file_id) - # Process using existing pipeline - result = await self.connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + if not self.user_id: + raise ValueError("user_id not provided to ConnectorFileProcessor") - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + # Create temporary file from document content + from utils.file_utils import auto_cleanup_tempfile + + suffix = self.connector_service._get_file_extension(document.mimetype) + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + + # Compute hash + file_hash = hash_id(tmp_path) + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.user_id, + original_filename=document.filename, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=len(document.content), + connector_type=connection.connector_type, + ) + + # Add connector-specific metadata + result.update({ + "source_url": document.source_url, + "document_id": document.id, + }) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class LangflowConnectorFileProcessor(TaskProcessor): @@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.langflow_connector_service = langflow_connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor): self.jwt_token = jwt_token self.owner_name = owner_name self.owner_email = owner_email - # Create lookup map for file info - handle both file objects and file IDs - self.file_info_map = {} - for f in files_to_process: - if isinstance(f, dict): - # Full file info objects - self.file_info_map[f["id"]] = f - else: - # Just file IDs - will need to fetch metadata during processing - self.file_info_map[f] = None async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: """Process a connector file using LangflowConnectorService""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.langflow_connector_service.get_connector( - self.connection_id - ) - connection = ( - await self.langflow_connector_service.connection_manager.get_connection( + try: + file_id = item # item is the connector file ID + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( self.connection_id ) - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get file content from connector + document = await connector.get_file_content(file_id) - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") - # Process using Langflow pipeline - result = await self.langflow_connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + # Create temporary file and compute hash to check for duplicates + from utils.file_utils import auto_cleanup_tempfile - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + suffix = self.langflow_connector_service._get_file_extension(document.mimetype) + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + + # Compute hash and check if already exists + file_hash = hash_id(tmp_path) + + # Check if document already exists + opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client( + self.user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class S3FileProcessor(TaskProcessor): @@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor): ): import boto3 - self.document_service = document_service + super().__init__(document_service) self.bucket = bucket self.s3_client = s3_client or boto3.client("s3") self.owner_user_id = owner_user_id @@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor): file_task.status = TaskStatus.RUNNING file_task.updated_at = time.time() - tmp = tempfile.NamedTemporaryFile(delete=False) + from utils.file_utils import auto_cleanup_tempfile + from utils.hash_utils import hash_id + try: - # Download object to temporary file - self.s3_client.download_fileobj(self.bucket, item, tmp) - tmp.flush() + with auto_cleanup_tempfile() as tmp_path: + # Download object to temporary file + with open(tmp_path, 'wb') as tmp_file: + self.s3_client.download_fileobj(self.bucket, item, tmp_file) - loop = asyncio.get_event_loop() - slim_doc = await loop.run_in_executor( - self.document_service.process_pool, process_document_sync, tmp.name - ) - - opensearch_client = ( - self.document_service.session_manager.get_user_opensearch_client( - self.owner_user_id, self.jwt_token - ) - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - texts = [c["text"] for c in slim_doc["chunks"]] - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) + # Compute hash + file_hash = hash_id(tmp_path) # Get object size try: @@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor): except Exception: file_size = 0 - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": "s3", # S3 uploads - "indexed_time": datetime.datetime.now().isoformat(), - } + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=item, # Use S3 key as filename + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="s3", + ) - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if self.owner_user_id is not None: - chunk_doc["owner"] = self.owner_user_id - if self.owner_name is not None: - chunk_doc["owner_name"] = self.owner_name - if self.owner_email is not None: - chunk_doc["owner_email"] = self.owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for S3 chunk", - chunk_id=chunk_id, - error=str(e), - chunk_doc=chunk_doc, - ) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - - result["path"] = f"s3://{self.bucket}/{item}" - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + result["path"] = f"s3://{self.bucket}/{item}" + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED file_task.error = str(e) upload_task.failed_files += 1 finally: - tmp.close() - os.remove(tmp.name) file_task.updated_at = time.time() @@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor): settings: dict = None, delete_after_ingest: bool = True, ): + super().__init__() self.langflow_file_service = langflow_file_service self.session_manager = session_manager self.owner_user_id = owner_user_id @@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor): file_task.updated_at = time.time() try: - # Read file content + # Compute hash and check if already exists + from utils.hash_utils import hash_id + file_hash = hash_id(item) + + # Check if document already exists + opensearch_client = self.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Read file content for processing with open(item, 'rb') as f: content = f.read() diff --git a/src/services/document_service.py b/src/services/document_service.py index 949515e3..5204ea0e 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -112,98 +112,6 @@ class DocumentService: return False return False - async def process_file_common( - self, - file_path: str, - file_hash: str = None, - owner_user_id: str = None, - original_filename: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - file_size: int = None, - connector_type: str = "local", - ): - """ - Common processing logic for both upload and upload_path. - 1. Optionally compute SHA256 hash if not provided. - 2. Convert with docling and extract relevant content. - 3. Add embeddings. - 4. Index into OpenSearch. - """ - if file_hash is None: - sha256 = hashlib.sha256() - async with aiofiles.open(file_path, "rb") as f: - while True: - chunk = await f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() - - # Get user's OpenSearch client with JWT for OIDC auth - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - - exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) - if exists: - return {"status": "unchanged", "id": file_hash} - - # convert and extract - result = clients.converter.convert(file_path) - full_doc = result.document.export_to_dict() - slim_doc = extract_relevant(full_doc) - - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Index each chunk as a separate document - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": file_hash, - "filename": original_filename - if original_filename - else slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{file_hash}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - return {"status": "indexed", "id": file_hash} async def process_upload_file( self, @@ -214,20 +122,22 @@ class DocumentService: owner_email: str = None, ): """Process an uploaded file from form data""" - sha256 = hashlib.sha256() - tmp = tempfile.NamedTemporaryFile(delete=False) - file_size = 0 - try: - while True: - chunk = await upload_file.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - tmp.write(chunk) - file_size += len(chunk) - tmp.flush() + from utils.hash_utils import hash_id + from utils.file_utils import auto_cleanup_tempfile + import os - file_hash = sha256.hexdigest() + with auto_cleanup_tempfile() as tmp_path: + # Stream upload file to temporary file + file_size = 0 + with open(tmp_path, 'wb') as tmp_file: + while True: + chunk = await upload_file.read(1 << 20) + if not chunk: + break + tmp_file.write(chunk) + file_size += len(chunk) + + file_hash = hash_id(tmp_path) # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.session_manager.get_user_opensearch_client( owner_user_id, jwt_token @@ -243,22 +153,22 @@ class DocumentService: if exists: return {"status": "unchanged", "id": file_hash} - result = await self.process_file_common( - tmp.name, - file_hash, + # Use consolidated standard processing + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=self) + result = await processor.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, owner_user_id=owner_user_id, original_filename=upload_file.filename, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, file_size=file_size, + connector_type="local", ) return result - finally: - tmp.close() - os.remove(tmp.name) - async def process_upload_context(self, upload_file, filename: str = None): """Process uploaded file and return content for context""" import io @@ -294,145 +204,3 @@ class DocumentService: "pages": len(slim_doc["chunks"]), "content_length": len(full_content), } - - async def process_single_file_task( - self, - upload_task, - file_path: str, - owner_user_id: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - connector_type: str = "local", - ): - """Process a single file and update task tracking - used by task service""" - from models.tasks import TaskStatus - import time - import asyncio - - file_task = upload_task.file_tasks[file_path] - file_task.status = TaskStatus.RUNNING - file_task.updated_at = time.time() - - try: - # Handle regular file processing - loop = asyncio.get_event_loop() - - # Run CPU-intensive docling processing in separate process - slim_doc = await loop.run_in_executor( - self.process_pool, process_document_sync, file_path - ) - - # Check if already indexed - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - # Generate embeddings and index (I/O bound, keep in main process) - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Get file size - file_size = 0 - try: - file_size = os.path.getsize(file_path) - except OSError: - pass # Keep file_size as 0 if can't get size - - # Index each chunk - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for batch chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - - result["path"] = file_path - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 - - except Exception as e: - import traceback - from concurrent.futures import BrokenExecutor - - if isinstance(e, BrokenExecutor): - logger.error( - "Process pool broken while processing file", file_path=file_path - ) - logger.info("Worker process likely crashed") - logger.info( - "You should see detailed crash logs above from the worker process" - ) - - # Mark pool as broken for potential recreation - self._process_pool_broken = True - - # Attempt to recreate the pool for future operations - if self._recreate_process_pool(): - logger.info("Process pool successfully recreated") - else: - logger.warning( - "Failed to recreate process pool - future operations may fail" - ) - - file_task.error = f"Worker process crashed: {str(e)}" - else: - logger.error( - "Failed to process file", file_path=file_path, error=str(e) - ) - file_task.error = str(e) - - logger.error("Full traceback available") - traceback.print_exc() - file_task.status = TaskStatus.FAILED - upload_task.failed_files += 1 - finally: - file_task.updated_at = time.time() - upload_task.processed_files += 1 - upload_task.updated_at = time.time() - - if upload_task.processed_files >= upload_task.total_files: - upload_task.status = TaskStatus.COMPLETED - diff --git a/src/services/task_service.py b/src/services/task_service.py index de297dff..be5312a0 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -130,10 +130,21 @@ class TaskService: async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task( - upload_task, file_path + from models.processors import DocumentFileProcessor + file_task = upload_task.file_tasks[file_path] + + # Create processor with user context (all None for background processing) + processor = DocumentFileProcessor( + document_service=self.document_service, + owner_user_id=None, + jwt_token=None, + owner_name=None, + owner_email=None, ) + # Process the file + await processor.process_item(upload_task, file_path, file_task) + tasks = [ process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys() @@ -141,6 +152,11 @@ class TaskService: await asyncio.gather(*tasks, return_exceptions=True) + # Check if task is complete + if upload_task.processed_files >= upload_task.total_files: + upload_task.status = TaskStatus.COMPLETED + upload_task.updated_at = time.time() + except Exception as e: logger.error( "Background upload processor failed", task_id=task_id, error=str(e) @@ -336,7 +352,7 @@ class TaskService: tasks.sort(key=lambda x: x["created_at"], reverse=True) return tasks - def cancel_task(self, user_id: str, task_id: str) -> bool: + async def cancel_task(self, user_id: str, task_id: str) -> bool: """Cancel a task if it exists and is not already completed. Supports cancellation of shared default tasks stored under the anonymous user. @@ -368,18 +384,28 @@ class TaskService: and not upload_task.background_task.done() ): upload_task.background_task.cancel() + # Wait for the background task to actually stop to avoid race conditions + try: + await upload_task.background_task + except asyncio.CancelledError: + pass # Expected when we cancel the task + except Exception: + pass # Ignore other errors during cancellation # Mark task as failed (cancelled) upload_task.status = TaskStatus.FAILED upload_task.updated_at = time.time() - # Mark all pending file tasks as failed + # Mark all pending and running file tasks as failed for file_task in upload_task.file_tasks.values(): - if file_task.status == TaskStatus.PENDING: + if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]: + # Increment failed_files counter for both pending and running + # (running files haven't been counted yet in either counter) + upload_task.failed_files += 1 + file_task.status = TaskStatus.FAILED file_task.error = "Task cancelled by user" file_task.updated_at = time.time() - upload_task.failed_files += 1 return True diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index a8792e46..fcb458fb 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -229,15 +229,9 @@ def process_document_sync(file_path: str): # Compute file hash try: + from utils.hash_utils import hash_id logger.info("Computing file hash", worker_pid=os.getpid()) - sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - while True: - chunk = f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() + file_hash = hash_id(file_path) logger.info( "File hash computed", worker_pid=os.getpid(), diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py new file mode 100644 index 00000000..2afc4024 --- /dev/null +++ b/src/utils/file_utils.py @@ -0,0 +1,60 @@ +"""File handling utilities for OpenRAG""" + +import os +import tempfile +from contextlib import contextmanager +from typing import Optional + + +@contextmanager +def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None): + """ + Context manager for temporary files that automatically cleans up. + + Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file + on disk for the duration of the context, making it safe for async operations. + + Usage: + with auto_cleanup_tempfile(suffix=".pdf") as tmp_path: + # Write to the file + with open(tmp_path, 'wb') as f: + f.write(content) + # Use tmp_path for processing + result = await process_file(tmp_path) + # File is automatically deleted here + + Args: + suffix: Optional file suffix/extension (e.g., ".pdf") + prefix: Optional file prefix + dir: Optional directory for temp file + + Yields: + str: Path to the temporary file + """ + fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) + try: + os.close(fd) # Close the file descriptor immediately + yield path + finally: + # Always clean up, even if an exception occurred + try: + if os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore cleanup errors + pass + + +def safe_unlink(path: str) -> None: + """ + Safely delete a file, ignoring errors if it doesn't exist. + + Args: + path: Path to the file to delete + """ + try: + if path and os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore errors + pass \ No newline at end of file diff --git a/src/utils/hash_utils.py b/src/utils/hash_utils.py new file mode 100644 index 00000000..c25c8856 --- /dev/null +++ b/src/utils/hash_utils.py @@ -0,0 +1,76 @@ +import io +import os +import base64 +import hashlib +from typing import BinaryIO, Optional, Union + + +def _b64url(data: bytes) -> str: + """URL-safe base64 without padding""" + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8") + + +def stream_hash( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + chunk_size: int = 1024 * 1024, # 1 MiB +) -> bytes: + """ + Memory-safe, incremental hash of a file path or binary stream. + - source: path or file-like object with .read() + - algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.) + - include_filename: if provided, the UTF-8 bytes of this string are prepended + - chunk_size: read size per iteration + Returns: raw digest bytes + """ + try: + h = hashlib.new(algo) + except ValueError as e: + raise ValueError(f"Unsupported hash algorithm: {algo}") from e + + def _update_from_file(f: BinaryIO): + if include_filename: + h.update(include_filename.encode("utf-8")) + for chunk in iter(lambda: f.read(chunk_size), b""): + h.update(chunk) + + if isinstance(source, (str, os.PathLike)): + with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f: + _update_from_file(f) + else: + f = source + # Preserve position if seekable + pos = None + try: + if f.seekable(): + pos = f.tell() + f.seek(0) + except Exception: + pos = None + try: + _update_from_file(f) + finally: + if pos is not None: + try: + f.seek(pos) + except Exception: + pass + + return h.digest() + + +def hash_id( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + length: int = 24, # characters of base64url (set 0 or None for full) +) -> str: + """ + Deterministic, URL-safe base64 digest (no prefix). + """ + b = stream_hash(source, algo=algo, include_filename=include_filename) + s = _b64url(b) + return s[:length] if length else s \ No newline at end of file diff --git a/uv.lock b/uv.lock index c64e6db4..30f7727a 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.13" resolution-markers = [ "sys_platform == 'darwin'", @@ -2282,7 +2282,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.13" +version = "0.1.14.dev1" source = { editable = "." } dependencies = [ { name = "agentd" },