From a6beb4a4aa6068a96052cdba92d755fcd8120320 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 23:49:20 -0300 Subject: [PATCH] Update ingestion flow JSON to enhance component configurations and add metadata input This commit modifies the ingestion flow JSON by adjusting the position of nodes for better layout, adding a new input field for ingestion metadata, and updating the code hash. These changes improve the clarity and functionality of the component configurations, aligning with best practices for robust async development and enhancing the overall user experience. --- flows/ingestion_flow.json | 65 ++++++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index 8a5b9256..0582548b 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -833,8 +833,8 @@ "width": 320 }, "position": { - "x": 1750.4286955907494, - "y": 1934.8525614216642 + "x": 1704.8491676318172, + "y": 1879.144249471858 }, "positionAbsolute": { "x": 1690.9220896443658, @@ -1395,6 +1395,7 @@ "documentation": "", "edited": true, "field_order": [ + "docs_metadata", "opensearch_url", "index_name", "engine", @@ -1420,9 +1421,8 @@ "frozen": false, "icon": "OpenSearch", "legacy": false, - "lf_version": "1.5.0.post2", "metadata": { - "code_hash": "7f0062789e1c", + "code_hash": "f979f0872c35", "dependencies": { "dependencies": [ { @@ -1556,7 +1556,58 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport json\nimport uuid\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.logging import logger\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"jvector\", \"nmslib\", \"faiss\", \"lucene\"],\n value=\"jvector\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"jvector\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 100,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"disk_ann\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n\n requests = []\n return_ids = []\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n **metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n self.log(metadatas[i])\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n self.log(self.ingest_data)\n client = self.build_client()\n self._add_documents_to_vector_store(client=client)\n return client\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n # Track document sizes by filename\n document_sizes = {}\n \n for doc_obj in docs:\n data_copy = json.loads(doc_obj.model_dump_json())\n text = data_copy.pop(doc_obj.text_key, doc_obj.default_value)\n texts.append(text)\n\n # Calculate and track document size\n filename = data_copy.get(\"filename\", \"unknown\")\n text_size = len(text.encode(\"utf-8\")) if text else 0\n if filename in document_sizes:\n document_sizes[filename] += text_size\n else:\n document_sizes[filename] = text_size\n\n # Add calculated size to metadata (per chunk, but represents total document size)\n metadatas.append(data_copy)\n self.log(metadatas)\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"jvector\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n self.log(metadatas)\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n logger.info(self.ingest_data)\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n text=hit[\"page_content\"],\n **hit[\"metadata\"]\n )\n for hit in raw\n ]\n self.log(self.ingest_data)\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + "value": "from __future__ import annotations\n\nimport json\nimport uuid\nfrom typing import Any, Dict, List, Optional\n\nfrom langflow.base.vectorstores.model import (\n LCVectorStoreComponent,\n check_cached_vector_store,\n)\nfrom langflow.base.vectorstores.vector_store_connection_decorator import (\n vector_store_connection,\n)\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n HandleInput,\n IntInput,\n MultilineInput,\n SecretStrInput,\n StrInput,\n TableInput\n)\nfrom langflow.logging import logger\nfrom langflow.schema.data import Data\nfrom opensearchpy import OpenSearch, helpers\n\n\n@vector_store_connection\nclass OpenSearchHybridComponent(LCVectorStoreComponent):\n \"\"\"OpenSearch hybrid search: KNN (k=10, boost=0.7) + multi_match (boost=0.3) with optional filters & min_score.\"\"\"\n\n display_name: str = \"OpenSearch (Hybrid)\"\n name: str = \"OpenSearchHybrid\"\n icon: str = \"OpenSearch\"\n description: str = \"Hybrid search: KNN + keyword, with optional filters, min_score, and aggregations.\"\n\n # Keys we consider baseline\n default_keys: list[str] = [\n \"opensearch_url\",\n \"index_name\",\n *[\n i.name for i in LCVectorStoreComponent.inputs\n ], # search_query, add_documents, etc.\n \"embedding\",\n \"vector_field\",\n \"number_of_results\",\n \"auth_mode\",\n \"username\",\n \"password\",\n \"jwt_token\",\n \"jwt_header\",\n \"bearer_prefix\",\n \"use_ssl\",\n \"verify_certs\",\n \"filter_expression\",\n \"engine\",\n \"space_type\",\n \"ef_construction\",\n \"m\",\n ]\n\n inputs = [\n TableInput(\n name=\"docs_metadata\",\n display_name=\"Ingestion Metadata\",\n info=\"Key value pairs to be inserted into each ingested document.\",\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"str\",\n \"description\": \"Key name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"type\": \"str\",\n \"description\": \"Value of the metadata\",\n },\n ],\n value=[],\n advanced=True,\n \n ),\n StrInput(\n name=\"opensearch_url\",\n display_name=\"OpenSearch URL\",\n value=\"http://localhost:9200\",\n info=\"URL for your OpenSearch cluster.\",\n ),\n StrInput(\n name=\"index_name\",\n display_name=\"Index Name\",\n value=\"langflow\",\n info=\"The index to search.\",\n ),\n DropdownInput(\n name=\"engine\",\n display_name=\"Engine\",\n options=[\"jvector\", \"nmslib\", \"faiss\", \"lucene\"],\n value=\"jvector\",\n info=\"Vector search engine to use.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"space_type\",\n display_name=\"Space Type\",\n options=[\"l2\", \"l1\", \"cosinesimil\", \"linf\", \"innerproduct\"],\n value=\"l2\",\n info=\"Distance metric for vector similarity.\",\n advanced=True,\n ),\n IntInput(\n name=\"ef_construction\",\n display_name=\"EF Construction\",\n value=512,\n info=\"Size of the dynamic list used during k-NN graph creation.\",\n advanced=True,\n ),\n IntInput(\n name=\"m\",\n display_name=\"M Parameter\",\n value=16,\n info=\"Number of bidirectional links created for each new element.\",\n advanced=True,\n ),\n *LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.\n HandleInput(\n name=\"embedding\", display_name=\"Embedding\", input_types=[\"Embeddings\"]\n ),\n StrInput(\n name=\"vector_field\",\n display_name=\"Vector Field\",\n value=\"chunk_embedding\",\n advanced=True,\n info=\"Vector field used for KNN.\",\n ),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Default Size (limit)\",\n value=10,\n advanced=True,\n info=\"Default number of hits when no limit provided in filter_expression.\",\n ),\n MultilineInput(\n name=\"filter_expression\",\n display_name=\"Filter Expression (JSON)\",\n value=\"\",\n info=(\n \"Optional JSON to control filters/limit/score threshold.\\n\"\n \"Accepted shapes:\\n\"\n '1) {\"filter\": [ {\"term\": {\"filename\":\"foo\"}}, {\"terms\":{\"owner\":[\"u1\",\"u2\"]}} ], \"limit\": 10, \"score_threshold\": 1.6 }\\n'\n '2) Context-style maps: {\"data_sources\":[\"fileA\"], \"document_types\":[\"application/pdf\"], \"owners\":[\"123\"]}\\n'\n \"Placeholders with __IMPOSSIBLE_VALUE__ are ignored.\"\n ),\n ),\n # ----- Auth controls (dynamic) -----\n DropdownInput(\n name=\"auth_mode\",\n display_name=\"Auth Mode\",\n value=\"basic\",\n options=[\"basic\", \"jwt\"],\n info=\"Choose Basic (username/password) or JWT (Bearer token).\",\n real_time_refresh=True,\n advanced=False,\n ),\n StrInput(\n name=\"username\",\n display_name=\"Username\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"password\",\n display_name=\"Password\",\n value=\"admin\",\n show=False,\n ),\n SecretStrInput(\n name=\"jwt_token\",\n display_name=\"JWT Token\",\n value=\"JWT\",\n load_from_db=True,\n show=True,\n info=\"Paste a valid JWT (sent as a header).\",\n ),\n StrInput(\n name=\"jwt_header\",\n display_name=\"JWT Header Name\",\n value=\"Authorization\",\n show=False,\n advanced=True,\n ),\n BoolInput(\n name=\"bearer_prefix\",\n display_name=\"Prefix 'Bearer '\",\n value=True,\n show=False,\n advanced=True,\n ),\n # ----- TLS -----\n BoolInput(name=\"use_ssl\", display_name=\"Use SSL\", value=True, advanced=True),\n BoolInput(\n name=\"verify_certs\",\n display_name=\"Verify Certificates\",\n value=False,\n advanced=True,\n ),\n ]\n\n # ---------- helper functions for index management ----------\n def _default_text_mapping(\n self,\n dim: int,\n engine: str = \"jvector\",\n space_type: str = \"l2\",\n ef_search: int = 512,\n ef_construction: int = 100,\n m: int = 16,\n vector_field: str = \"vector_field\",\n ) -> Dict[str, Any]:\n \"\"\"For Approximate k-NN Search, this is the default mapping to create index.\"\"\"\n return {\n \"settings\": {\"index\": {\"knn\": True, \"knn.algo_param.ef_search\": ef_search}},\n \"mappings\": {\n \"properties\": {\n vector_field: {\n \"type\": \"knn_vector\",\n \"dimension\": dim,\n \"method\": {\n \"name\": \"disk_ann\",\n \"space_type\": space_type,\n \"engine\": engine,\n \"parameters\": {\"ef_construction\": ef_construction, \"m\": m},\n },\n }\n }\n },\n }\n\n def _validate_aoss_with_engines(self, is_aoss: bool, engine: str) -> None:\n \"\"\"Validate AOSS with the engine.\"\"\"\n if is_aoss and engine != \"nmslib\" and engine != \"faiss\":\n raise ValueError(\n \"Amazon OpenSearch Service Serverless only \"\n \"supports `nmslib` or `faiss` engines\"\n )\n\n def _is_aoss_enabled(self, http_auth: Any) -> bool:\n \"\"\"Check if the service is http_auth is set as `aoss`.\"\"\"\n if (\n http_auth is not None\n and hasattr(http_auth, \"service\")\n and http_auth.service == \"aoss\"\n ):\n return True\n return False\n\n def _bulk_ingest_embeddings(\n self,\n client: OpenSearch,\n index_name: str,\n embeddings: List[List[float]],\n texts: List[str],\n metadatas: Optional[List[dict]] = None,\n ids: Optional[List[str]] = None,\n vector_field: str = \"vector_field\",\n text_field: str = \"text\",\n mapping: Optional[Dict] = None,\n max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,\n is_aoss: bool = False,\n ) -> List[str]:\n \"\"\"Bulk Ingest Embeddings into given index.\"\"\"\n if not mapping:\n mapping = dict()\n\n requests = []\n return_ids = []\n\n for i, text in enumerate(texts):\n metadata = metadatas[i] if metadatas else {}\n _id = ids[i] if ids else str(uuid.uuid4())\n request = {\n \"_op_type\": \"index\",\n \"_index\": index_name,\n vector_field: embeddings[i],\n text_field: text,\n **metadata,\n }\n if is_aoss:\n request[\"id\"] = _id\n else:\n request[\"_id\"] = _id\n requests.append(request)\n return_ids.append(_id)\n self.log(metadatas[i])\n helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)\n return return_ids\n\n # ---------- auth / client ----------\n def _build_auth_kwargs(self) -> Dict[str, Any]:\n mode = (self.auth_mode or \"basic\").strip().lower()\n if mode == \"jwt\":\n token = (self.jwt_token or \"\").strip()\n if not token:\n raise ValueError(\"Auth Mode is 'jwt' but no jwt_token was provided.\")\n header_name = (self.jwt_header or \"Authorization\").strip()\n header_value = f\"Bearer {token}\" if self.bearer_prefix else token\n return {\"headers\": {header_name: header_value}}\n user = (self.username or \"\").strip()\n pwd = (self.password or \"\").strip()\n if not user or not pwd:\n raise ValueError(\"Auth Mode is 'basic' but username/password are missing.\")\n return {\"http_auth\": (user, pwd)}\n\n def build_client(self) -> OpenSearch:\n auth_kwargs = self._build_auth_kwargs()\n return OpenSearch(\n hosts=[self.opensearch_url],\n use_ssl=self.use_ssl,\n verify_certs=self.verify_certs,\n ssl_assert_hostname=False,\n ssl_show_warn=False,\n **auth_kwargs,\n )\n\n @check_cached_vector_store\n def build_vector_store(self) -> OpenSearch:\n # Return raw OpenSearch client as our “vector store.”\n self.log(self.ingest_data)\n client = self.build_client()\n self._add_documents_to_vector_store(client=client)\n return client\n\n # ---------- ingest ----------\n def _add_documents_to_vector_store(self, client: OpenSearch) -> None:\n # Convert DataFrame to Data if needed using parent's method\n self.ingest_data = self._prepare_ingest_data()\n\n docs = self.ingest_data or []\n if not docs:\n self.log(\"No documents to ingest.\")\n return\n\n # Extract texts and metadata from documents\n texts = []\n metadatas = []\n # Track document sizes by filename\n document_sizes = {}\n \n for doc_obj in docs:\n data_copy = json.loads(doc_obj.model_dump_json())\n text = data_copy.pop(doc_obj.text_key, doc_obj.default_value)\n texts.append(text)\n\n # Calculate and track document size\n filename = data_copy.get(\"filename\", \"unknown\")\n text_size = len(text.encode(\"utf-8\")) if text else 0\n if filename in document_sizes:\n document_sizes[filename] += text_size\n else:\n document_sizes[filename] = text_size\n\n # Add calculated size to metadata (per chunk, but represents total document size)\n metadatas.append(data_copy)\n self.log(metadatas)\n if not self.embedding:\n raise ValueError(\"Embedding handle is required to embed documents.\")\n\n # Generate embeddings\n vectors = self.embedding.embed_documents(texts)\n\n if not vectors:\n self.log(\"No vectors generated from documents.\")\n return\n\n # Get vector dimension for mapping\n dim = len(vectors[0]) if vectors else 768 # default fallback\n\n # Check for AOSS\n auth_kwargs = self._build_auth_kwargs()\n is_aoss = self._is_aoss_enabled(auth_kwargs.get(\"http_auth\"))\n\n # Validate engine with AOSS\n engine = getattr(self, \"engine\", \"jvector\")\n self._validate_aoss_with_engines(is_aoss, engine)\n\n # Create mapping with proper KNN settings\n space_type = getattr(self, \"space_type\", \"l2\")\n ef_construction = getattr(self, \"ef_construction\", 512)\n m = getattr(self, \"m\", 16)\n\n mapping = self._default_text_mapping(\n dim=dim,\n engine=engine,\n space_type=space_type,\n ef_construction=ef_construction,\n m=m,\n vector_field=self.vector_field,\n )\n\n self.log(\n f\"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...\"\n )\n\n # Use the LangChain-style bulk ingestion\n return_ids = self._bulk_ingest_embeddings(\n client=client,\n index_name=self.index_name,\n embeddings=vectors,\n texts=texts,\n metadatas=metadatas,\n vector_field=self.vector_field,\n text_field=\"text\",\n mapping=mapping,\n is_aoss=is_aoss,\n )\n self.log(metadatas)\n\n self.log(f\"Successfully indexed {len(return_ids)} documents.\")\n\n # ---------- helpers for filters ----------\n def _is_placeholder_term(self, term_obj: dict) -> bool:\n # term_obj like {\"filename\": \"__IMPOSSIBLE_VALUE__\"}\n return any(v == \"__IMPOSSIBLE_VALUE__\" for v in term_obj.values())\n\n def _coerce_filter_clauses(self, filter_obj: dict | None) -> List[dict]:\n \"\"\"\n Accepts either:\n A) {\"filter\":[ ...term/terms objects... ], \"limit\":..., \"score_threshold\":...}\n B) Context-style: {\"data_sources\":[...], \"document_types\":[...], \"owners\":[...]}\n Returns a list of OS filter clauses (term/terms), skipping placeholders and empty terms.\n \"\"\"\n\n if not filter_obj:\n return []\n\n # If it’s a string, try to parse it once\n if isinstance(filter_obj, str):\n try:\n filter_obj = json.loads(filter_obj)\n except Exception:\n # Not valid JSON → treat as no filters\n return []\n\n # Case A: already an explicit list/dict under \"filter\"\n if \"filter\" in filter_obj:\n raw = filter_obj[\"filter\"]\n if isinstance(raw, dict):\n raw = [raw]\n clauses: List[dict] = []\n for f in raw or []:\n if (\n \"term\" in f\n and isinstance(f[\"term\"], dict)\n and not self._is_placeholder_term(f[\"term\"])\n ):\n clauses.append(f)\n elif \"terms\" in f and isinstance(f[\"terms\"], dict):\n field, vals = next(iter(f[\"terms\"].items()))\n if isinstance(vals, list) and len(vals) > 0:\n clauses.append(f)\n return clauses\n\n # Case B: convert context-style maps into clauses\n field_mapping = {\n \"data_sources\": \"filename\",\n \"document_types\": \"mimetype\",\n \"owners\": \"owner\",\n }\n clauses: List[dict] = []\n for k, values in filter_obj.items():\n if not isinstance(values, list):\n continue\n field = field_mapping.get(k, k)\n if len(values) == 0:\n # Match-nothing placeholder (kept to mirror your tool semantics)\n clauses.append({\"term\": {field: \"__IMPOSSIBLE_VALUE__\"}})\n elif len(values) == 1:\n if values[0] != \"__IMPOSSIBLE_VALUE__\":\n clauses.append({\"term\": {field: values[0]}})\n else:\n clauses.append({\"terms\": {field: values}})\n return clauses\n\n # ---------- search (single hybrid path matching your tool) ----------\n def search(self, query: str | None = None) -> list[dict[str, Any]]:\n logger.info(self.ingest_data)\n client = self.build_client()\n q = (query or \"\").strip()\n\n # Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)\n filter_obj = None\n if getattr(self, \"filter_expression\", \"\") and self.filter_expression.strip():\n try:\n filter_obj = json.loads(self.filter_expression)\n except json.JSONDecodeError as e:\n raise ValueError(f\"Invalid filter_expression JSON: {e}\") from e\n\n if not self.embedding:\n raise ValueError(\n \"Embedding is required to run hybrid search (KNN + keyword).\"\n )\n\n # Embed the query\n vec = self.embedding.embed_query(q)\n\n # Build filter clauses (accept both shapes)\n clauses = self._coerce_filter_clauses(filter_obj)\n\n # Respect the tool's limit/threshold defaults\n limit = (filter_obj or {}).get(\"limit\", self.number_of_results)\n score_threshold = (filter_obj or {}).get(\"score_threshold\", 0)\n\n # Build the same hybrid body as your SearchService\n body = {\n \"query\": {\n \"bool\": {\n \"should\": [\n {\n \"knn\": {\n self.vector_field: {\n \"vector\": vec,\n \"k\": 10, # fixed to match the tool\n \"boost\": 0.7,\n }\n }\n },\n {\n \"multi_match\": {\n \"query\": q,\n \"fields\": [\"text^2\", \"filename^1.5\"],\n \"type\": \"best_fields\",\n \"fuzziness\": \"AUTO\",\n \"boost\": 0.3,\n }\n },\n ],\n \"minimum_should_match\": 1,\n }\n },\n \"aggs\": {\n \"data_sources\": {\"terms\": {\"field\": \"filename\", \"size\": 20}},\n \"document_types\": {\"terms\": {\"field\": \"mimetype\", \"size\": 10}},\n \"owners\": {\"terms\": {\"field\": \"owner\", \"size\": 10}},\n },\n \"_source\": [\n \"filename\",\n \"mimetype\",\n \"page\",\n \"text\",\n \"source_url\",\n \"owner\",\n \"allowed_users\",\n \"allowed_groups\",\n ],\n \"size\": limit,\n }\n if clauses:\n body[\"query\"][\"bool\"][\"filter\"] = clauses\n\n if isinstance(score_threshold, (int, float)) and score_threshold > 0:\n # top-level min_score (matches your tool)\n body[\"min_score\"] = score_threshold\n\n resp = client.search(index=self.index_name, body=body)\n hits = resp.get(\"hits\", {}).get(\"hits\", [])\n return [\n {\n \"page_content\": hit[\"_source\"].get(\"text\", \"\"),\n \"metadata\": {k: v for k, v in hit[\"_source\"].items() if k != \"text\"},\n \"score\": hit.get(\"_score\"),\n }\n for hit in hits\n ]\n\n def search_documents(self) -> list[Data]:\n try:\n raw = self.search(self.search_query or \"\")\n return [\n Data(\n text=hit[\"page_content\"],\n **hit[\"metadata\"]\n )\n for hit in raw\n ]\n self.log(self.ingest_data)\n except Exception as e:\n self.log(f\"search_documents error: {e}\")\n raise\n\n # -------- dynamic UI handling (auth switch) --------\n async def update_build_config(\n self, build_config: dict, field_value: str, field_name: str | None = None\n ) -> dict:\n try:\n if field_name == \"auth_mode\":\n mode = (field_value or \"basic\").strip().lower()\n is_basic = mode == \"basic\"\n is_jwt = mode == \"jwt\"\n\n build_config[\"username\"][\"show\"] = is_basic\n build_config[\"password\"][\"show\"] = is_basic\n\n build_config[\"jwt_token\"][\"show\"] = is_jwt\n build_config[\"jwt_header\"][\"show\"] = is_jwt\n build_config[\"bearer_prefix\"][\"show\"] = is_jwt\n\n build_config[\"username\"][\"required\"] = is_basic\n build_config[\"password\"][\"required\"] = is_basic\n\n build_config[\"jwt_token\"][\"required\"] = is_jwt\n build_config[\"jwt_header\"][\"required\"] = is_jwt\n build_config[\"bearer_prefix\"][\"required\"] = False\n\n if is_basic:\n build_config[\"jwt_token\"][\"value\"] = \"\"\n\n return build_config\n\n return build_config\n\n except Exception as e:\n self.log(f\"update_build_config error: {e}\")\n return build_config\n" + }, + "docs_metadata": { + "_input_type": "TableInput", + "advanced": true, + "display_name": "Ingestion Metadata", + "dynamic": false, + "info": "Key value pairs to be inserted into each ingested document.", + "is_list": true, + "list_add_label": "Add More", + "name": "docs_metadata", + "placeholder": "", + "required": false, + "show": true, + "table_icon": "Table", + "table_schema": { + "columns": [ + { + "default": "None", + "description": "Key name", + "disable_edit": false, + "display_name": "Key", + "edit_mode": "popover", + "filterable": true, + "formatter": "text", + "hidden": false, + "name": "key", + "sortable": true, + "type": "str" + }, + { + "default": "None", + "description": "Value of the metadata", + "disable_edit": false, + "display_name": "Value", + "edit_mode": "popover", + "filterable": true, + "formatter": "text", + "hidden": false, + "name": "value", + "sortable": true, + "type": "str" + } + ] + }, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "trigger_icon": "Table", + "trigger_text": "Open table", + "type": "table", + "value": [] }, "ef_construction": { "_input_type": "IntInput", @@ -1956,12 +2007,12 @@ "x": 2218.9287723423276, "y": 1332.2598463956504 }, - "selected": true, + "selected": false, "type": "genericNode" } ], "viewport": { - "x": -960.2209660078397, + "x": -960.2209660078395, "y": -896.7176624994117, "zoom": 0.7898282762479812 }