Compare commits
6 commits
main
...
ingest-flo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dfb09f3b07 | ||
|
|
edb8e9d1a9 | ||
|
|
73bd0631c7 | ||
|
|
e3d3ae95f0 | ||
|
|
40e625cf96 | ||
|
|
0e8bc3090c |
6 changed files with 782 additions and 161 deletions
|
|
@ -52,11 +52,11 @@ interface Connector {
|
|||
}
|
||||
|
||||
interface SyncResult {
|
||||
processed?: number;
|
||||
added?: number;
|
||||
errors?: number;
|
||||
skipped?: number;
|
||||
total?: number;
|
||||
processed?: number;
|
||||
added?: number;
|
||||
errors?: number;
|
||||
skipped?: number;
|
||||
total?: number;
|
||||
}
|
||||
|
||||
interface Connection {
|
||||
|
|
@ -90,12 +90,24 @@ function KnowledgeSourcesPage() {
|
|||
"5488df7c-b93f-4f87-a446-b67028bc0813",
|
||||
);
|
||||
const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
|
||||
const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState<string>("");
|
||||
const [langflowIngestEditUrl, setLangflowIngestEditUrl] =
|
||||
useState<string>("");
|
||||
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
|
||||
|
||||
// Flow validation and ingestion settings state
|
||||
const [flowValidation, setFlowValidation] = useState<any>(null);
|
||||
const [settingsLoaded, setSettingsLoaded] = useState(false);
|
||||
const [ingestionSettings, setIngestionSettings] = useState({
|
||||
chunkSize: 1000,
|
||||
chunkOverlap: 200,
|
||||
separator: "\\n",
|
||||
embeddingModel: "text-embedding-3-small",
|
||||
});
|
||||
|
||||
// Fetch settings from backend
|
||||
const fetchSettings = useCallback(async () => {
|
||||
if (settingsLoaded) return; // Prevent multiple calls
|
||||
|
||||
try {
|
||||
const response = await fetch("/api/settings");
|
||||
if (response.ok) {
|
||||
|
|
@ -115,11 +127,56 @@ function KnowledgeSourcesPage() {
|
|||
if (settings.langflow_public_url) {
|
||||
setPublicLangflowUrl(settings.langflow_public_url);
|
||||
}
|
||||
|
||||
// Handle flow validation data
|
||||
if (settings.flow_validation) {
|
||||
setFlowValidation(settings.flow_validation);
|
||||
|
||||
// Update ingestion settings with flow defaults
|
||||
const availableSettings =
|
||||
settings.flow_validation.available_ui_settings || {};
|
||||
|
||||
const newSettings = { ...ingestionSettings };
|
||||
|
||||
if (
|
||||
availableSettings.chunkSize?.available &&
|
||||
availableSettings.chunkSize.param_info?.value
|
||||
) {
|
||||
newSettings.chunkSize =
|
||||
availableSettings.chunkSize.param_info.value;
|
||||
}
|
||||
if (
|
||||
availableSettings.chunkOverlap?.available &&
|
||||
availableSettings.chunkOverlap.param_info?.value
|
||||
) {
|
||||
newSettings.chunkOverlap =
|
||||
availableSettings.chunkOverlap.param_info.value;
|
||||
}
|
||||
if (
|
||||
availableSettings.separator?.available &&
|
||||
availableSettings.separator.param_info?.value
|
||||
) {
|
||||
newSettings.separator =
|
||||
availableSettings.separator.param_info.value;
|
||||
}
|
||||
if (
|
||||
availableSettings.embeddingModel?.available &&
|
||||
availableSettings.embeddingModel.param_info?.value
|
||||
) {
|
||||
newSettings.embeddingModel =
|
||||
availableSettings.embeddingModel.param_info.value;
|
||||
}
|
||||
|
||||
setIngestionSettings(newSettings);
|
||||
}
|
||||
|
||||
setSettingsLoaded(true);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Failed to fetch settings:", error);
|
||||
setSettingsLoaded(true);
|
||||
}
|
||||
}, []);
|
||||
}, [settingsLoaded]);
|
||||
|
||||
// Helper function to get connector icon
|
||||
const getConnectorIcon = (iconName: string) => {
|
||||
|
|
@ -393,11 +450,15 @@ function KnowledgeSourcesPage() {
|
|||
}
|
||||
}, [tasks, prevTasks]);
|
||||
|
||||
const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => {
|
||||
const handleEditInLangflow = (
|
||||
flowType: "chat" | "ingest",
|
||||
closeDialog: () => void,
|
||||
) => {
|
||||
// Select the appropriate flow ID and edit URL based on flow type
|
||||
const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId;
|
||||
const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
|
||||
|
||||
const editUrl =
|
||||
flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
|
||||
|
||||
const derivedFromWindow =
|
||||
typeof window !== "undefined"
|
||||
? `${window.location.protocol}//${window.location.hostname}:7860`
|
||||
|
|
@ -408,9 +469,9 @@ function KnowledgeSourcesPage() {
|
|||
"http://localhost:7860"
|
||||
).replace(/\/$/, "");
|
||||
const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base;
|
||||
|
||||
|
||||
const url = editUrl || computed;
|
||||
|
||||
|
||||
window.open(url, "_blank");
|
||||
closeDialog(); // Close immediately after opening Langflow
|
||||
};
|
||||
|
|
@ -421,6 +482,10 @@ function KnowledgeSourcesPage() {
|
|||
})
|
||||
.then((response) => response.json())
|
||||
.then(() => {
|
||||
// Reload settings after successful restore
|
||||
setSettingsLoaded(false);
|
||||
setFlowValidation(null);
|
||||
fetchSettings();
|
||||
closeDialog(); // Close after successful completion
|
||||
})
|
||||
.catch((error) => {
|
||||
|
|
@ -435,6 +500,10 @@ function KnowledgeSourcesPage() {
|
|||
})
|
||||
.then((response) => response.json())
|
||||
.then(() => {
|
||||
// Reload settings after successful restore
|
||||
setSettingsLoaded(false);
|
||||
setFlowValidation(null);
|
||||
fetchSettings();
|
||||
closeDialog(); // Close after successful completion
|
||||
})
|
||||
.catch((error) => {
|
||||
|
|
@ -493,7 +562,9 @@ function KnowledgeSourcesPage() {
|
|||
title="Edit Ingest flow in Langflow"
|
||||
description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
||||
confirmText="Proceed"
|
||||
onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)}
|
||||
onConfirm={(closeDialog) =>
|
||||
handleEditInLangflow("ingest", closeDialog)
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
|
@ -539,6 +610,144 @@ function KnowledgeSourcesPage() {
|
|||
</div>
|
||||
</div>
|
||||
</CardContent> */}
|
||||
{flowValidation &&
|
||||
Object.keys(flowValidation.available_ui_settings || {}).some(
|
||||
(key) => flowValidation.available_ui_settings[key]?.available,
|
||||
) && (
|
||||
<CardContent>
|
||||
<div className="space-y-6">
|
||||
{flowValidation.available_ui_settings?.chunkSize?.available && (
|
||||
<div className="space-y-2">
|
||||
<Label
|
||||
htmlFor="chunkSize"
|
||||
className="text-base font-medium"
|
||||
>
|
||||
{flowValidation.available_ui_settings.chunkSize.param_info
|
||||
?.display_name || "Chunk Size"}
|
||||
</Label>
|
||||
<div className="text-sm text-muted-foreground mb-3">
|
||||
{flowValidation.available_ui_settings.chunkSize.param_info
|
||||
?.info || "Maximum length of each text chunk"}
|
||||
</div>
|
||||
<Input
|
||||
id="chunkSize"
|
||||
type="number"
|
||||
value={ingestionSettings.chunkSize}
|
||||
onChange={(e) =>
|
||||
setIngestionSettings((prev) => ({
|
||||
...prev,
|
||||
chunkSize: parseInt(e.target.value) || 1000,
|
||||
}))
|
||||
}
|
||||
className="max-w-32"
|
||||
min="100"
|
||||
max="8000"
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{flowValidation.available_ui_settings?.chunkOverlap
|
||||
?.available && (
|
||||
<div className="space-y-2">
|
||||
<Label
|
||||
htmlFor="chunkOverlap"
|
||||
className="text-base font-medium"
|
||||
>
|
||||
{flowValidation.available_ui_settings.chunkOverlap
|
||||
.param_info?.display_name || "Chunk Overlap"}
|
||||
</Label>
|
||||
<div className="text-sm text-muted-foreground mb-3">
|
||||
{flowValidation.available_ui_settings.chunkOverlap
|
||||
.param_info?.info ||
|
||||
"Number of characters to overlap between chunks"}
|
||||
</div>
|
||||
<Input
|
||||
id="chunkOverlap"
|
||||
type="number"
|
||||
value={ingestionSettings.chunkOverlap}
|
||||
onChange={(e) =>
|
||||
setIngestionSettings((prev) => ({
|
||||
...prev,
|
||||
chunkOverlap: parseInt(e.target.value) || 200,
|
||||
}))
|
||||
}
|
||||
className="max-w-32"
|
||||
min="0"
|
||||
max="1000"
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{flowValidation.available_ui_settings?.separator?.available && (
|
||||
<div className="space-y-2">
|
||||
<Label
|
||||
htmlFor="separator"
|
||||
className="text-base font-medium"
|
||||
>
|
||||
{flowValidation.available_ui_settings.separator.param_info
|
||||
?.display_name || "Text Separator"}
|
||||
</Label>
|
||||
<div className="text-sm text-muted-foreground mb-3">
|
||||
{flowValidation.available_ui_settings.separator.param_info
|
||||
?.info || "Character(s) to split text on"}
|
||||
</div>
|
||||
<Input
|
||||
id="separator"
|
||||
value={ingestionSettings.separator}
|
||||
onChange={(e) =>
|
||||
setIngestionSettings((prev) => ({
|
||||
...prev,
|
||||
separator: e.target.value,
|
||||
}))
|
||||
}
|
||||
className="max-w-48"
|
||||
placeholder="\\n"
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{flowValidation.available_ui_settings?.embeddingModel
|
||||
?.available && (
|
||||
<div className="space-y-2">
|
||||
<Label
|
||||
htmlFor="embeddingModel"
|
||||
className="text-base font-medium"
|
||||
>
|
||||
{flowValidation.available_ui_settings.embeddingModel
|
||||
.param_info?.display_name || "Embedding Model"}
|
||||
</Label>
|
||||
<div className="text-sm text-muted-foreground mb-3">
|
||||
{flowValidation.available_ui_settings.embeddingModel
|
||||
.param_info?.info || "OpenAI embedding model to use"}
|
||||
</div>
|
||||
<select
|
||||
id="embeddingModel"
|
||||
value={ingestionSettings.embeddingModel}
|
||||
onChange={(e) =>
|
||||
setIngestionSettings((prev) => ({
|
||||
...prev,
|
||||
embeddingModel: e.target.value,
|
||||
}))
|
||||
}
|
||||
className="flex h-10 w-full max-w-64 rounded-md border border-input bg-background px-3 py-2 text-sm ring-offset-background file:border-0 file:bg-transparent file:text-sm file:font-medium placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50"
|
||||
>
|
||||
{flowValidation.available_ui_settings.embeddingModel.param_info?.options?.map(
|
||||
(option: string) => (
|
||||
<option key={option} value={option}>
|
||||
{option}
|
||||
</option>
|
||||
),
|
||||
) || (
|
||||
<option value="text-embedding-3-small">
|
||||
text-embedding-3-small
|
||||
</option>
|
||||
)}
|
||||
</select>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</CardContent>
|
||||
)}
|
||||
</Card>
|
||||
|
||||
{/* Agent Behavior Section */}
|
||||
|
|
@ -589,7 +798,9 @@ function KnowledgeSourcesPage() {
|
|||
title="Edit Agent flow in Langflow"
|
||||
description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
||||
confirmText="Proceed"
|
||||
onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)}
|
||||
onConfirm={(closeDialog) =>
|
||||
handleEditInLangflow("chat", closeDialog)
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -70,35 +70,33 @@ async def run_ingestion(
|
|||
if settings:
|
||||
logger.debug("Applying ingestion settings", settings=settings)
|
||||
|
||||
# Split Text component tweaks (SplitText-QIKhg)
|
||||
# Split Text component tweaks
|
||||
if (
|
||||
settings.get("chunkSize")
|
||||
or settings.get("chunkOverlap")
|
||||
or settings.get("separator")
|
||||
):
|
||||
if "SplitText-QIKhg" not in tweaks:
|
||||
tweaks["SplitText-QIKhg"] = {}
|
||||
if "Split Text" not in tweaks:
|
||||
tweaks["Split Text"] = {}
|
||||
if settings.get("chunkSize"):
|
||||
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
||||
tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
|
||||
if settings.get("chunkOverlap"):
|
||||
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
||||
"chunkOverlap"
|
||||
]
|
||||
tweaks["Split Text"]["chunk_overlap"] = settings["chunkOverlap"]
|
||||
if settings.get("separator"):
|
||||
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
|
||||
tweaks["Split Text"]["separator"] = settings["separator"]
|
||||
|
||||
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
|
||||
# OpenAI Embeddings component tweaks
|
||||
if settings.get("embeddingModel"):
|
||||
if "OpenAIEmbeddings-joRJ6" not in tweaks:
|
||||
tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
||||
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
||||
if "OpenAI Embeddings" not in tweaks:
|
||||
tweaks["OpenAI Embeddings"] = {}
|
||||
tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
|
||||
|
||||
# Note: OpenSearch component tweaks not needed for ingestion
|
||||
# (search parameters are for retrieval, not document processing)
|
||||
|
||||
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
|
||||
# Include user JWT if available
|
||||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
jwt_token: str | None = getattr(request.state, "jwt_token", None)
|
||||
|
||||
# Extract user info from User object
|
||||
user = getattr(request.state, "user", None)
|
||||
|
|
@ -128,7 +126,10 @@ async def run_ingestion(
|
|||
|
||||
|
||||
async def upload_and_ingest_user_file(
|
||||
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
|
||||
request: Request,
|
||||
langflow_file_service: LangflowFileService,
|
||||
session_manager,
|
||||
task_service,
|
||||
):
|
||||
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
|
||||
try:
|
||||
|
|
@ -148,10 +149,11 @@ async def upload_and_ingest_user_file(
|
|||
# Parse JSON fields if provided
|
||||
settings = None
|
||||
tweaks = None
|
||||
|
||||
|
||||
if settings_json:
|
||||
try:
|
||||
import json
|
||||
|
||||
settings = json.loads(settings_json)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error("Invalid settings JSON", error=str(e))
|
||||
|
|
@ -160,6 +162,7 @@ async def upload_and_ingest_user_file(
|
|||
if tweaks_json:
|
||||
try:
|
||||
import json
|
||||
|
||||
tweaks = json.loads(tweaks_json)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error("Invalid tweaks JSON", error=str(e))
|
||||
|
|
@ -173,7 +176,9 @@ async def upload_and_ingest_user_file(
|
|||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
|
||||
if not user_id:
|
||||
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
||||
return JSONResponse(
|
||||
{"error": "User authentication required"}, status_code=401
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Processing file for task-based upload and ingest",
|
||||
|
|
@ -183,28 +188,28 @@ async def upload_and_ingest_user_file(
|
|||
has_settings=bool(settings),
|
||||
has_tweaks=bool(tweaks),
|
||||
delete_after_ingest=delete_after_ingest,
|
||||
user_id=user_id
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# Create temporary file for task processing
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
import tempfile
|
||||
|
||||
# Read file content
|
||||
content = await upload_file.read()
|
||||
|
||||
|
||||
# Create temporary file
|
||||
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
|
||||
temp_fd, temp_path = tempfile.mkstemp(
|
||||
suffix=f"_{safe_filename}"
|
||||
)
|
||||
|
||||
temp_fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}")
|
||||
|
||||
try:
|
||||
# Write content to temp file
|
||||
with os.fdopen(temp_fd, 'wb') as temp_file:
|
||||
with os.fdopen(temp_fd, "wb") as temp_file:
|
||||
temp_file.write(content)
|
||||
|
||||
logger.debug("Created temporary file for task processing", temp_path=temp_path)
|
||||
logger.debug(
|
||||
"Created temporary file for task processing", temp_path=temp_path
|
||||
)
|
||||
|
||||
# Create langflow upload task for single file
|
||||
task_id = await task_service.create_langflow_upload_task(
|
||||
|
|
@ -222,12 +227,15 @@ async def upload_and_ingest_user_file(
|
|||
)
|
||||
|
||||
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||
|
||||
return JSONResponse({
|
||||
"task_id": task_id,
|
||||
"message": f"Langflow upload task created for file '{upload_file.filename}'",
|
||||
"filename": upload_file.filename
|
||||
}, status_code=202) # 202 Accepted for async processing
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"task_id": task_id,
|
||||
"message": f"Langflow upload task created for file '{upload_file.filename}'",
|
||||
"filename": upload_file.filename,
|
||||
},
|
||||
status_code=202,
|
||||
) # 202 Accepted for async processing
|
||||
|
||||
except Exception:
|
||||
# Clean up temp file on error
|
||||
|
|
@ -237,7 +245,7 @@ async def upload_and_ingest_user_file(
|
|||
except Exception:
|
||||
pass # Ignore cleanup errors
|
||||
raise
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"upload_and_ingest_user_file endpoint failed",
|
||||
|
|
@ -245,6 +253,7 @@ async def upload_and_ingest_user_file(
|
|||
error=str(e),
|
||||
)
|
||||
import traceback
|
||||
|
||||
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||
return JSONResponse({"error": str(e)}, status_code=500)
|
||||
|
||||
|
|
|
|||
|
|
@ -29,75 +29,33 @@ async def get_settings(request, session_manager):
|
|||
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
|
||||
)
|
||||
|
||||
# Fetch ingestion flow configuration to get actual component defaults
|
||||
# Fetch ingestion flow validation and available settings
|
||||
if LANGFLOW_INGEST_FLOW_ID:
|
||||
try:
|
||||
from config.settings import generate_langflow_api_key
|
||||
import httpx
|
||||
from services.langflow_file_service import LangflowFileService
|
||||
from services.flow_validation_context import set_flow_components
|
||||
|
||||
api_key = await generate_langflow_api_key()
|
||||
if api_key:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.get(
|
||||
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
|
||||
headers={"x-api-key": api_key},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
flow_data = response.json()
|
||||
langflow_service = LangflowFileService()
|
||||
|
||||
# Extract component defaults (ingestion-specific settings only)
|
||||
ingestion_defaults = {
|
||||
"chunkSize": 1000,
|
||||
"chunkOverlap": 200,
|
||||
"separator": "\\n",
|
||||
"embeddingModel": "text-embedding-3-small",
|
||||
}
|
||||
# Validate the flow and get component information
|
||||
component_info = await langflow_service.validate_ingestion_flow()
|
||||
|
||||
if flow_data.get("data", {}).get("nodes"):
|
||||
for node in flow_data["data"]["nodes"]:
|
||||
node_template = (
|
||||
node.get("data", {})
|
||||
.get("node", {})
|
||||
.get("template", {})
|
||||
)
|
||||
# Set in context for other endpoints to use
|
||||
user = getattr(request.state, "user", None)
|
||||
user_id = user.user_id if user else "anonymous"
|
||||
await set_flow_components(user_id, component_info)
|
||||
|
||||
# Split Text component (SplitText-QIKhg)
|
||||
if node.get("id") == "SplitText-QIKhg":
|
||||
if node_template.get("chunk_size", {}).get(
|
||||
"value"
|
||||
):
|
||||
ingestion_defaults["chunkSize"] = (
|
||||
node_template["chunk_size"]["value"]
|
||||
)
|
||||
if node_template.get("chunk_overlap", {}).get(
|
||||
"value"
|
||||
):
|
||||
ingestion_defaults["chunkOverlap"] = (
|
||||
node_template["chunk_overlap"]["value"]
|
||||
)
|
||||
if node_template.get("separator", {}).get(
|
||||
"value"
|
||||
):
|
||||
ingestion_defaults["separator"] = (
|
||||
node_template["separator"]["value"]
|
||||
)
|
||||
|
||||
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
|
||||
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
|
||||
if node_template.get("model", {}).get("value"):
|
||||
ingestion_defaults["embeddingModel"] = (
|
||||
node_template["model"]["value"]
|
||||
)
|
||||
|
||||
# Note: OpenSearch component settings are not exposed for ingestion
|
||||
# (search-related parameters like number_of_results, score_threshold
|
||||
# are for retrieval, not ingestion)
|
||||
|
||||
settings["ingestion_defaults"] = ingestion_defaults
|
||||
# Add flow validation results to settings
|
||||
settings["flow_validation"] = component_info.to_dict()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
|
||||
# Continue without ingestion defaults
|
||||
print(f"[WARNING] Failed to validate ingestion flow: {e}")
|
||||
# Continue without flow validation data
|
||||
settings["flow_validation"] = {
|
||||
"components": {},
|
||||
"validation": {"is_valid": False, "error": str(e)},
|
||||
"available_ui_settings": {},
|
||||
}
|
||||
|
||||
return JSONResponse(settings)
|
||||
|
||||
|
|
|
|||
|
|
@ -367,20 +367,22 @@ class LangflowFileProcessor(TaskProcessor):
|
|||
|
||||
try:
|
||||
# Read file content
|
||||
with open(item, 'rb') as f:
|
||||
with open(item, "rb") as f:
|
||||
content = f.read()
|
||||
|
||||
# Create file tuple for upload
|
||||
temp_filename = os.path.basename(item)
|
||||
# Extract original filename from temp file suffix (remove tmp prefix)
|
||||
if "_" in temp_filename:
|
||||
filename = temp_filename.split("_", 1)[1] # Get everything after first _
|
||||
filename = temp_filename.split("_", 1)[
|
||||
1
|
||||
] # Get everything after first _
|
||||
else:
|
||||
filename = temp_filename
|
||||
content_type, _ = mimetypes.guess_type(filename)
|
||||
if not content_type:
|
||||
content_type = 'application/octet-stream'
|
||||
|
||||
content_type = "application/octet-stream"
|
||||
|
||||
file_tuple = (filename, content, content_type)
|
||||
|
||||
# Get JWT token using same logic as DocumentFileProcessor
|
||||
|
|
@ -393,27 +395,29 @@ class LangflowFileProcessor(TaskProcessor):
|
|||
)
|
||||
# The session manager would have created anonymous JWT if needed
|
||||
# Get it from the session manager's internal state
|
||||
if hasattr(self.session_manager, '_anonymous_jwt'):
|
||||
if hasattr(self.session_manager, "_anonymous_jwt"):
|
||||
effective_jwt = self.session_manager._anonymous_jwt
|
||||
|
||||
# Prepare metadata tweaks similar to API endpoint
|
||||
final_tweaks = self.tweaks.copy() if self.tweaks else {}
|
||||
|
||||
|
||||
metadata_tweaks = []
|
||||
if self.owner_user_id:
|
||||
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
|
||||
if self.owner_name:
|
||||
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
|
||||
if self.owner_email:
|
||||
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
|
||||
metadata_tweaks.append(
|
||||
{"key": "owner_email", "value": self.owner_email}
|
||||
)
|
||||
# Mark as local upload for connector_type
|
||||
metadata_tweaks.append({"key": "connector_type", "value": "local"})
|
||||
|
||||
if metadata_tweaks:
|
||||
# Initialize the OpenSearch component tweaks if not already present
|
||||
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
|
||||
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
||||
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
||||
if "OpenSearch (Hybrid)" not in final_tweaks:
|
||||
final_tweaks["OpenSearch (Hybrid)"] = {}
|
||||
final_tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
|
||||
|
||||
# Process file using langflow service
|
||||
result = await self.langflow_file_service.upload_and_ingest_file(
|
||||
|
|
@ -422,7 +426,7 @@ class LangflowFileProcessor(TaskProcessor):
|
|||
tweaks=final_tweaks,
|
||||
settings=self.settings,
|
||||
jwt_token=effective_jwt,
|
||||
delete_after_ingest=self.delete_after_ingest
|
||||
delete_after_ingest=self.delete_after_ingest,
|
||||
)
|
||||
|
||||
# Update task with success
|
||||
|
|
|
|||
241
src/services/flow_validation_context.py
Normal file
241
src/services/flow_validation_context.py
Normal file
|
|
@ -0,0 +1,241 @@
|
|||
"""Context variable system for storing flow validation results per user."""
|
||||
|
||||
import asyncio
|
||||
from contextlib import contextmanager
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ComponentParameter:
|
||||
"""Information about a component parameter."""
|
||||
|
||||
name: str
|
||||
display_name: str
|
||||
param_type: str
|
||||
value: Any = None
|
||||
options: Optional[List] = None
|
||||
advanced: bool = False
|
||||
required: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ComponentInfo:
|
||||
"""Information about a single component."""
|
||||
|
||||
display_name: str
|
||||
component_type: str
|
||||
node_id: str
|
||||
parameters: Dict[str, ComponentParameter] = field(default_factory=dict)
|
||||
|
||||
|
||||
def get_datetime_now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
@dataclass
|
||||
class FlowComponentInfo:
|
||||
"""Information about all components available in a flow."""
|
||||
|
||||
components: Dict[str, List[ComponentInfo]] = field(default_factory=dict)
|
||||
validated_at: Optional[str] = field(default_factory=get_datetime_now)
|
||||
flow_id: Optional[str] = None
|
||||
|
||||
@property
|
||||
def has_file(self) -> bool:
|
||||
return len(self.components.get("File", [])) > 0
|
||||
|
||||
@property
|
||||
def has_split_text(self) -> bool:
|
||||
return len(self.components.get("Split Text", [])) > 0
|
||||
|
||||
@property
|
||||
def has_openai_embeddings(self) -> bool:
|
||||
return len(self.components.get("OpenAI Embeddings", [])) > 0
|
||||
|
||||
@property
|
||||
def has_opensearch_hybrid(self) -> bool:
|
||||
return len(self.components.get("OpenSearch (Hybrid)", [])) > 0
|
||||
|
||||
@property
|
||||
def is_valid_for_ingestion(self) -> bool:
|
||||
"""Check if flow has minimum required components for ingestion."""
|
||||
|
||||
return (
|
||||
self.has_file and self.has_opensearch_hybrid and self.has_openai_embeddings
|
||||
)
|
||||
|
||||
def get_available_parameters(
|
||||
self, component_type: str
|
||||
) -> Dict[str, ComponentParameter]:
|
||||
"""Get available parameters for a specific component type."""
|
||||
components_of_type = self.components.get(component_type, [])
|
||||
if not components_of_type:
|
||||
return {}
|
||||
|
||||
# Return parameters from first component of this type
|
||||
return components_of_type[0].parameters
|
||||
|
||||
# Configuration for UI settings mapping (display names come from flow data)
|
||||
UI_SETTINGS_CONFIG = [
|
||||
{
|
||||
"component_display_name": "Split Text",
|
||||
"settings": [
|
||||
{"ui_setting": "chunkSize", "parameter_name": "chunk_size"},
|
||||
{"ui_setting": "chunkOverlap", "parameter_name": "chunk_overlap"},
|
||||
{"ui_setting": "separator", "parameter_name": "separator"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"component_display_name": "OpenAI Embeddings",
|
||||
"settings": [{"ui_setting": "embeddingModel", "parameter_name": "model"}],
|
||||
},
|
||||
]
|
||||
|
||||
@property
|
||||
def available_ui_settings(self) -> Dict[str, Any]:
|
||||
"""Return which UI settings should be available with their parameter info."""
|
||||
settings = {}
|
||||
|
||||
for config in self.UI_SETTINGS_CONFIG:
|
||||
component_name = config["component_display_name"]
|
||||
has_component = (
|
||||
component_name in self.components
|
||||
and len(self.components[component_name]) > 0
|
||||
)
|
||||
|
||||
if has_component:
|
||||
component_params = self.get_available_parameters(component_name)
|
||||
|
||||
for setting_config in config["settings"]:
|
||||
ui_setting = setting_config["ui_setting"]
|
||||
param_name = setting_config["parameter_name"]
|
||||
|
||||
param_available = param_name in component_params
|
||||
param_info = (
|
||||
component_params.get(param_name) if param_available else None
|
||||
)
|
||||
|
||||
settings[ui_setting] = {
|
||||
"available": param_available,
|
||||
"component": component_name,
|
||||
"parameter_name": param_name,
|
||||
"param_info": {
|
||||
"display_name": param_info.display_name,
|
||||
"type": param_info.param_type,
|
||||
"value": param_info.value,
|
||||
"options": param_info.options,
|
||||
"advanced": param_info.advanced,
|
||||
"required": param_info.required,
|
||||
}
|
||||
if param_info
|
||||
else None,
|
||||
}
|
||||
else:
|
||||
# Component not present - mark all its settings as unavailable
|
||||
for setting_config in config["settings"]:
|
||||
ui_setting = setting_config["ui_setting"]
|
||||
settings[ui_setting] = {
|
||||
"available": False,
|
||||
"component": component_name,
|
||||
"parameter_name": setting_config["parameter_name"],
|
||||
"param_info": None,
|
||||
"reason": f"Component '{component_name}' not found in flow",
|
||||
}
|
||||
|
||||
return settings
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""Convert to dictionary for API responses."""
|
||||
return {
|
||||
"components": {
|
||||
component_type: [
|
||||
{
|
||||
"display_name": comp.display_name,
|
||||
"type": comp.component_type,
|
||||
"node_id": comp.node_id,
|
||||
"parameters": {
|
||||
param_name: {
|
||||
"display_name": param.display_name,
|
||||
"type": param.param_type,
|
||||
"value": param.value,
|
||||
"options": param.options,
|
||||
"advanced": param.advanced,
|
||||
"required": param.required,
|
||||
}
|
||||
for param_name, param in comp.parameters.items()
|
||||
},
|
||||
}
|
||||
for comp in component_list
|
||||
]
|
||||
for component_type, component_list in self.components.items()
|
||||
},
|
||||
"validation": {
|
||||
"is_valid": self.is_valid_for_ingestion,
|
||||
"validated_at": self.validated_at,
|
||||
"flow_id": self.flow_id,
|
||||
},
|
||||
"available_ui_settings": self.available_ui_settings,
|
||||
}
|
||||
|
||||
|
||||
# Context variable to store flow component info per request/user
|
||||
_flow_context: ContextVar[Optional[FlowComponentInfo]] = ContextVar(
|
||||
"flow_validation_context", default=None
|
||||
)
|
||||
|
||||
# Cache to store validation results per flow_id to avoid repeated API calls
|
||||
_validation_cache: Dict[str, FlowComponentInfo] = {}
|
||||
_cache_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def get_flow_components(user_id: str) -> Optional[FlowComponentInfo]:
|
||||
"""Get current flow component info from context."""
|
||||
return _flow_context.get()
|
||||
|
||||
|
||||
async def set_flow_components(user_id: str, component_info: FlowComponentInfo) -> None:
|
||||
"""Set flow component info in context."""
|
||||
_flow_context.set(component_info)
|
||||
logger.debug(f"[FC] Set flow context for user {user_id}")
|
||||
|
||||
|
||||
async def cache_flow_validation(
|
||||
flow_id: str, component_info: FlowComponentInfo
|
||||
) -> None:
|
||||
"""Cache flow validation results."""
|
||||
async with _cache_lock:
|
||||
_validation_cache[flow_id] = component_info
|
||||
logger.debug(f"[FC] Cached validation for flow {flow_id}")
|
||||
|
||||
|
||||
async def get_cached_flow_validation(flow_id: str) -> Optional[FlowComponentInfo]:
|
||||
"""Get cached flow validation results."""
|
||||
async with _cache_lock:
|
||||
cached = _validation_cache.get(flow_id)
|
||||
if cached:
|
||||
logger.debug(f"[FC] Using cached validation for flow {flow_id}")
|
||||
return cached
|
||||
|
||||
|
||||
def clear_validation_cache():
|
||||
"""Clear the validation cache (useful for testing or cache invalidation)."""
|
||||
global _validation_cache
|
||||
_validation_cache.clear()
|
||||
logger.debug("[FC] Cleared validation cache")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def flow_context(component_info: FlowComponentInfo):
|
||||
"""Context manager for setting flow component info."""
|
||||
token = _flow_context.set(component_info)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_flow_context.reset(token)
|
||||
|
|
@ -1,4 +1,7 @@
|
|||
from typing import Any, Dict, List, Optional
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from services.flow_validation_context import FlowComponentInfo
|
||||
|
||||
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
|
||||
from utils.logging_config import get_logger
|
||||
|
|
@ -57,6 +60,187 @@ class LangflowFileService:
|
|||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]:
|
||||
"""Fetch flow definition from Langflow API."""
|
||||
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||
|
||||
if response.status_code == 404:
|
||||
raise ValueError(f"Flow '{flow_id}' not found")
|
||||
elif response.status_code != 200:
|
||||
raise ValueError(
|
||||
f"Failed to fetch flow definition: HTTP {response.status_code}"
|
||||
)
|
||||
|
||||
return response.json()
|
||||
|
||||
def _extract_components_from_flow(
|
||||
self, flow_data: Dict[str, Any]
|
||||
) -> "FlowComponentInfo":
|
||||
"""Extract components and their parameters from flow data."""
|
||||
from services.flow_validation_context import (
|
||||
ComponentInfo,
|
||||
ComponentParameter,
|
||||
FlowComponentInfo,
|
||||
)
|
||||
|
||||
nodes = flow_data.get("data", {}).get("nodes", [])
|
||||
if not nodes:
|
||||
raise ValueError("Flow contains no components")
|
||||
|
||||
components = {}
|
||||
target_components = [
|
||||
"File",
|
||||
"Split Text",
|
||||
"OpenAI Embeddings",
|
||||
"OpenSearch (Hybrid)",
|
||||
]
|
||||
|
||||
for node in nodes:
|
||||
data = node.get("data", {})
|
||||
node_data = data.get("node", {})
|
||||
display_name = node_data.get("display_name", "")
|
||||
node_id = node.get("id", "")
|
||||
|
||||
if display_name in target_components:
|
||||
# Extract parameter information from the node template
|
||||
parameters = {}
|
||||
template = node_data.get("template", {}) # Template is in node_data
|
||||
|
||||
for param_name, param_data in template.items():
|
||||
if isinstance(param_data, dict):
|
||||
param_info = ComponentParameter(
|
||||
name=param_name,
|
||||
display_name=param_data.get("display_name", param_name),
|
||||
param_type=param_data.get("type", "unknown"),
|
||||
value=param_data.get("value"),
|
||||
options=param_data.get("options", []),
|
||||
advanced=param_data.get("advanced", False),
|
||||
required=param_data.get("required", False),
|
||||
)
|
||||
parameters[param_name] = param_info
|
||||
|
||||
component_info = ComponentInfo(
|
||||
display_name=display_name,
|
||||
component_type=node_data.get("type", ""),
|
||||
node_id=node_id,
|
||||
parameters=parameters,
|
||||
)
|
||||
|
||||
if display_name not in components:
|
||||
components[display_name] = []
|
||||
components[display_name].append(component_info)
|
||||
|
||||
return FlowComponentInfo(
|
||||
components=components,
|
||||
flow_id=self.flow_id_ingest,
|
||||
)
|
||||
|
||||
def _validate_component_requirements(
|
||||
self, component_info: "FlowComponentInfo"
|
||||
) -> None:
|
||||
"""Validate that required components are present in correct quantities."""
|
||||
# File component validation
|
||||
file_count = len(component_info.components.get("File", []))
|
||||
if file_count == 0:
|
||||
raise ValueError(
|
||||
"Flow validation failed: No 'File' component found. "
|
||||
"The ingestion flow must contain exactly one File component."
|
||||
)
|
||||
elif file_count > 1:
|
||||
raise ValueError(
|
||||
f"Flow validation failed: Found {file_count} 'File' components. "
|
||||
f"The ingestion flow must contain exactly one File component."
|
||||
)
|
||||
|
||||
# OpenSearch component validation
|
||||
opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", []))
|
||||
if opensearch_count == 0:
|
||||
raise ValueError(
|
||||
"Flow validation failed: No 'OpenSearch (Hybrid)' component found. "
|
||||
"The ingestion flow must contain at least one OpenSearch (Hybrid) component."
|
||||
)
|
||||
elif opensearch_count > 1:
|
||||
logger.warning(
|
||||
f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. "
|
||||
f"Tweaks will be applied to all components with this display name."
|
||||
)
|
||||
|
||||
# Optional component warnings
|
||||
if not component_info.has_split_text:
|
||||
logger.warning(
|
||||
"[LF] No 'Split Text' component found. Text chunking may not work as expected."
|
||||
)
|
||||
|
||||
if not component_info.has_openai_embeddings:
|
||||
logger.warning(
|
||||
"[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected."
|
||||
)
|
||||
|
||||
async def validate_ingestion_flow(
|
||||
self, use_cache: bool = True
|
||||
) -> "FlowComponentInfo":
|
||||
"""
|
||||
Validate the ingestion flow structure to ensure it has required components.
|
||||
|
||||
Args:
|
||||
use_cache: Whether to use cached validation results (default: True)
|
||||
|
||||
Returns:
|
||||
FlowComponentInfo: Component information if validation passes
|
||||
|
||||
Raises:
|
||||
ValueError: If flow is not configured, not found, or doesn't meet requirements
|
||||
"""
|
||||
if not self.flow_id_ingest:
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
|
||||
# Check cache first
|
||||
if use_cache:
|
||||
from services.flow_validation_context import get_cached_flow_validation
|
||||
|
||||
cached_info = await get_cached_flow_validation(self.flow_id_ingest)
|
||||
if cached_info:
|
||||
logger.debug(
|
||||
f"[LF] Using cached validation for flow: {self.flow_id_ingest}"
|
||||
)
|
||||
return cached_info
|
||||
|
||||
logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}")
|
||||
|
||||
try:
|
||||
# Fetch flow definition
|
||||
flow_data = await self._fetch_flow_definition(self.flow_id_ingest)
|
||||
|
||||
# Extract and analyze components
|
||||
component_info = self._extract_components_from_flow(flow_data)
|
||||
|
||||
# Validate requirements
|
||||
self._validate_component_requirements(component_info)
|
||||
|
||||
# Cache the results
|
||||
from services.flow_validation_context import cache_flow_validation
|
||||
|
||||
await cache_flow_validation(self.flow_id_ingest, component_info)
|
||||
|
||||
# Log successful validation
|
||||
logger.info(
|
||||
f"[LF] Flow validation passed for '{self.flow_id_ingest}': "
|
||||
f"File={len(component_info.components.get('File', []))}, "
|
||||
f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, "
|
||||
f"SplitText={len(component_info.components.get('Split Text', []))}, "
|
||||
f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, "
|
||||
f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, "
|
||||
f"Available settings: {list(component_info.available_ui_settings.keys())}"
|
||||
)
|
||||
|
||||
return component_info
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}"
|
||||
)
|
||||
raise
|
||||
|
||||
async def run_ingestion_flow(
|
||||
self,
|
||||
file_paths: List[str],
|
||||
|
|
@ -76,6 +260,13 @@ class LangflowFileService:
|
|||
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
|
||||
# Validate flow structure before proceeding
|
||||
try:
|
||||
await self.validate_ingestion_flow()
|
||||
except Exception as e:
|
||||
logger.error(f"[LF] Flow validation failed: {e}")
|
||||
raise ValueError(f"Ingestion flow validation failed: {e}")
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"input_value": "Ingest files",
|
||||
"input_type": "chat",
|
||||
|
|
@ -84,14 +275,13 @@ class LangflowFileService:
|
|||
if not tweaks:
|
||||
tweaks = {}
|
||||
|
||||
# Pass files via tweaks to File component (File-PSU37 from the flow)
|
||||
# Pass files via tweaks to File component
|
||||
if file_paths:
|
||||
tweaks["File-PSU37"] = {"path": file_paths}
|
||||
tweaks["File"] = {"path": file_paths}
|
||||
|
||||
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
|
||||
# Pass JWT token via tweaks to OpenSearch component
|
||||
if jwt_token:
|
||||
# Using the global variable pattern that Langflow expects for OpenSearch components
|
||||
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
|
||||
tweaks["OpenSearch (Hybrid)"] = {"jwt_token": jwt_token}
|
||||
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
|
||||
else:
|
||||
logger.warning("[LF] No JWT token provided")
|
||||
|
|
@ -109,9 +299,9 @@ class LangflowFileService:
|
|||
|
||||
if metadata_tweaks:
|
||||
# Initialize the OpenSearch component tweaks if not already present
|
||||
if "OpenSearchHybrid-Ve6bS" not in tweaks:
|
||||
tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
||||
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
||||
if "OpenSearch (Hybrid)" not in tweaks:
|
||||
tweaks["OpenSearch (Hybrid)"] = {}
|
||||
tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
|
||||
logger.debug(
|
||||
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
|
||||
)
|
||||
|
|
@ -168,7 +358,7 @@ class LangflowFileService:
|
|||
"""
|
||||
Combined upload, ingest, and delete operation.
|
||||
First uploads the file, then runs ingestion on it, then optionally deletes the file.
|
||||
|
||||
|
||||
Args:
|
||||
file_tuple: File tuple (filename, content, content_type)
|
||||
session_id: Optional session ID for the ingestion flow
|
||||
|
|
@ -176,12 +366,12 @@ class LangflowFileService:
|
|||
settings: Optional UI settings to convert to component tweaks
|
||||
jwt_token: Optional JWT token for authentication
|
||||
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
|
||||
|
||||
|
||||
Returns:
|
||||
Combined result with upload info, ingestion result, and deletion status
|
||||
"""
|
||||
logger.debug("[LF] Starting combined upload and ingest operation")
|
||||
|
||||
|
||||
# Step 1: Upload the file
|
||||
try:
|
||||
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
|
||||
|
|
@ -190,10 +380,12 @@ class LangflowFileService:
|
|||
extra={
|
||||
"file_id": upload_result.get("id"),
|
||||
"file_path": upload_result.get("path"),
|
||||
}
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
|
||||
logger.error(
|
||||
"[LF] Upload failed during combined operation", extra={"error": str(e)}
|
||||
)
|
||||
raise Exception(f"Upload failed: {str(e)}")
|
||||
|
||||
# Step 2: Prepare for ingestion
|
||||
|
|
@ -203,34 +395,39 @@ class LangflowFileService:
|
|||
|
||||
# Convert UI settings to component tweaks if provided
|
||||
final_tweaks = tweaks.copy() if tweaks else {}
|
||||
|
||||
if settings:
|
||||
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
|
||||
|
||||
# Split Text component tweaks (SplitText-QIKhg)
|
||||
if settings:
|
||||
logger.debug(
|
||||
"[LF] Applying ingestion settings", extra={"settings": settings}
|
||||
)
|
||||
|
||||
# Split Text component tweaks
|
||||
if (
|
||||
settings.get("chunkSize")
|
||||
or settings.get("chunkOverlap")
|
||||
or settings.get("separator")
|
||||
):
|
||||
if "SplitText-QIKhg" not in final_tweaks:
|
||||
final_tweaks["SplitText-QIKhg"] = {}
|
||||
if "Split Text" not in final_tweaks:
|
||||
final_tweaks["Split Text"] = {}
|
||||
if settings.get("chunkSize"):
|
||||
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
||||
final_tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
|
||||
if settings.get("chunkOverlap"):
|
||||
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
||||
final_tweaks["Split Text"]["chunk_overlap"] = settings[
|
||||
"chunkOverlap"
|
||||
]
|
||||
if settings.get("separator"):
|
||||
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
|
||||
final_tweaks["Split Text"]["separator"] = settings["separator"]
|
||||
|
||||
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
|
||||
# OpenAI Embeddings component tweaks
|
||||
if settings.get("embeddingModel"):
|
||||
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
|
||||
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
||||
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
||||
if "OpenAI Embeddings" not in final_tweaks:
|
||||
final_tweaks["OpenAI Embeddings"] = {}
|
||||
final_tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
|
||||
|
||||
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
|
||||
logger.debug(
|
||||
"[LF] Final tweaks with settings applied",
|
||||
extra={"tweaks": final_tweaks},
|
||||
)
|
||||
|
||||
# Step 3: Run ingestion
|
||||
try:
|
||||
|
|
@ -244,10 +441,7 @@ class LangflowFileService:
|
|||
except Exception as e:
|
||||
logger.error(
|
||||
"[LF] Ingestion failed during combined operation",
|
||||
extra={
|
||||
"error": str(e),
|
||||
"file_path": file_path
|
||||
}
|
||||
extra={"error": str(e), "file_path": file_path},
|
||||
)
|
||||
# Note: We could optionally delete the uploaded file here if ingestion fails
|
||||
raise Exception(f"Ingestion failed: {str(e)}")
|
||||
|
|
@ -256,10 +450,13 @@ class LangflowFileService:
|
|||
file_id = upload_result.get("id")
|
||||
delete_result = None
|
||||
delete_error = None
|
||||
|
||||
|
||||
if delete_after_ingest and file_id:
|
||||
try:
|
||||
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
|
||||
logger.debug(
|
||||
"[LF] Deleting file after successful ingestion",
|
||||
extra={"file_id": file_id},
|
||||
)
|
||||
await self.delete_user_file(file_id)
|
||||
delete_result = {"status": "deleted", "file_id": file_id}
|
||||
logger.debug("[LF] File deleted successfully")
|
||||
|
|
@ -267,26 +464,27 @@ class LangflowFileService:
|
|||
delete_error = str(e)
|
||||
logger.warning(
|
||||
"[LF] Failed to delete file after ingestion",
|
||||
extra={
|
||||
"error": delete_error,
|
||||
"file_id": file_id
|
||||
}
|
||||
extra={"error": delete_error, "file_id": file_id},
|
||||
)
|
||||
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
|
||||
delete_result = {
|
||||
"status": "delete_failed",
|
||||
"file_id": file_id,
|
||||
"error": delete_error,
|
||||
}
|
||||
|
||||
# Return combined result
|
||||
result = {
|
||||
"status": "success",
|
||||
"upload": upload_result,
|
||||
"ingestion": ingest_result,
|
||||
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
|
||||
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
|
||||
}
|
||||
|
||||
|
||||
if delete_after_ingest:
|
||||
result["deletion"] = delete_result
|
||||
if delete_result and delete_result.get("status") == "deleted":
|
||||
result["message"] += " and cleaned up"
|
||||
elif delete_error:
|
||||
result["message"] += f" (cleanup warning: {delete_error})"
|
||||
|
||||
|
||||
return result
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue