auto cleanup temp files
This commit is contained in:
parent
cb58b2165e
commit
6612388586
7 changed files with 246 additions and 207 deletions
|
|
@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
|
|||
|
||||
except Exception:
|
||||
# Clean up temp file on error
|
||||
try:
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
except Exception:
|
||||
pass # Ignore cleanup errors
|
||||
from utils.file_utils import safe_unlink
|
||||
safe_unlink(temp_path)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -164,12 +164,9 @@ async def langflow_upload_ingest_task(
|
|||
|
||||
except Exception:
|
||||
# Clean up temp files on error
|
||||
from utils.file_utils import safe_unlink
|
||||
for temp_path in temp_file_paths:
|
||||
try:
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
except Exception:
|
||||
pass # Ignore cleanup errors
|
||||
safe_unlink(temp_path)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -53,45 +53,46 @@ class LangflowConnectorService:
|
|||
filename=document.filename,
|
||||
)
|
||||
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
|
||||
suffix = self._get_file_extension(document.mimetype)
|
||||
|
||||
# Create temporary file from document content
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False, suffix=suffix
|
||||
) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
|
||||
# Write document content to temp file
|
||||
with open(tmp_path, 'wb') as f:
|
||||
f.write(document.content)
|
||||
|
||||
# Step 1: Upload file to Langflow
|
||||
logger.debug("Uploading file to Langflow", filename=document.filename)
|
||||
content = document.content
|
||||
file_tuple = (
|
||||
document.filename.replace(" ", "_").replace("/", "_")+suffix,
|
||||
content,
|
||||
document.mimetype or "application/octet-stream",
|
||||
)
|
||||
|
||||
upload_result = await self.langflow_service.upload_user_file(
|
||||
file_tuple, jwt_token
|
||||
)
|
||||
langflow_file_id = upload_result["id"]
|
||||
langflow_file_path = upload_result["path"]
|
||||
|
||||
logger.debug(
|
||||
"File uploaded to Langflow",
|
||||
file_id=langflow_file_id,
|
||||
path=langflow_file_path,
|
||||
)
|
||||
|
||||
# Step 2: Run ingestion flow with the uploaded file
|
||||
logger.debug(
|
||||
"Running Langflow ingestion flow", file_path=langflow_file_path
|
||||
)
|
||||
|
||||
# Use the same tweaks pattern as LangflowFileService
|
||||
tweaks = {} # Let Langflow handle the ingestion with default settings
|
||||
|
||||
try:
|
||||
# Step 1: Upload file to Langflow
|
||||
logger.debug("Uploading file to Langflow", filename=document.filename)
|
||||
content = document.content
|
||||
file_tuple = (
|
||||
document.filename.replace(" ", "_").replace("/", "_")+suffix,
|
||||
content,
|
||||
document.mimetype or "application/octet-stream",
|
||||
)
|
||||
|
||||
upload_result = await self.langflow_service.upload_user_file(
|
||||
file_tuple, jwt_token
|
||||
)
|
||||
langflow_file_id = upload_result["id"]
|
||||
langflow_file_path = upload_result["path"]
|
||||
|
||||
logger.debug(
|
||||
"File uploaded to Langflow",
|
||||
file_id=langflow_file_id,
|
||||
path=langflow_file_path,
|
||||
)
|
||||
|
||||
# Step 2: Run ingestion flow with the uploaded file
|
||||
logger.debug(
|
||||
"Running Langflow ingestion flow", file_path=langflow_file_path
|
||||
)
|
||||
|
||||
# Use the same tweaks pattern as LangflowFileService
|
||||
tweaks = {} # Let Langflow handle the ingestion with default settings
|
||||
|
||||
ingestion_result = await self.langflow_service.run_ingestion_flow(
|
||||
file_paths=[langflow_file_path],
|
||||
jwt_token=jwt_token,
|
||||
|
|
@ -125,25 +126,20 @@ class LangflowConnectorService:
|
|||
error=str(e),
|
||||
)
|
||||
# Try to clean up Langflow file if upload succeeded but processing failed
|
||||
if "langflow_file_id" in locals():
|
||||
try:
|
||||
await self.langflow_service.delete_user_file(langflow_file_id)
|
||||
logger.debug(
|
||||
"Cleaned up Langflow file after error",
|
||||
file_id=langflow_file_id,
|
||||
)
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(
|
||||
"Failed to cleanup Langflow file",
|
||||
file_id=langflow_file_id,
|
||||
error=str(cleanup_error),
|
||||
)
|
||||
try:
|
||||
await self.langflow_service.delete_user_file(langflow_file_id)
|
||||
logger.debug(
|
||||
"Cleaned up Langflow file after error",
|
||||
file_id=langflow_file_id,
|
||||
)
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(
|
||||
"Failed to cleanup Langflow file",
|
||||
file_id=langflow_file_id,
|
||||
error=str(cleanup_error),
|
||||
)
|
||||
raise
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
os.unlink(tmp_file.name)
|
||||
|
||||
def _get_file_extension(self, mimetype: str) -> str:
|
||||
"""Get file extension based on MIME type"""
|
||||
mime_to_ext = {
|
||||
|
|
|
|||
|
|
@ -54,54 +54,50 @@ class ConnectorService:
|
|||
"""Process a document from a connector using existing processing pipeline"""
|
||||
|
||||
# Create temporary file from document content
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False, suffix=self._get_file_extension(document.mimetype)
|
||||
) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
|
||||
try:
|
||||
# Use existing process_file_common function with connector document metadata
|
||||
# We'll use the document service's process_file_common method
|
||||
from services.document_service import DocumentService
|
||||
with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
|
||||
# Write document content to temp file
|
||||
with open(tmp_path, 'wb') as f:
|
||||
f.write(document.content)
|
||||
|
||||
doc_service = DocumentService(session_manager=self.session_manager)
|
||||
# Use existing process_file_common function with connector document metadata
|
||||
# We'll use the document service's process_file_common method
|
||||
from services.document_service import DocumentService
|
||||
|
||||
logger.debug("Processing connector document", document_id=document.id)
|
||||
doc_service = DocumentService(session_manager=self.session_manager)
|
||||
|
||||
# Process using consolidated processing pipeline
|
||||
from models.processors import TaskProcessor
|
||||
processor = TaskProcessor(document_service=doc_service)
|
||||
result = await processor.process_document_standard(
|
||||
file_path=tmp_file.name,
|
||||
file_hash=document.id, # Use connector document ID as hash
|
||||
owner_user_id=owner_user_id,
|
||||
original_filename=document.filename, # Pass the original Google Doc title
|
||||
jwt_token=jwt_token,
|
||||
owner_name=owner_name,
|
||||
owner_email=owner_email,
|
||||
file_size=len(document.content) if document.content else 0,
|
||||
connector_type=connector_type,
|
||||
logger.debug("Processing connector document", document_id=document.id)
|
||||
|
||||
# Process using consolidated processing pipeline
|
||||
from models.processors import TaskProcessor
|
||||
processor = TaskProcessor(document_service=doc_service)
|
||||
result = await processor.process_document_standard(
|
||||
file_path=tmp_path,
|
||||
file_hash=document.id, # Use connector document ID as hash
|
||||
owner_user_id=owner_user_id,
|
||||
original_filename=document.filename, # Pass the original Google Doc title
|
||||
jwt_token=jwt_token,
|
||||
owner_name=owner_name,
|
||||
owner_email=owner_email,
|
||||
file_size=len(document.content) if document.content else 0,
|
||||
connector_type=connector_type,
|
||||
)
|
||||
|
||||
logger.debug("Document processing result", result=result)
|
||||
|
||||
# If successfully indexed or already exists, update the indexed documents with connector metadata
|
||||
if result["status"] in ["indexed", "unchanged"]:
|
||||
# Update all chunks with connector-specific metadata
|
||||
await self._update_connector_metadata(
|
||||
document, owner_user_id, connector_type, jwt_token
|
||||
)
|
||||
|
||||
logger.debug("Document processing result", result=result)
|
||||
|
||||
# If successfully indexed or already exists, update the indexed documents with connector metadata
|
||||
if result["status"] in ["indexed", "unchanged"]:
|
||||
# Update all chunks with connector-specific metadata
|
||||
await self._update_connector_metadata(
|
||||
document, owner_user_id, connector_type, jwt_token
|
||||
)
|
||||
|
||||
return {
|
||||
**result,
|
||||
"filename": document.filename,
|
||||
"source_url": document.source_url,
|
||||
}
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
os.unlink(tmp_file.name)
|
||||
return {
|
||||
**result,
|
||||
"filename": document.filename,
|
||||
"source_url": document.source_url,
|
||||
}
|
||||
|
||||
async def _update_connector_metadata(
|
||||
self,
|
||||
|
|
|
|||
|
|
@ -277,37 +277,35 @@ class ConnectorFileProcessor(TaskProcessor):
|
|||
raise ValueError("user_id not provided to ConnectorFileProcessor")
|
||||
|
||||
# Create temporary file from document content
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
|
||||
suffix = self.connector_service._get_file_extension(document.mimetype)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
|
||||
# Write content to temp file
|
||||
with open(tmp_path, 'wb') as f:
|
||||
f.write(document.content)
|
||||
|
||||
try:
|
||||
# Compute hash
|
||||
file_hash = hash_id(tmp_file.name)
|
||||
# Compute hash
|
||||
file_hash = hash_id(tmp_path)
|
||||
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp_file.name,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.user_id,
|
||||
original_filename=document.filename,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=len(document.content),
|
||||
connector_type=connection.connector_type,
|
||||
)
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp_path,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.user_id,
|
||||
original_filename=document.filename,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=len(document.content),
|
||||
connector_type=connection.connector_type,
|
||||
)
|
||||
|
||||
# Add connector-specific metadata
|
||||
result.update({
|
||||
"source_url": document.source_url,
|
||||
"document_id": document.id,
|
||||
})
|
||||
|
||||
finally:
|
||||
if os.path.exists(tmp_file.name):
|
||||
os.unlink(tmp_file.name)
|
||||
# Add connector-specific metadata
|
||||
result.update({
|
||||
"source_url": document.source_url,
|
||||
"document_id": document.id,
|
||||
})
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
|
|
@ -379,39 +377,37 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
|||
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
|
||||
|
||||
# Create temporary file and compute hash to check for duplicates
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
|
||||
suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
|
||||
# Write content to temp file
|
||||
with open(tmp_path, 'wb') as f:
|
||||
f.write(document.content)
|
||||
|
||||
try:
|
||||
# Compute hash and check if already exists
|
||||
file_hash = hash_id(tmp_file.name)
|
||||
# Compute hash and check if already exists
|
||||
file_hash = hash_id(tmp_path)
|
||||
|
||||
# Check if document already exists
|
||||
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
|
||||
self.user_id, self.jwt_token
|
||||
)
|
||||
if await self.check_document_exists(file_hash, opensearch_client):
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = {"status": "unchanged", "id": file_hash}
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
return
|
||||
# Check if document already exists
|
||||
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
|
||||
self.user_id, self.jwt_token
|
||||
)
|
||||
if await self.check_document_exists(file_hash, opensearch_client):
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = {"status": "unchanged", "id": file_hash}
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
return
|
||||
|
||||
# Process using Langflow pipeline
|
||||
result = await self.langflow_connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
|
||||
finally:
|
||||
if os.path.exists(tmp_file.name):
|
||||
os.unlink(tmp_file.name)
|
||||
# Process using Langflow pipeline
|
||||
result = await self.langflow_connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
|
|
@ -466,48 +462,48 @@ class S3FileProcessor(TaskProcessor):
|
|||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False)
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
from utils.hash_utils import hash_id
|
||||
|
||||
try:
|
||||
# Download object to temporary file
|
||||
self.s3_client.download_fileobj(self.bucket, item, tmp)
|
||||
tmp.flush()
|
||||
with auto_cleanup_tempfile() as tmp_path:
|
||||
# Download object to temporary file
|
||||
with open(tmp_path, 'wb') as tmp_file:
|
||||
self.s3_client.download_fileobj(self.bucket, item, tmp_file)
|
||||
|
||||
# Compute hash
|
||||
from utils.hash_utils import hash_id
|
||||
file_hash = hash_id(tmp.name)
|
||||
# Compute hash
|
||||
file_hash = hash_id(tmp_path)
|
||||
|
||||
# Get object size
|
||||
try:
|
||||
obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item)
|
||||
file_size = obj_info.get("ContentLength", 0)
|
||||
except Exception:
|
||||
file_size = 0
|
||||
# Get object size
|
||||
try:
|
||||
obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item)
|
||||
file_size = obj_info.get("ContentLength", 0)
|
||||
except Exception:
|
||||
file_size = 0
|
||||
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp.name,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.owner_user_id,
|
||||
original_filename=item, # Use S3 key as filename
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=file_size,
|
||||
connector_type="s3",
|
||||
)
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp_path,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.owner_user_id,
|
||||
original_filename=item, # Use S3 key as filename
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=file_size,
|
||||
connector_type="s3",
|
||||
)
|
||||
|
||||
result["path"] = f"s3://{self.bucket}/{item}"
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
result["path"] = f"s3://{self.bucket}/{item}"
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
upload_task.failed_files += 1
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.unlink(tmp.name)
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -123,19 +123,21 @@ class DocumentService:
|
|||
):
|
||||
"""Process an uploaded file from form data"""
|
||||
from utils.hash_utils import hash_id
|
||||
from utils.file_utils import auto_cleanup_tempfile
|
||||
import os
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False)
|
||||
file_size = 0
|
||||
try:
|
||||
while True:
|
||||
chunk = await upload_file.read(1 << 20)
|
||||
if not chunk:
|
||||
break
|
||||
tmp.write(chunk)
|
||||
file_size += len(chunk)
|
||||
tmp.flush()
|
||||
|
||||
file_hash = hash_id(tmp.name)
|
||||
with auto_cleanup_tempfile() as tmp_path:
|
||||
# Stream upload file to temporary file
|
||||
file_size = 0
|
||||
with open(tmp_path, 'wb') as tmp_file:
|
||||
while True:
|
||||
chunk = await upload_file.read(1 << 20)
|
||||
if not chunk:
|
||||
break
|
||||
tmp_file.write(chunk)
|
||||
file_size += len(chunk)
|
||||
|
||||
file_hash = hash_id(tmp_path)
|
||||
# Get user's OpenSearch client with JWT for OIDC auth
|
||||
opensearch_client = self.session_manager.get_user_opensearch_client(
|
||||
owner_user_id, jwt_token
|
||||
|
|
@ -149,14 +151,13 @@ class DocumentService:
|
|||
)
|
||||
raise
|
||||
if exists:
|
||||
os.unlink(tmp.name) # Delete temp file since we don't need it
|
||||
return {"status": "unchanged", "id": file_hash}
|
||||
|
||||
# Use consolidated standard processing
|
||||
from models.processors import TaskProcessor
|
||||
processor = TaskProcessor(document_service=self)
|
||||
result = await processor.process_document_standard(
|
||||
file_path=tmp.name,
|
||||
file_path=tmp_path,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=owner_user_id,
|
||||
original_filename=upload_file.filename,
|
||||
|
|
@ -168,10 +169,6 @@ class DocumentService:
|
|||
)
|
||||
return result
|
||||
|
||||
finally:
|
||||
tmp.close()
|
||||
os.remove(tmp.name)
|
||||
|
||||
async def process_upload_context(self, upload_file, filename: str = None):
|
||||
"""Process uploaded file and return content for context"""
|
||||
import io
|
||||
|
|
|
|||
60
src/utils/file_utils.py
Normal file
60
src/utils/file_utils.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
"""File handling utilities for OpenRAG"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@contextmanager
|
||||
def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
|
||||
"""
|
||||
Context manager for temporary files that automatically cleans up.
|
||||
|
||||
Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
|
||||
on disk for the duration of the context, making it safe for async operations.
|
||||
|
||||
Usage:
|
||||
with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
|
||||
# Write to the file
|
||||
with open(tmp_path, 'wb') as f:
|
||||
f.write(content)
|
||||
# Use tmp_path for processing
|
||||
result = await process_file(tmp_path)
|
||||
# File is automatically deleted here
|
||||
|
||||
Args:
|
||||
suffix: Optional file suffix/extension (e.g., ".pdf")
|
||||
prefix: Optional file prefix
|
||||
dir: Optional directory for temp file
|
||||
|
||||
Yields:
|
||||
str: Path to the temporary file
|
||||
"""
|
||||
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
|
||||
try:
|
||||
os.close(fd) # Close the file descriptor immediately
|
||||
yield path
|
||||
finally:
|
||||
# Always clean up, even if an exception occurred
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
os.unlink(path)
|
||||
except Exception:
|
||||
# Silently ignore cleanup errors
|
||||
pass
|
||||
|
||||
|
||||
def safe_unlink(path: str) -> None:
|
||||
"""
|
||||
Safely delete a file, ignoring errors if it doesn't exist.
|
||||
|
||||
Args:
|
||||
path: Path to the file to delete
|
||||
"""
|
||||
try:
|
||||
if path and os.path.exists(path):
|
||||
os.unlink(path)
|
||||
except Exception:
|
||||
# Silently ignore errors
|
||||
pass
|
||||
Loading…
Add table
Reference in a new issue