let the agent do full os queries
This commit is contained in:
parent
015a4d955e
commit
ab0c734595
5 changed files with 134 additions and 90 deletions
|
|
@ -12,7 +12,18 @@ from opensearchpy.exceptions import OpenSearchException, RequestError
|
|||
|
||||
from lfx.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
|
||||
from lfx.base.vectorstores.vector_store_connection_decorator import vector_store_connection
|
||||
from lfx.io import BoolInput, DropdownInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput, TableInput
|
||||
from lfx.inputs.inputs import DictInput
|
||||
from lfx.io import (
|
||||
BoolInput,
|
||||
DropdownInput,
|
||||
HandleInput,
|
||||
IntInput,
|
||||
MultilineInput,
|
||||
Output,
|
||||
SecretStrInput,
|
||||
StrInput,
|
||||
TableInput,
|
||||
)
|
||||
from lfx.log import logger
|
||||
from lfx.schema.data import Data
|
||||
|
||||
|
|
@ -85,6 +96,32 @@ class OpenSearchVectorStoreComponentMultimodalMultiEmbedding(LCVectorStoreCompon
|
|||
icon: str = "OpenSearch"
|
||||
description: str = (
|
||||
"Store and search documents using OpenSearch with multi-model hybrid semantic and keyword search."
|
||||
"To search use the tools search_documents and raw_search. Search documents takes a query for vector search, for example\n"
|
||||
" {search_query: \"components in openrag\"}"
|
||||
"\n"
|
||||
"you can also override the filter_expression to limit the hybrid query in search_documents by also passing filter_expression\n"
|
||||
"for example:\n"
|
||||
" {search_query: \"components in openrag\", filter_expression: {\"data_sources\":[\"my_doc.md\"],\"document_types\":[\"*\"],\"owners\":[\"*\"],\"connector_types\":[\"*\"]},\"limit\":10,\"scoreThreshold\":0}"
|
||||
"\n"
|
||||
"raw_search takes actual opensearch queries for example:"
|
||||
" {"
|
||||
" \"size\": 100,"
|
||||
" \"query\": {"
|
||||
" \"term\": {\"filename\": \"AmericanAirlines-2017_4.md\"}"
|
||||
" }"
|
||||
" \"_source\": [\"filename\", \"text\", \"page\"]"
|
||||
" }"
|
||||
"\n"
|
||||
"or:"
|
||||
"\n"
|
||||
" {"
|
||||
" \"size\": 0,"
|
||||
" \"aggs\": {"
|
||||
" \"distinct_filenames\": {"
|
||||
" \"cardinality\": {\"field\": \"filename\"}"
|
||||
" }"
|
||||
" },"
|
||||
" }"
|
||||
)
|
||||
|
||||
# Keys we consider baseline
|
||||
|
|
@ -325,7 +362,55 @@ class OpenSearchVectorStoreComponentMultimodalMultiEmbedding(LCVectorStoreCompon
|
|||
"Disable for self-signed certificates in development environments."
|
||||
),
|
||||
),
|
||||
# DictInput(name="query", display_name="Query", input_types=["Data"], is_list=False, tool_mode=True),
|
||||
]
|
||||
outputs = [
|
||||
Output(
|
||||
display_name="Search Results",
|
||||
name="search_results",
|
||||
method="search_documents",
|
||||
),
|
||||
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
|
||||
Output(display_name="Raw Search", name="raw_search", method="raw_search"),
|
||||
]
|
||||
|
||||
def raw_search(self, query: str | None = None) -> Data:
|
||||
"""Execute a raw OpenSearch query against the target index.
|
||||
|
||||
Args:
|
||||
query (dict[str, Any]): The OpenSearch query DSL dictionary.
|
||||
|
||||
Returns:
|
||||
Data: Search results as a Data object.
|
||||
|
||||
Raises:
|
||||
ValueError: If 'query' is not a valid OpenSearch query (must be a non-empty dict).
|
||||
"""
|
||||
query = self.search_query
|
||||
if isinstance(query, str):
|
||||
query = json.loads(query)
|
||||
client = self.build_client()
|
||||
logger.info(f"query: {query}")
|
||||
resp = client.search(
|
||||
index=self.index_name,
|
||||
body=query,
|
||||
params={"terminate_after": 0},
|
||||
)
|
||||
# Remove any _source keys whose value is a list of floats (embedding vectors)
|
||||
def is_vector(val):
|
||||
# Accepts if it's a list of numbers (float or int) and has reasonable vector length (>3)
|
||||
return (
|
||||
isinstance(val, list) and len(val) > 100 and all(isinstance(x, (float, int)) for x in val)
|
||||
)
|
||||
if "hits" in resp and "hits" in resp["hits"]:
|
||||
for hit in resp["hits"]["hits"]:
|
||||
source = hit.get("_source")
|
||||
if isinstance(source, dict):
|
||||
keys_to_remove = [k for k, v in source.items() if is_vector(v)]
|
||||
for k in keys_to_remove:
|
||||
source.pop(k)
|
||||
logger.info(f"Raw search response (all embedding vectors removed): {resp}")
|
||||
return Data(**resp)
|
||||
|
||||
def _get_embedding_model_name(self, embedding_obj=None) -> str:
|
||||
"""Get the embedding model name from component config or embedding object.
|
||||
|
|
@ -865,99 +950,58 @@ class OpenSearchVectorStoreComponentMultimodalMultiEmbedding(LCVectorStoreCompon
|
|||
metadatas.append(data_copy)
|
||||
self.log(metadatas)
|
||||
|
||||
# Generate embeddings with rate-limit-aware retry logic using tenacity
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
)
|
||||
# Generate embeddings (threaded for concurrency) with retries
|
||||
def embed_chunk(chunk_text: str) -> list[float]:
|
||||
return selected_embedding.embed_documents([chunk_text])[0]
|
||||
|
||||
def is_rate_limit_error(exception: Exception) -> bool:
|
||||
"""Check if exception is a rate limit error (429)."""
|
||||
error_str = str(exception).lower()
|
||||
return "429" in error_str or "rate_limit" in error_str or "rate limit" in error_str
|
||||
|
||||
def is_other_retryable_error(exception: Exception) -> bool:
|
||||
"""Check if exception is retryable but not a rate limit error."""
|
||||
# Retry on most exceptions except for specific non-retryable ones
|
||||
# Add other non-retryable exceptions here if needed
|
||||
return not is_rate_limit_error(exception)
|
||||
|
||||
# Create retry decorator for rate limit errors (longer backoff)
|
||||
retry_on_rate_limit = retry(
|
||||
retry=retry_if_exception(is_rate_limit_error),
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential(multiplier=2, min=2, max=30),
|
||||
reraise=True,
|
||||
before_sleep=lambda retry_state: logger.warning(
|
||||
f"Rate limit hit for chunk (attempt {retry_state.attempt_number}/5), "
|
||||
f"backing off for {retry_state.next_action.sleep:.1f}s"
|
||||
),
|
||||
)
|
||||
|
||||
# Create retry decorator for other errors (shorter backoff)
|
||||
retry_on_other_errors = retry(
|
||||
retry=retry_if_exception(is_other_retryable_error),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=8),
|
||||
reraise=True,
|
||||
before_sleep=lambda retry_state: logger.warning(
|
||||
f"Error embedding chunk (attempt {retry_state.attempt_number}/3), "
|
||||
f"retrying in {retry_state.next_action.sleep:.1f}s: {retry_state.outcome.exception()}"
|
||||
),
|
||||
)
|
||||
|
||||
def embed_chunk_with_retry(chunk_text: str, chunk_idx: int) -> list[float]:
|
||||
"""Embed a single chunk with rate-limit-aware retry logic."""
|
||||
|
||||
@retry_on_rate_limit
|
||||
@retry_on_other_errors
|
||||
def _embed(text: str) -> list[float]:
|
||||
return selected_embedding.embed_documents([text])[0]
|
||||
vectors: list[list[float]] | None = None
|
||||
last_exception: Exception | None = None
|
||||
delay = 1.0
|
||||
attempts = 0
|
||||
max_attempts = 3
|
||||
|
||||
while attempts < max_attempts:
|
||||
attempts += 1
|
||||
try:
|
||||
return _embed(chunk_text)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to embed chunk {chunk_idx} after all retries: {e}",
|
||||
error=str(e),
|
||||
# Restrict concurrency for IBM/Watsonx models to avoid rate limits
|
||||
is_ibm = (embedding_model and "ibm" in str(embedding_model).lower()) or (
|
||||
selected_embedding and "watsonx" in type(selected_embedding).__name__.lower()
|
||||
)
|
||||
raise
|
||||
logger.debug(f"Is IBM: {is_ibm}")
|
||||
max_workers = 1 if is_ibm else min(max(len(texts), 1), 8)
|
||||
|
||||
# Restrict concurrency for IBM/Watsonx models to avoid rate limits
|
||||
is_ibm = (embedding_model and "ibm" in str(embedding_model).lower()) or (
|
||||
selected_embedding and "watsonx" in type(selected_embedding).__name__.lower()
|
||||
)
|
||||
logger.debug(f"Is IBM: {is_ibm}")
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(embed_chunk, chunk): idx for idx, chunk in enumerate(texts)}
|
||||
vectors = [None] * len(texts)
|
||||
for future in as_completed(futures):
|
||||
idx = futures[future]
|
||||
vectors[idx] = future.result()
|
||||
break
|
||||
except Exception as exc:
|
||||
last_exception = exc
|
||||
if attempts >= max_attempts:
|
||||
logger.error(
|
||||
f"Embedding generation failed for model {embedding_model} after retries",
|
||||
error=str(exc),
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"Threaded embedding generation failed for model %s (attempt %s/%s), retrying in %.1fs",
|
||||
embedding_model,
|
||||
attempts,
|
||||
max_attempts,
|
||||
delay,
|
||||
)
|
||||
time.sleep(delay)
|
||||
delay = min(delay * 2, 8.0)
|
||||
|
||||
# For IBM models, use sequential processing with rate limiting
|
||||
# For other models, use parallel processing
|
||||
vectors: list[list[float]] = [None] * len(texts)
|
||||
|
||||
if is_ibm:
|
||||
# Sequential processing with inter-request delay for IBM models
|
||||
inter_request_delay = 0.6 # ~1.67 req/s, safely under 2 req/s limit
|
||||
logger.info(
|
||||
f"Using sequential processing for IBM model with {inter_request_delay}s delay between requests"
|
||||
if vectors is None:
|
||||
raise RuntimeError(
|
||||
f"Embedding generation failed for {embedding_model}: {last_exception}"
|
||||
if last_exception
|
||||
else f"Embedding generation failed for {embedding_model}"
|
||||
)
|
||||
|
||||
for idx, chunk in enumerate(texts):
|
||||
if idx > 0:
|
||||
# Add delay between requests (but not before the first one)
|
||||
time.sleep(inter_request_delay)
|
||||
vectors[idx] = embed_chunk_with_retry(chunk, idx)
|
||||
else:
|
||||
# Parallel processing for non-IBM models
|
||||
max_workers = min(max(len(texts), 1), 8)
|
||||
logger.debug(f"Using parallel processing with {max_workers} workers")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(embed_chunk_with_retry, chunk, idx): idx for idx, chunk in enumerate(texts)}
|
||||
for future in as_completed(futures):
|
||||
idx = futures[future]
|
||||
vectors[idx] = future.result()
|
||||
|
||||
if not vectors:
|
||||
self.log(f"No vectors generated from documents for model {embedding_model}.")
|
||||
return
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Add table
Reference in a new issue