From 0539f7751aa40e4b7a6b7f1d65edd0188c8498c4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 23:49:46 -0300 Subject: [PATCH] Enhance ingestion flow by adding user metadata and improving context handling This commit updates the ingestion flow to include user metadata such as owner ID, name, and email, enhancing the context for downstream services. It also refines the handling of tweaks in the LangflowFileService to incorporate this metadata, ensuring better tracking and clarity in the ingestion process. These changes align with best practices for robust async development and improve the overall functionality of the ingestion flow. --- src/api/langflow_files.py | 12 +++++++++- src/connectors/langflow_connector_service.py | 8 ++++++- src/services/langflow_file_service.py | 24 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 23bd33ec..36deafbd 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -99,11 +99,17 @@ async def run_ingestion( logger.debug("Final tweaks with settings applied", tweaks=tweaks) # Include user JWT if available jwt_token = getattr(request.state, "jwt_token", None) + + # Extract user info from User object + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + if jwt_token: # Set auth context for downstream services from auth_context import set_auth_context - user_id = getattr(request.state, "user_id", None) set_auth_context(user_id, jwt_token) result = await langflow_file_service.run_ingestion_flow( @@ -111,6 +117,10 @@ async def run_ingestion( jwt_token=jwt_token, session_id=session_id, tweaks=tweaks, + owner=user_id, + owner_name=user_name, + owner_email=user_email, + connector_type="local", ) return JSONResponse(result) except Exception as e: diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index 52f0a6b8..eda9d1fd 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -91,7 +91,13 @@ class LangflowConnectorService: tweaks = {} # Let Langflow handle the ingestion with default settings ingestion_result = await self.langflow_service.run_ingestion_flow( - file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks + file_paths=[langflow_file_path], + jwt_token=jwt_token, + tweaks=tweaks, + owner=owner_user_id, + owner_name=owner_name, + owner_email=owner_email, + connector_type=connector_type, ) logger.debug("Ingestion flow completed", result=ingestion_result) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 9d582989..aab128bb 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -63,6 +63,10 @@ class LangflowFileService: jwt_token: str, session_id: Optional[str] = None, tweaks: Optional[Dict[str, Any]] = None, + owner: Optional[str] = None, + owner_name: Optional[str] = None, + owner_email: Optional[str] = None, + connector_type: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger the ingestion flow with provided file paths. @@ -91,6 +95,26 @@ class LangflowFileService: logger.debug("[LF] Added JWT token to tweaks for OpenSearch components") else: logger.warning("[LF] No JWT token provided") + + # Pass metadata via tweaks to OpenSearch component + metadata_tweaks = [] + if owner: + metadata_tweaks.append({"key": "owner", "value": owner}) + if owner_name: + metadata_tweaks.append({"key": "owner_name", "value": owner_name}) + if owner_email: + metadata_tweaks.append({"key": "owner_email", "value": owner_email}) + if connector_type: + metadata_tweaks.append({"key": "connector_type", "value": connector_type}) + + if metadata_tweaks: + # Initialize the OpenSearch component tweaks if not already present + if "OpenSearchHybrid-Ve6bS" not in tweaks: + tweaks["OpenSearchHybrid-Ve6bS"] = {} + tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + logger.debug( + "[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks) + ) if tweaks: payload["tweaks"] = tweaks if session_id: