create a task service
This commit is contained in:
parent
422d243638
commit
3ed0bcef3f
3 changed files with 64 additions and 5 deletions
|
|
@ -8,7 +8,7 @@ logger = get_logger(__name__)
|
|||
|
||||
|
||||
async def upload_user_file(
|
||||
request: Request, langflow_file_service: LangflowFileService, session_manager
|
||||
request: Request, langflow_file_service: LangflowFileService, task_service, session_manager
|
||||
):
|
||||
try:
|
||||
logger.debug("upload_user_file endpoint called")
|
||||
|
|
@ -33,10 +33,30 @@ async def upload_user_file(
|
|||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
logger.debug("JWT token status", jwt_present=jwt_token is not None)
|
||||
|
||||
logger.debug("Calling langflow_file_service.upload_user_file")
|
||||
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
|
||||
logger.debug("Upload successful", result=result)
|
||||
return JSONResponse(result, status_code=201)
|
||||
# Get user info for task management
|
||||
user = getattr(request.state, "user", None)
|
||||
user_id = user.user_id if user else "anonymous"
|
||||
|
||||
# Create processor for Langflow file upload
|
||||
from models.processors import LangflowFileProcessor
|
||||
processor = LangflowFileProcessor(
|
||||
langflow_file_service=langflow_file_service,
|
||||
jwt_token=jwt_token,
|
||||
)
|
||||
|
||||
# Create task for file upload
|
||||
logger.debug("Creating task for langflow file upload")
|
||||
task_id = await task_service.create_custom_task(
|
||||
user_id, [file_tuple], processor
|
||||
)
|
||||
|
||||
logger.debug("Task created successfully", task_id=task_id)
|
||||
return JSONResponse({
|
||||
"task_id": task_id,
|
||||
"total_files": 1,
|
||||
"status": "accepted"
|
||||
}, status_code=201)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"upload_user_file endpoint failed",
|
||||
|
|
|
|||
|
|
@ -381,6 +381,7 @@ async def create_app():
|
|||
partial(
|
||||
langflow_files.upload_user_file,
|
||||
langflow_file_service=services["langflow_file_service"],
|
||||
task_service=services["task_service"],
|
||||
session_manager=services["session_manager"],
|
||||
)
|
||||
),
|
||||
|
|
|
|||
|
|
@ -198,6 +198,44 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
|||
upload_task.successful_files += 1
|
||||
|
||||
|
||||
class LangflowFileProcessor(TaskProcessor):
|
||||
"""Processor for files uploaded via Langflow API"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
langflow_file_service,
|
||||
jwt_token: str = None,
|
||||
):
|
||||
self.langflow_file_service = langflow_file_service
|
||||
self.jwt_token = jwt_token
|
||||
|
||||
async def process_item(
|
||||
self, upload_task: UploadTask, item: tuple, file_task: FileTask
|
||||
) -> None:
|
||||
"""Process a file upload via Langflow API"""
|
||||
from models.tasks import TaskStatus
|
||||
import time
|
||||
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
try:
|
||||
# item is the file_tuple: (filename, content, content_type)
|
||||
result = await self.langflow_file_service.upload_user_file(item, self.jwt_token)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to process langflow file upload", error=str(e))
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
upload_task.failed_files += 1
|
||||
finally:
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
|
||||
class S3FileProcessor(TaskProcessor):
|
||||
"""Processor for files stored in S3 buckets"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue