""" Copyright 2024, Zep Software, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import json import logging from collections.abc import Iterable from typing import Any import numpy as np from pydantic import Field from .client import EmbedderClient, EmbedderConfig logger = logging.getLogger(__name__) DEFAULT_EMBEDDING_MODEL = 'vscode-embedder' DEFAULT_EMBEDDING_DIM = 1024 class VSCodeEmbedderConfig(EmbedderConfig): """Configuration for VS Code Embedder Client.""" embedding_model: str = DEFAULT_EMBEDDING_MODEL embedding_dim: int = Field(default=DEFAULT_EMBEDDING_DIM, frozen=True) use_fallback: bool = Field(default=True, description="Use fallback embeddings when VS Code unavailable") class VSCodeEmbedder(EmbedderClient): """ VS Code Embedder Client This client integrates with VS Code's embedding capabilities or provides intelligent fallback embeddings when VS Code is not available. Features: - Native VS Code embedding integration - Consistent fallback embeddings - Batch processing support - Semantic similarity preservation """ def __init__(self, config: VSCodeEmbedderConfig | None = None): if config is None: config = VSCodeEmbedderConfig() self.config = config self.vscode_available = self._check_vscode_availability() self._embedding_cache: dict[str, list[float]] = {} # Initialize semantic similarity components for fallback self._init_fallback_components() logger.info(f"VSCodeEmbedder initialized - VS Code available: {self.vscode_available}") def _check_vscode_availability(self) -> bool: """Check if VS Code embedding integration is available.""" try: import os # Check if we're running in a VS Code context return ( 'VSCODE_PID' in os.environ or 'VSCODE_IPC_HOOK' in os.environ or os.environ.get('USE_VSCODE_MODELS', 'false').lower() == 'true' ) except Exception: return False def _init_fallback_components(self): """Initialize components for fallback embedding generation.""" # Pre-computed word vectors for common terms (simplified TF-IDF approach) self._common_words = { # Entities 'person': 0.1, 'people': 0.1, 'user': 0.1, 'customer': 0.1, 'client': 0.1, 'company': 0.2, 'organization': 0.2, 'business': 0.2, 'enterprise': 0.2, 'product': 0.3, 'service': 0.3, 'item': 0.3, 'feature': 0.3, 'project': 0.4, 'task': 0.4, 'work': 0.4, 'job': 0.4, 'meeting': 0.5, 'discussion': 0.5, 'conversation': 0.5, 'talk': 0.5, # Actions 'create': 0.6, 'make': 0.6, 'build': 0.6, 'develop': 0.6, 'manage': 0.7, 'handle': 0.7, 'process': 0.7, 'organize': 0.7, 'analyze': 0.8, 'review': 0.8, 'evaluate': 0.8, 'assess': 0.8, 'design': 0.9, 'plan': 0.9, 'strategy': 0.9, 'approach': 0.9, # Relationships 'works': 1.1, 'manages': 1.1, 'leads': 1.1, 'supervises': 1.1, 'owns': 1.2, 'has': 1.2, 'contains': 1.2, 'includes': 1.2, 'uses': 1.3, 'utilizes': 1.3, 'operates': 1.3, 'handles': 1.3, 'knows': 1.4, 'understands': 1.4, 'familiar': 1.4, 'expert': 1.4, } # Semantic clusters for better similarity self._semantic_clusters = { 'person_cluster': ['person', 'people', 'user', 'customer', 'client', 'individual'], 'organization_cluster': ['company', 'organization', 'business', 'enterprise', 'firm'], 'product_cluster': ['product', 'service', 'item', 'feature', 'solution'], 'action_cluster': ['create', 'make', 'build', 'develop', 'design'], 'management_cluster': ['manage', 'handle', 'process', 'organize', 'coordinate'], } def _generate_fallback_embedding(self, text: str) -> list[float]: """ Generate a fallback embedding using semantic analysis. This creates consistent, meaningful embeddings without external APIs. """ if not text or not text.strip(): return [0.0] * self.config.embedding_dim # Check cache first cache_key = text.lower().strip() if cache_key in self._embedding_cache: return self._embedding_cache[cache_key] # Normalize text words = text.lower().replace(',', ' ').replace('.', ' ').split() # Initialize embedding vector embedding = np.zeros(self.config.embedding_dim) # Generate base embedding using word importance and semantic clusters for i, word in enumerate(words): # Get word weight word_weight = self._common_words.get(word, 0.05) # Position weight (earlier words are more important) position_weight = 1.0 / (i + 1) * 0.1 # Generate word-specific vector word_hash = hash(word) % self.config.embedding_dim word_vector = np.zeros(self.config.embedding_dim) # Create sparse vector based on word hash for j in range(min(10, self.config.embedding_dim)): # Use 10 dimensions per word idx = (word_hash + j * 31) % self.config.embedding_dim word_vector[idx] = word_weight + position_weight # Add semantic cluster information for cluster_name, cluster_words in self._semantic_clusters.items(): if word in cluster_words: cluster_hash = hash(cluster_name) % self.config.embedding_dim for k in range(5): # Use 5 dimensions for cluster idx = (cluster_hash + k * 17) % self.config.embedding_dim word_vector[idx] += 0.1 embedding += word_vector # Normalize the embedding if np.linalg.norm(embedding) > 0: embedding = embedding / np.linalg.norm(embedding) # Add some text-specific characteristics text_length_factor = min(len(text) / 100.0, 1.0) # Text length influence text_complexity = len(set(words)) / max(len(words), 1) # Vocabulary richness # Apply text characteristics to embedding embedding[0] = text_length_factor embedding[1] = text_complexity # Convert to list and cache result = embedding.tolist() self._embedding_cache[cache_key] = result return result async def _call_vscode_embedder(self, input_data: str | list[str]) -> list[float] | list[list[float]]: """ Call VS Code's embedding service through available integration methods. """ try: # Method 1: Try VS Code extension API for embeddings result = await self._try_vscode_embedding_api(input_data) if result: return result # Method 2: Try MCP protocol for embeddings result = await self._try_mcp_embedding_protocol(input_data) if result: return result # Method 3: Fallback to local embeddings return await self._fallback_embedding_generation(input_data) except Exception as e: logger.warning(f"VS Code embedding integration failed, using fallback: {e}") return await self._fallback_embedding_generation(input_data) async def _try_vscode_embedding_api(self, input_data: str | list[str]) -> list[float] | list[list[float]] | None: """Try to use VS Code extension API for embeddings.""" try: # This would integrate with VS Code's embedding API # In a real implementation, this would use VS Code's extension context # For now, return None to indicate this method is not available return None except Exception: return None async def _try_mcp_embedding_protocol(self, input_data: str | list[str]) -> list[float] | list[list[float]] | None: """Try to use MCP protocol to communicate with VS Code embedding service.""" try: # This would use MCP to communicate with VS Code's embedding server # Implementation would depend on available MCP clients and VS Code setup # For now, return None to indicate this method is not available return None except Exception: return None async def _fallback_embedding_generation(self, input_data: str | list[str]) -> list[float] | list[list[float]]: """ Generate fallback embeddings using local semantic analysis. """ if isinstance(input_data, str): return self._generate_fallback_embedding(input_data) else: # Batch processing return [self._generate_fallback_embedding(text) for text in input_data] async def create( self, input_data: str | list[str] | Iterable[int] | Iterable[Iterable[int]] ) -> list[float]: """ Create embeddings for input data. Args: input_data: Text string or list of strings to embed Returns: List of floats representing the embedding """ if not self.vscode_available and not self.config.use_fallback: raise RuntimeError("VS Code embeddings not available and fallback disabled") # Handle different input types if isinstance(input_data, str): text = input_data elif isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], str): # Take first string from list text = input_data[0] else: # Convert other iterables to string representation text = str(input_data) try: result = await self._call_vscode_embedder(text) if isinstance(result, list) and isinstance(result[0], (int, float)): return result[:self.config.embedding_dim] elif isinstance(result, list) and isinstance(result[0], list): return result[0][:self.config.embedding_dim] else: raise ValueError(f"Unexpected embedding result format: {type(result)}") except Exception as e: logger.error(f"Error creating VS Code embedding: {e}") if self.config.use_fallback: return self._generate_fallback_embedding(text) else: raise async def create_batch(self, input_data_list: list[str]) -> list[list[float]]: """ Create embeddings for a batch of input strings. Args: input_data_list: List of strings to embed Returns: List of embedding vectors """ if not self.vscode_available and not self.config.use_fallback: raise RuntimeError("VS Code embeddings not available and fallback disabled") try: result = await self._call_vscode_embedder(input_data_list) if isinstance(result, list) and len(result) > 0: if isinstance(result[0], list): # Batch result return [emb[:self.config.embedding_dim] for emb in result] else: # Single result, wrap in list return [result[:self.config.embedding_dim]] else: raise ValueError(f"Unexpected batch embedding result: {type(result)}") except Exception as e: logger.error(f"Error creating VS Code batch embeddings: {e}") if self.config.use_fallback: return [self._generate_fallback_embedding(text) for text in input_data_list] else: raise def get_embedding_info(self) -> dict[str, Any]: """Get information about the current embedding configuration.""" return { "provider": "vscode", "model": self.config.embedding_model, "embedding_dim": self.config.embedding_dim, "vscode_available": self.vscode_available, "use_fallback": self.config.use_fallback, "cache_size": len(self._embedding_cache), }