Merge pull request #59 from langflow-ai/docling-presets
Add Docling ingest flow and presets controls
This commit is contained in:
commit
5fe45b3b66
8 changed files with 2340 additions and 44 deletions
|
|
@ -8,6 +8,8 @@ LANGFLOW_SECRET_KEY=
|
|||
# flow ids for chat and ingestion flows
|
||||
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
|
||||
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
|
||||
# Ingest flow using docling
|
||||
LANGFLOW_INGEST_FLOW_ID=1402618b-e6d1-4ff2-9a11-d6ce71186915
|
||||
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
|
||||
|
||||
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at
|
||||
|
|
|
|||
2220
flows/openrag_ingest_docling.json
Normal file
2220
flows/openrag_ingest_docling.json
Normal file
File diff suppressed because one or more lines are too long
|
|
@ -152,6 +152,7 @@ function KnowledgeSourcesPage() {
|
|||
},
|
||||
});
|
||||
|
||||
|
||||
// Debounced update function
|
||||
const debouncedUpdate = useDebounce(
|
||||
(variables: Parameters<typeof updateFlowSettingMutation.mutate>[0]) => {
|
||||
|
|
@ -219,6 +220,7 @@ function KnowledgeSourcesPage() {
|
|||
// Update processing mode
|
||||
const handleProcessingModeChange = (mode: string) => {
|
||||
setProcessingMode(mode);
|
||||
// Update the configuration setting (backend will also update the flow automatically)
|
||||
debouncedUpdate({ doclingPresets: mode });
|
||||
};
|
||||
|
||||
|
|
|
|||
18
src/agent.py
18
src/agent.py
|
|
@ -106,7 +106,6 @@ async def async_response_stream(
|
|||
model: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
log_prefix: str = "response",
|
||||
):
|
||||
logger.info("User prompt received", prompt=prompt)
|
||||
|
|
@ -121,8 +120,6 @@ async def async_response_stream(
|
|||
}
|
||||
if previous_response_id is not None:
|
||||
request_params["previous_response_id"] = previous_response_id
|
||||
if tweaks:
|
||||
request_params["tweaks"] = tweaks
|
||||
|
||||
if "x-api-key" not in client.default_headers:
|
||||
if hasattr(client, "api_key") and extra_headers is not None:
|
||||
|
|
@ -199,7 +196,6 @@ async def async_response(
|
|||
model: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
log_prefix: str = "response",
|
||||
):
|
||||
try:
|
||||
|
|
@ -214,8 +210,6 @@ async def async_response(
|
|||
}
|
||||
if previous_response_id is not None:
|
||||
request_params["previous_response_id"] = previous_response_id
|
||||
if tweaks:
|
||||
request_params["tweaks"] = tweaks
|
||||
if extra_headers:
|
||||
request_params["extra_headers"] = extra_headers
|
||||
|
||||
|
|
@ -249,7 +243,6 @@ async def async_stream(
|
|||
model: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
log_prefix: str = "response",
|
||||
):
|
||||
async for chunk in async_response_stream(
|
||||
|
|
@ -258,7 +251,6 @@ async def async_stream(
|
|||
model,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=tweaks,
|
||||
log_prefix=log_prefix,
|
||||
):
|
||||
yield chunk
|
||||
|
|
@ -271,7 +263,6 @@ async def async_langflow(
|
|||
prompt: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
):
|
||||
response_text, response_id, response_obj = await async_response(
|
||||
langflow_client,
|
||||
|
|
@ -279,7 +270,6 @@ async def async_langflow(
|
|||
flow_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=tweaks,
|
||||
log_prefix="langflow",
|
||||
)
|
||||
return response_text, response_id
|
||||
|
|
@ -292,7 +282,6 @@ async def async_langflow_stream(
|
|||
prompt: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
):
|
||||
logger.debug("Starting langflow stream", prompt=prompt)
|
||||
try:
|
||||
|
|
@ -302,8 +291,7 @@ async def async_langflow_stream(
|
|||
flow_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=tweaks,
|
||||
log_prefix="langflow",
|
||||
log_prefix="langflow",
|
||||
):
|
||||
logger.debug(
|
||||
"Yielding chunk from langflow stream",
|
||||
|
|
@ -463,7 +451,6 @@ async def async_langflow_chat(
|
|||
user_id: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
store_conversation: bool = True,
|
||||
):
|
||||
logger.debug(
|
||||
|
|
@ -497,7 +484,6 @@ async def async_langflow_chat(
|
|||
flow_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=tweaks,
|
||||
log_prefix="langflow",
|
||||
)
|
||||
logger.debug(
|
||||
|
|
@ -576,7 +562,6 @@ async def async_langflow_chat_stream(
|
|||
user_id: str,
|
||||
extra_headers: dict = None,
|
||||
previous_response_id: str = None,
|
||||
tweaks: dict = None,
|
||||
):
|
||||
logger.debug(
|
||||
"async_langflow_chat_stream called",
|
||||
|
|
@ -603,7 +588,6 @@ async def async_langflow_chat_stream(
|
|||
flow_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=tweaks,
|
||||
log_prefix="langflow",
|
||||
):
|
||||
# Extract text content to build full response for history
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ from config.settings import (
|
|||
LANGFLOW_CHAT_FLOW_ID,
|
||||
LANGFLOW_INGEST_FLOW_ID,
|
||||
LANGFLOW_PUBLIC_URL,
|
||||
DOCLING_COMPONENT_ID,
|
||||
clients,
|
||||
get_openrag_config,
|
||||
config_manager,
|
||||
|
|
@ -46,22 +47,7 @@ def get_docling_preset_configs():
|
|||
}
|
||||
|
||||
|
||||
def get_docling_tweaks(docling_preset: str = None) -> dict:
|
||||
"""Get Langflow tweaks for docling component based on preset"""
|
||||
if not docling_preset:
|
||||
# Get current preset from config
|
||||
openrag_config = get_openrag_config()
|
||||
docling_preset = openrag_config.knowledge.doclingPresets
|
||||
|
||||
preset_configs = get_docling_preset_configs()
|
||||
|
||||
if docling_preset not in preset_configs:
|
||||
docling_preset = "standard" # fallback
|
||||
|
||||
preset_config = preset_configs[docling_preset]
|
||||
docling_serve_opts = json.dumps(preset_config)
|
||||
|
||||
return {"DoclingRemote-ayRdw": {"docling_serve_opts": docling_serve_opts}}
|
||||
|
||||
|
||||
async def get_settings(request, session_manager):
|
||||
|
|
@ -234,6 +220,15 @@ async def update_settings(request, session_manager):
|
|||
current_config.knowledge.doclingPresets = body["doclingPresets"]
|
||||
config_updated = True
|
||||
|
||||
# Also update the flow with the new docling preset
|
||||
try:
|
||||
await _update_flow_docling_preset(body["doclingPresets"], preset_configs[body["doclingPresets"]])
|
||||
logger.info(f"Successfully updated docling preset in flow to '{body['doclingPresets']}'")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update docling preset in flow: {str(e)}")
|
||||
# Don't fail the entire settings update if flow update fails
|
||||
# The config will still be saved
|
||||
|
||||
if "chunk_size" in body:
|
||||
if not isinstance(body["chunk_size"], int) or body["chunk_size"] <= 0:
|
||||
return JSONResponse(
|
||||
|
|
@ -527,3 +522,93 @@ async def onboarding(request, flows_service):
|
|||
{"error": f"Failed to update onboarding settings: {str(e)}"},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
|
||||
async def _update_flow_docling_preset(preset: str, preset_config: dict):
|
||||
"""Helper function to update docling preset in the ingest flow"""
|
||||
if not LANGFLOW_INGEST_FLOW_ID:
|
||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||
|
||||
# Get the current flow data from Langflow
|
||||
response = await clients.langflow_request(
|
||||
"GET", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Failed to get ingest flow: HTTP {response.status_code} - {response.text}")
|
||||
|
||||
flow_data = response.json()
|
||||
|
||||
# Find the target node in the flow using environment variable
|
||||
nodes = flow_data.get("data", {}).get("nodes", [])
|
||||
target_node = None
|
||||
target_node_index = None
|
||||
|
||||
for i, node in enumerate(nodes):
|
||||
if node.get("id") == DOCLING_COMPONENT_ID:
|
||||
target_node = node
|
||||
target_node_index = i
|
||||
break
|
||||
|
||||
if target_node is None:
|
||||
raise Exception(f"Docling component '{DOCLING_COMPONENT_ID}' not found in ingest flow")
|
||||
|
||||
# Update the docling_serve_opts value directly in the existing node
|
||||
if (target_node.get("data", {}).get("node", {}).get("template", {}).get("docling_serve_opts")):
|
||||
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"]["docling_serve_opts"]["value"] = preset_config
|
||||
else:
|
||||
raise Exception(f"docling_serve_opts field not found in node '{DOCLING_COMPONENT_ID}'")
|
||||
|
||||
# Update the flow via PATCH request
|
||||
patch_response = await clients.langflow_request(
|
||||
"PATCH", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", json=flow_data
|
||||
)
|
||||
|
||||
if patch_response.status_code != 200:
|
||||
raise Exception(f"Failed to update ingest flow: HTTP {patch_response.status_code} - {patch_response.text}")
|
||||
|
||||
|
||||
async def update_docling_preset(request, session_manager):
|
||||
"""Update docling preset in the ingest flow"""
|
||||
try:
|
||||
# Parse request body
|
||||
body = await request.json()
|
||||
|
||||
# Validate preset parameter
|
||||
if "preset" not in body:
|
||||
return JSONResponse(
|
||||
{"error": "preset parameter is required"},
|
||||
status_code=400
|
||||
)
|
||||
|
||||
preset = body["preset"]
|
||||
preset_configs = get_docling_preset_configs()
|
||||
|
||||
if preset not in preset_configs:
|
||||
valid_presets = list(preset_configs.keys())
|
||||
return JSONResponse(
|
||||
{"error": f"Invalid preset '{preset}'. Valid presets: {', '.join(valid_presets)}"},
|
||||
status_code=400
|
||||
)
|
||||
|
||||
# Get the preset configuration
|
||||
preset_config = preset_configs[preset]
|
||||
|
||||
# Use the helper function to update the flow
|
||||
await _update_flow_docling_preset(preset, preset_config)
|
||||
|
||||
logger.info(f"Successfully updated docling preset to '{preset}' in ingest flow")
|
||||
|
||||
return JSONResponse({
|
||||
"message": f"Successfully updated docling preset to '{preset}'",
|
||||
"preset": preset,
|
||||
"preset_config": preset_config
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to update docling preset", error=str(e))
|
||||
return JSONResponse(
|
||||
{"error": f"Failed to update docling preset: {str(e)}"},
|
||||
status_code=500
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -544,6 +544,9 @@ OLLAMA_LLM_TEXT_COMPONENT_ID = os.getenv(
|
|||
"OLLAMA_LLM_TEXT_COMPONENT_ID", "OllamaModel-XDGqZ"
|
||||
)
|
||||
|
||||
# Docling component ID for ingest flow
|
||||
DOCLING_COMPONENT_ID = os.getenv("DOCLING_COMPONENT_ID", "DoclingRemote-78KoX")
|
||||
|
||||
# Global clients instance
|
||||
clients = AppClients()
|
||||
|
||||
|
|
|
|||
13
src/main.py
13
src/main.py
|
|
@ -971,12 +971,23 @@ async def create_app():
|
|||
"/onboarding",
|
||||
require_auth(services["session_manager"])(
|
||||
partial(
|
||||
settings.onboarding,
|
||||
settings.onboarding,
|
||||
flows_service=services["flows_service"]
|
||||
)
|
||||
),
|
||||
methods=["POST"],
|
||||
),
|
||||
# Docling preset update endpoint
|
||||
Route(
|
||||
"/settings/docling-preset",
|
||||
require_auth(services["session_manager"])(
|
||||
partial(
|
||||
settings.update_docling_preset,
|
||||
session_manager=services["session_manager"]
|
||||
)
|
||||
),
|
||||
methods=["PATCH"],
|
||||
),
|
||||
Route(
|
||||
"/nudges",
|
||||
require_auth(services["session_manager"])(
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import json
|
|||
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID
|
||||
from agent import async_chat, async_langflow, async_chat_stream
|
||||
from auth_context import set_auth_context
|
||||
from api.settings import get_docling_tweaks
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
|
@ -127,8 +126,6 @@ class ChatService:
|
|||
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
|
||||
)
|
||||
|
||||
# Get docling tweaks based on current configuration
|
||||
docling_tweaks = get_docling_tweaks()
|
||||
|
||||
if stream:
|
||||
from agent import async_langflow_chat_stream
|
||||
|
|
@ -140,7 +137,6 @@ class ChatService:
|
|||
user_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=docling_tweaks,
|
||||
)
|
||||
else:
|
||||
from agent import async_langflow_chat
|
||||
|
|
@ -152,7 +148,6 @@ class ChatService:
|
|||
user_id,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=docling_tweaks,
|
||||
)
|
||||
response_data = {"response": response_text}
|
||||
if response_id:
|
||||
|
|
@ -202,8 +197,6 @@ class ChatService:
|
|||
|
||||
from agent import async_langflow_chat
|
||||
|
||||
# Get docling tweaks (might not be used by nudges flow, but keeping consistent)
|
||||
docling_tweaks = get_docling_tweaks()
|
||||
|
||||
response_text, response_id = await async_langflow_chat(
|
||||
langflow_client,
|
||||
|
|
@ -211,7 +204,6 @@ class ChatService:
|
|||
prompt,
|
||||
user_id,
|
||||
extra_headers=extra_headers,
|
||||
tweaks=docling_tweaks,
|
||||
store_conversation=False,
|
||||
)
|
||||
response_data = {"response": response_text}
|
||||
|
|
@ -242,8 +234,6 @@ class ChatService:
|
|||
raise ValueError(
|
||||
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
|
||||
)
|
||||
# Get docling tweaks based on current configuration
|
||||
docling_tweaks = get_docling_tweaks()
|
||||
|
||||
response_text, response_id = await async_langflow(
|
||||
langflow_client=langflow_client,
|
||||
|
|
@ -251,7 +241,6 @@ class ChatService:
|
|||
prompt=document_prompt,
|
||||
extra_headers=extra_headers,
|
||||
previous_response_id=previous_response_id,
|
||||
tweaks=docling_tweaks,
|
||||
)
|
||||
else: # chat
|
||||
# Set auth context for chat tools and provide user_id
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue