From 163d31384922bf8d0ac291d215d9526346445e4e Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 16 Oct 2025 20:52:44 -0400 Subject: [PATCH] ingest should use task tracker --- src/main.py | 123 +++++++++++++++++++--------------------------------- 1 file changed, 45 insertions(+), 78 deletions(-) diff --git a/src/main.py b/src/main.py index 45cd05f9..0fed1ee8 100644 --- a/src/main.py +++ b/src/main.py @@ -335,97 +335,64 @@ async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] + task_service = services["task_service"] logger.info( "Using Langflow ingestion pipeline for default documents", file_count=len(file_paths), ) - success_count = 0 - error_count = 0 + # Use AnonymousUser details for default documents + from session_manager import AnonymousUser - for file_path in file_paths: - try: - logger.debug("Processing file with Langflow pipeline", file_path=file_path) + anonymous_user = AnonymousUser() - # Read file content - with open(file_path, "rb") as f: - content = f.read() + # Get JWT token using same logic as DocumentFileProcessor + # This will handle anonymous JWT creation if needed for anonymous user + effective_jwt = None - # Create file tuple for upload - filename = os.path.basename(file_path) - # Determine content type based on file extension - content_type, _ = mimetypes.guess_type(filename) - if not content_type: - content_type = "application/octet-stream" + # Let session manager handle anonymous JWT creation if needed + if session_manager: + # This call will create anonymous JWT if needed (same as DocumentFileProcessor) + session_manager.get_user_opensearch_client( + anonymous_user.user_id, effective_jwt + ) + # Get the JWT that was created by session manager + if hasattr(session_manager, "_anonymous_jwt"): + effective_jwt = session_manager._anonymous_jwt - file_tuple = (filename, content, content_type) + # Prepare tweaks for default documents with anonymous user metadata + default_tweaks = { + "OpenSearchHybrid-Ve6bS": { + "docs_metadata": [ + {"key": "owner", "value": None}, + {"key": "owner_name", "value": anonymous_user.name}, + {"key": "owner_email", "value": anonymous_user.email}, + {"key": "connector_type", "value": "system_default"}, + ] + } + } - # Use AnonymousUser details for default documents - from session_manager import AnonymousUser - - anonymous_user = AnonymousUser() - - # Get JWT token using same logic as DocumentFileProcessor - # This will handle anonymous JWT creation if needed for anonymous user - effective_jwt = None - - # Let session manager handle anonymous JWT creation if needed - if session_manager: - # This call will create anonymous JWT if needed (same as DocumentFileProcessor) - session_manager.get_user_opensearch_client( - anonymous_user.user_id, effective_jwt - ) - # Get the JWT that was created by session manager - if hasattr(session_manager, "_anonymous_jwt"): - effective_jwt = session_manager._anonymous_jwt - - # Prepare tweaks for default documents with anonymous user metadata - default_tweaks = { - "OpenSearchHybrid-Ve6bS": { - "docs_metadata": [ - {"key": "owner", "value": None}, - {"key": "owner_name", "value": anonymous_user.name}, - {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"}, - ] - } - } - - # Use langflow upload_and_ingest_file method with JWT token - result = await langflow_file_service.upload_and_ingest_file( - file_tuple=file_tuple, - session_id=None, # No session for default documents - tweaks=default_tweaks, # Add anonymous user metadata - settings=None, # Use default ingestion settings - jwt_token=effective_jwt, # Use JWT token (anonymous if needed) - delete_after_ingest=True, # Clean up after ingestion - owner=None, - owner_name=anonymous_user.name, - owner_email=anonymous_user.email, - connector_type="system_default", - ) - - logger.info( - "Successfully ingested file via Langflow", - file_path=file_path, - result_status=result.get("status"), - ) - success_count += 1 - - except Exception as e: - logger.error( - "Failed to ingest file via Langflow", - file_path=file_path, - error=str(e), - ) - error_count += 1 + # Create a langflow upload task for trackable progress + task_id = await task_service.create_langflow_upload_task( + user_id=None, # Anonymous user + file_paths=file_paths, + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=effective_jwt, + owner_name=anonymous_user.name, + owner_email=anonymous_user.email, + session_id=None, # No session for default documents + tweaks=default_tweaks, + settings=None, # Use default ingestion settings + delete_after_ingest=True, # Clean up after ingestion + replace_duplicates=False, + ) logger.info( - "Langflow ingestion completed", - success_count=success_count, - error_count=error_count, - total_files=len(file_paths), + "Started Langflow ingestion task for default documents", + task_id=task_id, + file_count=len(file_paths), )