diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index eff0552d..5ee5d1b2 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -24,40 +24,12 @@ "type": "other" } }, - "id": "reactflow__edge-File-PSU37{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}-SplitText-QIKhg{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}", + "id": "reactflow__edge-File-PSU37{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-QIKhg{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", "selected": false, "source": "File-PSU37", - "sourceHandle": "{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}", + "sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}", "target": "SplitText-QIKhg", - "targetHandle": "{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}" - }, - { - "animated": false, - "className": "", - "data": { - "sourceHandle": { - "dataType": "OpenAIEmbeddings", - "id": "OpenAIEmbeddings-joRJ6", - "name": "embeddings", - "output_types": [ - "Embeddings" - ] - }, - "targetHandle": { - "fieldName": "embedding", - "id": "OpenSearch-Mkw1W", - "inputTypes": [ - "Embeddings" - ], - "type": "other" - } - }, - "id": "xy-edge__OpenAIEmbeddings-joRJ6{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}", - "selected": false, - "source": "OpenAIEmbeddings-joRJ6", - "sourceHandle": "{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}", - "target": "OpenSearch-Mkw1W", - "targetHandle": "{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}" + "targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" }, { "animated": false, @@ -73,7 +45,7 @@ }, "targetHandle": { "fieldName": "ingest_data", - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "inputTypes": [ "Data", "DataFrame" @@ -81,12 +53,40 @@ "type": "other" } }, - "id": "xy-edge__SplitText-QIKhg{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}", + "id": "xy-edge__SplitText-QIKhg{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-OpenSearchHybrid-Ve6bS{œfieldNameœ:œingest_dataœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}", "selected": false, "source": "SplitText-QIKhg", - "sourceHandle": "{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}", - "target": "OpenSearch-Mkw1W", - "targetHandle": "{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}" + "sourceHandle": "{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}", + "target": "OpenSearchHybrid-Ve6bS", + "targetHandle": "{œfieldNameœ:œingest_dataœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "OpenAIEmbeddings", + "id": "OpenAIEmbeddings-joRJ6", + "name": "embeddings", + "output_types": [ + "Embeddings" + ] + }, + "targetHandle": { + "fieldName": "embedding", + "id": "OpenSearchHybrid-Ve6bS", + "inputTypes": [ + "Embeddings" + ], + "type": "other" + } + }, + "id": "xy-edge__OpenAIEmbeddings-joRJ6{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-joRJ6œ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}-OpenSearchHybrid-Ve6bS{œfieldNameœ:œembeddingœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}", + "selected": false, + "source": "OpenAIEmbeddings-joRJ6", + "sourceHandle": "{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-joRJ6œ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}", + "target": "OpenSearchHybrid-Ve6bS", + "targetHandle": "{œfieldNameœ:œembeddingœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}" } ], "nodes": [ @@ -115,7 +115,7 @@ "frozen": false, "icon": "scissors-line-dashed", "legacy": false, - "lf_version": "1.1.1", + "lf_version": "1.5.0.post2", "metadata": { "code_hash": "dbf2e9d2319d", "dependencies": { @@ -353,7 +353,7 @@ "frozen": false, "icon": "OpenAI", "legacy": false, - "lf_version": "1.1.1", + "lf_version": "1.5.0.post2", "metadata": { "code_hash": "2691dee277c9", "dependencies": { @@ -829,7 +829,7 @@ "data": { "id": "note-Bm5Xw", "node": { - "description": "### \ud83d\udca1 Add your OpenAI API key here \ud83d\udc47", + "description": "### 💡 Add your OpenAI API key here 👇", "display_name": "", "documentation": "", "template": { @@ -884,8 +884,9 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-09-03T06:37:14.082Z", + "last_updated": "2025-09-08T11:35:39.784Z", "legacy": false, + "lf_version": "1.5.0.post2", "metadata": {}, "minimized": false, "output_types": [], @@ -905,6 +906,23 @@ "Message" ], "value": "__UNDEFINED__" + }, + { + "allows_loop": false, + "cache": true, + "display_name": "File Path", + "group_outputs": false, + "hidden": null, + "method": "load_files_path", + "name": "path", + "options": null, + "required_inputs": null, + "selected": "Message", + "tool_mode": true, + "types": [ + "Message" + ], + "value": "__UNDEFINED__" } ], "pinned": false, @@ -1051,7 +1069,9 @@ "bz2", "gz" ], - "file_path": [], + "file_path": [ + "242b5797-4104-4a01-8da1-b8036813100d/test_ingestion.txt" + ], "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": true, "list_add_label": "Add More", @@ -1124,13 +1144,14 @@ }, "tool_mode": false }, + "selected_output": "message", "showNode": true, "type": "File" }, "dragging": false, "id": "File-PSU37", "measured": { - "height": 230, + "height": 234, "width": 320 }, "position": { @@ -1142,9 +1163,7 @@ }, { "data": { - "description": "OpenSearch Vector Store with advanced, customizable search capabilities.", - "display_name": "OpenSearch", - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "node": { "base_classes": [ "Data", @@ -1154,45 +1173,53 @@ "beta": false, "conditional_paths": [], "custom_fields": {}, - "description": "OpenSearch Vector Store with advanced, customizable search capabilities.", - "display_name": "OpenSearch", + "description": "Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.", + "display_name": "OpenSearch (Hybrid)", "documentation": "", - "edited": false, + "edited": true, "field_order": [ "opensearch_url", "index_name", + "engine", + "space_type", + "ef_construction", + "m", "ingest_data", "search_query", "should_cache_vector_store", "embedding", - "search_type", + "vector_field", "number_of_results", - "search_score_threshold", + "filter_expression", + "auth_mode", "username", "password", + "jwt_token", + "jwt_header", + "bearer_prefix", "use_ssl", - "verify_certs", - "hybrid_search_query" + "verify_certs" ], "frozen": false, "icon": "OpenSearch", + "last_updated": "2025-09-05T21:19:52.776Z", "legacy": false, "metadata": { - "code_hash": "972b714acf6b", + "code_hash": "37e8631c902b", "dependencies": { "dependencies": [ - { - "name": "langchain_community", - "version": "0.3.21" - }, { "name": "langflow", "version": "1.5.0.post2" + }, + { + "name": "opensearchpy", + "version": "2.8.0" } ], "total_dependencies": 2 }, - "module": "custom_components.opensearch" + "module": "custom_components.opensearch_hybrid" }, "minimized": false, "output_types": [], @@ -1202,6 +1229,7 @@ "cache": true, "display_name": "Search Results", "group_outputs": false, + "hidden": null, "method": "search_documents", "name": "search_results", "options": null, @@ -1218,11 +1246,11 @@ "cache": true, "display_name": "DataFrame", "group_outputs": false, + "hidden": null, "method": "as_dataframe", "name": "dataframe", "options": null, "required_inputs": null, - "selected": "DataFrame", "tool_mode": true, "types": [ "DataFrame" @@ -1239,7 +1267,6 @@ "name": "vectorstoreconnection", "options": null, "required_inputs": null, - "selected": "VectorStore", "tool_mode": true, "types": [ "VectorStore" @@ -1250,6 +1277,50 @@ "pinned": false, "template": { "_type": "Component", + "auth_mode": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Auth Mode", + "dynamic": false, + "info": "Choose Basic (username/password) or JWT (Bearer token).", + "load_from_db": false, + "name": "auth_mode", + "options": [ + "basic", + "jwt" + ], + "options_metadata": [], + "placeholder": "", + "real_time_refresh": true, + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "jwt" + }, + "bearer_prefix": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Prefix 'Bearer '", + "dynamic": false, + "info": "", + "list": false, + "list_add_label": "Add More", + "name": "bearer_prefix", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, "code": { "advanced": true, "dynamic": true, @@ -1266,7 +1337,25 @@ "show": true, "title_case": false, "type": "code", - "value": "import json\nfrom typing import Any\n\nfrom langchain_community.vectorstores import OpenSearchVectorSearch\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.base.vectorstores.vector_store_connection_decorator import vector_store_connection\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n FloatInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom fastapi.encoders import jsonable_encoder\nfrom langchain_core.documents import Document\nimport json\n\n@vector_store_connection\nclass OpenSearchVectorStoreComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch Vector Store with advanced, customizable search capabilities.\"\"\"\n\n display_name: str = \"OpenSearch\"\n description: str = \"OpenSearch Vector Store with advanced, customizable search capabilities.\"\n name = \"OpenSearch\"\n icon = \"OpenSearch\"\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"documents\",\n info=\"The index name where the vectors will be stored in OpenSearch cluster.\",\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n info=\"Document field embeddings are stored in.\",\n advanced=True,\n ),\n StrInput(\n name=\"text_field\",\n display_name=\"Text Field\", \n value=\"text\",\n info=\"Document field the text of the document is stored in.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=100,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs,\n HandleInput(name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n options=[\"similarity\", \"similarity_score_threshold\", \"mmr\"],\n value=\"similarity\",\n advanced=True,\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Results\",\n info=\"Number of results to return.\",\n advanced=True,\n value=4,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results.\",\n value=0.0,\n advanced=True,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n advanced=True,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"OPENSEARCH_PASSWORD\",\n advanced=True,\n ),\n BoolInput(\n name=\"use_ssl\",\n display_name=\"Use SSL\",\n value=True,\n advanced=True,\n ),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n MultilineInput(\n name=\"hybrid_search_query\",\n display_name=\"Hybrid Search Query\",\n value=\"\",\n advanced=True,\n info=(\n \"Provide a custom hybrid search query in JSON format. This allows you to combine \"\n \"vector similarity and keyword matching.\"\n ),\n ),\n ]\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearchVectorSearch:\n \"\"\"Builds the OpenSearch Vector Store object.\"\"\"\n try:\n from langchain_community.vectorstores import OpenSearchVectorSearch\n except ImportError as e:\n error_message = f\"Failed to import required modules: {e}\"\n self.log(error_message)\n raise ImportError(error_message) from e\n\n try:\n opensearch = OpenSearchVectorSearch(\n index_name=self.index_name,\n embedding_function=self.embedding,\n opensearch_url=self.opensearch_url,\n http_auth=(self.username, self.password),\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n engine=self.engine,\n vector_field=self.vector_field,\n text_field=self.text_field,\n space_type=self.space_type,\n ef_construction=self.ef_construction,\n m=self.m,\n )\n except Exception as e:\n error_message = f\"Failed to create OpenSearchVectorSearch instance: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n if self.ingest_data:\n self._add_documents_to_vector_store(opensearch)\n\n return opensearch\n\n def _add_documents_to_vector_store(self, vector_store: \"OpenSearchVectorSearch\") -> None:\n \"\"\"Adds documents to the Vector Store.\"\"\"\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n doc = Document(\n page_content=_input.get_text(), \n metadata=jsonable_encoder(_input.data) if _input.data else {}\n )\n documents.append(doc)\n else:\n error_message = f\"Expected Data object, got {type(_input)}\"\n self.log(error_message)\n raise TypeError(error_message)\n\n if documents and self.embedding is not None:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(\n documents, \n vector_field=self.vector_field,\n text_field=self.text_field\n )\n except Exception as e:\n error_message = f\"Error adding documents to Vector Store: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n \"\"\"Search for similar documents in the vector store or retrieve all documents if no query is provided.\"\"\"\n \n vector_store = self.build_vector_store()\n try:\n query = query or \"\"\n\n if self.hybrid_search_query.strip():\n try:\n hybrid_query = json.loads(self.hybrid_search_query)\n except json.JSONDecodeError as e:\n error_message = f\"Invalid hybrid search query JSON: {e}\"\n self.log(error_message)\n raise ValueError(error_message) from e\n\n results = vector_store.client.search(index=self.index_name, body=hybrid_query)\n\n processed_results = []\n for hit in results.get(\"hits\", {}).get(\"hits\", []):\n source = hit.get(\"_source\", {})\n text = source.get(self.text_field, \"\")\n metadata = source.get(\"metadata\", {})\n\n if isinstance(text, dict):\n text = text.get(\"text\", \"\")\n\n processed_results.append(\n {\n \"page_content\": text,\n \"metadata\": metadata,\n }\n )\n return processed_results\n\n search_kwargs = {\n \"k\": self.number_of_results,\n \"vector_field\": self.vector_field,\n \"text_field\": self.text_field\n }\n search_type = self.search_type.lower()\n\n if search_type == \"similarity\":\n results = vector_store.similarity_search(query, **search_kwargs)\n return [{\"page_content\": doc.page_content, \"metadata\": doc.metadata} for doc in results]\n if search_type == \"similarity_score_threshold\":\n search_kwargs[\"score_threshold\"] = self.search_score_threshold\n results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)\n return [\n {\n \"page_content\": doc.page_content,\n \"metadata\": doc.metadata,\n \"score\": score,\n }\n for doc, score in results\n ]\n if search_type == \"mmr\":\n results = vector_store.max_marginal_relevance_search(query, **search_kwargs)\n return [{\"page_content\": doc.page_content, \"metadata\": doc.metadata} for doc in results]\n\n except Exception as e:\n error_message = f\"Error during search: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n error_message = f\"Error during search. Invalid search type: {self.search_type}\"\n self.log(error_message)\n raise ValueError(error_message)\n\n def search_documents(self) -> list[Data]:\n \"\"\"Search for documents in the vector store based on the search input.\n\n If no search input is provided, retrieve all documents.\n \"\"\"\n try:\n query = self.search_query.strip() if self.search_query else None\n results = self.search(query)\n retrieved_data = [\n Data(\n file_path=result[\"metadata\"].get(\"file_path\", \"\"),\n text=result[\"page_content\"],\n )\n for result in results\n ]\n except Exception as e:\n error_message = f\"Error during document search: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n self.status = retrieved_data\n return retrieved_data\n" + "value": "from __future__ import annotations\n\nimport json\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\nimport uuid\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"nmslib\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 512,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"hnsw\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n try:\n from opensearchpy.exceptions import NotFoundError\n except ImportError:\n raise ImportError(\"Could not import OpenSearch exceptions\")\n\n requests = []\n return_ids = []\n\n try:\n client.indices.get(index=index_name)\n except NotFoundError:\n client.indices.create(index=index_name, body=mapping)\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n if not is_aoss:\n client.indices.refresh(index=index_name)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n return self.build_client()\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n lc_doc = doc_obj.to_lc_document()\n texts.append(lc_doc.page_content)\n metadatas.append(lc_doc.metadata)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"nmslib\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + }, + "ef_construction": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "EF Construction", + "dynamic": false, + "info": "Size of the dynamic list used during k-NN graph creation.", + "list": false, + "list_add_label": "Add More", + "name": "ef_construction", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "int", + "value": 512 }, "embedding": { "_input_type": "HandleInput", @@ -1288,13 +1377,38 @@ "type": "other", "value": "" }, - "hybrid_search_query": { - "_input_type": "MultilineInput", + "engine": { + "_input_type": "DropdownInput", "advanced": true, - "copy_field": false, - "display_name": "Hybrid Search Query", + "combobox": false, + "dialog_inputs": {}, + "display_name": "Engine", "dynamic": false, - "info": "Provide a custom hybrid search query in JSON format. This allows you to combine vector similarity and keyword matching.", + "info": "Vector search engine to use.", + "name": "engine", + "options": [ + "nmslib", + "faiss", + "lucene" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "nmslib" + }, + "filter_expression": { + "_input_type": "MultilineInput", + "advanced": false, + "copy_field": false, + "display_name": "Filter Expression (JSON)", + "dynamic": false, + "info": "Optional JSON to control filters/limit/score threshold.\nAccepted shapes:\n1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\n2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\nPlaceholders with __IMPOSSIBLE_VALUE__ are ignored.", "input_types": [ "Message" ], @@ -1302,7 +1416,7 @@ "list_add_label": "Add More", "load_from_db": false, "multiline": true, - "name": "hybrid_search_query", + "name": "filter_expression", "placeholder": "", "required": false, "show": true, @@ -1318,7 +1432,7 @@ "advanced": false, "display_name": "Index Name", "dynamic": false, - "info": "The index name where the vectors will be stored in OpenSearch cluster.", + "info": "The index to search.", "list": false, "list_add_label": "Add More", "load_from_db": false, @@ -1353,14 +1467,69 @@ "type": "other", "value": "" }, + "jwt_header": { + "_input_type": "StrInput", + "advanced": true, + "display_name": "JWT Header Name", + "dynamic": false, + "info": "", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "jwt_header", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "Authorization" + }, + "jwt_token": { + "_input_type": "SecretStrInput", + "advanced": false, + "display_name": "JWT Token", + "dynamic": false, + "info": "Paste a valid JWT (sent as a header).", + "input_types": [], + "load_from_db": false, + "name": "jwt_token", + "password": true, + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "type": "str", + "value": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vb3BlbnJhZy1iYWNrZW5kOjgwMDAiLCJzdWIiOiIxMDMwNzA3MzY1NDU0NjQyNDYxMTMiLCJhdWQiOlsib3BlbnNlYXJjaCIsIm9wZW5yYWciXSwiZXhwIjoxNzU3NzExNzEyLCJpYXQiOjE3NTcxMDY5MTIsImF1dGhfdGltZSI6MTc1NzExNzcxMiwidXNlcl9pZCI6IjEwMzA3MDczNjU0NTQ2NDI0NjExMyIsImVtYWlsIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJuYW1lIjoiR2FicmllbCBBbG1laWRhIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwicm9sZXMiOlsib3BlbnJhZ191c2VyIl19.JneUFesg-FuNKVdd0Nbc8dtItxtrctwldJTnrj8I2U_mGcZgX0ObnqrrrF8lvn25Su3rdyZIJ84bX16WMUMhUivzRl1od7X5_PUOr21F_MHtIVMBnmQW_DO5MjN6Op4-v54FAc9HZn6v5gS_RdUr4E0Vscv5CJIfbirFTA0B3Yip9hxg1UXocgXnc0NwiwTJnu9XBhEgPOXJLIu1PJjvVWBclO7ZgzMmgSUoZPzDH6GQphPqtWxeav-bGk38HyI2GR0QaRYjGMgKMB-xwGQWh5kvCuwEQ5ylF80yXN7lVIc7DGY69vhy24II6W8FaWZvMVqJnwcByfHJWbWQ8g8UDA" + }, + "m": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "M Parameter", + "dynamic": false, + "info": "Number of bidirectional links created for each new element.", + "list": false, + "list_add_label": "Add More", + "name": "m", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "int", + "value": 16 + }, "number_of_results": { "_input_type": "IntInput", "advanced": true, - "display_name": "Number of Results", + "display_name": "Default Size (limit)", "dynamic": false, - "info": "Number of results to return.", + "info": "Default number of hits when no limit provided in filter_expression.", "list": false, "list_add_label": "Add More", + "load_from_db": false, "name": "number_of_results", "placeholder": "", "required": false, @@ -1376,7 +1545,7 @@ "advanced": false, "display_name": "OpenSearch URL", "dynamic": false, - "info": "URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).", + "info": "URL for your OpenSearch cluster.", "list": false, "list_add_label": "Add More", "load_from_db": false, @@ -1388,24 +1557,24 @@ "tool_mode": false, "trace_as_metadata": true, "type": "str", - "value": "http://opensearch:9200" + "value": "https://opensearch:9200" }, "password": { "_input_type": "SecretStrInput", - "advanced": true, + "advanced": false, "display_name": "Password", "dynamic": false, "info": "", "input_types": [], - "load_from_db": true, + "load_from_db": false, "name": "password", "password": true, "placeholder": "", "required": false, - "show": true, + "show": false, "title_case": false, "type": "str", - "value": "OPENSEARCH_PASSWORD" + "value": "" }, "search_query": { "_input_type": "QueryInput", @@ -1430,49 +1599,6 @@ "type": "query", "value": "" }, - "search_score_threshold": { - "_input_type": "FloatInput", - "advanced": true, - "display_name": "Search Score Threshold", - "dynamic": false, - "info": "Minimum similarity score threshold for search results.", - "list": false, - "list_add_label": "Add More", - "name": "search_score_threshold", - "placeholder": "", - "required": false, - "show": true, - "title_case": false, - "tool_mode": false, - "trace_as_metadata": true, - "type": "float", - "value": 0 - }, - "search_type": { - "_input_type": "DropdownInput", - "advanced": true, - "combobox": false, - "dialog_inputs": {}, - "display_name": "Search Type", - "dynamic": false, - "info": "", - "name": "search_type", - "options": [ - "similarity", - "similarity_score_threshold", - "mmr" - ], - "options_metadata": [], - "placeholder": "", - "required": false, - "show": true, - "title_case": false, - "toggle": false, - "tool_mode": false, - "trace_as_metadata": true, - "type": "str", - "value": "similarity" - }, "should_cache_vector_store": { "_input_type": "BoolInput", "advanced": true, @@ -1491,6 +1617,33 @@ "type": "bool", "value": true }, + "space_type": { + "_input_type": "DropdownInput", + "advanced": true, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Space Type", + "dynamic": false, + "info": "Distance metric for vector similarity.", + "name": "space_type", + "options": [ + "l2", + "l1", + "cosinesimil", + "linf", + "innerproduct" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "l2" + }, "use_ssl": { "_input_type": "BoolInput", "advanced": true, @@ -1511,7 +1664,7 @@ }, "username": { "_input_type": "StrInput", - "advanced": true, + "advanced": false, "display_name": "Username", "dynamic": false, "info": "", @@ -1521,13 +1674,32 @@ "name": "username", "placeholder": "", "required": false, - "show": true, + "show": false, "title_case": false, "tool_mode": false, "trace_as_metadata": true, "type": "str", "value": "admin" }, + "vector_field": { + "_input_type": "StrInput", + "advanced": true, + "display_name": "Vector Field", + "dynamic": false, + "info": "Vector field used for KNN.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "vector_field", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "chunk_embedding" + }, "verify_certs": { "_input_type": "BoolInput", "advanced": true, @@ -1551,24 +1723,24 @@ }, "selected_output": "search_results", "showNode": true, - "type": "OpenSearch" + "type": "OpenSearchHybrid" }, "dragging": false, - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "measured": { - "height": 518, + "height": 765, "width": 320 }, "position": { - "x": 2136.4456339674302, - "y": 1460.3160066924486 + "x": 2218.9287723423276, + "y": 1332.2598463956504 }, - "selected": false, + "selected": true, "type": "genericNode" } ], "viewport": { - "x": -1173.5436043881646, + "x": -1214.8709460066525, "y": -1289.0306227762003, "zoom": 1.0020797567291742 } diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 5ac8b901..7cd3ab46 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -34,9 +34,7 @@ async def upload_user_file( logger.debug("JWT token status", jwt_present=jwt_token is not None) logger.debug("Calling langflow_file_service.upload_user_file") - result = await langflow_file_service.upload_user_file( - file_tuple, jwt_token=jwt_token - ) + result = await langflow_file_service.upload_user_file(file_tuple, jwt_token) logger.debug("Upload successful", result=result) return JSONResponse(result, status_code=201) except Exception as e: @@ -99,15 +97,20 @@ async def run_ingestion( # (search parameters are for retrieval, not document processing) logger.debug("Final tweaks with settings applied", tweaks=tweaks) - # Include user JWT if available jwt_token = getattr(request.state, "jwt_token", None) + if jwt_token: + # Set auth context for downstream services + from auth_context import set_auth_context + + user_id = getattr(request.state, "user_id", None) + set_auth_context(user_id, jwt_token) result = await langflow_file_service.run_ingestion_flow( file_paths=file_paths or [], + jwt_token=jwt_token, session_id=session_id, tweaks=tweaks, - jwt_token=jwt_token, ) return JSONResponse(result) except Exception as e: diff --git a/src/config/settings.py b/src/config/settings.py index f2e3be7e..8b5aaa4f 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -322,6 +322,10 @@ class AppClients: existing_headers = kwargs.pop("headers", {}) headers = {**default_headers, **existing_headers} + # Remove Content-Type if explicitly set to None (for file uploads) + if headers.get("Content-Type") is None: + headers.pop("Content-Type", None) + url = f"{LANGFLOW_URL}{endpoint}" return await self.langflow_http_client.request( diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 1bc4da29..d28aad16 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -3,11 +3,12 @@ from typing import Any, Dict, List, Optional from config.settings import LANGFLOW_INGEST_FLOW_ID, clients +logger = logging.getLogger(__name__) + class LangflowFileService: def __init__(self): self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID - self.logger = logging.getLogger(__name__) async def upload_user_file( self, file_tuple, jwt_token: Optional[str] = None @@ -15,15 +16,18 @@ class LangflowFileService: """Upload a file using Langflow Files API v2: POST /api/v2/files. Returns JSON with keys: id, name, path, size, provider. """ - self.logger.debug("[LF] Upload (v2) -> /api/v2/files") + logger.debug("[LF] Upload (v2) -> /api/v2/files") resp = await clients.langflow_request( - "POST", "/api/v2/files", files={"file": file_tuple} + "POST", + "/api/v2/files", + files={"file": file_tuple}, + headers={"Content-Type": None}, ) - self.logger.debug( + logger.debug( "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Upload failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -35,13 +39,13 @@ class LangflowFileService: async def delete_user_file(self, file_id: str) -> None: """Delete a file by id using v2: DELETE /api/v2/files/{id}.""" # NOTE: use v2 root, not /api/v1 - self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) + logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}") - self.logger.debug( + logger.debug( "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Delete failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -52,15 +56,16 @@ class LangflowFileService: async def run_ingestion_flow( self, file_paths: List[str], + jwt_token: str, session_id: Optional[str] = None, tweaks: Optional[Dict[str, Any]] = None, - jwt_token: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger the ingestion flow with provided file paths. The flow must expose a File component path in input schema or accept files parameter. """ if not self.flow_id_ingest: + logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") payload: Dict[str, Any] = { @@ -68,19 +73,26 @@ class LangflowFileService: "input_type": "chat", "output_type": "text", # Changed from "json" to "text" } + if not tweaks: + tweaks = {} # Pass files via tweaks to File component (File-PSU37 from the flow) if file_paths: - if not tweaks: - tweaks = {} tweaks["File-PSU37"] = {"path": file_paths} + # Pass JWT token via tweaks using the x-langflow-global-var- pattern + if jwt_token: + # Using the global variable pattern that Langflow expects for OpenSearch components + tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token} + logger.error("[LF] Adding JWT token to tweaks for OpenSearch components") + else: + logger.error("[LF] No JWT token provided") if tweaks: payload["tweaks"] = tweaks if session_id: payload["session_id"] = session_id - self.logger.debug( + logger.debug( "[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", self.flow_id_ingest, len(file_paths) if file_paths else 0, @@ -90,16 +102,14 @@ class LangflowFileService: ) # Log the full payload for debugging - self.logger.debug("[LF] Request payload: %s", payload) + logger.debug("[LF] Request payload: %s", payload) resp = await clients.langflow_request( "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload ) - self.logger.debug( - "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase - ) + logger.debug("[LF] Run response: %s %s", resp.status_code, resp.reason_phrase) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Run failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, diff --git a/src/services/task_service.py b/src/services/task_service.py index 004385ae..ad24b188 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -5,7 +5,7 @@ import uuid from typing import Dict from models.tasks import FileTask, TaskStatus, UploadTask -from src.utils.gpu_detection import get_worker_count +from utils.gpu_detection import get_worker_count from utils.logging_config import get_logger logger = get_logger(__name__)