Compare commits
6 commits
main
...
fix-create
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
713ea45d56 | ||
|
|
28f9bfc820 | ||
|
|
bbf7247b49 | ||
|
|
9f3d651588 | ||
|
|
d59a9cc437 | ||
|
|
3ed0bcef3f |
3 changed files with 65 additions and 5 deletions
|
|
@ -2,13 +2,14 @@ from starlette.requests import Request
|
|||
from starlette.responses import JSONResponse
|
||||
|
||||
from services.langflow_file_service import LangflowFileService
|
||||
from session_manager import AnonymousUser
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def upload_user_file(
|
||||
request: Request, langflow_file_service: LangflowFileService, session_manager
|
||||
request: Request, langflow_file_service: LangflowFileService, task_service, session_manager
|
||||
):
|
||||
try:
|
||||
logger.debug("upload_user_file endpoint called")
|
||||
|
|
@ -33,10 +34,30 @@ async def upload_user_file(
|
|||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
logger.debug("JWT token status", jwt_present=jwt_token is not None)
|
||||
|
||||
logger.debug("Calling langflow_file_service.upload_user_file")
|
||||
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
|
||||
logger.debug("Upload successful", result=result)
|
||||
return JSONResponse(result, status_code=201)
|
||||
# Get user info for task management
|
||||
user = getattr(request.state, "user", None)
|
||||
user_id = getattr(user, "user_id", AnonymousUser().user_id)
|
||||
|
||||
# Create processor for Langflow file upload
|
||||
from models.processors import LangflowFileProcessor
|
||||
processor = LangflowFileProcessor(
|
||||
langflow_file_service=langflow_file_service,
|
||||
jwt_token=jwt_token,
|
||||
)
|
||||
|
||||
# Create task for file upload
|
||||
logger.debug("Creating task for langflow file upload")
|
||||
task_id = await task_service.create_custom_task(
|
||||
user_id, [file_tuple], processor
|
||||
)
|
||||
|
||||
logger.debug("Task created successfully", task_id=task_id)
|
||||
return JSONResponse({
|
||||
"task_id": task_id,
|
||||
"total_files": 1,
|
||||
"status": "accepted"
|
||||
}, status_code=201)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"upload_user_file endpoint failed",
|
||||
|
|
|
|||
|
|
@ -381,6 +381,7 @@ async def create_app():
|
|||
partial(
|
||||
langflow_files.upload_user_file,
|
||||
langflow_file_service=services["langflow_file_service"],
|
||||
task_service=services["task_service"],
|
||||
session_manager=services["session_manager"],
|
||||
)
|
||||
),
|
||||
|
|
|
|||
|
|
@ -198,6 +198,44 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
|||
upload_task.successful_files += 1
|
||||
|
||||
|
||||
class LangflowFileProcessor(TaskProcessor):
|
||||
"""Processor for files uploaded via Langflow API"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
langflow_file_service,
|
||||
jwt_token: str = None,
|
||||
):
|
||||
self.langflow_file_service = langflow_file_service
|
||||
self.jwt_token = jwt_token
|
||||
|
||||
async def process_item(
|
||||
self, upload_task: UploadTask, item: tuple, file_task: FileTask
|
||||
) -> None:
|
||||
"""Process a file upload via Langflow API"""
|
||||
from models.tasks import TaskStatus
|
||||
import time
|
||||
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
try:
|
||||
# item is the file_tuple: (filename, content, content_type)
|
||||
result = await self.langflow_file_service.upload_user_file(item, self.jwt_token)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to process langflow file upload", error=str(e))
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
upload_task.failed_files += 1
|
||||
finally:
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
|
||||
class S3FileProcessor(TaskProcessor):
|
||||
"""Processor for files stored in S3 buckets"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue