From 30f63a76b65af3b2011a4f83817e57528a37f3c7 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 3 Oct 2025 11:02:45 -0400 Subject: [PATCH] add mcp Flows --- flows/openrag_agent.json | 302 ++++++++++++++- flows/openrag_url_mcp.json | 748 ++++++++++++++++++++++++++++++++++++- 2 files changed, 1031 insertions(+), 19 deletions(-) diff --git a/flows/openrag_agent.json b/flows/openrag_agent.json index 73dccee4..9aa875d1 100644 --- a/flows/openrag_agent.json +++ b/flows/openrag_agent.json @@ -142,6 +142,34 @@ "sourceHandle": "{œdataTypeœ:œLanguageModelComponentœ,œidœ:œLanguageModelComponent-0YME7œ,œnameœ:œmodel_outputœ,œoutput_typesœ:[œLanguageModelœ]}", "target": "Agent-crjWf", "targetHandle": "{œfieldNameœ:œagent_llmœ,œidœ:œAgent-crjWfœ,œinputTypesœ:[œLanguageModelœ],œtypeœ:œstrœ}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "MCP", + "id": "MCP-Uk5Vf", + "name": "component_as_tool", + "output_types": [ + "Tool" + ] + }, + "targetHandle": { + "fieldName": "tools", + "id": "Agent-crjWf", + "inputTypes": [ + "Tool" + ], + "type": "other" + } + }, + "id": "xy-edge__MCP-Uk5Vf{œdataTypeœ:œMCPœ,œidœ:œMCP-Uk5Vfœ,œnameœ:œcomponent_as_toolœ,œoutput_typesœ:[œToolœ]}-Agent-crjWf{œfieldNameœ:œtoolsœ,œidœ:œAgent-crjWfœ,œinputTypesœ:[œToolœ],œtypeœ:œotherœ}", + "selected": false, + "source": "MCP-Uk5Vf", + "sourceHandle": "{œdataTypeœ:œMCPœ,œidœ:œMCP-Uk5Vfœ,œnameœ:œcomponent_as_toolœ,œoutput_typesœ:[œToolœ]}", + "target": "Agent-crjWf", + "targetHandle": "{œfieldNameœ:œtoolsœ,œidœ:œAgent-crjWfœ,œinputTypesœ:[œToolœ],œtypeœ:œotherœ}" } ], "nodes": [ @@ -702,7 +730,7 @@ ], "frozen": false, "icon": "OpenSearch", - "last_updated": "2025-10-01T15:31:22.241Z", + "last_updated": "2025-10-03T14:42:51.280Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -1356,7 +1384,7 @@ ], "frozen": false, "icon": "binary", - "last_updated": "2025-10-01T15:31:22.243Z", + "last_updated": "2025-10-03T14:42:51.281Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -1681,7 +1709,7 @@ ], "frozen": false, "icon": "bot", - "last_updated": "2025-10-01T15:31:45.879Z", + "last_updated": "2025-10-03T14:42:51.349Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -2220,7 +2248,7 @@ ], "frozen": false, "icon": "brain-circuit", - "last_updated": "2025-10-01T15:31:22.244Z", + "last_updated": "2025-10-03T14:42:51.281Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -2523,12 +2551,272 @@ }, "selected": false, "type": "genericNode" + }, + { + "data": { + "id": "MCP-Uk5Vf", + "node": { + "base_classes": [ + "DataFrame" + ], + "beta": false, + "category": "MCP", + "conditional_paths": [], + "custom_fields": {}, + "description": "Connect to an MCP server to use its tools.", + "display_name": "MCP Tools", + "documentation": "https://docs.langflow.org/mcp-client", + "edited": false, + "field_order": [ + "mcp_server", + "use_cache", + "tool", + "tool_placeholder" + ], + "frozen": false, + "icon": "Mcp", + "key": "mcp_lf-starter_project", + "last_updated": "2025-10-03T14:42:51.282Z", + "legacy": false, + "mcpServerName": "lf-starter_project", + "metadata": { + "code_hash": "756d1e10d0ca", + "dependencies": { + "dependencies": [ + { + "name": "langchain_core", + "version": "0.3.76" + }, + { + "name": "lfx", + "version": null + }, + { + "name": "langflow", + "version": null + } + ], + "total_dependencies": 3 + }, + "module": "lfx.components.agents.mcp_component.MCPToolsComponent" + }, + "minimized": false, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Toolset", + "group_outputs": false, + "hidden": null, + "method": "to_toolkit", + "name": "component_as_tool", + "options": null, + "required_inputs": null, + "selected": "Tool", + "tool_mode": true, + "types": [ + "Tool" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "from __future__ import annotations\n\nimport asyncio\nimport uuid\nfrom typing import Any\n\nfrom langchain_core.tools import StructuredTool # noqa: TC002\n\nfrom lfx.base.agents.utils import maybe_unflatten_dict, safe_cache_get, safe_cache_set\nfrom lfx.base.mcp.util import MCPSseClient, MCPStdioClient, create_input_schema_from_json_schema, update_tools\nfrom lfx.custom.custom_component.component_with_cache import ComponentWithCache\nfrom lfx.inputs.inputs import InputTypes # noqa: TC001\nfrom lfx.io import BoolInput, DropdownInput, McpInput, MessageTextInput, Output\nfrom lfx.io.schema import flatten_schema, schema_to_langflow_inputs\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.services.deps import get_settings_service, get_storage_service, session_scope\n\n\nclass MCPToolsComponent(ComponentWithCache):\n schema_inputs: list = []\n tools: list[StructuredTool] = []\n _not_load_actions: bool = False\n _tool_cache: dict = {}\n _last_selected_server: str | None = None # Cache for the last selected server\n\n def __init__(self, **data) -> None:\n super().__init__(**data)\n # Initialize cache keys to avoid CacheMiss when accessing them\n self._ensure_cache_structure()\n\n # Initialize clients with access to the component cache\n self.stdio_client: MCPStdioClient = MCPStdioClient(component_cache=self._shared_component_cache)\n self.sse_client: MCPSseClient = MCPSseClient(component_cache=self._shared_component_cache)\n\n def _ensure_cache_structure(self):\n \"\"\"Ensure the cache has the required structure.\"\"\"\n # Check if servers key exists and is not CacheMiss\n servers_value = safe_cache_get(self._shared_component_cache, \"servers\")\n if servers_value is None:\n safe_cache_set(self._shared_component_cache, \"servers\", {})\n\n # Check if last_selected_server key exists and is not CacheMiss\n last_server_value = safe_cache_get(self._shared_component_cache, \"last_selected_server\")\n if last_server_value is None:\n safe_cache_set(self._shared_component_cache, \"last_selected_server\", \"\")\n\n default_keys: list[str] = [\n \"code\",\n \"_type\",\n \"tool_mode\",\n \"tool_placeholder\",\n \"mcp_server\",\n \"tool\",\n \"use_cache\",\n ]\n\n display_name = \"MCP Tools\"\n description = \"Connect to an MCP server to use its tools.\"\n documentation: str = \"https://docs.langflow.org/mcp-client\"\n icon = \"Mcp\"\n name = \"MCPTools\"\n\n inputs = [\n McpInput(\n name=\"mcp_server\",\n display_name=\"MCP Server\",\n info=\"Select the MCP Server that will be used by this component\",\n real_time_refresh=True,\n ),\n BoolInput(\n name=\"use_cache\",\n display_name=\"Use Cached Server\",\n info=(\n \"Enable caching of MCP Server and tools to improve performance. \"\n \"Disable to always fetch fresh tools and server updates.\"\n ),\n value=False,\n advanced=True,\n ),\n DropdownInput(\n name=\"tool\",\n display_name=\"Tool\",\n options=[],\n value=\"\",\n info=\"Select the tool to execute\",\n show=False,\n required=True,\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"tool_placeholder\",\n display_name=\"Tool Placeholder\",\n info=\"Placeholder for the tool\",\n value=\"\",\n show=False,\n tool_mode=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Response\", name=\"response\", method=\"build_output\"),\n ]\n\n async def _validate_schema_inputs(self, tool_obj) -> list[InputTypes]:\n \"\"\"Validate and process schema inputs for a tool.\"\"\"\n try:\n if not tool_obj or not hasattr(tool_obj, \"args_schema\"):\n msg = \"Invalid tool object or missing input schema\"\n raise ValueError(msg)\n\n flat_schema = flatten_schema(tool_obj.args_schema.schema())\n input_schema = create_input_schema_from_json_schema(flat_schema)\n if not input_schema:\n msg = f\"Empty input schema for tool '{tool_obj.name}'\"\n raise ValueError(msg)\n\n schema_inputs = schema_to_langflow_inputs(input_schema)\n if not schema_inputs:\n msg = f\"No input parameters defined for tool '{tool_obj.name}'\"\n await logger.awarning(msg)\n return []\n\n except Exception as e:\n msg = f\"Error validating schema inputs: {e!s}\"\n await logger.aexception(msg)\n raise ValueError(msg) from e\n else:\n return schema_inputs\n\n async def update_tool_list(self, mcp_server_value=None):\n # Accepts mcp_server_value as dict {name, config} or uses self.mcp_server\n mcp_server = mcp_server_value if mcp_server_value is not None else getattr(self, \"mcp_server\", None)\n server_name = None\n server_config_from_value = None\n if isinstance(mcp_server, dict):\n server_name = mcp_server.get(\"name\")\n server_config_from_value = mcp_server.get(\"config\")\n else:\n server_name = mcp_server\n if not server_name:\n self.tools = []\n return [], {\"name\": server_name, \"config\": server_config_from_value}\n\n # Check if caching is enabled, default to False\n use_cache = getattr(self, \"use_cache\", False)\n\n # Use shared cache if available and caching is enabled\n cached = None\n if use_cache:\n servers_cache = safe_cache_get(self._shared_component_cache, \"servers\", {})\n cached = servers_cache.get(server_name) if isinstance(servers_cache, dict) else None\n\n if cached is not None:\n try:\n self.tools = cached[\"tools\"]\n self.tool_names = cached[\"tool_names\"]\n self._tool_cache = cached[\"tool_cache\"]\n server_config_from_value = cached[\"config\"]\n except (TypeError, KeyError, AttributeError) as e:\n # Handle corrupted cache data by clearing it and continuing to fetch fresh tools\n msg = f\"Unable to use cached data for MCP Server{server_name}: {e}\"\n await logger.awarning(msg)\n # Clear the corrupted cache entry\n current_servers_cache = safe_cache_get(self._shared_component_cache, \"servers\", {})\n if isinstance(current_servers_cache, dict) and server_name in current_servers_cache:\n current_servers_cache.pop(server_name)\n safe_cache_set(self._shared_component_cache, \"servers\", current_servers_cache)\n else:\n return self.tools, {\"name\": server_name, \"config\": server_config_from_value}\n\n try:\n try:\n from langflow.api.v2.mcp import get_server\n from langflow.services.database.models.user.crud import get_user_by_id\n except ImportError as e:\n msg = (\n \"Langflow MCP server functionality is not available. \"\n \"This feature requires the full Langflow installation.\"\n )\n raise ImportError(msg) from e\n async with session_scope() as db:\n if not self.user_id:\n msg = \"User ID is required for fetching MCP tools.\"\n raise ValueError(msg)\n current_user = await get_user_by_id(db, self.user_id)\n\n # Try to get server config from DB/API\n server_config = await get_server(\n server_name,\n current_user,\n db,\n storage_service=get_storage_service(),\n settings_service=get_settings_service(),\n )\n\n # If get_server returns empty but we have a config, use it\n if not server_config and server_config_from_value:\n server_config = server_config_from_value\n\n if not server_config:\n self.tools = []\n return [], {\"name\": server_name, \"config\": server_config}\n\n _, tool_list, tool_cache = await update_tools(\n server_name=server_name,\n server_config=server_config,\n mcp_stdio_client=self.stdio_client,\n mcp_sse_client=self.sse_client,\n )\n\n self.tool_names = [tool.name for tool in tool_list if hasattr(tool, \"name\")]\n self._tool_cache = tool_cache\n self.tools = tool_list\n\n # Cache the result only if caching is enabled\n if use_cache:\n cache_data = {\n \"tools\": tool_list,\n \"tool_names\": self.tool_names,\n \"tool_cache\": tool_cache,\n \"config\": server_config,\n }\n\n # Safely update the servers cache\n current_servers_cache = safe_cache_get(self._shared_component_cache, \"servers\", {})\n if isinstance(current_servers_cache, dict):\n current_servers_cache[server_name] = cache_data\n safe_cache_set(self._shared_component_cache, \"servers\", current_servers_cache)\n\n except (TimeoutError, asyncio.TimeoutError) as e:\n msg = f\"Timeout updating tool list: {e!s}\"\n await logger.aexception(msg)\n raise TimeoutError(msg) from e\n except Exception as e:\n msg = f\"Error updating tool list: {e!s}\"\n await logger.aexception(msg)\n raise ValueError(msg) from e\n else:\n return tool_list, {\"name\": server_name, \"config\": server_config}\n\n async def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:\n \"\"\"Toggle the visibility of connection-specific fields based on the selected mode.\"\"\"\n try:\n if field_name == \"tool\":\n try:\n if len(self.tools) == 0:\n try:\n self.tools, build_config[\"mcp_server\"][\"value\"] = await self.update_tool_list()\n build_config[\"tool\"][\"options\"] = [tool.name for tool in self.tools]\n build_config[\"tool\"][\"placeholder\"] = \"Select a tool\"\n except (TimeoutError, asyncio.TimeoutError) as e:\n msg = f\"Timeout updating tool list: {e!s}\"\n await logger.aexception(msg)\n if not build_config[\"tools_metadata\"][\"show\"]:\n build_config[\"tool\"][\"show\"] = True\n build_config[\"tool\"][\"options\"] = []\n build_config[\"tool\"][\"value\"] = \"\"\n build_config[\"tool\"][\"placeholder\"] = \"Timeout on MCP server\"\n else:\n build_config[\"tool\"][\"show\"] = False\n except ValueError:\n if not build_config[\"tools_metadata\"][\"show\"]:\n build_config[\"tool\"][\"show\"] = True\n build_config[\"tool\"][\"options\"] = []\n build_config[\"tool\"][\"value\"] = \"\"\n build_config[\"tool\"][\"placeholder\"] = \"Error on MCP Server\"\n else:\n build_config[\"tool\"][\"show\"] = False\n\n if field_value == \"\":\n return build_config\n tool_obj = None\n for tool in self.tools:\n if tool.name == field_value:\n tool_obj = tool\n break\n if tool_obj is None:\n msg = f\"Tool {field_value} not found in available tools: {self.tools}\"\n await logger.awarning(msg)\n return build_config\n await self._update_tool_config(build_config, field_value)\n except Exception as e:\n build_config[\"tool\"][\"options\"] = []\n msg = f\"Failed to update tools: {e!s}\"\n raise ValueError(msg) from e\n else:\n return build_config\n elif field_name == \"mcp_server\":\n if not field_value:\n build_config[\"tool\"][\"show\"] = False\n build_config[\"tool\"][\"options\"] = []\n build_config[\"tool\"][\"value\"] = \"\"\n build_config[\"tool\"][\"placeholder\"] = \"\"\n build_config[\"tool_placeholder\"][\"tool_mode\"] = False\n self.remove_non_default_keys(build_config)\n return build_config\n\n build_config[\"tool_placeholder\"][\"tool_mode\"] = True\n\n current_server_name = field_value.get(\"name\") if isinstance(field_value, dict) else field_value\n _last_selected_server = safe_cache_get(self._shared_component_cache, \"last_selected_server\", \"\")\n\n # To avoid unnecessary updates, only proceed if the server has actually changed\n if (_last_selected_server in (current_server_name, \"\")) and build_config[\"tool\"][\"show\"]:\n if current_server_name:\n servers_cache = safe_cache_get(self._shared_component_cache, \"servers\", {})\n if isinstance(servers_cache, dict):\n cached = servers_cache.get(current_server_name)\n if cached is not None and cached.get(\"tool_names\"):\n cached_tools = cached[\"tool_names\"]\n current_tools = build_config[\"tool\"][\"options\"]\n if current_tools == cached_tools:\n return build_config\n else:\n return build_config\n\n # Determine if \"Tool Mode\" is active by checking if the tool dropdown is hidden.\n is_in_tool_mode = build_config[\"tools_metadata\"][\"show\"]\n safe_cache_set(self._shared_component_cache, \"last_selected_server\", current_server_name)\n\n # Check if tools are already cached for this server before clearing\n cached_tools = None\n if current_server_name:\n use_cache = getattr(self, \"use_cache\", True)\n if use_cache:\n servers_cache = safe_cache_get(self._shared_component_cache, \"servers\", {})\n if isinstance(servers_cache, dict):\n cached = servers_cache.get(current_server_name)\n if cached is not None:\n try:\n cached_tools = cached[\"tools\"]\n self.tools = cached_tools\n self.tool_names = cached[\"tool_names\"]\n self._tool_cache = cached[\"tool_cache\"]\n except (TypeError, KeyError, AttributeError) as e:\n # Handle corrupted cache data by ignoring it\n msg = f\"Unable to use cached data for MCP Server,{current_server_name}: {e}\"\n await logger.awarning(msg)\n cached_tools = None\n\n # Only clear tools if we don't have cached tools for the current server\n if not cached_tools:\n self.tools = [] # Clear previous tools only if no cache\n\n self.remove_non_default_keys(build_config) # Clear previous tool inputs\n\n # Only show the tool dropdown if not in tool_mode\n if not is_in_tool_mode:\n build_config[\"tool\"][\"show\"] = True\n if cached_tools:\n # Use cached tools to populate options immediately\n build_config[\"tool\"][\"options\"] = [tool.name for tool in cached_tools]\n build_config[\"tool\"][\"placeholder\"] = \"Select a tool\"\n else:\n # Show loading state only when we need to fetch tools\n build_config[\"tool\"][\"placeholder\"] = \"Loading tools...\"\n build_config[\"tool\"][\"options\"] = []\n build_config[\"tool\"][\"value\"] = uuid.uuid4()\n else:\n # Keep the tool dropdown hidden if in tool_mode\n self._not_load_actions = True\n build_config[\"tool\"][\"show\"] = False\n\n elif field_name == \"tool_mode\":\n build_config[\"tool\"][\"placeholder\"] = \"\"\n build_config[\"tool\"][\"show\"] = not bool(field_value) and bool(build_config[\"mcp_server\"])\n self.remove_non_default_keys(build_config)\n self.tool = build_config[\"tool\"][\"value\"]\n if field_value:\n self._not_load_actions = True\n else:\n build_config[\"tool\"][\"value\"] = uuid.uuid4()\n build_config[\"tool\"][\"options\"] = []\n build_config[\"tool\"][\"show\"] = True\n build_config[\"tool\"][\"placeholder\"] = \"Loading tools...\"\n elif field_name == \"tools_metadata\":\n self._not_load_actions = False\n\n except Exception as e:\n msg = f\"Error in update_build_config: {e!s}\"\n await logger.aexception(msg)\n raise ValueError(msg) from e\n else:\n return build_config\n\n def get_inputs_for_all_tools(self, tools: list) -> dict:\n \"\"\"Get input schemas for all tools.\"\"\"\n inputs = {}\n for tool in tools:\n if not tool or not hasattr(tool, \"name\"):\n continue\n try:\n flat_schema = flatten_schema(tool.args_schema.schema())\n input_schema = create_input_schema_from_json_schema(flat_schema)\n langflow_inputs = schema_to_langflow_inputs(input_schema)\n inputs[tool.name] = langflow_inputs\n except (AttributeError, ValueError, TypeError, KeyError) as e:\n msg = f\"Error getting inputs for tool {getattr(tool, 'name', 'unknown')}: {e!s}\"\n logger.exception(msg)\n continue\n return inputs\n\n def remove_input_schema_from_build_config(\n self, build_config: dict, tool_name: str, input_schema: dict[list[InputTypes], Any]\n ):\n \"\"\"Remove the input schema for the tool from the build config.\"\"\"\n # Keep only schemas that don't belong to the current tool\n input_schema = {k: v for k, v in input_schema.items() if k != tool_name}\n # Remove all inputs from other tools\n for value in input_schema.values():\n for _input in value:\n if _input.name in build_config:\n build_config.pop(_input.name)\n\n def remove_non_default_keys(self, build_config: dict) -> None:\n \"\"\"Remove non-default keys from the build config.\"\"\"\n for key in list(build_config.keys()):\n if key not in self.default_keys:\n build_config.pop(key)\n\n async def _update_tool_config(self, build_config: dict, tool_name: str) -> None:\n \"\"\"Update tool configuration with proper error handling.\"\"\"\n if not self.tools:\n self.tools, build_config[\"mcp_server\"][\"value\"] = await self.update_tool_list()\n\n if not tool_name:\n return\n\n tool_obj = next((tool for tool in self.tools if tool.name == tool_name), None)\n if not tool_obj:\n msg = f\"Tool {tool_name} not found in available tools: {self.tools}\"\n self.remove_non_default_keys(build_config)\n build_config[\"tool\"][\"value\"] = \"\"\n await logger.awarning(msg)\n return\n\n try:\n # Store current values before removing inputs\n current_values = {}\n for key, value in build_config.items():\n if key not in self.default_keys and isinstance(value, dict) and \"value\" in value:\n current_values[key] = value[\"value\"]\n\n # Get all tool inputs and remove old ones\n input_schema_for_all_tools = self.get_inputs_for_all_tools(self.tools)\n self.remove_input_schema_from_build_config(build_config, tool_name, input_schema_for_all_tools)\n\n # Get and validate new inputs\n self.schema_inputs = await self._validate_schema_inputs(tool_obj)\n if not self.schema_inputs:\n msg = f\"No input parameters to configure for tool '{tool_name}'\"\n await logger.ainfo(msg)\n return\n\n # Add new inputs to build config\n for schema_input in self.schema_inputs:\n if not schema_input or not hasattr(schema_input, \"name\"):\n msg = \"Invalid schema input detected, skipping\"\n await logger.awarning(msg)\n continue\n\n try:\n name = schema_input.name\n input_dict = schema_input.to_dict()\n input_dict.setdefault(\"value\", None)\n input_dict.setdefault(\"required\", True)\n\n build_config[name] = input_dict\n\n # Preserve existing value if the parameter name exists in current_values\n if name in current_values:\n build_config[name][\"value\"] = current_values[name]\n\n except (AttributeError, KeyError, TypeError) as e:\n msg = f\"Error processing schema input {schema_input}: {e!s}\"\n await logger.aexception(msg)\n continue\n except ValueError as e:\n msg = f\"Schema validation error for tool {tool_name}: {e!s}\"\n await logger.aexception(msg)\n self.schema_inputs = []\n return\n except (AttributeError, KeyError, TypeError) as e:\n msg = f\"Error updating tool config: {e!s}\"\n await logger.aexception(msg)\n raise ValueError(msg) from e\n\n async def build_output(self) -> DataFrame:\n \"\"\"Build output with improved error handling and validation.\"\"\"\n try:\n self.tools, _ = await self.update_tool_list()\n if self.tool != \"\":\n # Set session context for persistent MCP sessions using Langflow session ID\n session_context = self._get_session_context()\n if session_context:\n self.stdio_client.set_session_context(session_context)\n self.sse_client.set_session_context(session_context)\n\n exec_tool = self._tool_cache[self.tool]\n tool_args = self.get_inputs_for_all_tools(self.tools)[self.tool]\n kwargs = {}\n for arg in tool_args:\n value = getattr(self, arg.name, None)\n if value is not None:\n if isinstance(value, Message):\n kwargs[arg.name] = value.text\n else:\n kwargs[arg.name] = value\n\n unflattened_kwargs = maybe_unflatten_dict(kwargs)\n\n output = await exec_tool.coroutine(**unflattened_kwargs)\n\n tool_content = []\n for item in output.content:\n item_dict = item.model_dump()\n tool_content.append(item_dict)\n return DataFrame(data=tool_content)\n return DataFrame(data=[{\"error\": \"You must select a tool\"}])\n except Exception as e:\n msg = f\"Error in build_output: {e!s}\"\n await logger.aexception(msg)\n raise ValueError(msg) from e\n\n def _get_session_context(self) -> str | None:\n \"\"\"Get the Langflow session ID for MCP session caching.\"\"\"\n # Try to get session ID from the component's execution context\n if hasattr(self, \"graph\") and hasattr(self.graph, \"session_id\"):\n session_id = self.graph.session_id\n # Include server name to ensure different servers get different sessions\n server_name = \"\"\n mcp_server = getattr(self, \"mcp_server\", None)\n if isinstance(mcp_server, dict):\n server_name = mcp_server.get(\"name\", \"\")\n elif mcp_server:\n server_name = str(mcp_server)\n return f\"{session_id}_{server_name}\" if session_id else None\n return None\n\n async def _get_tools(self):\n \"\"\"Get cached tools or update if necessary.\"\"\"\n mcp_server = getattr(self, \"mcp_server\", None)\n if not self._not_load_actions:\n tools, _ = await self.update_tool_list(mcp_server)\n return tools\n return []\n" + }, + "mcp_server": { + "_input_type": "McpInput", + "advanced": false, + "display_name": "MCP Server", + "dynamic": false, + "info": "Select the MCP Server that will be used by this component", + "name": "mcp_server", + "placeholder": "", + "real_time_refresh": true, + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "mcp", + "value": { + "config": { + "args": [ + "mcp-proxy", + "--headers", + "x-api-key", + "sk-yG6gCxpN87BUG3PYWlQWrePJxVNo__UHipbFtyxweCU", + "http://localhost:7860/api/v1/mcp/project/93eb5c48-0b07-4b83-93a5-1d5ba4a521e6/sse", + "--headers", + "X-Langflow-Global-Var-JWT", + "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vb3BlbnJhZy1iYWNrZW5kOjgwMDAiLCJzdWIiOiIxMDc1NDkxMDcwMzY3OTcwMTcwODIiLCJhdWQiOlsib3BlbnNlYXJjaCIsIm9wZW5yYWciXSwiZXhwIjoxNzYwMTA2NTg3LCJpYXQiOjE3NTk1MDE3ODcsImF1dGhfdGltZSI6MTc1OTUwMTc4NywidXNlcl9pZCI6IjEwNzU0OTEwNzAzNjc5NzAxNzA4MiIsImVtYWlsIjoibGFuZ2Zsb3dkZW1vQGdtYWlsLmNvbSIsIm5hbWUiOiJEZW1vIEVtYWlsIiwicHJlZmVycmVkX3VzZXJuYW1lIjoibGFuZ2Zsb3dkZW1vQGdtYWlsLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJyb2xlcyI6WyJvcGVucmFnX3VzZXIiXX0.KXIVlw1MImvZdM5M5GDVFCbizdBTeMScRgPz2zGO6tZaO48-EIkRA-j2557XdPNgVTmHD4ARl8N66aok-CY1rysSoeDJZ0zK2kAXWORtt1Jrs_pdJOwfG9HwHHkaAbSFxmhvAUFr4akE8Ye83tRQQnBxw3f03Js_E37rYeKiMiNTuOSEHQQXkI-4rPDbnz1srDz6xW_JPv-PBD5y5004uJbqUdfSzYJQ9D7Tprq2DvzO0YIFHGgzJ7S6inZ4ky3yyBQ0nLgQt9ISjlNtLEkn2GvAjXDbVNZzwGFmnu8OgqFM9RL9TNtAdfUFDaip0nW-7Xc2KvVNZWzxG7asuf77BQ", + "--headers", + "X-Langflow-Global-Var-OWNER", + "107549107036797017082", + "--headers", + "X-Langflow-Global-Var-OWNER_NAME", + "Demo Email", + "--headers", + "X-Langflow-Global-Var-OWNER_EMAIL", + "langflowdemo@gmail.com" + ], + "command": "uvx" + }, + "name": "lf-starter_project" + } + }, + "tool": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Tool", + "dynamic": false, + "external_options": {}, + "info": "Select the tool to execute", + "name": "tool", + "options": [ + "opensearch_url_ingestion_flow" + ], + "options_metadata": [], + "placeholder": "", + "real_time_refresh": true, + "required": true, + "show": false, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "tool_placeholder": { + "_input_type": "MessageTextInput", + "advanced": false, + "display_name": "Tool Placeholder", + "dynamic": false, + "info": "Placeholder for the tool", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "tool_placeholder", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": true, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "tools_metadata": { + "_input_type": "ToolsInput", + "advanced": false, + "display_name": "Actions", + "dynamic": false, + "info": "Modify tool names and descriptions to help agents understand when to use each tool.", + "is_list": true, + "list_add_label": "Add More", + "name": "tools_metadata", + "placeholder": "", + "real_time_refresh": true, + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "tools", + "value": [ + { + "args": { + "input_value": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Message to be passed as input.", + "title": "Input Value" + } + }, + "description": "This flow is to ingest the URL to open search.", + "display_description": "This flow is to ingest the URL to open search.", + "display_name": "opensearch_url_ingestion_flow", + "name": "opensearch_url_ingestion_flow", + "readonly": false, + "status": true, + "tags": [ + "opensearch_url_ingestion_flow" + ] + } + ] + }, + "use_cache": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Use Cached Server", + "dynamic": false, + "info": "Enable caching of MCP Server and tools to improve performance. Disable to always fetch fresh tools and server updates.", + "list": false, + "list_add_label": "Add More", + "name": "use_cache", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": false + } + }, + "tool_mode": true + }, + "showNode": true, + "type": "MCP" + }, + "id": "MCP-Uk5Vf", + "measured": { + "height": 284, + "width": 320 + }, + "position": { + "x": 554.5386324758331, + "y": 845.4231682975634 + }, + "selected": false, + "type": "genericNode" } ], "viewport": { - "x": -325.1448543831343, - "y": 117.81831162150468, - "zoom": 0.6031333914833354 + "x": -184.98015964664273, + "y": 154.6885920024542, + "zoom": 0.602433700773958 } }, "description": "OpenRAG Open Search Agent", diff --git a/flows/openrag_url_mcp.json b/flows/openrag_url_mcp.json index 9bf255ff..c8f8cb70 100644 --- a/flows/openrag_url_mcp.json +++ b/flows/openrag_url_mcp.json @@ -199,6 +199,58 @@ "sourceHandle": "{œdataTypeœ:œDataFrameOperationsœ,œidœ:œDataFrameOperations-A98BLœ,œnameœ:œoutputœ,œoutput_typesœ:[œDataFrameœ]}", "target": "SplitText-QIKhg", "targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" + }, + { + "data": { + "sourceHandle": { + "dataType": "SplitText", + "id": "SplitText-QIKhg", + "name": "dataframe", + "output_types": [ + "DataFrame" + ] + }, + "targetHandle": { + "fieldName": "df", + "id": "DataFrameOperations-RhKoe", + "inputTypes": [ + "DataFrame" + ], + "type": "other" + } + }, + "id": "xy-edge__SplitText-QIKhg{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-DataFrameOperations-RhKoe{œfieldNameœ:œdfœ,œidœ:œDataFrameOperations-RhKoeœ,œinputTypesœ:[œDataFrameœ],œtypeœ:œotherœ}", + "source": "SplitText-QIKhg", + "sourceHandle": "{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}", + "target": "DataFrameOperations-RhKoe", + "targetHandle": "{œfieldNameœ:œdfœ,œidœ:œDataFrameOperations-RhKoeœ,œinputTypesœ:[œDataFrameœ],œtypeœ:œotherœ}" + }, + { + "data": { + "sourceHandle": { + "dataType": "DataFrameOperations", + "id": "DataFrameOperations-RhKoe", + "name": "output", + "output_types": [ + "DataFrame" + ] + }, + "targetHandle": { + "fieldName": "input_value", + "id": "ChatOutput-Q1dhr", + "inputTypes": [ + "Data", + "DataFrame", + "Message" + ], + "type": "other" + } + }, + "id": "xy-edge__DataFrameOperations-RhKoe{œdataTypeœ:œDataFrameOperationsœ,œidœ:œDataFrameOperations-RhKoeœ,œnameœ:œoutputœ,œoutput_typesœ:[œDataFrameœ]}-ChatOutput-Q1dhr{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-Q1dhrœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "source": "DataFrameOperations-RhKoe", + "sourceHandle": "{œdataTypeœ:œDataFrameOperationsœ,œidœ:œDataFrameOperations-RhKoeœ,œnameœ:œoutputœ,œoutput_typesœ:[œDataFrameœ]}", + "target": "ChatOutput-Q1dhr", + "targetHandle": "{œfieldNameœ:œinput_valueœ,œidœ:œChatOutput-Q1dhrœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" } ], "nodes": [ @@ -487,6 +539,7 @@ "frozen": false, "icon": "OpenAI", "legacy": false, + "lf_version": "1.6.0", "metadata": { "code_hash": "8a658ed6d4c9", "dependencies": { @@ -773,7 +826,7 @@ "dynamic": false, "info": "", "input_types": [], - "load_from_db": false, + "load_from_db": true, "name": "openai_api_key", "password": true, "placeholder": "", @@ -781,7 +834,7 @@ "show": true, "title_case": false, "type": "str", - "value": "" + "value": "OPENAI_API_KEY" }, "openai_api_type": { "_input_type": "MessageTextInput", @@ -1070,6 +1123,7 @@ "frozen": false, "icon": "OpenSearch", "legacy": false, + "lf_version": "1.6.0", "metadata": { "code_hash": "08d808984c3d", "dependencies": { @@ -1100,7 +1154,6 @@ "name": "search_results", "options": null, "required_inputs": null, - "selected": "Data", "tool_mode": true, "types": [ "Data" @@ -1134,7 +1187,6 @@ "name": "vectorstoreconnection", "options": null, "required_inputs": null, - "selected": "VectorStore", "tool_mode": true, "types": [ "VectorStore" @@ -1644,7 +1696,7 @@ }, "tool_mode": false }, - "selected_output": "search_results", + "selected_output": "dataframe", "showNode": true, "type": "OpenSearchVectorStoreComponent" }, @@ -1658,7 +1710,7 @@ "x": 2694.183983837566, "y": 1425.777807367294 }, - "selected": true, + "selected": false, "type": "genericNode" }, { @@ -2330,7 +2382,7 @@ ], "frozen": false, "icon": "table", - "last_updated": "2025-10-02T18:55:27.635Z", + "last_updated": "2025-10-03T14:31:53.355Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -2746,7 +2798,7 @@ ], "frozen": false, "icon": "table", - "last_updated": "2025-10-02T18:55:27.636Z", + "last_updated": "2025-10-03T14:31:53.355Z", "legacy": false, "lf_version": "1.6.0", "metadata": { @@ -3131,12 +3183,684 @@ }, "selected": false, "type": "genericNode" + }, + { + "data": { + "id": "ChatOutput-Q1dhr", + "node": { + "base_classes": [ + "Message" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Display a chat message in the Playground.", + "display_name": "Chat Output", + "documentation": "https://docs.langflow.org/components-io#chat-output", + "edited": false, + "field_order": [ + "input_value", + "should_store_message", + "sender", + "sender_name", + "session_id", + "data_template", + "clean_data" + ], + "frozen": false, + "icon": "MessagesSquare", + "legacy": false, + "metadata": { + "code_hash": "9647f4d2f4b4", + "dependencies": { + "dependencies": [ + { + "name": "orjson", + "version": "3.10.15" + }, + { + "name": "fastapi", + "version": "0.117.1" + }, + { + "name": "lfx", + "version": null + } + ], + "total_dependencies": 3 + }, + "module": "lfx.components.input_output.chat_output.ChatOutput" + }, + "minimized": true, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Output Message", + "group_outputs": false, + "method": "message_response", + "name": "message", + "selected": "Message", + "tool_mode": true, + "types": [ + "Message" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "clean_data": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Basic Clean Data", + "dynamic": false, + "info": "Whether to clean data before converting to string.", + "list": false, + "list_add_label": "Add More", + "name": "clean_data", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "from collections.abc import Generator\nfrom typing import Any\n\nimport orjson\nfrom fastapi.encoders import jsonable_encoder\n\nfrom lfx.base.io.chat import ChatComponent\nfrom lfx.helpers.data import safe_convert\nfrom lfx.inputs.inputs import BoolInput, DropdownInput, HandleInput, MessageTextInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.schema.properties import Source\nfrom lfx.template.field.base import Output\nfrom lfx.utils.constants import (\n MESSAGE_SENDER_AI,\n MESSAGE_SENDER_NAME_AI,\n MESSAGE_SENDER_USER,\n)\n\n\nclass ChatOutput(ChatComponent):\n display_name = \"Chat Output\"\n description = \"Display a chat message in the Playground.\"\n documentation: str = \"https://docs.langflow.org/components-io#chat-output\"\n icon = \"MessagesSquare\"\n name = \"ChatOutput\"\n minimized = True\n\n inputs = [\n HandleInput(\n name=\"input_value\",\n display_name=\"Inputs\",\n info=\"Message to be passed as output.\",\n input_types=[\"Data\", \"DataFrame\", \"Message\"],\n required=True,\n ),\n BoolInput(\n name=\"should_store_message\",\n display_name=\"Store Messages\",\n info=\"Store the message in the history.\",\n value=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"sender\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER],\n value=MESSAGE_SENDER_AI,\n advanced=True,\n info=\"Type of sender.\",\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Name of the sender.\",\n value=MESSAGE_SENDER_NAME_AI,\n advanced=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"data_template\",\n display_name=\"Data Template\",\n value=\"{text}\",\n advanced=True,\n info=\"Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.\",\n ),\n BoolInput(\n name=\"clean_data\",\n display_name=\"Basic Clean Data\",\n value=True,\n advanced=True,\n info=\"Whether to clean data before converting to string.\",\n ),\n ]\n outputs = [\n Output(\n display_name=\"Output Message\",\n name=\"message\",\n method=\"message_response\",\n ),\n ]\n\n def _build_source(self, id_: str | None, display_name: str | None, source: str | None) -> Source:\n source_dict = {}\n if id_:\n source_dict[\"id\"] = id_\n if display_name:\n source_dict[\"display_name\"] = display_name\n if source:\n # Handle case where source is a ChatOpenAI object\n if hasattr(source, \"model_name\"):\n source_dict[\"source\"] = source.model_name\n elif hasattr(source, \"model\"):\n source_dict[\"source\"] = str(source.model)\n else:\n source_dict[\"source\"] = str(source)\n return Source(**source_dict)\n\n async def message_response(self) -> Message:\n # First convert the input to string if needed\n text = self.convert_to_string()\n\n # Get source properties\n source, _, display_name, source_id = self.get_properties_from_source_component()\n\n # Create or use existing Message object\n if isinstance(self.input_value, Message):\n message = self.input_value\n # Update message properties\n message.text = text\n else:\n message = Message(text=text)\n\n # Set message properties\n message.sender = self.sender\n message.sender_name = self.sender_name\n message.session_id = self.session_id\n message.flow_id = self.graph.flow_id if hasattr(self, \"graph\") else None\n message.properties.source = self._build_source(source_id, display_name, source)\n\n # Store message if needed\n if self.session_id and self.should_store_message:\n stored_message = await self.send_message(message)\n self.message.value = stored_message\n message = stored_message\n\n self.status = message\n return message\n\n def _serialize_data(self, data: Data) -> str:\n \"\"\"Serialize Data object to JSON string.\"\"\"\n # Convert data.data to JSON-serializable format\n serializable_data = jsonable_encoder(data.data)\n # Serialize with orjson, enabling pretty printing with indentation\n json_bytes = orjson.dumps(serializable_data, option=orjson.OPT_INDENT_2)\n # Convert bytes to string and wrap in Markdown code blocks\n return \"```json\\n\" + json_bytes.decode(\"utf-8\") + \"\\n```\"\n\n def _validate_input(self) -> None:\n \"\"\"Validate the input data and raise ValueError if invalid.\"\"\"\n if self.input_value is None:\n msg = \"Input data cannot be None\"\n raise ValueError(msg)\n if isinstance(self.input_value, list) and not all(\n isinstance(item, Message | Data | DataFrame | str) for item in self.input_value\n ):\n invalid_types = [\n type(item).__name__\n for item in self.input_value\n if not isinstance(item, Message | Data | DataFrame | str)\n ]\n msg = f\"Expected Data or DataFrame or Message or str, got {invalid_types}\"\n raise TypeError(msg)\n if not isinstance(\n self.input_value,\n Message | Data | DataFrame | str | list | Generator | type(None),\n ):\n type_name = type(self.input_value).__name__\n msg = f\"Expected Data or DataFrame or Message or str, Generator or None, got {type_name}\"\n raise TypeError(msg)\n\n def convert_to_string(self) -> str | Generator[Any, None, None]:\n \"\"\"Convert input data to string with proper error handling.\"\"\"\n self._validate_input()\n if isinstance(self.input_value, list):\n clean_data: bool = getattr(self, \"clean_data\", False)\n return \"\\n\".join([safe_convert(item, clean_data=clean_data) for item in self.input_value])\n if isinstance(self.input_value, Generator):\n return self.input_value\n return safe_convert(self.input_value)\n" + }, + "data_template": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Data Template", + "dynamic": false, + "info": "Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "data_template", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "{text}" + }, + "input_value": { + "_input_type": "HandleInput", + "advanced": false, + "display_name": "Inputs", + "dynamic": false, + "info": "Message to be passed as output.", + "input_types": [ + "Data", + "DataFrame", + "Message" + ], + "list": false, + "list_add_label": "Add More", + "name": "input_value", + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "other", + "value": "" + }, + "sender": { + "_input_type": "DropdownInput", + "advanced": true, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Sender Type", + "dynamic": false, + "external_options": {}, + "info": "Type of sender.", + "name": "sender", + "options": [ + "Machine", + "User" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "Machine" + }, + "sender_name": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Sender Name", + "dynamic": false, + "info": "Name of the sender.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "sender_name", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "AI" + }, + "session_id": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Session ID", + "dynamic": false, + "info": "The session ID of the chat. If empty, the current session ID parameter will be used.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "session_id", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "should_store_message": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Store Messages", + "dynamic": false, + "info": "Store the message in the history.", + "list": false, + "list_add_label": "Add More", + "name": "should_store_message", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + } + }, + "tool_mode": false + }, + "showNode": false, + "type": "ChatOutput" + }, + "dragging": false, + "id": "ChatOutput-Q1dhr", + "measured": { + "height": 48, + "width": 192 + }, + "position": { + "x": 3135.060664388748, + "y": 2541.1604512818694 + }, + "selected": false, + "type": "genericNode" + }, + { + "data": { + "id": "DataFrameOperations-RhKoe", + "node": { + "base_classes": [ + "DataFrame" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Perform various operations on a DataFrame.", + "display_name": "DataFrame Operations", + "documentation": "https://docs.langflow.org/components-processing#dataframe-operations", + "edited": false, + "field_order": [ + "df", + "operation", + "column_name", + "filter_value", + "filter_operator", + "ascending", + "new_column_name", + "new_column_value", + "columns_to_select", + "num_rows", + "replace_value", + "replacement_value" + ], + "frozen": false, + "icon": "table", + "last_updated": "2025-10-03T14:40:28.935Z", + "legacy": false, + "metadata": { + "code_hash": "b4d6b19b6eef", + "dependencies": { + "dependencies": [ + { + "name": "pandas", + "version": "2.2.3" + }, + { + "name": "lfx", + "version": null + } + ], + "total_dependencies": 2 + }, + "module": "lfx.components.processing.dataframe_operations.DataFrameOperationsComponent" + }, + "minimized": false, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "DataFrame", + "group_outputs": false, + "method": "perform_operation", + "name": "output", + "options": null, + "required_inputs": null, + "selected": "DataFrame", + "tool_mode": true, + "types": [ + "DataFrame" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "ascending": { + "_input_type": "BoolInput", + "advanced": false, + "display_name": "Sort Ascending", + "dynamic": true, + "info": "Whether to sort in ascending order.", + "list": false, + "list_add_label": "Add More", + "name": "ascending", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "import pandas as pd\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs import SortableListInput\nfrom lfx.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput\nfrom lfx.log.logger import logger\nfrom lfx.schema.dataframe import DataFrame\n\n\nclass DataFrameOperationsComponent(Component):\n display_name = \"DataFrame Operations\"\n description = \"Perform various operations on a DataFrame.\"\n documentation: str = \"https://docs.langflow.org/components-processing#dataframe-operations\"\n icon = \"table\"\n name = \"DataFrameOperations\"\n\n OPERATION_CHOICES = [\n \"Add Column\",\n \"Drop Column\",\n \"Filter\",\n \"Head\",\n \"Rename Column\",\n \"Replace Value\",\n \"Select Columns\",\n \"Sort\",\n \"Tail\",\n \"Drop Duplicates\",\n ]\n\n inputs = [\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The input DataFrame to operate on.\",\n required=True,\n ),\n SortableListInput(\n name=\"operation\",\n display_name=\"Operation\",\n placeholder=\"Select Operation\",\n info=\"Select the DataFrame operation to perform.\",\n options=[\n {\"name\": \"Add Column\", \"icon\": \"plus\"},\n {\"name\": \"Drop Column\", \"icon\": \"minus\"},\n {\"name\": \"Filter\", \"icon\": \"filter\"},\n {\"name\": \"Head\", \"icon\": \"arrow-up\"},\n {\"name\": \"Rename Column\", \"icon\": \"pencil\"},\n {\"name\": \"Replace Value\", \"icon\": \"replace\"},\n {\"name\": \"Select Columns\", \"icon\": \"columns\"},\n {\"name\": \"Sort\", \"icon\": \"arrow-up-down\"},\n {\"name\": \"Tail\", \"icon\": \"arrow-down\"},\n {\"name\": \"Drop Duplicates\", \"icon\": \"copy-x\"},\n ],\n real_time_refresh=True,\n limit=1,\n ),\n StrInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The column name to use for the operation.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"filter_value\",\n display_name=\"Filter Value\",\n info=\"The value to filter rows by.\",\n dynamic=True,\n show=False,\n ),\n DropdownInput(\n name=\"filter_operator\",\n display_name=\"Filter Operator\",\n options=[\n \"equals\",\n \"not equals\",\n \"contains\",\n \"not contains\",\n \"starts with\",\n \"ends with\",\n \"greater than\",\n \"less than\",\n ],\n value=\"equals\",\n info=\"The operator to apply for filtering rows.\",\n advanced=False,\n dynamic=True,\n show=False,\n ),\n BoolInput(\n name=\"ascending\",\n display_name=\"Sort Ascending\",\n info=\"Whether to sort in ascending order.\",\n dynamic=True,\n show=False,\n value=True,\n ),\n StrInput(\n name=\"new_column_name\",\n display_name=\"New Column Name\",\n info=\"The new column name when renaming or adding a column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"new_column_value\",\n display_name=\"New Column Value\",\n info=\"The value to populate the new column with.\",\n dynamic=True,\n show=False,\n ),\n StrInput(\n name=\"columns_to_select\",\n display_name=\"Columns to Select\",\n dynamic=True,\n is_list=True,\n show=False,\n ),\n IntInput(\n name=\"num_rows\",\n display_name=\"Number of Rows\",\n info=\"Number of rows to return (for head/tail).\",\n dynamic=True,\n show=False,\n value=5,\n ),\n MessageTextInput(\n name=\"replace_value\",\n display_name=\"Value to Replace\",\n info=\"The value to replace in the column.\",\n dynamic=True,\n show=False,\n ),\n MessageTextInput(\n name=\"replacement_value\",\n display_name=\"Replacement Value\",\n info=\"The value to replace with.\",\n dynamic=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"output\",\n method=\"perform_operation\",\n info=\"The resulting DataFrame after the operation.\",\n )\n ]\n\n def update_build_config(self, build_config, field_value, field_name=None):\n dynamic_fields = [\n \"column_name\",\n \"filter_value\",\n \"filter_operator\",\n \"ascending\",\n \"new_column_name\",\n \"new_column_value\",\n \"columns_to_select\",\n \"num_rows\",\n \"replace_value\",\n \"replacement_value\",\n ]\n for field in dynamic_fields:\n build_config[field][\"show\"] = False\n\n if field_name == \"operation\":\n # Handle SortableListInput format\n if isinstance(field_value, list):\n operation_name = field_value[0].get(\"name\", \"\") if field_value else \"\"\n else:\n operation_name = field_value or \"\"\n\n # If no operation selected, all dynamic fields stay hidden (already set to False above)\n if not operation_name:\n return build_config\n\n if operation_name == \"Filter\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"filter_value\"][\"show\"] = True\n build_config[\"filter_operator\"][\"show\"] = True\n elif operation_name == \"Sort\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"ascending\"][\"show\"] = True\n elif operation_name == \"Drop Column\":\n build_config[\"column_name\"][\"show\"] = True\n elif operation_name == \"Rename Column\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"new_column_name\"][\"show\"] = True\n elif operation_name == \"Add Column\":\n build_config[\"new_column_name\"][\"show\"] = True\n build_config[\"new_column_value\"][\"show\"] = True\n elif operation_name == \"Select Columns\":\n build_config[\"columns_to_select\"][\"show\"] = True\n elif operation_name in {\"Head\", \"Tail\"}:\n build_config[\"num_rows\"][\"show\"] = True\n elif operation_name == \"Replace Value\":\n build_config[\"column_name\"][\"show\"] = True\n build_config[\"replace_value\"][\"show\"] = True\n build_config[\"replacement_value\"][\"show\"] = True\n elif operation_name == \"Drop Duplicates\":\n build_config[\"column_name\"][\"show\"] = True\n\n return build_config\n\n def perform_operation(self) -> DataFrame:\n df_copy = self.df.copy()\n\n # Handle SortableListInput format for operation\n operation_input = getattr(self, \"operation\", [])\n if isinstance(operation_input, list) and len(operation_input) > 0:\n op = operation_input[0].get(\"name\", \"\")\n else:\n op = \"\"\n\n # If no operation selected, return original DataFrame\n if not op:\n return df_copy\n\n if op == \"Filter\":\n return self.filter_rows_by_value(df_copy)\n if op == \"Sort\":\n return self.sort_by_column(df_copy)\n if op == \"Drop Column\":\n return self.drop_column(df_copy)\n if op == \"Rename Column\":\n return self.rename_column(df_copy)\n if op == \"Add Column\":\n return self.add_column(df_copy)\n if op == \"Select Columns\":\n return self.select_columns(df_copy)\n if op == \"Head\":\n return self.head(df_copy)\n if op == \"Tail\":\n return self.tail(df_copy)\n if op == \"Replace Value\":\n return self.replace_values(df_copy)\n if op == \"Drop Duplicates\":\n return self.drop_duplicates(df_copy)\n msg = f\"Unsupported operation: {op}\"\n logger.error(msg)\n raise ValueError(msg)\n\n def filter_rows_by_value(self, df: DataFrame) -> DataFrame:\n column = df[self.column_name]\n filter_value = self.filter_value\n\n # Handle regular DropdownInput format (just a string value)\n operator = getattr(self, \"filter_operator\", \"equals\") # Default to equals for backward compatibility\n\n if operator == \"equals\":\n mask = column == filter_value\n elif operator == \"not equals\":\n mask = column != filter_value\n elif operator == \"contains\":\n mask = column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"not contains\":\n mask = ~column.astype(str).str.contains(str(filter_value), na=False)\n elif operator == \"starts with\":\n mask = column.astype(str).str.startswith(str(filter_value), na=False)\n elif operator == \"ends with\":\n mask = column.astype(str).str.endswith(str(filter_value), na=False)\n elif operator == \"greater than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column > numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) > str(filter_value)\n elif operator == \"less than\":\n try:\n # Try to convert filter_value to numeric for comparison\n numeric_value = pd.to_numeric(filter_value)\n mask = column < numeric_value\n except (ValueError, TypeError):\n # If conversion fails, compare as strings\n mask = column.astype(str) < str(filter_value)\n else:\n mask = column == filter_value # Fallback to equals\n\n return DataFrame(df[mask])\n\n def sort_by_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))\n\n def drop_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop(columns=[self.column_name]))\n\n def rename_column(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))\n\n def add_column(self, df: DataFrame) -> DataFrame:\n df[self.new_column_name] = [self.new_column_value] * len(df)\n return DataFrame(df)\n\n def select_columns(self, df: DataFrame) -> DataFrame:\n columns = [col.strip() for col in self.columns_to_select]\n return DataFrame(df[columns])\n\n def head(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.head(self.num_rows))\n\n def tail(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.tail(self.num_rows))\n\n def replace_values(self, df: DataFrame) -> DataFrame:\n df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)\n return DataFrame(df)\n\n def drop_duplicates(self, df: DataFrame) -> DataFrame:\n return DataFrame(df.drop_duplicates(subset=self.column_name))\n" + }, + "column_name": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "Column Name", + "dynamic": true, + "info": "The column name to use for the operation.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "column_name", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "columns_to_select": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "Columns to Select", + "dynamic": true, + "info": "", + "list": true, + "list_add_label": "Add More", + "load_from_db": false, + "name": "columns_to_select", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "df": { + "_input_type": "DataFrameInput", + "advanced": false, + "display_name": "DataFrame", + "dynamic": false, + "info": "The input DataFrame to operate on.", + "input_types": [ + "DataFrame" + ], + "list": false, + "list_add_label": "Add More", + "name": "df", + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "other", + "value": "" + }, + "filter_operator": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Filter Operator", + "dynamic": true, + "external_options": {}, + "info": "The operator to apply for filtering rows.", + "name": "filter_operator", + "options": [ + "equals", + "not equals", + "contains", + "not contains", + "starts with", + "ends with", + "greater than", + "less than" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "equals" + }, + "filter_value": { + "_input_type": "MessageTextInput", + "advanced": false, + "display_name": "Filter Value", + "dynamic": true, + "info": "The value to filter rows by.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "filter_value", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "new_column_name": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "New Column Name", + "dynamic": true, + "info": "The new column name when renaming or adding a column.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "new_column_name", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "new_column_value": { + "_input_type": "MessageTextInput", + "advanced": false, + "display_name": "New Column Value", + "dynamic": true, + "info": "The value to populate the new column with.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "new_column_value", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "num_rows": { + "_input_type": "IntInput", + "advanced": false, + "display_name": "Number of Rows", + "dynamic": true, + "info": "Number of rows to return (for head/tail).", + "list": false, + "list_add_label": "Add More", + "name": "num_rows", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "int", + "value": 5 + }, + "operation": { + "_input_type": "SortableListInput", + "advanced": false, + "display_name": "Operation", + "dynamic": false, + "info": "Select the DataFrame operation to perform.", + "limit": 1, + "name": "operation", + "options": [ + { + "icon": "plus", + "name": "Add Column" + }, + { + "icon": "minus", + "name": "Drop Column" + }, + { + "icon": "filter", + "name": "Filter" + }, + { + "icon": "arrow-up", + "name": "Head" + }, + { + "icon": "pencil", + "name": "Rename Column" + }, + { + "icon": "replace", + "name": "Replace Value" + }, + { + "icon": "columns", + "name": "Select Columns" + }, + { + "icon": "arrow-up-down", + "name": "Sort" + }, + { + "icon": "arrow-down", + "name": "Tail" + }, + { + "icon": "copy-x", + "name": "Drop Duplicates" + } + ], + "placeholder": "Select Operation", + "real_time_refresh": true, + "required": false, + "search_category": [], + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "sortableList", + "value": [ + { + "chosen": false, + "icon": "arrow-up", + "name": "Head", + "selected": false + } + ] + }, + "replace_value": { + "_input_type": "MessageTextInput", + "advanced": false, + "display_name": "Value to Replace", + "dynamic": true, + "info": "The value to replace in the column.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "replace_value", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "replacement_value": { + "_input_type": "MessageTextInput", + "advanced": false, + "display_name": "Replacement Value", + "dynamic": true, + "info": "The value to replace with.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "replacement_value", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + } + }, + "tool_mode": false + }, + "showNode": true, + "type": "DataFrameOperations" + }, + "dragging": false, + "id": "DataFrameOperations-RhKoe", + "measured": { + "height": 317, + "width": 320 + }, + "position": { + "x": 2773.2060092972047, + "y": 2337.54590413581 + }, + "selected": true, + "type": "genericNode" } ], "viewport": { - "x": -455.58644249058534, - "y": -635.4279993708312, - "zoom": 0.5860911118774581 + "x": -1399.3594540015256, + "y": -1163.4584036787048, + "zoom": 0.7595394813583742 } }, "description": "This flow is to ingest the URL to open search.", @@ -3144,8 +3868,8 @@ "id": "72c3d17c-2dac-4a73-b48a-6518473d7830", "is_component": false, "last_tested_version": "1.6.0", - "mcp_enabled": true, "name": "OpenSearch URL Ingestion Flow", + "mcp_enabled": true, "tags": [ "openai", "astradb",