Enhance ingestion flow by adding user metadata and improving context handling
This commit updates the ingestion flow to include user metadata such as owner ID, name, and email, enhancing the context for downstream services. It also refines the handling of tweaks in the LangflowFileService to incorporate this metadata, ensuring better tracking and clarity in the ingestion process. These changes align with best practices for robust async development and improve the overall functionality of the ingestion flow.
This commit is contained in:
parent
a6beb4a4aa
commit
0539f7751a
3 changed files with 42 additions and 2 deletions
|
|
@ -99,11 +99,17 @@ async def run_ingestion(
|
|||
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
|
||||
# Include user JWT if available
|
||||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
|
||||
# Extract user info from User object
|
||||
user = getattr(request.state, "user", None)
|
||||
user_id = user.user_id if user else None
|
||||
user_name = user.name if user else None
|
||||
user_email = user.email if user else None
|
||||
|
||||
if jwt_token:
|
||||
# Set auth context for downstream services
|
||||
from auth_context import set_auth_context
|
||||
|
||||
user_id = getattr(request.state, "user_id", None)
|
||||
set_auth_context(user_id, jwt_token)
|
||||
|
||||
result = await langflow_file_service.run_ingestion_flow(
|
||||
|
|
@ -111,6 +117,10 @@ async def run_ingestion(
|
|||
jwt_token=jwt_token,
|
||||
session_id=session_id,
|
||||
tweaks=tweaks,
|
||||
owner=user_id,
|
||||
owner_name=user_name,
|
||||
owner_email=user_email,
|
||||
connector_type="local",
|
||||
)
|
||||
return JSONResponse(result)
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -91,7 +91,13 @@ class LangflowConnectorService:
|
|||
tweaks = {} # Let Langflow handle the ingestion with default settings
|
||||
|
||||
ingestion_result = await self.langflow_service.run_ingestion_flow(
|
||||
file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks
|
||||
file_paths=[langflow_file_path],
|
||||
jwt_token=jwt_token,
|
||||
tweaks=tweaks,
|
||||
owner=owner_user_id,
|
||||
owner_name=owner_name,
|
||||
owner_email=owner_email,
|
||||
connector_type=connector_type,
|
||||
)
|
||||
|
||||
logger.debug("Ingestion flow completed", result=ingestion_result)
|
||||
|
|
|
|||
|
|
@ -63,6 +63,10 @@ class LangflowFileService:
|
|||
jwt_token: str,
|
||||
session_id: Optional[str] = None,
|
||||
tweaks: Optional[Dict[str, Any]] = None,
|
||||
owner: Optional[str] = None,
|
||||
owner_name: Optional[str] = None,
|
||||
owner_email: Optional[str] = None,
|
||||
connector_type: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Trigger the ingestion flow with provided file paths.
|
||||
|
|
@ -91,6 +95,26 @@ class LangflowFileService:
|
|||
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
|
||||
else:
|
||||
logger.warning("[LF] No JWT token provided")
|
||||
|
||||
# Pass metadata via tweaks to OpenSearch component
|
||||
metadata_tweaks = []
|
||||
if owner:
|
||||
metadata_tweaks.append({"key": "owner", "value": owner})
|
||||
if owner_name:
|
||||
metadata_tweaks.append({"key": "owner_name", "value": owner_name})
|
||||
if owner_email:
|
||||
metadata_tweaks.append({"key": "owner_email", "value": owner_email})
|
||||
if connector_type:
|
||||
metadata_tweaks.append({"key": "connector_type", "value": connector_type})
|
||||
|
||||
if metadata_tweaks:
|
||||
# Initialize the OpenSearch component tweaks if not already present
|
||||
if "OpenSearchHybrid-Ve6bS" not in tweaks:
|
||||
tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
||||
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
||||
logger.debug(
|
||||
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
|
||||
)
|
||||
if tweaks:
|
||||
payload["tweaks"] = tweaks
|
||||
if session_id:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue