From 2543b2103be59ea10463d7fce0a75065f550509f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 08:42:46 -0300 Subject: [PATCH 1/6] fix import statement --- src/services/task_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 004385ae..ad24b188 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -5,7 +5,7 @@ import uuid from typing import Dict from models.tasks import FileTask, TaskStatus, UploadTask -from src.utils.gpu_detection import get_worker_count +from utils.gpu_detection import get_worker_count from utils.logging_config import get_logger logger = get_logger(__name__) From 7b5589653a1ca93ad9caa2beabafec8f77b7cc3b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 08:43:32 -0300 Subject: [PATCH 2/6] Add handling for Content-Type header in AppClients This commit modifies the AppClients class to remove the Content-Type header if it is explicitly set to None, particularly for file uploads. This change enhances the robustness of the async code by ensuring proper header management during API requests, aligning with the project's focus on well-documented and maintainable code. --- src/config/settings.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/config/settings.py b/src/config/settings.py index f2e3be7e..8b5aaa4f 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -322,6 +322,10 @@ class AppClients: existing_headers = kwargs.pop("headers", {}) headers = {**default_headers, **existing_headers} + # Remove Content-Type if explicitly set to None (for file uploads) + if headers.get("Content-Type") is None: + headers.pop("Content-Type", None) + url = f"{LANGFLOW_URL}{endpoint}" return await self.langflow_http_client.request( From deb39a6e5bec258d7d7536cf99fac06a8eb6c99f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 08:43:53 -0300 Subject: [PATCH 3/6] Refactor user file upload and ingestion flow to streamline JWT token handling This commit simplifies the handling of the JWT token in the upload_user_file and run_ingestion functions by removing unnecessary lines and ensuring the token is passed correctly to downstream services. This change enhances code readability and maintains the focus on robust async coding practices and well-documented code. --- src/api/langflow_files.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 1fa1f9c7..23bd33ec 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -34,9 +34,7 @@ async def upload_user_file( logger.debug("JWT token status", jwt_present=jwt_token is not None) logger.debug("Calling langflow_file_service.upload_user_file") - result = await langflow_file_service.upload_user_file( - file_tuple, jwt_token=jwt_token - ) + result = await langflow_file_service.upload_user_file(file_tuple, jwt_token) logger.debug("Upload successful", result=result) return JSONResponse(result, status_code=201) except Exception as e: @@ -99,15 +97,20 @@ async def run_ingestion( # (search parameters are for retrieval, not document processing) logger.debug("Final tweaks with settings applied", tweaks=tweaks) - # Include user JWT if available jwt_token = getattr(request.state, "jwt_token", None) + if jwt_token: + # Set auth context for downstream services + from auth_context import set_auth_context + + user_id = getattr(request.state, "user_id", None) + set_auth_context(user_id, jwt_token) result = await langflow_file_service.run_ingestion_flow( file_paths=file_paths or [], + jwt_token=jwt_token, session_id=session_id, tweaks=tweaks, - jwt_token=jwt_token, ) return JSONResponse(result) except Exception as e: From 18e1874c881038c0159bd2df50da7f2dae7345aa Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 08:44:05 -0300 Subject: [PATCH 4/6] Enhance logging and JWT token handling in LangflowFileService This commit refactors the LangflowFileService to utilize a centralized logger instead of instance-specific logging. It also improves the handling of the JWT token in the run_ingestion_flow method, ensuring it is correctly passed to downstream services and logged appropriately. These changes enhance code readability and maintainability while adhering to robust async coding practices. --- src/services/langflow_file_service.py | 43 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 452ecb86..6b343670 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -3,11 +3,12 @@ from typing import Any, Dict, List, Optional from config.settings import LANGFLOW_INGEST_FLOW_ID, clients +logger = logging.getLogger(__name__) + class LangflowFileService: def __init__(self): self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID - self.logger = logging.getLogger(__name__) async def upload_user_file( self, file_tuple, jwt_token: Optional[str] = None @@ -15,15 +16,18 @@ class LangflowFileService: """Upload a file using Langflow Files API v2: POST /api/v2/files. Returns JSON with keys: id, name, path, size, provider. """ - self.logger.debug("[LF] Upload (v2) -> /api/v2/files") + logger.debug("[LF] Upload (v2) -> /api/v2/files") resp = await clients.langflow_request( - "POST", "/api/v2/files", files={"file": file_tuple} + "POST", + "/api/v2/files", + files={"file": file_tuple}, + headers={"Content-Type": None}, ) - self.logger.debug( + logger.debug( "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Upload failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -35,13 +39,13 @@ class LangflowFileService: async def delete_user_file(self, file_id: str) -> None: """Delete a file by id using v2: DELETE /api/v2/files/{id}.""" # NOTE: use v2 root, not /api/v1 - self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) + logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}") - self.logger.debug( + logger.debug( "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Delete failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -52,9 +56,9 @@ class LangflowFileService: async def run_ingestion_flow( self, file_paths: List[str], + jwt_token: str, session_id: Optional[str] = None, tweaks: Optional[Dict[str, Any]] = None, - jwt_token: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger the ingestion flow with provided file paths. @@ -68,19 +72,26 @@ class LangflowFileService: "input_type": "chat", "output_type": "text", # Changed from "json" to "text" } + if not tweaks: + tweaks = {} # Pass files via tweaks to File component (File-PSU37 from the flow) if file_paths: - if not tweaks: - tweaks = {} tweaks["File-PSU37"] = {"path": file_paths} + # Pass JWT token via tweaks using the x-langflow-global-var- pattern + if jwt_token: + # Using the global variable pattern that Langflow expects for OpenSearch components + tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token} + logger.error("[LF] Adding JWT token to tweaks for OpenSearch components") + else: + logger.error("[LF] No JWT token provided") if tweaks: payload["tweaks"] = tweaks if session_id: payload["session_id"] = session_id - self.logger.debug( + logger.debug( "[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", self.flow_id_ingest, len(file_paths) if file_paths else 0, @@ -90,16 +101,14 @@ class LangflowFileService: ) # Log the full payload for debugging - self.logger.debug("[LF] Request payload: %s", payload) + logger.debug("[LF] Request payload: %s", payload) resp = await clients.langflow_request( "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload ) - self.logger.debug( - "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase - ) + logger.debug("[LF] Run response: %s %s", resp.status_code, resp.reason_phrase) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Run failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, From 20a5648b3d7c8f11d242fd466a0f062fd9a12d05 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 09:14:03 -0300 Subject: [PATCH 5/6] Refactor ingestion flow configuration for OpenSearch hybrid component This commit updates the ingestion flow JSON configuration to enhance the OpenSearch hybrid component. Key changes include renaming the component to OpenSearchHybrid, updating input fields for better clarity, and improving the handling of JWT authentication. Additionally, the commit introduces new parameters for search configuration, such as engine selection and filter expressions, while ensuring the overall structure adheres to robust async coding practices and well-documented code. --- flows/ingestion_flow.json | 428 ++++++++++++++++++++++++++------------ 1 file changed, 300 insertions(+), 128 deletions(-) diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index eff0552d..5ee5d1b2 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -24,40 +24,12 @@ "type": "other" } }, - "id": "reactflow__edge-File-PSU37{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}-SplitText-QIKhg{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}", + "id": "reactflow__edge-File-PSU37{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-QIKhg{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", "selected": false, "source": "File-PSU37", - "sourceHandle": "{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}", + "sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}", "target": "SplitText-QIKhg", - "targetHandle": "{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}" - }, - { - "animated": false, - "className": "", - "data": { - "sourceHandle": { - "dataType": "OpenAIEmbeddings", - "id": "OpenAIEmbeddings-joRJ6", - "name": "embeddings", - "output_types": [ - "Embeddings" - ] - }, - "targetHandle": { - "fieldName": "embedding", - "id": "OpenSearch-Mkw1W", - "inputTypes": [ - "Embeddings" - ], - "type": "other" - } - }, - "id": "xy-edge__OpenAIEmbeddings-joRJ6{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}", - "selected": false, - "source": "OpenAIEmbeddings-joRJ6", - "sourceHandle": "{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}", - "target": "OpenSearch-Mkw1W", - "targetHandle": "{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}" + "targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" }, { "animated": false, @@ -73,7 +45,7 @@ }, "targetHandle": { "fieldName": "ingest_data", - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "inputTypes": [ "Data", "DataFrame" @@ -81,12 +53,40 @@ "type": "other" } }, - "id": "xy-edge__SplitText-QIKhg{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}", + "id": "xy-edge__SplitText-QIKhg{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-OpenSearchHybrid-Ve6bS{œfieldNameœ:œingest_dataœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}", "selected": false, "source": "SplitText-QIKhg", - "sourceHandle": "{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}", - "target": "OpenSearch-Mkw1W", - "targetHandle": "{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}" + "sourceHandle": "{œdataTypeœ:œSplitTextœ,œidœ:œSplitText-QIKhgœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}", + "target": "OpenSearchHybrid-Ve6bS", + "targetHandle": "{œfieldNameœ:œingest_dataœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œDataœ,œDataFrameœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "OpenAIEmbeddings", + "id": "OpenAIEmbeddings-joRJ6", + "name": "embeddings", + "output_types": [ + "Embeddings" + ] + }, + "targetHandle": { + "fieldName": "embedding", + "id": "OpenSearchHybrid-Ve6bS", + "inputTypes": [ + "Embeddings" + ], + "type": "other" + } + }, + "id": "xy-edge__OpenAIEmbeddings-joRJ6{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-joRJ6œ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}-OpenSearchHybrid-Ve6bS{œfieldNameœ:œembeddingœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}", + "selected": false, + "source": "OpenAIEmbeddings-joRJ6", + "sourceHandle": "{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-joRJ6œ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}", + "target": "OpenSearchHybrid-Ve6bS", + "targetHandle": "{œfieldNameœ:œembeddingœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}" } ], "nodes": [ @@ -115,7 +115,7 @@ "frozen": false, "icon": "scissors-line-dashed", "legacy": false, - "lf_version": "1.1.1", + "lf_version": "1.5.0.post2", "metadata": { "code_hash": "dbf2e9d2319d", "dependencies": { @@ -353,7 +353,7 @@ "frozen": false, "icon": "OpenAI", "legacy": false, - "lf_version": "1.1.1", + "lf_version": "1.5.0.post2", "metadata": { "code_hash": "2691dee277c9", "dependencies": { @@ -829,7 +829,7 @@ "data": { "id": "note-Bm5Xw", "node": { - "description": "### \ud83d\udca1 Add your OpenAI API key here \ud83d\udc47", + "description": "### 💡 Add your OpenAI API key here 👇", "display_name": "", "documentation": "", "template": { @@ -884,8 +884,9 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-09-03T06:37:14.082Z", + "last_updated": "2025-09-08T11:35:39.784Z", "legacy": false, + "lf_version": "1.5.0.post2", "metadata": {}, "minimized": false, "output_types": [], @@ -905,6 +906,23 @@ "Message" ], "value": "__UNDEFINED__" + }, + { + "allows_loop": false, + "cache": true, + "display_name": "File Path", + "group_outputs": false, + "hidden": null, + "method": "load_files_path", + "name": "path", + "options": null, + "required_inputs": null, + "selected": "Message", + "tool_mode": true, + "types": [ + "Message" + ], + "value": "__UNDEFINED__" } ], "pinned": false, @@ -1051,7 +1069,9 @@ "bz2", "gz" ], - "file_path": [], + "file_path": [ + "242b5797-4104-4a01-8da1-b8036813100d/test_ingestion.txt" + ], "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": true, "list_add_label": "Add More", @@ -1124,13 +1144,14 @@ }, "tool_mode": false }, + "selected_output": "message", "showNode": true, "type": "File" }, "dragging": false, "id": "File-PSU37", "measured": { - "height": 230, + "height": 234, "width": 320 }, "position": { @@ -1142,9 +1163,7 @@ }, { "data": { - "description": "OpenSearch Vector Store with advanced, customizable search capabilities.", - "display_name": "OpenSearch", - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "node": { "base_classes": [ "Data", @@ -1154,45 +1173,53 @@ "beta": false, "conditional_paths": [], "custom_fields": {}, - "description": "OpenSearch Vector Store with advanced, customizable search capabilities.", - "display_name": "OpenSearch", + "description": "Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.", + "display_name": "OpenSearch (Hybrid)", "documentation": "", - "edited": false, + "edited": true, "field_order": [ "opensearch_url", "index_name", + "engine", + "space_type", + "ef_construction", + "m", "ingest_data", "search_query", "should_cache_vector_store", "embedding", - "search_type", + "vector_field", "number_of_results", - "search_score_threshold", + "filter_expression", + "auth_mode", "username", "password", + "jwt_token", + "jwt_header", + "bearer_prefix", "use_ssl", - "verify_certs", - "hybrid_search_query" + "verify_certs" ], "frozen": false, "icon": "OpenSearch", + "last_updated": "2025-09-05T21:19:52.776Z", "legacy": false, "metadata": { - "code_hash": "972b714acf6b", + "code_hash": "37e8631c902b", "dependencies": { "dependencies": [ - { - "name": "langchain_community", - "version": "0.3.21" - }, { "name": "langflow", "version": "1.5.0.post2" + }, + { + "name": "opensearchpy", + "version": "2.8.0" } ], "total_dependencies": 2 }, - "module": "custom_components.opensearch" + "module": "custom_components.opensearch_hybrid" }, "minimized": false, "output_types": [], @@ -1202,6 +1229,7 @@ "cache": true, "display_name": "Search Results", "group_outputs": false, + "hidden": null, "method": "search_documents", "name": "search_results", "options": null, @@ -1218,11 +1246,11 @@ "cache": true, "display_name": "DataFrame", "group_outputs": false, + "hidden": null, "method": "as_dataframe", "name": "dataframe", "options": null, "required_inputs": null, - "selected": "DataFrame", "tool_mode": true, "types": [ "DataFrame" @@ -1239,7 +1267,6 @@ "name": "vectorstoreconnection", "options": null, "required_inputs": null, - "selected": "VectorStore", "tool_mode": true, "types": [ "VectorStore" @@ -1250,6 +1277,50 @@ "pinned": false, "template": { "_type": "Component", + "auth_mode": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Auth Mode", + "dynamic": false, + "info": "Choose Basic (username/password) or JWT (Bearer token).", + "load_from_db": false, + "name": "auth_mode", + "options": [ + "basic", + "jwt" + ], + "options_metadata": [], + "placeholder": "", + "real_time_refresh": true, + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "jwt" + }, + "bearer_prefix": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Prefix 'Bearer '", + "dynamic": false, + "info": "", + "list": false, + "list_add_label": "Add More", + "name": "bearer_prefix", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, "code": { "advanced": true, "dynamic": true, @@ -1266,7 +1337,25 @@ "show": true, "title_case": false, "type": "code", - "value": "import json\nfrom typing import Any\n\nfrom langchain_community.vectorstores import OpenSearchVectorSearch\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.base.vectorstores.vector_store_connection_decorator import vector_store_connection\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n FloatInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom fastapi.encoders import jsonable_encoder\nfrom langchain_core.documents import Document\nimport json\n\n@vector_store_connection\nclass OpenSearchVectorStoreComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch Vector Store with advanced, customizable search capabilities.\"\"\"\n\n display_name: str = \"OpenSearch\"\n description: str = \"OpenSearch Vector Store with advanced, customizable search capabilities.\"\n name = \"OpenSearch\"\n icon = \"OpenSearch\"\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"documents\",\n info=\"The index name where the vectors will be stored in OpenSearch cluster.\",\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n info=\"Document field embeddings are stored in.\",\n advanced=True,\n ),\n StrInput(\n name=\"text_field\",\n display_name=\"Text Field\", \n value=\"text\",\n info=\"Document field the text of the document is stored in.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=100,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs,\n HandleInput(name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n options=[\"similarity\", \"similarity_score_threshold\", \"mmr\"],\n value=\"similarity\",\n advanced=True,\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Results\",\n info=\"Number of results to return.\",\n advanced=True,\n value=4,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results.\",\n value=0.0,\n advanced=True,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n advanced=True,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"OPENSEARCH_PASSWORD\",\n advanced=True,\n ),\n BoolInput(\n name=\"use_ssl\",\n display_name=\"Use SSL\",\n value=True,\n advanced=True,\n ),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n MultilineInput(\n name=\"hybrid_search_query\",\n display_name=\"Hybrid Search Query\",\n value=\"\",\n advanced=True,\n info=(\n \"Provide a custom hybrid search query in JSON format. This allows you to combine \"\n \"vector similarity and keyword matching.\"\n ),\n ),\n ]\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearchVectorSearch:\n \"\"\"Builds the OpenSearch Vector Store object.\"\"\"\n try:\n from langchain_community.vectorstores import OpenSearchVectorSearch\n except ImportError as e:\n error_message = f\"Failed to import required modules: {e}\"\n self.log(error_message)\n raise ImportError(error_message) from e\n\n try:\n opensearch = OpenSearchVectorSearch(\n index_name=self.index_name,\n embedding_function=self.embedding,\n opensearch_url=self.opensearch_url,\n http_auth=(self.username, self.password),\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n engine=self.engine,\n vector_field=self.vector_field,\n text_field=self.text_field,\n space_type=self.space_type,\n ef_construction=self.ef_construction,\n m=self.m,\n )\n except Exception as e:\n error_message = f\"Failed to create OpenSearchVectorSearch instance: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n if self.ingest_data:\n self._add_documents_to_vector_store(opensearch)\n\n return opensearch\n\n def _add_documents_to_vector_store(self, vector_store: \"OpenSearchVectorSearch\") -> None:\n \"\"\"Adds documents to the Vector Store.\"\"\"\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n doc = Document(\n page_content=_input.get_text(), \n metadata=jsonable_encoder(_input.data) if _input.data else {}\n )\n documents.append(doc)\n else:\n error_message = f\"Expected Data object, got {type(_input)}\"\n self.log(error_message)\n raise TypeError(error_message)\n\n if documents and self.embedding is not None:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(\n documents, \n vector_field=self.vector_field,\n text_field=self.text_field\n )\n except Exception as e:\n error_message = f\"Error adding documents to Vector Store: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n \"\"\"Search for similar documents in the vector store or retrieve all documents if no query is provided.\"\"\"\n \n vector_store = self.build_vector_store()\n try:\n query = query or \"\"\n\n if self.hybrid_search_query.strip():\n try:\n hybrid_query = json.loads(self.hybrid_search_query)\n except json.JSONDecodeError as e:\n error_message = f\"Invalid hybrid search query JSON: {e}\"\n self.log(error_message)\n raise ValueError(error_message) from e\n\n results = vector_store.client.search(index=self.index_name, body=hybrid_query)\n\n processed_results = []\n for hit in results.get(\"hits\", {}).get(\"hits\", []):\n source = hit.get(\"_source\", {})\n text = source.get(self.text_field, \"\")\n metadata = source.get(\"metadata\", {})\n\n if isinstance(text, dict):\n text = text.get(\"text\", \"\")\n\n processed_results.append(\n {\n \"page_content\": text,\n \"metadata\": metadata,\n }\n )\n return processed_results\n\n search_kwargs = {\n \"k\": self.number_of_results,\n \"vector_field\": self.vector_field,\n \"text_field\": self.text_field\n }\n search_type = self.search_type.lower()\n\n if search_type == \"similarity\":\n results = vector_store.similarity_search(query, **search_kwargs)\n return [{\"page_content\": doc.page_content, \"metadata\": doc.metadata} for doc in results]\n if search_type == \"similarity_score_threshold\":\n search_kwargs[\"score_threshold\"] = self.search_score_threshold\n results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)\n return [\n {\n \"page_content\": doc.page_content,\n \"metadata\": doc.metadata,\n \"score\": score,\n }\n for doc, score in results\n ]\n if search_type == \"mmr\":\n results = vector_store.max_marginal_relevance_search(query, **search_kwargs)\n return [{\"page_content\": doc.page_content, \"metadata\": doc.metadata} for doc in results]\n\n except Exception as e:\n error_message = f\"Error during search: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n error_message = f\"Error during search. Invalid search type: {self.search_type}\"\n self.log(error_message)\n raise ValueError(error_message)\n\n def search_documents(self) -> list[Data]:\n \"\"\"Search for documents in the vector store based on the search input.\n\n If no search input is provided, retrieve all documents.\n \"\"\"\n try:\n query = self.search_query.strip() if self.search_query else None\n results = self.search(query)\n retrieved_data = [\n Data(\n file_path=result[\"metadata\"].get(\"file_path\", \"\"),\n text=result[\"page_content\"],\n )\n for result in results\n ]\n except Exception as e:\n error_message = f\"Error during document search: {e}\"\n self.log(error_message)\n raise RuntimeError(error_message) from e\n\n self.status = retrieved_data\n return retrieved_data\n" + "value": "from __future__ import annotations\n\nimport json\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\nimport uuid\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"nmslib\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 512,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"hnsw\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n try:\n from opensearchpy.exceptions import NotFoundError\n except ImportError:\n raise ImportError(\"Could not import OpenSearch exceptions\")\n\n requests = []\n return_ids = []\n\n try:\n client.indices.get(index=index_name)\n except NotFoundError:\n client.indices.create(index=index_name, body=mapping)\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n if not is_aoss:\n client.indices.refresh(index=index_name)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n return self.build_client()\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n lc_doc = doc_obj.to_lc_document()\n texts.append(lc_doc.page_content)\n metadatas.append(lc_doc.metadata)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"nmslib\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + }, + "ef_construction": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "EF Construction", + "dynamic": false, + "info": "Size of the dynamic list used during k-NN graph creation.", + "list": false, + "list_add_label": "Add More", + "name": "ef_construction", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "int", + "value": 512 }, "embedding": { "_input_type": "HandleInput", @@ -1288,13 +1377,38 @@ "type": "other", "value": "" }, - "hybrid_search_query": { - "_input_type": "MultilineInput", + "engine": { + "_input_type": "DropdownInput", "advanced": true, - "copy_field": false, - "display_name": "Hybrid Search Query", + "combobox": false, + "dialog_inputs": {}, + "display_name": "Engine", "dynamic": false, - "info": "Provide a custom hybrid search query in JSON format. This allows you to combine vector similarity and keyword matching.", + "info": "Vector search engine to use.", + "name": "engine", + "options": [ + "nmslib", + "faiss", + "lucene" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "nmslib" + }, + "filter_expression": { + "_input_type": "MultilineInput", + "advanced": false, + "copy_field": false, + "display_name": "Filter Expression (JSON)", + "dynamic": false, + "info": "Optional JSON to control filters/limit/score threshold.\nAccepted shapes:\n1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\n2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\nPlaceholders with __IMPOSSIBLE_VALUE__ are ignored.", "input_types": [ "Message" ], @@ -1302,7 +1416,7 @@ "list_add_label": "Add More", "load_from_db": false, "multiline": true, - "name": "hybrid_search_query", + "name": "filter_expression", "placeholder": "", "required": false, "show": true, @@ -1318,7 +1432,7 @@ "advanced": false, "display_name": "Index Name", "dynamic": false, - "info": "The index name where the vectors will be stored in OpenSearch cluster.", + "info": "The index to search.", "list": false, "list_add_label": "Add More", "load_from_db": false, @@ -1353,14 +1467,69 @@ "type": "other", "value": "" }, + "jwt_header": { + "_input_type": "StrInput", + "advanced": true, + "display_name": "JWT Header Name", + "dynamic": false, + "info": "", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "jwt_header", + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "Authorization" + }, + "jwt_token": { + "_input_type": "SecretStrInput", + "advanced": false, + "display_name": "JWT Token", + "dynamic": false, + "info": "Paste a valid JWT (sent as a header).", + "input_types": [], + "load_from_db": false, + "name": "jwt_token", + "password": true, + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "type": "str", + "value": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vb3BlbnJhZy1iYWNrZW5kOjgwMDAiLCJzdWIiOiIxMDMwNzA3MzY1NDU0NjQyNDYxMTMiLCJhdWQiOlsib3BlbnNlYXJjaCIsIm9wZW5yYWciXSwiZXhwIjoxNzU3NzExNzEyLCJpYXQiOjE3NTcxMDY5MTIsImF1dGhfdGltZSI6MTc1NzExNzcxMiwidXNlcl9pZCI6IjEwMzA3MDczNjU0NTQ2NDI0NjExMyIsImVtYWlsIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJuYW1lIjoiR2FicmllbCBBbG1laWRhIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwicm9sZXMiOlsib3BlbnJhZ191c2VyIl19.JneUFesg-FuNKVdd0Nbc8dtItxtrctwldJTnrj8I2U_mGcZgX0ObnqrrrF8lvn25Su3rdyZIJ84bX16WMUMhUivzRl1od7X5_PUOr21F_MHtIVMBnmQW_DO5MjN6Op4-v54FAc9HZn6v5gS_RdUr4E0Vscv5CJIfbirFTA0B3Yip9hxg1UXocgXnc0NwiwTJnu9XBhEgPOXJLIu1PJjvVWBclO7ZgzMmgSUoZPzDH6GQphPqtWxeav-bGk38HyI2GR0QaRYjGMgKMB-xwGQWh5kvCuwEQ5ylF80yXN7lVIc7DGY69vhy24II6W8FaWZvMVqJnwcByfHJWbWQ8g8UDA" + }, + "m": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "M Parameter", + "dynamic": false, + "info": "Number of bidirectional links created for each new element.", + "list": false, + "list_add_label": "Add More", + "name": "m", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "int", + "value": 16 + }, "number_of_results": { "_input_type": "IntInput", "advanced": true, - "display_name": "Number of Results", + "display_name": "Default Size (limit)", "dynamic": false, - "info": "Number of results to return.", + "info": "Default number of hits when no limit provided in filter_expression.", "list": false, "list_add_label": "Add More", + "load_from_db": false, "name": "number_of_results", "placeholder": "", "required": false, @@ -1376,7 +1545,7 @@ "advanced": false, "display_name": "OpenSearch URL", "dynamic": false, - "info": "URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).", + "info": "URL for your OpenSearch cluster.", "list": false, "list_add_label": "Add More", "load_from_db": false, @@ -1388,24 +1557,24 @@ "tool_mode": false, "trace_as_metadata": true, "type": "str", - "value": "http://opensearch:9200" + "value": "https://opensearch:9200" }, "password": { "_input_type": "SecretStrInput", - "advanced": true, + "advanced": false, "display_name": "Password", "dynamic": false, "info": "", "input_types": [], - "load_from_db": true, + "load_from_db": false, "name": "password", "password": true, "placeholder": "", "required": false, - "show": true, + "show": false, "title_case": false, "type": "str", - "value": "OPENSEARCH_PASSWORD" + "value": "" }, "search_query": { "_input_type": "QueryInput", @@ -1430,49 +1599,6 @@ "type": "query", "value": "" }, - "search_score_threshold": { - "_input_type": "FloatInput", - "advanced": true, - "display_name": "Search Score Threshold", - "dynamic": false, - "info": "Minimum similarity score threshold for search results.", - "list": false, - "list_add_label": "Add More", - "name": "search_score_threshold", - "placeholder": "", - "required": false, - "show": true, - "title_case": false, - "tool_mode": false, - "trace_as_metadata": true, - "type": "float", - "value": 0 - }, - "search_type": { - "_input_type": "DropdownInput", - "advanced": true, - "combobox": false, - "dialog_inputs": {}, - "display_name": "Search Type", - "dynamic": false, - "info": "", - "name": "search_type", - "options": [ - "similarity", - "similarity_score_threshold", - "mmr" - ], - "options_metadata": [], - "placeholder": "", - "required": false, - "show": true, - "title_case": false, - "toggle": false, - "tool_mode": false, - "trace_as_metadata": true, - "type": "str", - "value": "similarity" - }, "should_cache_vector_store": { "_input_type": "BoolInput", "advanced": true, @@ -1491,6 +1617,33 @@ "type": "bool", "value": true }, + "space_type": { + "_input_type": "DropdownInput", + "advanced": true, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Space Type", + "dynamic": false, + "info": "Distance metric for vector similarity.", + "name": "space_type", + "options": [ + "l2", + "l1", + "cosinesimil", + "linf", + "innerproduct" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "toggle": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "l2" + }, "use_ssl": { "_input_type": "BoolInput", "advanced": true, @@ -1511,7 +1664,7 @@ }, "username": { "_input_type": "StrInput", - "advanced": true, + "advanced": false, "display_name": "Username", "dynamic": false, "info": "", @@ -1521,13 +1674,32 @@ "name": "username", "placeholder": "", "required": false, - "show": true, + "show": false, "title_case": false, "tool_mode": false, "trace_as_metadata": true, "type": "str", "value": "admin" }, + "vector_field": { + "_input_type": "StrInput", + "advanced": true, + "display_name": "Vector Field", + "dynamic": false, + "info": "Vector field used for KNN.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "vector_field", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "chunk_embedding" + }, "verify_certs": { "_input_type": "BoolInput", "advanced": true, @@ -1551,24 +1723,24 @@ }, "selected_output": "search_results", "showNode": true, - "type": "OpenSearch" + "type": "OpenSearchHybrid" }, "dragging": false, - "id": "OpenSearch-Mkw1W", + "id": "OpenSearchHybrid-Ve6bS", "measured": { - "height": 518, + "height": 765, "width": 320 }, "position": { - "x": 2136.4456339674302, - "y": 1460.3160066924486 + "x": 2218.9287723423276, + "y": 1332.2598463956504 }, - "selected": false, + "selected": true, "type": "genericNode" } ], "viewport": { - "x": -1173.5436043881646, + "x": -1214.8709460066525, "y": -1289.0306227762003, "zoom": 1.0020797567291742 } From 9f7c506cce56fbb1ecd898c49fd7756b2f363fda Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 8 Sep 2025 09:58:28 -0400 Subject: [PATCH 6/6] log error when INGEST_FLOW_ID is not configured --- src/services/langflow_file_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 6b343670..694e71e0 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -65,6 +65,7 @@ class LangflowFileService: The flow must expose a File component path in input schema or accept files parameter. """ if not self.flow_id_ingest: + logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") payload: Dict[str, Any] = {