Compare commits
6 commits
main
...
fix-create
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
713ea45d56 | ||
|
|
28f9bfc820 | ||
|
|
bbf7247b49 | ||
|
|
9f3d651588 | ||
|
|
d59a9cc437 | ||
|
|
3ed0bcef3f |
3 changed files with 65 additions and 5 deletions
|
|
@ -2,13 +2,14 @@ from starlette.requests import Request
|
||||||
from starlette.responses import JSONResponse
|
from starlette.responses import JSONResponse
|
||||||
|
|
||||||
from services.langflow_file_service import LangflowFileService
|
from services.langflow_file_service import LangflowFileService
|
||||||
|
from session_manager import AnonymousUser
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def upload_user_file(
|
async def upload_user_file(
|
||||||
request: Request, langflow_file_service: LangflowFileService, session_manager
|
request: Request, langflow_file_service: LangflowFileService, task_service, session_manager
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
logger.debug("upload_user_file endpoint called")
|
logger.debug("upload_user_file endpoint called")
|
||||||
|
|
@ -33,10 +34,30 @@ async def upload_user_file(
|
||||||
jwt_token = getattr(request.state, "jwt_token", None)
|
jwt_token = getattr(request.state, "jwt_token", None)
|
||||||
logger.debug("JWT token status", jwt_present=jwt_token is not None)
|
logger.debug("JWT token status", jwt_present=jwt_token is not None)
|
||||||
|
|
||||||
logger.debug("Calling langflow_file_service.upload_user_file")
|
# Get user info for task management
|
||||||
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
|
user = getattr(request.state, "user", None)
|
||||||
logger.debug("Upload successful", result=result)
|
user_id = getattr(user, "user_id", AnonymousUser().user_id)
|
||||||
return JSONResponse(result, status_code=201)
|
|
||||||
|
# Create processor for Langflow file upload
|
||||||
|
from models.processors import LangflowFileProcessor
|
||||||
|
processor = LangflowFileProcessor(
|
||||||
|
langflow_file_service=langflow_file_service,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create task for file upload
|
||||||
|
logger.debug("Creating task for langflow file upload")
|
||||||
|
task_id = await task_service.create_custom_task(
|
||||||
|
user_id, [file_tuple], processor
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Task created successfully", task_id=task_id)
|
||||||
|
return JSONResponse({
|
||||||
|
"task_id": task_id,
|
||||||
|
"total_files": 1,
|
||||||
|
"status": "accepted"
|
||||||
|
}, status_code=201)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"upload_user_file endpoint failed",
|
"upload_user_file endpoint failed",
|
||||||
|
|
|
||||||
|
|
@ -381,6 +381,7 @@ async def create_app():
|
||||||
partial(
|
partial(
|
||||||
langflow_files.upload_user_file,
|
langflow_files.upload_user_file,
|
||||||
langflow_file_service=services["langflow_file_service"],
|
langflow_file_service=services["langflow_file_service"],
|
||||||
|
task_service=services["task_service"],
|
||||||
session_manager=services["session_manager"],
|
session_manager=services["session_manager"],
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
|
||||||
|
|
@ -198,6 +198,44 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
||||||
upload_task.successful_files += 1
|
upload_task.successful_files += 1
|
||||||
|
|
||||||
|
|
||||||
|
class LangflowFileProcessor(TaskProcessor):
|
||||||
|
"""Processor for files uploaded via Langflow API"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
langflow_file_service,
|
||||||
|
jwt_token: str = None,
|
||||||
|
):
|
||||||
|
self.langflow_file_service = langflow_file_service
|
||||||
|
self.jwt_token = jwt_token
|
||||||
|
|
||||||
|
async def process_item(
|
||||||
|
self, upload_task: UploadTask, item: tuple, file_task: FileTask
|
||||||
|
) -> None:
|
||||||
|
"""Process a file upload via Langflow API"""
|
||||||
|
from models.tasks import TaskStatus
|
||||||
|
import time
|
||||||
|
|
||||||
|
file_task.status = TaskStatus.RUNNING
|
||||||
|
file_task.updated_at = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# item is the file_tuple: (filename, content, content_type)
|
||||||
|
result = await self.langflow_file_service.upload_user_file(item, self.jwt_token)
|
||||||
|
|
||||||
|
file_task.status = TaskStatus.COMPLETED
|
||||||
|
file_task.result = result
|
||||||
|
upload_task.successful_files += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to process langflow file upload", error=str(e))
|
||||||
|
file_task.status = TaskStatus.FAILED
|
||||||
|
file_task.error = str(e)
|
||||||
|
upload_task.failed_files += 1
|
||||||
|
finally:
|
||||||
|
file_task.updated_at = time.time()
|
||||||
|
|
||||||
|
|
||||||
class S3FileProcessor(TaskProcessor):
|
class S3FileProcessor(TaskProcessor):
|
||||||
"""Processor for files stored in S3 buckets"""
|
"""Processor for files stored in S3 buckets"""
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue