openrag/flows/components/opensearch.py
2025-10-11 00:03:58 -04:00

876 lines
33 KiB
Python

from __future__ import annotations
import json
import uuid
from typing import Any
from opensearchpy import OpenSearch, helpers
from opensearchpy.exceptions import RequestError
from lfx.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from lfx.base.vectorstores.vector_store_connection_decorator import vector_store_connection
from lfx.io import BoolInput, DropdownInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput, TableInput
from lfx.log import logger
from lfx.schema.data import Data
@vector_store_connection
class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
"""OpenSearch Vector Store Component with Hybrid Search Capabilities.
This component provides vector storage and retrieval using OpenSearch, combining semantic
similarity search (KNN) with keyword-based search for optimal results. It supports document
ingestion, vector embeddings, and advanced filtering with authentication options.
Features:
- Vector storage with configurable engines (jvector, nmslib, faiss, lucene)
- Hybrid search combining KNN vector similarity and keyword matching
- Flexible authentication (Basic auth, JWT tokens)
- Advanced filtering and aggregations
- Metadata injection during document ingestion
"""
display_name: str = "OpenSearch"
icon: str = "OpenSearch"
description: str = (
"Store and search documents using OpenSearch with hybrid semantic and keyword search capabilities."
)
# Keys we consider baseline
default_keys: list[str] = [
"opensearch_url",
"index_name",
*[i.name for i in LCVectorStoreComponent.inputs], # search_query, add_documents, etc.
"embedding",
"vector_field",
"number_of_results",
"auth_mode",
"username",
"password",
"jwt_token",
"jwt_header",
"bearer_prefix",
"use_ssl",
"verify_certs",
"filter_expression",
"engine",
"space_type",
"ef_construction",
"m",
"docs_metadata",
]
inputs = [
TableInput(
name="docs_metadata",
display_name="Document Metadata",
info=(
"Additional metadata key-value pairs to be added to all ingested documents. "
"Useful for tagging documents with source information, categories, or other custom attributes."
),
table_schema=[
{
"name": "key",
"display_name": "Key",
"type": "str",
"description": "Key name",
},
{
"name": "value",
"display_name": "Value",
"type": "str",
"description": "Value of the metadata",
},
],
value=[],
# advanced=True,
input_types=["Data"]
),
StrInput(
name="opensearch_url",
display_name="OpenSearch URL",
value="http://localhost:9200",
info=(
"The connection URL for your OpenSearch cluster "
"(e.g., http://localhost:9200 for local development or your cloud endpoint)."
),
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info=(
"The OpenSearch index name where documents will be stored and searched. "
"Will be created automatically if it doesn't exist."
),
),
DropdownInput(
name="engine",
display_name="Vector Engine",
options=["jvector", "nmslib", "faiss", "lucene"],
value="jvector",
info=(
"Vector search engine for similarity calculations. 'jvector' is recommended for most use cases. "
"Note: Amazon OpenSearch Serverless only supports 'nmslib' or 'faiss'."
),
advanced=True,
),
DropdownInput(
name="space_type",
display_name="Distance Metric",
options=["l2", "l1", "cosinesimil", "linf", "innerproduct"],
value="l2",
info=(
"Distance metric for calculating vector similarity. 'l2' (Euclidean) is most common, "
"'cosinesimil' for cosine similarity, 'innerproduct' for dot product."
),
advanced=True,
),
IntInput(
name="ef_construction",
display_name="EF Construction",
value=512,
info=(
"Size of the dynamic candidate list during index construction. "
"Higher values improve recall but increase indexing time and memory usage."
),
advanced=True,
),
IntInput(
name="m",
display_name="M Parameter",
value=16,
info=(
"Number of bidirectional connections for each vector in the HNSW graph. "
"Higher values improve search quality but increase memory usage and indexing time."
),
advanced=True,
),
*LCVectorStoreComponent.inputs, # includes search_query, add_documents, etc.
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
StrInput(
name="vector_field",
display_name="Vector Field Name",
value="chunk_embedding",
advanced=True,
info="Name of the field in OpenSearch documents that stores the vector embeddings for similarity search.",
),
IntInput(
name="number_of_results",
display_name="Default Result Limit",
value=10,
advanced=True,
info=(
"Default maximum number of search results to return when no limit is "
"specified in the filter expression."
),
),
MultilineInput(
name="filter_expression",
display_name="Search Filters (JSON)",
value="",
info=(
"Optional JSON configuration for search filtering, result limits, and score thresholds.\n\n"
"Format 1 - Explicit filters:\n"
'{"filter": [{"term": {"filename":"doc.pdf"}}, '
'{"terms":{"owner":["user1","user2"]}}], "limit": 10, "score_threshold": 1.6}\n\n'
"Format 2 - Context-style mapping:\n"
'{"data_sources":["file.pdf"], "document_types":["application/pdf"], "owners":["user123"]}\n\n'
"Use __IMPOSSIBLE_VALUE__ as placeholder to ignore specific filters."
),
),
# ----- Auth controls (dynamic) -----
DropdownInput(
name="auth_mode",
display_name="Authentication Mode",
value="basic",
options=["basic", "jwt"],
info=(
"Authentication method: 'basic' for username/password authentication, "
"or 'jwt' for JSON Web Token (Bearer) authentication."
),
real_time_refresh=True,
advanced=False,
),
StrInput(
name="username",
display_name="Username",
value="admin",
show=False,
),
SecretStrInput(
name="password",
display_name="OpenSearch Password",
value="admin",
show=False,
),
SecretStrInput(
name="jwt_token",
display_name="JWT Token",
value="JWT",
load_from_db=False,
show=True,
info=(
"Valid JSON Web Token for authentication. "
"Will be sent in the Authorization header (with optional 'Bearer ' prefix)."
),
),
StrInput(
name="jwt_header",
display_name="JWT Header Name",
value="Authorization",
show=False,
advanced=True,
),
BoolInput(
name="bearer_prefix",
display_name="Prefix 'Bearer '",
value=True,
show=False,
advanced=True,
),
# ----- TLS -----
BoolInput(
name="use_ssl",
display_name="Use SSL/TLS",
value=True,
advanced=True,
info="Enable SSL/TLS encryption for secure connections to OpenSearch.",
),
BoolInput(
name="verify_certs",
display_name="Verify SSL Certificates",
value=False,
advanced=True,
info=(
"Verify SSL certificates when connecting. "
"Disable for self-signed certificates in development environments."
),
),
]
# ---------- helper functions for index management ----------
def _default_text_mapping(
self,
dim: int,
engine: str = "jvector",
space_type: str = "l2",
ef_search: int = 512,
ef_construction: int = 100,
m: int = 16,
vector_field: str = "vector_field",
) -> dict[str, Any]:
"""Create the default OpenSearch index mapping for vector search.
This method generates the index configuration with k-NN settings optimized
for approximate nearest neighbor search using the specified vector engine.
Args:
dim: Dimensionality of the vector embeddings
engine: Vector search engine (jvector, nmslib, faiss, lucene)
space_type: Distance metric for similarity calculation
ef_search: Size of dynamic list used during search
ef_construction: Size of dynamic list used during index construction
m: Number of bidirectional links for each vector
vector_field: Name of the field storing vector embeddings
Returns:
Dictionary containing OpenSearch index mapping configuration
"""
return {
"settings": {"index": {"knn": True, "knn.algo_param.ef_search": ef_search}},
"mappings": {
"properties": {
vector_field: {
"type": "knn_vector",
"dimension": dim,
"method": {
"name": "disk_ann",
"space_type": space_type,
"engine": engine,
"parameters": {"ef_construction": ef_construction, "m": m},
},
},
"embedding_dimensions": {
"type": "integer"
}
}
},
}
def _ensure_vector_field_mapping(
self,
client: OpenSearch,
index_name: str,
vector_field: str,
dim: int,
engine: str,
space_type: str,
ef_construction: int,
m: int,
) -> None:
"""Ensure the target vector field exists with the correct mapping."""
try:
mapping = {
"properties": {
vector_field: {
"type": "knn_vector",
"dimension": dim,
"method": {
"name": "disk_ann",
"space_type": space_type,
"engine": engine,
"parameters": {"ef_construction": ef_construction, "m": m},
},
},
"embedding_dimensions": {
"type": "integer"
}
}
}
client.indices.put_mapping(index=index_name, body=mapping)
logger.info(
"Added/updated vector field mapping for %s in index %s",
vector_field,
index_name,
)
except Exception as exc:
logger.warning(
"Could not ensure vector field mapping for %s: %s",
vector_field,
exc,
)
def _validate_aoss_with_engines(self, *, is_aoss: bool, engine: str) -> None:
"""Validate engine compatibility with Amazon OpenSearch Serverless (AOSS).
Amazon OpenSearch Serverless has restrictions on which vector engines
can be used. This method ensures the selected engine is compatible.
Args:
is_aoss: Whether the connection is to Amazon OpenSearch Serverless
engine: The selected vector search engine
Raises:
ValueError: If AOSS is used with an incompatible engine
"""
if is_aoss and engine not in {"nmslib", "faiss"}:
msg = "Amazon OpenSearch Service Serverless only supports `nmslib` or `faiss` engines"
raise ValueError(msg)
def _is_aoss_enabled(self, http_auth: Any) -> bool:
"""Determine if Amazon OpenSearch Serverless (AOSS) is being used.
Args:
http_auth: The HTTP authentication object
Returns:
True if AOSS is enabled, False otherwise
"""
return http_auth is not None and hasattr(http_auth, "service") and http_auth.service == "aoss"
def _bulk_ingest_embeddings(
self,
client: OpenSearch,
index_name: str,
embeddings: list[list[float]],
texts: list[str],
metadatas: list[dict] | None = None,
ids: list[str] | None = None,
vector_field: str = "vector_field",
text_field: str = "text",
mapping: dict | None = None,
max_chunk_bytes: int | None = 1 * 1024 * 1024,
*,
is_aoss: bool = False,
) -> list[str]:
"""Efficiently ingest multiple documents with embeddings into OpenSearch.
This method uses bulk operations to insert documents with their vector
embeddings and metadata into the specified OpenSearch index.
Args:
client: OpenSearch client instance
index_name: Target index for document storage
embeddings: List of vector embeddings for each document
texts: List of document texts
metadatas: Optional metadata dictionaries for each document
ids: Optional document IDs (UUIDs generated if not provided)
vector_field: Field name for storing vector embeddings
text_field: Field name for storing document text
mapping: Optional index mapping configuration
max_chunk_bytes: Maximum size per bulk request chunk
is_aoss: Whether using Amazon OpenSearch Serverless
Returns:
List of document IDs that were successfully ingested
"""
if not mapping:
mapping = {}
requests = []
return_ids = []
vector_dimensions = len(embeddings[0]) if embeddings else None
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
if vector_dimensions is not None and "embedding_dimensions" not in metadata:
metadata = {**metadata, "embedding_dimensions": vector_dimensions}
_id = ids[i] if ids else str(uuid.uuid4())
request = {
"_op_type": "index",
"_index": index_name,
vector_field: embeddings[i],
text_field: text,
**metadata,
}
if is_aoss:
request["id"] = _id
else:
request["_id"] = _id
requests.append(request)
return_ids.append(_id)
if metadatas:
self.log(f"Sample metadata: {metadatas[0] if metadatas else {}}")
helpers.bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
return return_ids
# ---------- auth / client ----------
def _build_auth_kwargs(self) -> dict[str, Any]:
"""Build authentication configuration for OpenSearch client.
Constructs the appropriate authentication parameters based on the
selected auth mode (basic username/password or JWT token).
Returns:
Dictionary containing authentication configuration
Raises:
ValueError: If required authentication parameters are missing
"""
mode = (self.auth_mode or "basic").strip().lower()
if mode == "jwt":
token = (self.jwt_token or "").strip()
if not token:
msg = "Auth Mode is 'jwt' but no jwt_token was provided."
raise ValueError(msg)
header_name = (self.jwt_header or "Authorization").strip()
header_value = f"Bearer {token}" if self.bearer_prefix else token
return {"headers": {header_name: header_value}}
user = (self.username or "").strip()
pwd = (self.password or "").strip()
if not user or not pwd:
msg = "Auth Mode is 'basic' but username/password are missing."
raise ValueError(msg)
return {"http_auth": (user, pwd)}
def build_client(self) -> OpenSearch:
"""Create and configure an OpenSearch client instance.
Returns:
Configured OpenSearch client ready for operations
"""
auth_kwargs = self._build_auth_kwargs()
return OpenSearch(
hosts=[self.opensearch_url],
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
ssl_assert_hostname=False,
ssl_show_warn=False,
**auth_kwargs,
)
@check_cached_vector_store
def build_vector_store(self) -> OpenSearch:
# Return raw OpenSearch client as our “vector store.”
self.log(self.ingest_data)
client = self.build_client()
self._add_documents_to_vector_store(client=client)
return client
# ---------- ingest ----------
def _add_documents_to_vector_store(self, client: OpenSearch) -> None:
"""Process and ingest documents into the OpenSearch vector store.
This method handles the complete document ingestion pipeline:
- Prepares document data and metadata
- Generates vector embeddings
- Creates appropriate index mappings
- Bulk inserts documents with vectors
Args:
client: OpenSearch client for performing operations
"""
# Convert DataFrame to Data if needed using parent's method
self.ingest_data = self._prepare_ingest_data()
docs = self.ingest_data or []
if not docs:
self.log("No documents to ingest.")
return
# Extract texts and metadata from documents
texts = []
metadatas = []
# Process docs_metadata table input into a dict
additional_metadata = {}
if hasattr(self, "docs_metadata") and self.docs_metadata:
logger.info(f"[LF] Docs metadata {self.docs_metadata}")
if isinstance(self.docs_metadata[-1], Data):
logger.info(f"[LF] Docs metadata is a Data object {self.docs_metadata}")
self.docs_metadata = self.docs_metadata[-1].data
logger.info(f"[LF] Docs metadata is a Data object {self.docs_metadata}")
additional_metadata.update(self.docs_metadata)
else:
for item in self.docs_metadata:
if isinstance(item, dict) and "key" in item and "value" in item:
additional_metadata[item["key"]] = item["value"]
logger.info(f"[LF] Additional metadata {additional_metadata}")
for doc_obj in docs:
data_copy = json.loads(doc_obj.model_dump_json())
text = data_copy.pop(doc_obj.text_key, doc_obj.default_value)
texts.append(text)
# Merge additional metadata from table input
data_copy.update(additional_metadata)
metadatas.append(data_copy)
self.log(metadatas)
if not self.embedding:
msg = "Embedding handle is required to embed documents."
raise ValueError(msg)
# Generate embeddings using a thread pool for concurrency
def embed_chunk(chunk_text: str) -> list[float]:
return self.embedding.embed_documents([chunk_text])[0]
try:
max_workers = min(max(len(texts), 1), 8)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(embed_chunk, chunk): idx for idx, chunk in enumerate(texts)}
vectors = [None] * len(texts)
for future in as_completed(futures):
idx = futures[future]
vectors[idx] = future.result()
except Exception as exc:
logger.warning(
"Threaded embedding generation failed, falling back to synchronous mode: %s",
exc,
)
vectors = self.embedding.embed_documents(texts)
if not vectors:
self.log("No vectors generated from documents.")
return
# Get vector dimension for mapping
dim = len(vectors[0]) if vectors else 768 # default fallback
# Check for AOSS
auth_kwargs = self._build_auth_kwargs()
is_aoss = self._is_aoss_enabled(auth_kwargs.get("http_auth"))
# Validate engine with AOSS
engine = getattr(self, "engine", "jvector")
self._validate_aoss_with_engines(is_aoss=is_aoss, engine=engine)
# Create mapping with proper KNN settings
space_type = getattr(self, "space_type", "l2")
ef_construction = getattr(self, "ef_construction", 512)
m = getattr(self, "m", 16)
mapping = self._default_text_mapping(
dim=dim,
engine=engine,
space_type=space_type,
ef_construction=ef_construction,
m=m,
vector_field=self.vector_field,
)
try:
if not client.indices.exists(index=self.index_name):
self.log(f"Creating index '{self.index_name}' with base mapping")
client.indices.create(index=self.index_name, body=mapping)
except RequestError as creation_error:
if getattr(creation_error, "error", "") != "resource_already_exists_exception":
logger.warning(
"Failed to create index %s: %s",
self.index_name,
creation_error,
)
self._ensure_vector_field_mapping(
client=client,
index_name=self.index_name,
vector_field=self.vector_field,
dim=dim,
engine=engine,
space_type=space_type,
ef_construction=ef_construction,
m=m,
)
self.log(f"Indexing {len(texts)} documents into '{self.index_name}' with proper KNN mapping...")
# Use the LangChain-style bulk ingestion
return_ids = self._bulk_ingest_embeddings(
client=client,
index_name=self.index_name,
embeddings=vectors,
texts=texts,
metadatas=metadatas,
vector_field=self.vector_field,
text_field="text",
mapping=mapping,
is_aoss=is_aoss,
)
self.log(metadatas)
self.log(f"Successfully indexed {len(return_ids)} documents.")
# ---------- helpers for filters ----------
def _is_placeholder_term(self, term_obj: dict) -> bool:
# term_obj like {"filename": "__IMPOSSIBLE_VALUE__"}
return any(v == "__IMPOSSIBLE_VALUE__" for v in term_obj.values())
def _coerce_filter_clauses(self, filter_obj: dict | None) -> list[dict]:
"""Convert filter expressions into OpenSearch-compatible filter clauses.
This method accepts two filter formats and converts them to standardized
OpenSearch query clauses:
Format A - Explicit filters:
{"filter": [{"term": {"field": "value"}}, {"terms": {"field": ["val1", "val2"]}}],
"limit": 10, "score_threshold": 1.5}
Format B - Context-style mapping:
{"data_sources": ["file1.pdf"], "document_types": ["pdf"], "owners": ["user1"]}
Args:
filter_obj: Filter configuration dictionary or None
Returns:
List of OpenSearch filter clauses (term/terms objects)
Placeholder values with "__IMPOSSIBLE_VALUE__" are ignored
"""
if not filter_obj:
return []
# If it is a string, try to parse it once
if isinstance(filter_obj, str):
try:
filter_obj = json.loads(filter_obj)
except json.JSONDecodeError:
# Not valid JSON - treat as no filters
return []
# Case A: already an explicit list/dict under "filter"
if "filter" in filter_obj:
raw = filter_obj["filter"]
if isinstance(raw, dict):
raw = [raw]
explicit_clauses: list[dict] = []
for f in raw or []:
if "term" in f and isinstance(f["term"], dict) and not self._is_placeholder_term(f["term"]):
explicit_clauses.append(f)
elif "terms" in f and isinstance(f["terms"], dict):
field, vals = next(iter(f["terms"].items()))
if isinstance(vals, list) and len(vals) > 0:
explicit_clauses.append(f)
return explicit_clauses
# Case B: convert context-style maps into clauses
field_mapping = {
"data_sources": "filename",
"document_types": "mimetype",
"owners": "owner",
}
context_clauses: list[dict] = []
for k, values in filter_obj.items():
if not isinstance(values, list):
continue
field = field_mapping.get(k, k)
if len(values) == 0:
# Match-nothing placeholder (kept to mirror your tool semantics)
context_clauses.append({"term": {field: "__IMPOSSIBLE_VALUE__"}})
elif len(values) == 1:
if values[0] != "__IMPOSSIBLE_VALUE__":
context_clauses.append({"term": {field: values[0]}})
else:
context_clauses.append({"terms": {field: values}})
return context_clauses
# ---------- search (single hybrid path matching your tool) ----------
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""Perform hybrid search combining vector similarity and keyword matching.
This method executes a sophisticated search that combines:
- K-nearest neighbor (KNN) vector similarity search (70% weight)
- Multi-field keyword search with fuzzy matching (30% weight)
- Optional filtering and score thresholds
- Aggregations for faceted search results
Args:
query: Search query string (used for both vector embedding and keyword search)
Returns:
List of search results with page_content, metadata, and relevance scores
Raises:
ValueError: If embedding component is not provided or filter JSON is invalid
"""
logger.info(self.ingest_data)
client = self.build_client()
q = (query or "").strip()
# Parse optional filter expression (can be either A or B shape; see _coerce_filter_clauses)
filter_obj = None
if getattr(self, "filter_expression", "") and self.filter_expression.strip():
try:
filter_obj = json.loads(self.filter_expression)
except json.JSONDecodeError as e:
msg = f"Invalid filter_expression JSON: {e}"
raise ValueError(msg) from e
if not self.embedding:
msg = "Embedding is required to run hybrid search (KNN + keyword)."
raise ValueError(msg)
# Embed the query
vec = self.embedding.embed_query(q)
# Build filter clauses (accept both shapes)
filter_clauses = self._coerce_filter_clauses(filter_obj)
# Respect the tool's limit/threshold defaults
limit = (filter_obj or {}).get("limit", self.number_of_results)
score_threshold = (filter_obj or {}).get("score_threshold", 0)
# Build the same hybrid body as your SearchService
body = {
"query": {
"bool": {
"should": [
{
"knn": {
self.vector_field: {
"vector": vec,
"k": 10, # fixed to match the tool
"boost": 0.7,
}
}
},
{
"multi_match": {
"query": q,
"fields": ["text^2", "filename^1.5"],
"type": "best_fields",
"fuzziness": "AUTO",
"boost": 0.3,
}
},
],
"minimum_should_match": 1,
}
},
"aggs": {
"data_sources": {"terms": {"field": "filename", "size": 20}},
"document_types": {"terms": {"field": "mimetype", "size": 10}},
"owners": {"terms": {"field": "owner", "size": 10}},
},
"_source": [
"filename",
"mimetype",
"page",
"text",
"source_url",
"owner",
"embedding_dimensions",
"allowed_users",
"allowed_groups",
],
"size": limit,
}
if filter_clauses:
body["query"]["bool"]["filter"] = filter_clauses
if isinstance(score_threshold, (int, float)) and score_threshold > 0:
# top-level min_score (matches your tool)
body["min_score"] = score_threshold
resp = client.search(index=self.index_name, body=body)
hits = resp.get("hits", {}).get("hits", [])
return [
{
"page_content": hit["_source"].get("text", ""),
"metadata": {k: v for k, v in hit["_source"].items() if k != "text"},
"score": hit.get("_score"),
}
for hit in hits
]
def search_documents(self) -> list[Data]:
"""Search documents and return results as Data objects.
This is the main interface method that performs the search using the
configured search_query and returns results in Langflow's Data format.
Returns:
List of Data objects containing search results with text and metadata
Raises:
Exception: If search operation fails
"""
try:
raw = self.search(self.search_query or "")
return [Data(text=hit["page_content"], **hit["metadata"]) for hit in raw]
self.log(self.ingest_data)
except Exception as e:
self.log(f"search_documents error: {e}")
raise
# -------- dynamic UI handling (auth switch) --------
async def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
"""Dynamically update component configuration based on field changes.
This method handles real-time UI updates, particularly for authentication
mode changes that show/hide relevant input fields.
Args:
build_config: Current component configuration
field_value: New value for the changed field
field_name: Name of the field that changed
Returns:
Updated build configuration with appropriate field visibility
"""
try:
if field_name == "auth_mode":
mode = (field_value or "basic").strip().lower()
is_basic = mode == "basic"
is_jwt = mode == "jwt"
build_config["username"]["show"] = is_basic
build_config["password"]["show"] = is_basic
build_config["jwt_token"]["show"] = is_jwt
build_config["jwt_header"]["show"] = is_jwt
build_config["bearer_prefix"]["show"] = is_jwt
build_config["username"]["required"] = is_basic
build_config["password"]["required"] = is_basic
build_config["jwt_token"]["required"] = is_jwt
build_config["jwt_header"]["required"] = is_jwt
build_config["bearer_prefix"]["required"] = False
if is_basic:
build_config["jwt_token"]["value"] = ""
return build_config
except (KeyError, ValueError) as e:
self.log(f"update_build_config error: {e}")
return build_config