diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index a5595813..1c83724d 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -231,11 +231,8 @@ async def upload_and_ingest_user_file( except Exception: # Clean up temp file on error - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + from utils.file_utils import safe_unlink + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/api/router.py b/src/api/router.py index 154757a5..56789d41 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -164,12 +164,9 @@ async def langflow_upload_ingest_task( except Exception: # Clean up temp files on error + from utils.file_utils import safe_unlink for temp_path in temp_file_paths: - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors + safe_unlink(temp_path) raise except Exception as e: diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index ef68816d..858843ea 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -53,45 +53,46 @@ class LangflowConnectorService: filename=document.filename, ) + from utils.file_utils import auto_cleanup_tempfile + suffix = self._get_file_extension(document.mimetype) # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=suffix - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) + + # Step 1: Upload file to Langflow + logger.debug("Uploading file to Langflow", filename=document.filename) + content = document.content + file_tuple = ( + document.filename.replace(" ", "_").replace("/", "_")+suffix, + content, + document.mimetype or "application/octet-stream", + ) + + upload_result = await self.langflow_service.upload_user_file( + file_tuple, jwt_token + ) + langflow_file_id = upload_result["id"] + langflow_file_path = upload_result["path"] + + logger.debug( + "File uploaded to Langflow", + file_id=langflow_file_id, + path=langflow_file_path, + ) + + # Step 2: Run ingestion flow with the uploaded file + logger.debug( + "Running Langflow ingestion flow", file_path=langflow_file_path + ) + + # Use the same tweaks pattern as LangflowFileService + tweaks = {} # Let Langflow handle the ingestion with default settings try: - # Step 1: Upload file to Langflow - logger.debug("Uploading file to Langflow", filename=document.filename) - content = document.content - file_tuple = ( - document.filename.replace(" ", "_").replace("/", "_")+suffix, - content, - document.mimetype or "application/octet-stream", - ) - - upload_result = await self.langflow_service.upload_user_file( - file_tuple, jwt_token - ) - langflow_file_id = upload_result["id"] - langflow_file_path = upload_result["path"] - - logger.debug( - "File uploaded to Langflow", - file_id=langflow_file_id, - path=langflow_file_path, - ) - - # Step 2: Run ingestion flow with the uploaded file - logger.debug( - "Running Langflow ingestion flow", file_path=langflow_file_path - ) - - # Use the same tweaks pattern as LangflowFileService - tweaks = {} # Let Langflow handle the ingestion with default settings - ingestion_result = await self.langflow_service.run_ingestion_flow( file_paths=[langflow_file_path], jwt_token=jwt_token, @@ -125,25 +126,20 @@ class LangflowConnectorService: error=str(e), ) # Try to clean up Langflow file if upload succeeded but processing failed - if "langflow_file_id" in locals(): - try: - await self.langflow_service.delete_user_file(langflow_file_id) - logger.debug( - "Cleaned up Langflow file after error", - file_id=langflow_file_id, - ) - except Exception as cleanup_error: - logger.warning( - "Failed to cleanup Langflow file", - file_id=langflow_file_id, - error=str(cleanup_error), - ) + try: + await self.langflow_service.delete_user_file(langflow_file_id) + logger.debug( + "Cleaned up Langflow file after error", + file_id=langflow_file_id, + ) + except Exception as cleanup_error: + logger.warning( + "Failed to cleanup Langflow file", + file_id=langflow_file_id, + error=str(cleanup_error), + ) raise - finally: - # Clean up temporary file - os.unlink(tmp_file.name) - def _get_file_extension(self, mimetype: str) -> str: """Get file extension based on MIME type""" mime_to_ext = { diff --git a/src/connectors/service.py b/src/connectors/service.py index 171da7a1..792d8d1f 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -54,54 +54,50 @@ class ConnectorService: """Process a document from a connector using existing processing pipeline""" # Create temporary file from document content - with tempfile.NamedTemporaryFile( - delete=False, suffix=self._get_file_extension(document.mimetype) - ) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + from utils.file_utils import auto_cleanup_tempfile - try: - # Use existing process_file_common function with connector document metadata - # We'll use the document service's process_file_common method - from services.document_service import DocumentService + with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path: + # Write document content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) - doc_service = DocumentService(session_manager=self.session_manager) + # Use existing process_file_common function with connector document metadata + # We'll use the document service's process_file_common method + from services.document_service import DocumentService - logger.debug("Processing connector document", document_id=document.id) + doc_service = DocumentService(session_manager=self.session_manager) - # Process using consolidated processing pipeline - from models.processors import TaskProcessor - processor = TaskProcessor(document_service=doc_service) - result = await processor.process_document_standard( - file_path=tmp_file.name, - file_hash=document.id, # Use connector document ID as hash - owner_user_id=owner_user_id, - original_filename=document.filename, # Pass the original Google Doc title - jwt_token=jwt_token, - owner_name=owner_name, - owner_email=owner_email, - file_size=len(document.content) if document.content else 0, - connector_type=connector_type, + logger.debug("Processing connector document", document_id=document.id) + + # Process using consolidated processing pipeline + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=doc_service) + result = await processor.process_document_standard( + file_path=tmp_path, + file_hash=document.id, # Use connector document ID as hash + owner_user_id=owner_user_id, + original_filename=document.filename, # Pass the original Google Doc title + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + file_size=len(document.content) if document.content else 0, + connector_type=connector_type, + ) + + logger.debug("Document processing result", result=result) + + # If successfully indexed or already exists, update the indexed documents with connector metadata + if result["status"] in ["indexed", "unchanged"]: + # Update all chunks with connector-specific metadata + await self._update_connector_metadata( + document, owner_user_id, connector_type, jwt_token ) - logger.debug("Document processing result", result=result) - - # If successfully indexed or already exists, update the indexed documents with connector metadata - if result["status"] in ["indexed", "unchanged"]: - # Update all chunks with connector-specific metadata - await self._update_connector_metadata( - document, owner_user_id, connector_type, jwt_token - ) - - return { - **result, - "filename": document.filename, - "source_url": document.source_url, - } - - finally: - # Clean up temporary file - os.unlink(tmp_file.name) + return { + **result, + "filename": document.filename, + "source_url": document.source_url, + } async def _update_connector_metadata( self, diff --git a/src/models/processors.py b/src/models/processors.py index 8d399e86..9cf5506f 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -277,37 +277,35 @@ class ConnectorFileProcessor(TaskProcessor): raise ValueError("user_id not provided to ConnectorFileProcessor") # Create temporary file from document content + from utils.file_utils import auto_cleanup_tempfile + suffix = self.connector_service._get_file_extension(document.mimetype) - with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) - try: - # Compute hash - file_hash = hash_id(tmp_file.name) + # Compute hash + file_hash = hash_id(tmp_path) - # Use consolidated standard processing - result = await self.process_document_standard( - file_path=tmp_file.name, - file_hash=file_hash, - owner_user_id=self.user_id, - original_filename=document.filename, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - file_size=len(document.content), - connector_type=connection.connector_type, - ) + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.user_id, + original_filename=document.filename, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=len(document.content), + connector_type=connection.connector_type, + ) - # Add connector-specific metadata - result.update({ - "source_url": document.source_url, - "document_id": document.id, - }) - - finally: - if os.path.exists(tmp_file.name): - os.unlink(tmp_file.name) + # Add connector-specific metadata + result.update({ + "source_url": document.source_url, + "document_id": document.id, + }) file_task.status = TaskStatus.COMPLETED file_task.result = result @@ -379,39 +377,37 @@ class LangflowConnectorFileProcessor(TaskProcessor): raise ValueError("user_id not provided to LangflowConnectorFileProcessor") # Create temporary file and compute hash to check for duplicates + from utils.file_utils import auto_cleanup_tempfile + suffix = self.langflow_connector_service._get_file_extension(document.mimetype) - with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: - tmp_file.write(document.content) - tmp_file.flush() + with auto_cleanup_tempfile(suffix=suffix) as tmp_path: + # Write content to temp file + with open(tmp_path, 'wb') as f: + f.write(document.content) - try: - # Compute hash and check if already exists - file_hash = hash_id(tmp_file.name) + # Compute hash and check if already exists + file_hash = hash_id(tmp_path) - # Check if document already exists - opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client( - self.user_id, self.jwt_token - ) - if await self.check_document_exists(file_hash, opensearch_client): - file_task.status = TaskStatus.COMPLETED - file_task.result = {"status": "unchanged", "id": file_hash} - file_task.updated_at = time.time() - upload_task.successful_files += 1 - return + # Check if document already exists + opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client( + self.user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return - # Process using Langflow pipeline - result = await self.langflow_connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) - - finally: - if os.path.exists(tmp_file.name): - os.unlink(tmp_file.name) + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) file_task.status = TaskStatus.COMPLETED file_task.result = result @@ -466,48 +462,48 @@ class S3FileProcessor(TaskProcessor): file_task.status = TaskStatus.RUNNING file_task.updated_at = time.time() - tmp = tempfile.NamedTemporaryFile(delete=False) + from utils.file_utils import auto_cleanup_tempfile + from utils.hash_utils import hash_id + try: - # Download object to temporary file - self.s3_client.download_fileobj(self.bucket, item, tmp) - tmp.flush() + with auto_cleanup_tempfile() as tmp_path: + # Download object to temporary file + with open(tmp_path, 'wb') as tmp_file: + self.s3_client.download_fileobj(self.bucket, item, tmp_file) - # Compute hash - from utils.hash_utils import hash_id - file_hash = hash_id(tmp.name) + # Compute hash + file_hash = hash_id(tmp_path) - # Get object size - try: - obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item) - file_size = obj_info.get("ContentLength", 0) - except Exception: - file_size = 0 + # Get object size + try: + obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item) + file_size = obj_info.get("ContentLength", 0) + except Exception: + file_size = 0 - # Use consolidated standard processing - result = await self.process_document_standard( - file_path=tmp.name, - file_hash=file_hash, - owner_user_id=self.owner_user_id, - original_filename=item, # Use S3 key as filename - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - file_size=file_size, - connector_type="s3", - ) + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_path, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=item, # Use S3 key as filename + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="s3", + ) - result["path"] = f"s3://{self.bucket}/{item}" - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + result["path"] = f"s3://{self.bucket}/{item}" + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 except Exception as e: file_task.status = TaskStatus.FAILED file_task.error = str(e) upload_task.failed_files += 1 finally: - if os.path.exists(tmp.name): - os.unlink(tmp.name) file_task.updated_at = time.time() diff --git a/src/services/document_service.py b/src/services/document_service.py index fb52a89c..5204ea0e 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -123,19 +123,21 @@ class DocumentService: ): """Process an uploaded file from form data""" from utils.hash_utils import hash_id + from utils.file_utils import auto_cleanup_tempfile import os - tmp = tempfile.NamedTemporaryFile(delete=False) - file_size = 0 - try: - while True: - chunk = await upload_file.read(1 << 20) - if not chunk: - break - tmp.write(chunk) - file_size += len(chunk) - tmp.flush() - file_hash = hash_id(tmp.name) + with auto_cleanup_tempfile() as tmp_path: + # Stream upload file to temporary file + file_size = 0 + with open(tmp_path, 'wb') as tmp_file: + while True: + chunk = await upload_file.read(1 << 20) + if not chunk: + break + tmp_file.write(chunk) + file_size += len(chunk) + + file_hash = hash_id(tmp_path) # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.session_manager.get_user_opensearch_client( owner_user_id, jwt_token @@ -149,14 +151,13 @@ class DocumentService: ) raise if exists: - os.unlink(tmp.name) # Delete temp file since we don't need it return {"status": "unchanged", "id": file_hash} # Use consolidated standard processing from models.processors import TaskProcessor processor = TaskProcessor(document_service=self) result = await processor.process_document_standard( - file_path=tmp.name, + file_path=tmp_path, file_hash=file_hash, owner_user_id=owner_user_id, original_filename=upload_file.filename, @@ -168,10 +169,6 @@ class DocumentService: ) return result - finally: - tmp.close() - os.remove(tmp.name) - async def process_upload_context(self, upload_file, filename: str = None): """Process uploaded file and return content for context""" import io diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py new file mode 100644 index 00000000..2afc4024 --- /dev/null +++ b/src/utils/file_utils.py @@ -0,0 +1,60 @@ +"""File handling utilities for OpenRAG""" + +import os +import tempfile +from contextlib import contextmanager +from typing import Optional + + +@contextmanager +def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None): + """ + Context manager for temporary files that automatically cleans up. + + Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file + on disk for the duration of the context, making it safe for async operations. + + Usage: + with auto_cleanup_tempfile(suffix=".pdf") as tmp_path: + # Write to the file + with open(tmp_path, 'wb') as f: + f.write(content) + # Use tmp_path for processing + result = await process_file(tmp_path) + # File is automatically deleted here + + Args: + suffix: Optional file suffix/extension (e.g., ".pdf") + prefix: Optional file prefix + dir: Optional directory for temp file + + Yields: + str: Path to the temporary file + """ + fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) + try: + os.close(fd) # Close the file descriptor immediately + yield path + finally: + # Always clean up, even if an exception occurred + try: + if os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore cleanup errors + pass + + +def safe_unlink(path: str) -> None: + """ + Safely delete a file, ignoring errors if it doesn't exist. + + Args: + path: Path to the file to delete + """ + try: + if path and os.path.exists(path): + os.unlink(path) + except Exception: + # Silently ignore errors + pass \ No newline at end of file