Merge branch 'ingestion-flow' into fix-create-task-service
This commit is contained in:
commit
bbf7247b49
6 changed files with 117 additions and 21 deletions
|
|
@ -54,6 +54,7 @@ services:
|
|||
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
|
||||
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
||||
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
||||
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
||||
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
||||
- OPENSEARCH_PORT=9200
|
||||
- OPENSEARCH_USERNAME=admin
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ services:
|
|||
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
|
||||
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
||||
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
||||
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
||||
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
||||
- OPENSEARCH_PORT=9200
|
||||
- OPENSEARCH_USERNAME=admin
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -120,11 +120,17 @@ async def run_ingestion(
|
|||
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
|
||||
# Include user JWT if available
|
||||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
|
||||
# Extract user info from User object
|
||||
user = getattr(request.state, "user", None)
|
||||
user_id = user.user_id if user else None
|
||||
user_name = user.name if user else None
|
||||
user_email = user.email if user else None
|
||||
|
||||
if jwt_token:
|
||||
# Set auth context for downstream services
|
||||
from auth_context import set_auth_context
|
||||
|
||||
user_id = getattr(request.state, "user_id", None)
|
||||
set_auth_context(user_id, jwt_token)
|
||||
|
||||
result = await langflow_file_service.run_ingestion_flow(
|
||||
|
|
@ -132,6 +138,10 @@ async def run_ingestion(
|
|||
jwt_token=jwt_token,
|
||||
session_id=session_id,
|
||||
tweaks=tweaks,
|
||||
owner=user_id,
|
||||
owner_name=user_name,
|
||||
owner_email=user_email,
|
||||
connector_type="local",
|
||||
)
|
||||
return JSONResponse(result)
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -91,7 +91,13 @@ class LangflowConnectorService:
|
|||
tweaks = {} # Let Langflow handle the ingestion with default settings
|
||||
|
||||
ingestion_result = await self.langflow_service.run_ingestion_flow(
|
||||
file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks
|
||||
file_paths=[langflow_file_path],
|
||||
jwt_token=jwt_token,
|
||||
tweaks=tweaks,
|
||||
owner=owner_user_id,
|
||||
owner_name=owner_name,
|
||||
owner_email=owner_email,
|
||||
connector_type=connector_type,
|
||||
)
|
||||
|
||||
logger.debug("Ingestion flow completed", result=ingestion_result)
|
||||
|
|
|
|||
|
|
@ -63,6 +63,10 @@ class LangflowFileService:
|
|||
jwt_token: str,
|
||||
session_id: Optional[str] = None,
|
||||
tweaks: Optional[Dict[str, Any]] = None,
|
||||
owner: Optional[str] = None,
|
||||
owner_name: Optional[str] = None,
|
||||
owner_email: Optional[str] = None,
|
||||
connector_type: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Trigger the ingestion flow with provided file paths.
|
||||
|
|
@ -91,6 +95,26 @@ class LangflowFileService:
|
|||
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
|
||||
else:
|
||||
logger.warning("[LF] No JWT token provided")
|
||||
|
||||
# Pass metadata via tweaks to OpenSearch component
|
||||
metadata_tweaks = []
|
||||
if owner:
|
||||
metadata_tweaks.append({"key": "owner", "value": owner})
|
||||
if owner_name:
|
||||
metadata_tweaks.append({"key": "owner_name", "value": owner_name})
|
||||
if owner_email:
|
||||
metadata_tweaks.append({"key": "owner_email", "value": owner_email})
|
||||
if connector_type:
|
||||
metadata_tweaks.append({"key": "connector_type", "value": connector_type})
|
||||
|
||||
if metadata_tweaks:
|
||||
# Initialize the OpenSearch component tweaks if not already present
|
||||
if "OpenSearchHybrid-Ve6bS" not in tweaks:
|
||||
tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
||||
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
||||
logger.debug(
|
||||
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
|
||||
)
|
||||
if tweaks:
|
||||
payload["tweaks"] = tweaks
|
||||
if session_id:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue