diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index 5d872b42..6e8e09ed 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -884,7 +884,7 @@ ], "frozen": false, "icon": "file-text", - "last_updated": "2025-09-08T17:45:33.714Z", + "last_updated": "2025-09-08T22:16:40.365Z", "legacy": false, "lf_version": "1.5.0.post2", "metadata": {}, @@ -1132,7 +1132,7 @@ "dragging": false, "id": "File-PSU37", "measured": { - "height": 230, + "height": 229, "width": 320 }, "position": { @@ -1183,10 +1183,9 @@ ], "frozen": false, "icon": "OpenSearch", - "last_updated": "2025-09-05T21:19:52.776Z", "legacy": false, "metadata": { - "code_hash": "37e8631c902b", + "code_hash": "b14dce621594", "dependencies": { "dependencies": [ { @@ -1232,6 +1231,7 @@ "name": "dataframe", "options": null, "required_inputs": null, + "selected": "DataFrame", "tool_mode": true, "types": [ "DataFrame" @@ -1248,6 +1248,7 @@ "name": "vectorstoreconnection", "options": null, "required_inputs": null, + "selected": "VectorStore", "tool_mode": true, "types": [ "VectorStore" @@ -1318,7 +1319,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport json\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\nimport uuid\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"nmslib\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 512,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"hnsw\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n try:\n from opensearchpy.exceptions import NotFoundError\n except ImportError:\n raise ImportError(\"Could not import OpenSearch exceptions\")\n\n requests = []\n return_ids = []\n\n try:\n client.indices.get(index=index_name)\n except NotFoundError:\n client.indices.create(index=index_name, body=mapping)\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n if not is_aoss:\n client.indices.refresh(index=index_name)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n return self.build_client()\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n lc_doc = doc_obj.to_lc_document()\n texts.append(lc_doc.page_content)\n metadatas.append(lc_doc.metadata)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"nmslib\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + "value": "from __future__ import annotations\n\nimport json\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\nimport uuid\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"nmslib\", \"faiss\", \"lucene\"],\n value=\"nmslib\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"nmslib\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 512,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"hnsw\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n try:\n from opensearchpy.exceptions import NotFoundError\n except ImportError:\n raise ImportError(\"Could not import OpenSearch exceptions\")\n\n requests = []\n return_ids = []\n\n try:\n client.indices.get(index=index_name)\n except NotFoundError:\n client.indices.create(index=index_name, body=mapping)\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n \"metadata\": metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n if not is_aoss:\n client.indices.refresh(index=index_name)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our \"vector store.\"\n client = self.build_client()\n self._add_documents_to_vector_store(client=client)\n return client\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n for doc_obj in docs:\n lc_doc = doc_obj.to_lc_document()\n texts.append(lc_doc.page_content)\n metadatas.append(lc_doc.metadata)\n\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"nmslib\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n file_path=hit[\"metadata\"].get(\"file_path\", \"\"),\n text=hit[\"page_content\"],\n )\n for hit in raw\n ]\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" }, "ef_construction": { "_input_type": "IntInput", @@ -1482,7 +1483,7 @@ "show": true, "title_case": false, "type": "str", - "value": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vb3BlbnJhZy1iYWNrZW5kOjgwMDAiLCJzdWIiOiIxMDMwNzA3MzY1NDU0NjQyNDYxMTMiLCJhdWQiOlsib3BlbnNlYXJjaCIsIm9wZW5yYWciXSwiZXhwIjoxNzU3NzExNzEyLCJpYXQiOjE3NTcxMDY5MTIsImF1dGhfdGltZSI6MTc1NzExNzcxMiwidXNlcl9pZCI6IjEwMzA3MDczNjU0NTQ2NDI0NjExMyIsImVtYWlsIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJuYW1lIjoiR2FicmllbCBBbG1laWRhIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZ2FicmllbEBsYW5nZmxvdy5vcmciLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwicm9sZXMiOlsib3BlbnJhZ191c2VyIl19.JneUFesg-FuNKVdd0Nbc8dtItxtrctwldJTnrj8I2U_mGcZgX0ObnqrrrF8lvn25Su3rdyZIJ84bX16WMUMhUivzRl1od7X5_PUOr21F_MHtIVMBnmQW_DO5MjN6Op4-v54FAc9HZn6v5gS_RdUr4E0Vscv5CJIfbirFTA0B3Yip9hxg1UXocgXnc0NwiwTJnu9XBhEgPOXJLIu1PJjvVWBclO7ZgzMmgSUoZPzDH6GQphPqtWxeav-bGk38HyI2GR0QaRYjGMgKMB-xwGQWh5kvCuwEQ5ylF80yXN7lVIc7DGY69vhy24II6W8FaWZvMVqJnwcByfHJWbWQ8g8UDA" + "value": "" }, "m": { "_input_type": "IntInput", @@ -1702,14 +1703,14 @@ }, "tool_mode": false }, - "selected_output": "search_results", + "selected_output": "dataframe", "showNode": true, "type": "OpenSearchHybrid" }, "dragging": false, "id": "OpenSearchHybrid-Ve6bS", "measured": { - "height": 765, + "height": 761, "width": 320 }, "position": { @@ -1721,9 +1722,9 @@ } ], "viewport": { - "x": -1214.8709460066525, - "y": -1289.0306227762003, - "zoom": 1.0020797567291742 + "x": -988.2209660078397, + "y": -990.7176624994117, + "zoom": 0.7898282762479812 } }, "description": "Load your data for chat context with Retrieval Augmented Generation.",