183 lines
7 KiB
Python
183 lines
7 KiB
Python
"""
|
|
Copyright 2024, Zep Software, Inc.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
"""
|
|
|
|
import logging
|
|
from collections.abc import Iterable
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from google import genai
|
|
from google.genai import types
|
|
else:
|
|
try:
|
|
from google import genai
|
|
from google.genai import types
|
|
except ImportError:
|
|
raise ImportError(
|
|
'google-genai is required for GeminiEmbedder. '
|
|
'Install it with: pip install graphiti-core[google-genai]'
|
|
) from None
|
|
|
|
from pydantic import Field
|
|
|
|
from .client import EmbedderClient, EmbedderConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_EMBEDDING_MODEL = 'text-embedding-001' # gemini-embedding-001 or text-embedding-005
|
|
|
|
DEFAULT_BATCH_SIZE = 100
|
|
|
|
|
|
class GeminiEmbedderConfig(EmbedderConfig):
|
|
embedding_model: str = Field(default=DEFAULT_EMBEDDING_MODEL)
|
|
api_key: str | None = None
|
|
|
|
|
|
class GeminiEmbedder(EmbedderClient):
|
|
"""
|
|
Google Gemini Embedder Client
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: GeminiEmbedderConfig | None = None,
|
|
client: 'genai.Client | None' = None,
|
|
batch_size: int | None = None,
|
|
):
|
|
"""
|
|
Initialize the GeminiEmbedder with the provided configuration and client.
|
|
|
|
Args:
|
|
config (GeminiEmbedderConfig | None): The configuration for the GeminiEmbedder, including API key, model, base URL, temperature, and max tokens.
|
|
client (genai.Client | None): An optional async client instance to use. If not provided, a new genai.Client is created.
|
|
batch_size (int | None): An optional batch size to use. If not provided, the default batch size will be used.
|
|
"""
|
|
if config is None:
|
|
config = GeminiEmbedderConfig()
|
|
|
|
self.config = config
|
|
|
|
if client is None:
|
|
self.client = genai.Client(api_key=config.api_key)
|
|
else:
|
|
self.client = client
|
|
|
|
if batch_size is None and self.config.embedding_model == 'gemini-embedding-001':
|
|
# Gemini API has a limit on the number of instances per request
|
|
# https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/text-embeddings-api
|
|
self.batch_size = 1
|
|
elif batch_size is None:
|
|
self.batch_size = DEFAULT_BATCH_SIZE
|
|
else:
|
|
self.batch_size = batch_size
|
|
|
|
async def create(
|
|
self, input_data: str | list[str] | Iterable[int] | Iterable[Iterable[int]]
|
|
) -> list[float]:
|
|
"""
|
|
Create embeddings for the given input data using Google's Gemini embedding model.
|
|
|
|
Args:
|
|
input_data: The input data to create embeddings for. Can be a string, list of strings,
|
|
or an iterable of integers or iterables of integers.
|
|
|
|
Returns:
|
|
A list of floats representing the embedding vector.
|
|
"""
|
|
# Generate embeddings
|
|
result = await self.client.aio.models.embed_content(
|
|
model=self.config.embedding_model or DEFAULT_EMBEDDING_MODEL,
|
|
contents=[input_data], # type: ignore[arg-type] # mypy fails on broad union type
|
|
config=types.EmbedContentConfig(output_dimensionality=self.config.embedding_dim),
|
|
)
|
|
|
|
if not result.embeddings or len(result.embeddings) == 0 or not result.embeddings[0].values:
|
|
raise ValueError('No embeddings returned from Gemini API in create()')
|
|
|
|
return result.embeddings[0].values
|
|
|
|
async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
|
|
"""
|
|
Create embeddings for a batch of input data using Google's Gemini embedding model.
|
|
|
|
This method handles batching to respect the Gemini API's limits on the number
|
|
of instances that can be processed in a single request.
|
|
|
|
Args:
|
|
input_data_list: A list of strings to create embeddings for.
|
|
|
|
Returns:
|
|
A list of embedding vectors (each vector is a list of floats).
|
|
"""
|
|
if not input_data_list:
|
|
return []
|
|
|
|
batch_size = self.batch_size
|
|
all_embeddings = []
|
|
|
|
# Process inputs in batches
|
|
for i in range(0, len(input_data_list), batch_size):
|
|
batch = input_data_list[i : i + batch_size]
|
|
|
|
try:
|
|
# Generate embeddings for this batch
|
|
result = await self.client.aio.models.embed_content(
|
|
model=self.config.embedding_model or DEFAULT_EMBEDDING_MODEL,
|
|
contents=batch, # type: ignore[arg-type] # mypy fails on broad union type
|
|
config=types.EmbedContentConfig(
|
|
output_dimensionality=self.config.embedding_dim
|
|
),
|
|
)
|
|
|
|
if not result.embeddings or len(result.embeddings) == 0:
|
|
raise Exception('No embeddings returned')
|
|
|
|
# Process embeddings from this batch
|
|
for embedding in result.embeddings:
|
|
if not embedding.values:
|
|
raise ValueError('Empty embedding values returned')
|
|
all_embeddings.append(embedding.values)
|
|
|
|
except Exception as e:
|
|
# If batch processing fails, fall back to individual processing
|
|
logger.warning(
|
|
f'Batch embedding failed for batch {i // batch_size + 1}, falling back to individual processing: {e}'
|
|
)
|
|
|
|
for item in batch:
|
|
try:
|
|
# Process each item individually
|
|
result = await self.client.aio.models.embed_content(
|
|
model=self.config.embedding_model or DEFAULT_EMBEDDING_MODEL,
|
|
contents=[item], # type: ignore[arg-type] # mypy fails on broad union type
|
|
config=types.EmbedContentConfig(
|
|
output_dimensionality=self.config.embedding_dim
|
|
),
|
|
)
|
|
|
|
if not result.embeddings or len(result.embeddings) == 0:
|
|
raise ValueError('No embeddings returned from Gemini API')
|
|
if not result.embeddings[0].values:
|
|
raise ValueError('Empty embedding values returned')
|
|
|
|
all_embeddings.append(result.embeddings[0].values)
|
|
|
|
except Exception as individual_error:
|
|
logger.error(f'Failed to embed individual item: {individual_error}')
|
|
raise individual_error
|
|
|
|
return all_embeddings
|