update to flow

This commit is contained in:
Edwin Jose 2025-09-25 15:39:42 -04:00
parent f710ae2137
commit 425b673e14
9 changed files with 1075 additions and 843 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -278,4 +278,4 @@ async def delete_user_files(
status_code=status,
)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -140,6 +140,14 @@ async def langflow_upload_ingest_task(
)
# Create langflow upload task
print(f"tweaks: {tweaks}")
print(f"settings: {settings}")
print(f"jwt_token: {jwt_token}")
print(f"user_name: {user_name}")
print(f"user_email: {user_email}")
print(f"session_id: {session_id}")
print(f"delete_after_ingest: {delete_after_ingest}")
print(f"temp_file_paths: {temp_file_paths}")
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=temp_file_paths,

View file

@ -317,7 +317,7 @@ async def _ingest_default_documents_langflow(services, file_paths):
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchVectorStoreComponent-YnJox": {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
@ -392,8 +392,6 @@ async def startup_tasks(services):
"""Startup tasks"""
logger.info("Starting startup tasks")
await init_index()
# Sample data ingestion is now handled by the onboarding endpoint when sample_data=True
logger.info("Sample data ingestion moved to onboarding endpoint")
async def initialize_services():
@ -786,6 +784,18 @@ async def create_app():
),
methods=["GET"],
),
# Session deletion endpoint
Route(
"/sessions/{session_id}",
require_auth(services["session_manager"])(
partial(
chat.delete_session_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["DELETE"],
),
# Authentication endpoints
Route(
"/auth/init",
@ -927,7 +937,8 @@ async def create_app():
"/settings",
require_auth(services["session_manager"])(
partial(
settings.update_settings, session_manager=services["session_manager"]
settings.update_settings,
session_manager=services["session_manager"],
)
),
methods=["POST"],
@ -939,7 +950,7 @@ async def create_app():
partial(
models.get_openai_models,
models_service=services["models_service"],
session_manager=services["session_manager"]
session_manager=services["session_manager"],
)
),
methods=["GET"],
@ -950,7 +961,7 @@ async def create_app():
partial(
models.get_ollama_models,
models_service=services["models_service"],
session_manager=services["session_manager"]
session_manager=services["session_manager"],
)
),
methods=["GET"],
@ -961,7 +972,7 @@ async def create_app():
partial(
models.get_ibm_models,
models_service=services["models_service"],
session_manager=services["session_manager"]
session_manager=services["session_manager"],
)
),
methods=["GET", "POST"],
@ -970,10 +981,7 @@ async def create_app():
Route(
"/onboarding",
require_auth(services["session_manager"])(
partial(
settings.onboarding,
flows_service=services["flows_service"]
)
partial(settings.onboarding, flows_service=services["flows_service"])
),
methods=["POST"],
),
@ -983,7 +991,7 @@ async def create_app():
require_auth(services["session_manager"])(
partial(
settings.update_docling_preset,
session_manager=services["session_manager"]
session_manager=services["session_manager"],
)
),
methods=["PATCH"],
@ -1134,4 +1142,4 @@ if __name__ == "__main__":
host="0.0.0.0",
port=8000,
reload=False, # Disable reload since we're running from main
)
)

View file

@ -411,9 +411,9 @@ class LangflowFileProcessor(TaskProcessor):
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchVectorStoreComponent-YnJox" not in final_tweaks:
final_tweaks["OpenSearchVectorStoreComponent-YnJox"] = {}
final_tweaks["OpenSearchVectorStoreComponent-YnJox"]["docs_metadata"] = metadata_tweaks
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file(
@ -437,4 +437,4 @@ class LangflowFileProcessor(TaskProcessor):
file_task.error_message = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
raise

View file

@ -84,16 +84,14 @@ class LangflowFileService:
if not tweaks:
tweaks = {}
# Pass files via tweaks to File component (File-5NMSr from the flow)
# Pass files via tweaks to File component (File-PSU37 from the flow)
if file_paths:
tweaks["File-5NMSr"] = {"path": file_paths}
tweaks["File-PSU37"] = {"path": file_paths}
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components
tweaks["OpenSearchVectorStoreComponent-YnJox"] = {
"jwt_token": {"value": jwt_token, "load_from_db": False},
}
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
else:
logger.warning("[LF] No JWT token provided")
@ -111,16 +109,14 @@ class LangflowFileService:
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchVectorStoreComponent-YnJox" not in tweaks:
tweaks["OpenSearchVectorStoreComponent-YnJox"] = {}
tweaks["OpenSearchVectorStoreComponent-YnJox"]["docs_metadata"] = (
metadata_tweaks
)
if "OpenSearchHybrid-Ve6bS" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
)
if tweaks:
payload["tweaks"] = tweaks
# if tweaks:
# payload["tweaks"] = tweaks
if session_id:
payload["session_id"] = session_id
@ -136,7 +132,16 @@ class LangflowFileService:
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
resp = await clients.langflow_request(
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
"POST",
f"/api/v1/run/{self.flow_id_ingest}",
json=payload,
headers={
"X-Langflow-Global-Var-JWT": jwt_token,
"X-Langflow-Global-Var-Owner": owner,
"X-Langflow-Global-Var-Owner-Name": owner_name,
"X-Langflow-Global-Var-Owner-Email": owner_email,
"X-Langflow-Global-Var-Connector-Type": connector_type,
},
)
logger.debug(
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
@ -215,24 +220,24 @@ class LangflowFileService:
"[LF] Applying ingestion settings", extra={"settings": settings}
)
# Split Text component tweaks (SplitText-PC36h)
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-PC36h" not in final_tweaks:
final_tweaks["SplitText-PC36h"] = {}
if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-PC36h"]["chunk_size"] = settings[
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings[
"chunkSize"
]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-PC36h"]["chunk_overlap"] = settings[
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
final_tweaks["SplitText-PC36h"]["separator"] = settings["separator"]
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):