diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 23bd33ec..36deafbd 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -99,11 +99,17 @@ async def run_ingestion( logger.debug("Final tweaks with settings applied", tweaks=tweaks) # Include user JWT if available jwt_token = getattr(request.state, "jwt_token", None) + + # Extract user info from User object + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + if jwt_token: # Set auth context for downstream services from auth_context import set_auth_context - user_id = getattr(request.state, "user_id", None) set_auth_context(user_id, jwt_token) result = await langflow_file_service.run_ingestion_flow( @@ -111,6 +117,10 @@ async def run_ingestion( jwt_token=jwt_token, session_id=session_id, tweaks=tweaks, + owner=user_id, + owner_name=user_name, + owner_email=user_email, + connector_type="local", ) return JSONResponse(result) except Exception as e: diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index 52f0a6b8..eda9d1fd 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -91,7 +91,13 @@ class LangflowConnectorService: tweaks = {} # Let Langflow handle the ingestion with default settings ingestion_result = await self.langflow_service.run_ingestion_flow( - file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks + file_paths=[langflow_file_path], + jwt_token=jwt_token, + tweaks=tweaks, + owner=owner_user_id, + owner_name=owner_name, + owner_email=owner_email, + connector_type=connector_type, ) logger.debug("Ingestion flow completed", result=ingestion_result) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 9d582989..aab128bb 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -63,6 +63,10 @@ class LangflowFileService: jwt_token: str, session_id: Optional[str] = None, tweaks: Optional[Dict[str, Any]] = None, + owner: Optional[str] = None, + owner_name: Optional[str] = None, + owner_email: Optional[str] = None, + connector_type: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger the ingestion flow with provided file paths. @@ -91,6 +95,26 @@ class LangflowFileService: logger.debug("[LF] Added JWT token to tweaks for OpenSearch components") else: logger.warning("[LF] No JWT token provided") + + # Pass metadata via tweaks to OpenSearch component + metadata_tweaks = [] + if owner: + metadata_tweaks.append({"key": "owner", "value": owner}) + if owner_name: + metadata_tweaks.append({"key": "owner_name", "value": owner_name}) + if owner_email: + metadata_tweaks.append({"key": "owner_email", "value": owner_email}) + if connector_type: + metadata_tweaks.append({"key": "connector_type", "value": connector_type}) + + if metadata_tweaks: + # Initialize the OpenSearch component tweaks if not already present + if "OpenSearchHybrid-Ve6bS" not in tweaks: + tweaks["OpenSearchHybrid-Ve6bS"] = {} + tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + logger.debug( + "[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks) + ) if tweaks: payload["tweaks"] = tweaks if session_id: