From a0151dfaff1a1781a77ea46b1e9ab7fe5bbd0925 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 20:10:34 -0300 Subject: [PATCH] Refactor ingestion flow JSON to enhance edge connections and update component settings This commit refines the ingestion flow JSON by reintroducing an edge connection between the File and SplitText nodes, ensuring proper data flow. It also updates the last updated timestamp and modifies the selection state of a node, contributing to improved functionality and user experience. These changes align with best practices for async development and enhance the overall robustness of the ingestion flow. --- flows/ingestion_flow.json | 79 +++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index 7bc4949d..b671c16b 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -1,36 +1,6 @@ { "data": { "edges": [ - { - "animated": false, - "className": "", - "data": { - "sourceHandle": { - "dataType": "File", - "id": "File-PSU37", - "name": "message", - "output_types": [ - "Message" - ] - }, - "targetHandle": { - "fieldName": "data_inputs", - "id": "SplitText-QIKhg", - "inputTypes": [ - "Data", - "DataFrame", - "Message" - ], - "type": "other" - } - }, - "id": "reactflow__edge-File-PSU37{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-QIKhg{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", - "selected": false, - "source": "File-PSU37", - "sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}", - "target": "SplitText-QIKhg", - "targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" - }, { "animated": false, "className": "", @@ -87,6 +57,36 @@ "sourceHandle": "{œdataTypeœ:œOpenAIEmbeddingsœ,œidœ:œOpenAIEmbeddings-joRJ6œ,œnameœ:œembeddingsœ,œoutput_typesœ:[œEmbeddingsœ]}", "target": "OpenSearchHybrid-Ve6bS", "targetHandle": "{œfieldNameœ:œembeddingœ,œidœ:œOpenSearchHybrid-Ve6bSœ,œinputTypesœ:[œEmbeddingsœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "File", + "id": "File-PSU37", + "name": "message", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "fieldName": "data_inputs", + "id": "SplitText-QIKhg", + "inputTypes": [ + "Data", + "DataFrame", + "Message" + ], + "type": "other" + } + }, + "id": "xy-edge__File-PSU37{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-SplitText-QIKhg{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "selected": false, + "source": "File-PSU37", + "sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-PSU37œ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}", + "target": "SplitText-QIKhg", + "targetHandle": "{œfieldNameœ:œdata_inputsœ,œidœ:œSplitText-QIKhgœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" } ], "nodes": [ @@ -884,7 +884,7 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-09-08T22:45:35.898Z", + "last_updated": "2025-09-08T23:05:09.886Z", "legacy": false, "lf_version": "1.5.0.post2", "metadata": {}, @@ -917,7 +917,6 @@ "name": "path", "options": null, "required_inputs": null, - "selected": "Message", "tool_mode": true, "types": [ "Message" @@ -1069,9 +1068,7 @@ "bz2", "gz" ], - "file_path": [ - "435b1280-b2e0-44eb-b917-cf6292dfc41a/Actors.txt" - ], + "file_path": [], "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": true, "list_add_label": "Add More", @@ -1205,7 +1202,7 @@ "legacy": false, "lf_version": "1.5.0.post2", "metadata": { - "code_hash": "d31747dc61c3", + "code_hash": "307f5461379f", "dependencies": { "dependencies": [ { @@ -1339,7 +1336,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport json\nimport uuid\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"jvector\", \"nmslib\", \"faiss\", \"lucene\"],\n value=\"jvector\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"jvector\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 512,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"hnsw\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n try:\n from opensearchpy.exceptions import NotFoundError\n except ImportError:\n raise ImportError(\"Could not import OpenSearch exceptions\")\n\n requests = []\n return_ids = []\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n # if not is_aoss:\n # client.indices.refresh(index=index_name)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n client = self.build_client()\n self._add_documents_to_vector_store(client=client)\n return client\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n data_copy = json.loads(doc_obj.model_dump_json())\n text = data_copy.pop(doc_obj.text_key, doc_obj.default_value)\n texts.append(text)\n metadatas.append(data_copy)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"jvector\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + "value": "from __future__ import annotations\n\nimport json\nimport uuid\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"jvector\", \"nmslib\", \"faiss\", \"lucene\"],\n value=\"jvector\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"jvector\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 100,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"disk_ann\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n\n requests = []\n return_ids = []\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n client = self.build_client()\n self._add_documents_to_vector_store(client=client)\n return client\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n data_copy = json.loads(doc_obj.model_dump_json())\n text = data_copy.pop(doc_obj.text_key, doc_obj.default_value)\n texts.append(text)\n metadatas.append(data_copy)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"jvector\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" }, "ef_construction": { "_input_type": "IntInput", @@ -1505,7 +1502,7 @@ "show": true, "title_case": false, "type": "str", - "value": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vb3BlbnJhZy1iYWNrZW5kOjgwMDAiLCJzdWIiOiIxMDMwNzA3MzY1NDU0NjQyNDYxMTMiLCJhdWQiOlsib3BlbnNlYXJjaCIsIm9wZW5yYWciXSwiZXhwIjoxNzU3OTc0MDk1LCJpYXQiOjE3NTczNjkyOTUsImF1dGhfdGltZSI6MTc1NzM4MDA5NSwidXNlcl9pZCI6IjEwMzA3MDczNjU0NTQ2NDI0NjExMyIsImVtYWlsIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJuYW1lIjoiR2FicmllbCBBbG1laWRhIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwicm9sZXMiOlsib3BlbnJhZ191c2VyIl19.S9MDGbI8jPuN1m1cziYk1FwL8rukmHV8QcR7DaohMWvlsC0cEKPKTb1mjwG7DJNE20jK75smp02G1guf54Xykqa3HJmvHbsF4XXT-xAHzbAW20xEsBCj318JwHUfJegFoyhPRk2c7PDGvYMa88YVvJEaYf65UCw6YRsPyTguDXSPblHI2bV49tyTc3xQYXMYQBz8OEa4fXCwlNt0WBwDDAFVW6DwMyAMalTlIqZiLGgyeADwGAhh7vgi0n3F0k16ynDvrhQIzNQnG-u6BMAKifE5LR4HvyG3UYPIh6a_d0DENM7MqfsWyDdY8V7upMz2vk_F3MVUMXfEOysDiTa4wQ" + "value": "" }, "m": { "_input_type": "IntInput", @@ -1739,13 +1736,13 @@ "x": 2218.9287723423276, "y": 1332.2598463956504 }, - "selected": true, + "selected": false, "type": "genericNode" } ], "viewport": { - "x": -679.2209660078397, - "y": -919.7176624994117, + "x": -793.2209660078397, + "y": -894.7176624994117, "zoom": 0.7898282762479812 } },