Merge pull request #620 from langflow-ai/fix/watsonx_fixes
This commit is contained in:
commit
5bf2076b05
6 changed files with 1825 additions and 49 deletions
|
|
@ -865,57 +865,98 @@ class OpenSearchVectorStoreComponentMultimodalMultiEmbedding(LCVectorStoreCompon
|
||||||
metadatas.append(data_copy)
|
metadatas.append(data_copy)
|
||||||
self.log(metadatas)
|
self.log(metadatas)
|
||||||
|
|
||||||
# Generate embeddings (threaded for concurrency) with retries
|
# Generate embeddings with rate-limit-aware retry logic using tenacity
|
||||||
def embed_chunk(chunk_text: str) -> list[float]:
|
from tenacity import (
|
||||||
return selected_embedding.embed_documents([chunk_text])[0]
|
retry,
|
||||||
|
retry_if_exception,
|
||||||
|
stop_after_attempt,
|
||||||
|
wait_exponential,
|
||||||
|
)
|
||||||
|
|
||||||
vectors: list[list[float]] | None = None
|
def is_rate_limit_error(exception: Exception) -> bool:
|
||||||
last_exception: Exception | None = None
|
"""Check if exception is a rate limit error (429)."""
|
||||||
delay = 1.0
|
error_str = str(exception).lower()
|
||||||
attempts = 0
|
return "429" in error_str or "rate_limit" in error_str or "rate limit" in error_str
|
||||||
max_attempts = 3
|
|
||||||
|
def is_other_retryable_error(exception: Exception) -> bool:
|
||||||
|
"""Check if exception is retryable but not a rate limit error."""
|
||||||
|
# Retry on most exceptions except for specific non-retryable ones
|
||||||
|
# Add other non-retryable exceptions here if needed
|
||||||
|
return not is_rate_limit_error(exception)
|
||||||
|
|
||||||
|
# Create retry decorator for rate limit errors (longer backoff)
|
||||||
|
retry_on_rate_limit = retry(
|
||||||
|
retry=retry_if_exception(is_rate_limit_error),
|
||||||
|
stop=stop_after_attempt(5),
|
||||||
|
wait=wait_exponential(multiplier=2, min=2, max=30),
|
||||||
|
reraise=True,
|
||||||
|
before_sleep=lambda retry_state: logger.warning(
|
||||||
|
f"Rate limit hit for chunk (attempt {retry_state.attempt_number}/5), "
|
||||||
|
f"backing off for {retry_state.next_action.sleep:.1f}s"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create retry decorator for other errors (shorter backoff)
|
||||||
|
retry_on_other_errors = retry(
|
||||||
|
retry=retry_if_exception(is_other_retryable_error),
|
||||||
|
stop=stop_after_attempt(3),
|
||||||
|
wait=wait_exponential(multiplier=1, min=1, max=8),
|
||||||
|
reraise=True,
|
||||||
|
before_sleep=lambda retry_state: logger.warning(
|
||||||
|
f"Error embedding chunk (attempt {retry_state.attempt_number}/3), "
|
||||||
|
f"retrying in {retry_state.next_action.sleep:.1f}s: {retry_state.outcome.exception()}"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def embed_chunk_with_retry(chunk_text: str, chunk_idx: int) -> list[float]:
|
||||||
|
"""Embed a single chunk with rate-limit-aware retry logic."""
|
||||||
|
|
||||||
|
@retry_on_rate_limit
|
||||||
|
@retry_on_other_errors
|
||||||
|
def _embed(text: str) -> list[float]:
|
||||||
|
return selected_embedding.embed_documents([text])[0]
|
||||||
|
|
||||||
while attempts < max_attempts:
|
|
||||||
attempts += 1
|
|
||||||
try:
|
try:
|
||||||
|
return _embed(chunk_text)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to embed chunk {chunk_idx} after all retries: {e}",
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
# Restrict concurrency for IBM/Watsonx models to avoid rate limits
|
# Restrict concurrency for IBM/Watsonx models to avoid rate limits
|
||||||
is_ibm = (embedding_model and "ibm" in str(embedding_model).lower()) or (
|
is_ibm = (embedding_model and "ibm" in str(embedding_model).lower()) or (
|
||||||
selected_embedding and "watsonx" in type(selected_embedding).__name__.lower()
|
selected_embedding and "watsonx" in type(selected_embedding).__name__.lower()
|
||||||
)
|
)
|
||||||
logger.debug(f"Is IBM: {is_ibm}")
|
logger.debug(f"Is IBM: {is_ibm}")
|
||||||
max_workers = 1 if is_ibm else min(max(len(texts), 1), 8)
|
|
||||||
|
# For IBM models, use sequential processing with rate limiting
|
||||||
|
# For other models, use parallel processing
|
||||||
|
vectors: list[list[float]] = [None] * len(texts)
|
||||||
|
|
||||||
|
if is_ibm:
|
||||||
|
# Sequential processing with inter-request delay for IBM models
|
||||||
|
inter_request_delay = 0.6 # ~1.67 req/s, safely under 2 req/s limit
|
||||||
|
logger.info(
|
||||||
|
f"Using sequential processing for IBM model with {inter_request_delay}s delay between requests"
|
||||||
|
)
|
||||||
|
|
||||||
|
for idx, chunk in enumerate(texts):
|
||||||
|
if idx > 0:
|
||||||
|
# Add delay between requests (but not before the first one)
|
||||||
|
time.sleep(inter_request_delay)
|
||||||
|
vectors[idx] = embed_chunk_with_retry(chunk, idx)
|
||||||
|
else:
|
||||||
|
# Parallel processing for non-IBM models
|
||||||
|
max_workers = min(max(len(texts), 1), 8)
|
||||||
|
logger.debug(f"Using parallel processing with {max_workers} workers")
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
futures = {executor.submit(embed_chunk, chunk): idx for idx, chunk in enumerate(texts)}
|
futures = {executor.submit(embed_chunk_with_retry, chunk, idx): idx for idx, chunk in enumerate(texts)}
|
||||||
vectors = [None] * len(texts)
|
|
||||||
for future in as_completed(futures):
|
for future in as_completed(futures):
|
||||||
idx = futures[future]
|
idx = futures[future]
|
||||||
vectors[idx] = future.result()
|
vectors[idx] = future.result()
|
||||||
break
|
|
||||||
except Exception as exc:
|
|
||||||
last_exception = exc
|
|
||||||
if attempts >= max_attempts:
|
|
||||||
logger.error(
|
|
||||||
f"Embedding generation failed for model {embedding_model} after retries",
|
|
||||||
error=str(exc),
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
logger.warning(
|
|
||||||
"Threaded embedding generation failed for model %s (attempt %s/%s), retrying in %.1fs",
|
|
||||||
embedding_model,
|
|
||||||
attempts,
|
|
||||||
max_attempts,
|
|
||||||
delay,
|
|
||||||
)
|
|
||||||
time.sleep(delay)
|
|
||||||
delay = min(delay * 2, 8.0)
|
|
||||||
|
|
||||||
if vectors is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Embedding generation failed for {embedding_model}: {last_exception}"
|
|
||||||
if last_exception
|
|
||||||
else f"Embedding generation failed for {embedding_model}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not vectors:
|
if not vectors:
|
||||||
self.log(f"No vectors generated from documents for model {embedding_model}.")
|
self.log(f"No vectors generated from documents for model {embedding_model}.")
|
||||||
|
|
|
||||||
1735
flows/components/opensearch_multimodel.py
Normal file
1735
flows/components/opensearch_multimodel.py
Normal file
File diff suppressed because it is too large
Load diff
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Add table
Reference in a new issue