From db0818cd33694d8fb34d7cbfa535bae032ff8c41 Mon Sep 17 00:00:00 2001 From: andikarachman Date: Thu, 1 Jan 2026 15:46:53 +0700 Subject: [PATCH] feat(translation): implement multilingual content translation task - Add translation module with OpenAI, Google, Azure provider support - Implement language detection using langdetect - Add TranslatedContent and LanguageMetadata models - Integrate translation task into cognify pipeline - Add auto_translate parameter to cognify() function - Preserve original text alongside translations - Support custom translation providers and target languages --- cognee/api/v1/cognify/cognify.py | 89 +++++- .../llm/prompts/translate_content.txt | 19 ++ cognee/tasks/translation/__init__.py | 96 +++++++ cognee/tasks/translation/config.py | 63 +++++ cognee/tasks/translation/detect_language.py | 190 +++++++++++++ cognee/tasks/translation/exceptions.py | 53 ++++ cognee/tasks/translation/models.py | 72 +++++ .../tasks/translation/providers/__init__.py | 40 +++ .../translation/providers/azure_provider.py | 182 ++++++++++++ cognee/tasks/translation/providers/base.py | 69 +++++ .../translation/providers/google_provider.py | 159 +++++++++++ .../translation/providers/openai_provider.py | 107 +++++++ cognee/tasks/translation/translate_content.py | 265 ++++++++++++++++++ 13 files changed, 1397 insertions(+), 7 deletions(-) create mode 100644 cognee/infrastructure/llm/prompts/translate_content.txt create mode 100644 cognee/tasks/translation/__init__.py create mode 100644 cognee/tasks/translation/config.py create mode 100644 cognee/tasks/translation/detect_language.py create mode 100644 cognee/tasks/translation/exceptions.py create mode 100644 cognee/tasks/translation/models.py create mode 100644 cognee/tasks/translation/providers/__init__.py create mode 100644 cognee/tasks/translation/providers/azure_provider.py create mode 100644 cognee/tasks/translation/providers/base.py create mode 100644 cognee/tasks/translation/providers/google_provider.py create mode 100644 cognee/tasks/translation/providers/openai_provider.py create mode 100644 cognee/tasks/translation/translate_content.py diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index ffc903d68..50071caef 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -26,6 +26,8 @@ from cognee.tasks.documents import ( from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points from cognee.tasks.summarization import summarize_text +from cognee.tasks.translation import translate_content +from cognee.tasks.translation.config import TranslationProviderType from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor from cognee.tasks.temporal_graph.extract_events_and_entities import extract_events_and_timestamps from cognee.tasks.temporal_graph.extract_knowledge_graph_from_events import ( @@ -53,6 +55,9 @@ async def cognify( custom_prompt: Optional[str] = None, temporal_cognify: bool = False, data_per_batch: int = 20, + auto_translate: bool = False, + target_language: str = "en", + translation_provider: TranslationProviderType = None, **kwargs, ): """ @@ -118,6 +123,15 @@ async def cognify( If provided, this prompt will be used instead of the default prompts for knowledge graph extraction. The prompt should guide the LLM on how to extract entities and relationships from the text content. + auto_translate: If True, automatically detect and translate non-English content to the + target language before processing. Uses language detection to identify + content that needs translation. Defaults to False. + target_language: Target language code for translation (e.g., "en", "es", "fr"). + Only used when auto_translate=True. Defaults to "en" (English). + translation_provider: Translation service to use ("openai", "google", "azure"). + OpenAI uses the existing LLM infrastructure, Google requires + GOOGLE_TRANSLATE_API_KEY, Azure requires AZURE_TRANSLATOR_KEY. + If not specified, uses TRANSLATION_PROVIDER env var or defaults to "openai". Returns: Union[dict, list[PipelineRunInfo]]: @@ -182,6 +196,14 @@ async def cognify( run_in_background=True ) # Check status later with run_info.pipeline_run_id + + # Auto-translate multilingual content to English + await cognee.add("document_spanish.pdf") + await cognee.cognify( + auto_translate=True, + target_language="en", + translation_provider="openai" # or "google", "azure" + ) ``` @@ -193,6 +215,9 @@ async def cognify( - LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER - LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False) - LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60) + - TRANSLATION_PROVIDER: Default translation provider ("openai", "google", "azure") + - GOOGLE_TRANSLATE_API_KEY: API key for Google Translate + - AZURE_TRANSLATOR_KEY: API key for Azure Translator """ if config is None: ontology_config = get_ontology_env_config() @@ -213,7 +238,13 @@ async def cognify( if temporal_cognify: tasks = await get_temporal_tasks( - user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch + user=user, + chunker=chunker, + chunk_size=chunk_size, + chunks_per_batch=chunks_per_batch, + auto_translate=auto_translate, + target_language=target_language, + translation_provider=translation_provider, ) else: tasks = await get_default_tasks( @@ -224,6 +255,9 @@ async def cognify( config=config, custom_prompt=custom_prompt, chunks_per_batch=chunks_per_batch, + auto_translate=auto_translate, + target_language=target_language, + translation_provider=translation_provider, **kwargs, ) @@ -253,6 +287,9 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's config: Config = None, custom_prompt: Optional[str] = None, chunks_per_batch: int = 100, + auto_translate: bool = False, + target_language: str = "en", + translation_provider: TranslationProviderType = None, **kwargs, ) -> list[Task]: if config is None: @@ -285,6 +322,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), # Extract text chunks based on the document type. + ] + + # Add translation task if auto_translate is enabled + if auto_translate: + default_tasks.append( + Task( + translate_content, + target_language=target_language, + translation_provider=translation_provider, + task_config={"batch_size": chunks_per_batch}, + ) + ) + + default_tasks.extend([ Task( extract_graph_from_data, graph_model=graph_model, @@ -302,13 +353,19 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's embed_triplets=embed_triplets, task_config={"batch_size": chunks_per_batch}, ), - ] + ]) return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10 + user: User = None, + chunker=TextChunker, + chunk_size: int = None, + chunks_per_batch: int = 10, + auto_translate: bool = False, + target_language: str = "en", + translation_provider: TranslationProviderType = None, ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -316,15 +373,19 @@ async def get_temporal_tasks( The pipeline includes: 1. Document classification. 2. Document chunking with a specified or default chunk size. - 3. Event and timestamp extraction from chunks. - 4. Knowledge graph extraction from events. - 5. Batched insertion of data points. + 3. (Optional) Translation of non-English content to target language. + 4. Event and timestamp extraction from chunks. + 5. Knowledge graph extraction from events. + 6. Batched insertion of data points. Args: user (User, optional): The user requesting task execution. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify + auto_translate (bool, optional): If True, translate non-English content. Defaults to False. + target_language (str, optional): Target language for translation. Defaults to "en". + translation_provider (str, optional): Translation provider to use ("openai", "google", "azure"). Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. @@ -339,9 +400,23 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), + ] + + # Add translation task if auto_translate is enabled + if auto_translate: + temporal_tasks.append( + Task( + translate_content, + target_language=target_language, + translation_provider=translation_provider, + task_config={"batch_size": chunks_per_batch}, + ) + ) + + temporal_tasks.extend([ Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}), Task(extract_knowledge_graph_from_events), Task(add_data_points, task_config={"batch_size": chunks_per_batch}), - ] + ]) return temporal_tasks diff --git a/cognee/infrastructure/llm/prompts/translate_content.txt b/cognee/infrastructure/llm/prompts/translate_content.txt new file mode 100644 index 000000000..759e83f31 --- /dev/null +++ b/cognee/infrastructure/llm/prompts/translate_content.txt @@ -0,0 +1,19 @@ +You are an expert translator with deep knowledge of languages, cultures, and linguistics. + +Your task is to: +1. Detect the source language of the provided text if not specified +2. Translate the text accurately to the target language +3. Preserve the original meaning, tone, and intent +4. Maintain proper grammar and natural phrasing in the target language + +Guidelines: +- Preserve technical terms, proper nouns, and specialized vocabulary appropriately +- Maintain formatting such as paragraphs, lists, and emphasis where applicable +- If the text contains code, URLs, or other non-translatable content, preserve them as-is +- Handle idioms and cultural references thoughtfully, adapting when necessary +- Ensure the translation reads naturally to a native speaker of the target language + +Provide the translation in a structured format with: +- The translated text +- The detected source language (ISO 639-1 code like "en", "es", "fr", "de", etc.) +- Any notes about the translation (optional, for ambiguous terms or cultural adaptations) diff --git a/cognee/tasks/translation/__init__.py b/cognee/tasks/translation/__init__.py new file mode 100644 index 000000000..b9836160c --- /dev/null +++ b/cognee/tasks/translation/__init__.py @@ -0,0 +1,96 @@ +""" +Translation task for Cognee. + +This module provides multilingual content translation capabilities, +allowing automatic detection and translation of non-English content +to a target language while preserving original text and metadata. + +Main Components: +- translate_content: Main task function for translating document chunks +- translate_text: Convenience function for translating single texts +- batch_translate_texts: Batch translation for multiple texts +- detect_language: Language detection utility +- TranslatedContent: DataPoint model for translated content +- LanguageMetadata: DataPoint model for language information + +Supported Translation Providers: +- OpenAI (default): Uses GPT models via existing LLM infrastructure +- Google Translate: Requires google-cloud-translate package +- Azure Translator: Requires Azure Translator API key + +Example Usage: + ```python + from cognee.tasks.translation import translate_content, translate_text + + # Translate document chunks in a pipeline + translated_chunks = await translate_content( + chunks, + target_language="en", + translation_provider="openai" + ) + + # Translate a single text + result = await translate_text("Bonjour le monde!") + print(result.translated_text) # "Hello world!" + ``` +""" + +from .config import get_translation_config, TranslationConfig +from .detect_language import ( + detect_language, + detect_language_async, + LanguageDetectionResult, + get_language_name, +) +from .exceptions import ( + TranslationError, + LanguageDetectionError, + TranslationProviderError, + UnsupportedLanguageError, + TranslationConfigError, +) +from .models import TranslatedContent, LanguageMetadata +from .providers import ( + TranslationProvider, + TranslationResult, + get_translation_provider, + OpenAITranslationProvider, + GoogleTranslationProvider, + AzureTranslationProvider, +) +from .translate_content import ( + translate_content, + translate_text, + batch_translate_texts, +) + +__all__ = [ + # Main task functions + "translate_content", + "translate_text", + "batch_translate_texts", + # Language detection + "detect_language", + "detect_language_async", + "LanguageDetectionResult", + "get_language_name", + # Models + "TranslatedContent", + "LanguageMetadata", + # Configuration + "get_translation_config", + "TranslationConfig", + # Providers + "TranslationProvider", + "TranslationResult", + "get_translation_provider", + "OpenAITranslationProvider", + "GoogleTranslationProvider", + "AzureTranslationProvider", + # Exceptions + "TranslationError", + "LanguageDetectionError", + "TranslationProviderError", + "UnsupportedLanguageError", + "TranslationConfigError", +] diff --git a/cognee/tasks/translation/config.py b/cognee/tasks/translation/config.py new file mode 100644 index 000000000..99ed560de --- /dev/null +++ b/cognee/tasks/translation/config.py @@ -0,0 +1,63 @@ +from functools import lru_cache +from typing import Literal, Optional + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +TranslationProviderType = Literal["openai", "google", "azure"] + + +class TranslationConfig(BaseSettings): + """ + Configuration settings for the translation task. + + Environment variables can be used to configure these settings: + - TRANSLATION_PROVIDER: The translation service to use + - TRANSLATION_TARGET_LANGUAGE: Default target language + - TRANSLATION_CONFIDENCE_THRESHOLD: Minimum confidence for language detection + - GOOGLE_TRANSLATE_API_KEY: API key for Google Translate + - AZURE_TRANSLATOR_KEY: API key for Azure Translator + - AZURE_TRANSLATOR_REGION: Region for Azure Translator + """ + + # Translation provider settings + translation_provider: TranslationProviderType = "openai" + target_language: str = "en" + confidence_threshold: float = 0.8 + + # Google Translate settings + google_translate_api_key: Optional[str] = None + google_project_id: Optional[str] = None + + # Azure Translator settings + azure_translator_key: Optional[str] = None + azure_translator_region: Optional[str] = None + azure_translator_endpoint: str = "https://api.cognitive.microsofttranslator.com" + + # OpenAI uses the existing LLM configuration + + # Performance settings + batch_size: int = 10 + max_retries: int = 3 + timeout_seconds: int = 30 + + # Language detection settings + min_text_length_for_detection: int = 10 + skip_detection_for_short_text: bool = True + + model_config = SettingsConfigDict(env_file=".env", extra="allow") + + def to_dict(self) -> dict: + return { + "translation_provider": self.translation_provider, + "target_language": self.target_language, + "confidence_threshold": self.confidence_threshold, + "batch_size": self.batch_size, + "max_retries": self.max_retries, + } + + +@lru_cache +def get_translation_config() -> TranslationConfig: + """Get the translation configuration singleton.""" + return TranslationConfig() diff --git a/cognee/tasks/translation/detect_language.py b/cognee/tasks/translation/detect_language.py new file mode 100644 index 000000000..e223083c0 --- /dev/null +++ b/cognee/tasks/translation/detect_language.py @@ -0,0 +1,190 @@ +from dataclasses import dataclass +from typing import Optional + +from cognee.shared.logging_utils import get_logger + +from .config import get_translation_config +from .exceptions import LanguageDetectionError + +logger = get_logger(__name__) + + +# ISO 639-1 language code to name mapping +LANGUAGE_NAMES = { + "af": "Afrikaans", + "ar": "Arabic", + "bg": "Bulgarian", + "bn": "Bengali", + "ca": "Catalan", + "cs": "Czech", + "cy": "Welsh", + "da": "Danish", + "de": "German", + "el": "Greek", + "en": "English", + "es": "Spanish", + "et": "Estonian", + "fa": "Persian", + "fi": "Finnish", + "fr": "French", + "gu": "Gujarati", + "he": "Hebrew", + "hi": "Hindi", + "hr": "Croatian", + "hu": "Hungarian", + "id": "Indonesian", + "it": "Italian", + "ja": "Japanese", + "kn": "Kannada", + "ko": "Korean", + "lt": "Lithuanian", + "lv": "Latvian", + "mk": "Macedonian", + "ml": "Malayalam", + "mr": "Marathi", + "ne": "Nepali", + "nl": "Dutch", + "no": "Norwegian", + "pa": "Punjabi", + "pl": "Polish", + "pt": "Portuguese", + "ro": "Romanian", + "ru": "Russian", + "sk": "Slovak", + "sl": "Slovenian", + "so": "Somali", + "sq": "Albanian", + "sv": "Swedish", + "sw": "Swahili", + "ta": "Tamil", + "te": "Telugu", + "th": "Thai", + "tl": "Tagalog", + "tr": "Turkish", + "uk": "Ukrainian", + "ur": "Urdu", + "vi": "Vietnamese", + "zh-cn": "Chinese (Simplified)", + "zh-tw": "Chinese (Traditional)", +} + + +@dataclass +class LanguageDetectionResult: + """Result of language detection.""" + + language_code: str + language_name: str + confidence: float + requires_translation: bool + character_count: int + + +def get_language_name(language_code: str) -> str: + """Get the human-readable name for a language code.""" + return LANGUAGE_NAMES.get(language_code.lower(), language_code) + + +def detect_language( + text: str, + target_language: str = "en", + confidence_threshold: float = None, +) -> LanguageDetectionResult: + """ + Detect the language of the given text. + + Uses the langdetect library which is already a dependency of cognee. + + Args: + text: The text to analyze + target_language: The target language for translation comparison + confidence_threshold: Minimum confidence to consider detection reliable + + Returns: + LanguageDetectionResult with language info and translation requirement + + Raises: + LanguageDetectionError: If language detection fails + """ + config = get_translation_config() + threshold = confidence_threshold or config.confidence_threshold + + # Handle empty or very short text + if not text or len(text.strip()) < config.min_text_length_for_detection: + if config.skip_detection_for_short_text: + return LanguageDetectionResult( + language_code="unknown", + language_name="Unknown", + confidence=0.0, + requires_translation=False, + character_count=len(text) if text else 0, + ) + else: + raise LanguageDetectionError( + f"Text too short for reliable language detection: {len(text)} characters" + ) + + try: + from langdetect import detect_langs, LangDetectException + except ImportError: + raise LanguageDetectionError( + "langdetect is required for language detection. Install it with: pip install langdetect" + ) + + try: + # Get detection results with probabilities + detections = detect_langs(text) + + if not detections: + raise LanguageDetectionError("No language detected") + + # Get the most likely language + best_detection = detections[0] + language_code = best_detection.lang + confidence = best_detection.prob + + # Check if translation is needed + requires_translation = ( + language_code.lower() != target_language.lower() and confidence >= threshold + ) + + return LanguageDetectionResult( + language_code=language_code, + language_name=get_language_name(language_code), + confidence=confidence, + requires_translation=requires_translation, + character_count=len(text), + ) + + except LangDetectException as e: + logger.warning(f"Language detection failed: {e}") + raise LanguageDetectionError(f"Language detection failed: {e}", original_error=e) + except Exception as e: + logger.error(f"Unexpected error during language detection: {e}") + raise LanguageDetectionError( + f"Unexpected error during language detection: {e}", original_error=e + ) + + +async def detect_language_async( + text: str, + target_language: str = "en", + confidence_threshold: float = None, +) -> LanguageDetectionResult: + """ + Async wrapper for language detection. + + Args: + text: The text to analyze + target_language: The target language for translation comparison + confidence_threshold: Minimum confidence to consider detection reliable + + Returns: + LanguageDetectionResult with language info and translation requirement + """ + import asyncio + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, detect_language, text, target_language, confidence_threshold + ) diff --git a/cognee/tasks/translation/exceptions.py b/cognee/tasks/translation/exceptions.py new file mode 100644 index 000000000..322e00c7a --- /dev/null +++ b/cognee/tasks/translation/exceptions.py @@ -0,0 +1,53 @@ +class TranslationError(Exception): + """Base exception for translation errors.""" + + def __init__(self, message: str, original_error: Exception = None): + self.message = message + self.original_error = original_error + super().__init__(self.message) + + +class LanguageDetectionError(TranslationError): + """Exception raised when language detection fails.""" + + def __init__(self, message: str = "Failed to detect language", original_error: Exception = None): + super().__init__(message, original_error) + + +class TranslationProviderError(TranslationError): + """Exception raised when the translation provider encounters an error.""" + + def __init__( + self, + provider: str, + message: str = "Translation provider error", + original_error: Exception = None, + ): + self.provider = provider + full_message = f"[{provider}] {message}" + super().__init__(full_message, original_error) + + +class UnsupportedLanguageError(TranslationError): + """Exception raised when the language is not supported.""" + + def __init__( + self, + language: str, + provider: str = None, + message: str = None, + ): + self.language = language + self.provider = provider + if message is None: + message = f"Language '{language}' is not supported" + if provider: + message += f" by {provider}" + super().__init__(message) + + +class TranslationConfigError(TranslationError): + """Exception raised when translation configuration is invalid.""" + + def __init__(self, message: str = "Invalid translation configuration"): + super().__init__(message) diff --git a/cognee/tasks/translation/models.py b/cognee/tasks/translation/models.py new file mode 100644 index 000000000..12854c965 --- /dev/null +++ b/cognee/tasks/translation/models.py @@ -0,0 +1,72 @@ +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from cognee.infrastructure.engine import DataPoint +from cognee.modules.chunking.models import DocumentChunk + + +class TranslatedContent(DataPoint): + """ + Represents translated content with quality metrics. + + This class stores both the original and translated versions of content, + along with metadata about the translation process including source and + target languages, translation provider used, and confidence scores. + + Instance variables include: + + - original_chunk_id: UUID of the original document chunk + - original_text: The original text before translation + - translated_text: The translated text content + - source_language: Detected or specified source language code (e.g., "es", "fr", "de") + - target_language: Target language code for translation (default: "en") + - translation_provider: Name of the translation service used + - confidence_score: Translation quality/confidence score (0.0 to 1.0) + - translation_timestamp: When the translation was performed + - translated_from: Reference to the original DocumentChunk + """ + + original_chunk_id: UUID + original_text: str + translated_text: str + source_language: str + target_language: str = "en" + translation_provider: str + confidence_score: float + translation_timestamp: datetime = None + translated_from: Optional[DocumentChunk] = None + + metadata: dict = {"index_fields": ["source_language", "original_chunk_id", "translated_text"]} + + def __init__(self, **data): + if data.get("translation_timestamp") is None: + data["translation_timestamp"] = datetime.now(timezone.utc) + super().__init__(**data) + + +class LanguageMetadata(DataPoint): + """ + Language information for content. + + This class stores metadata about the detected language of content, + including confidence scores and whether translation is required. + + Instance variables include: + + - content_id: UUID of the associated content + - detected_language: ISO 639-1 language code (e.g., "en", "es", "fr") + - language_confidence: Confidence score for language detection (0.0 to 1.0) + - requires_translation: Whether the content needs translation + - character_count: Number of characters in the content + - language_name: Human-readable language name (e.g., "English", "Spanish") + """ + + content_id: UUID + detected_language: str + language_confidence: float + requires_translation: bool + character_count: int + language_name: Optional[str] = None + + metadata: dict = {"index_fields": ["detected_language", "content_id"]} diff --git a/cognee/tasks/translation/providers/__init__.py b/cognee/tasks/translation/providers/__init__.py new file mode 100644 index 000000000..79a28a586 --- /dev/null +++ b/cognee/tasks/translation/providers/__init__.py @@ -0,0 +1,40 @@ +from .base import TranslationProvider, TranslationResult +from .openai_provider import OpenAITranslationProvider +from .google_provider import GoogleTranslationProvider +from .azure_provider import AzureTranslationProvider + +__all__ = [ + "TranslationProvider", + "TranslationResult", + "OpenAITranslationProvider", + "GoogleTranslationProvider", + "AzureTranslationProvider", +] + + +def get_translation_provider(provider_name: str) -> TranslationProvider: + """ + Factory function to get the appropriate translation provider. + + Args: + provider_name: Name of the provider ("openai", "google", or "azure") + + Returns: + TranslationProvider instance + + Raises: + ValueError: If the provider name is not recognized + """ + providers = { + "openai": OpenAITranslationProvider, + "google": GoogleTranslationProvider, + "azure": AzureTranslationProvider, + } + + if provider_name.lower() not in providers: + raise ValueError( + f"Unknown translation provider: {provider_name}. " + f"Available providers: {list(providers.keys())}" + ) + + return providers[provider_name.lower()]() diff --git a/cognee/tasks/translation/providers/azure_provider.py b/cognee/tasks/translation/providers/azure_provider.py new file mode 100644 index 000000000..4618834ff --- /dev/null +++ b/cognee/tasks/translation/providers/azure_provider.py @@ -0,0 +1,182 @@ +import asyncio +from typing import Optional + +import aiohttp + +from cognee.shared.logging_utils import get_logger + +from .base import TranslationProvider, TranslationResult +from ..config import get_translation_config + +logger = get_logger(__name__) + + +class AzureTranslationProvider(TranslationProvider): + """ + Translation provider using Azure Translator API. + + Requires: + - AZURE_TRANSLATOR_KEY environment variable + - AZURE_TRANSLATOR_REGION environment variable (optional) + """ + + def __init__(self): + self._config = get_translation_config() + + @property + def provider_name(self) -> str: + return "azure" + + def is_available(self) -> bool: + """Check if Azure Translator is available.""" + return self._config.azure_translator_key is not None + + async def translate( + self, + text: str, + target_language: str = "en", + source_language: Optional[str] = None, + ) -> TranslationResult: + """ + Translate text using Azure Translator API. + + Args: + text: The text to translate + target_language: Target language code (default: "en") + source_language: Source language code (optional) + + Returns: + TranslationResult with translated text and metadata + """ + if not self.is_available(): + raise ValueError( + "Azure Translator API key not configured. " + "Set AZURE_TRANSLATOR_KEY environment variable." + ) + + endpoint = f"{self._config.azure_translator_endpoint}/translate" + + params = { + "api-version": "3.0", + "to": target_language, + } + if source_language: + params["from"] = source_language + + headers = { + "Ocp-Apim-Subscription-Key": self._config.azure_translator_key, + "Content-Type": "application/json", + } + if self._config.azure_translator_region: + headers["Ocp-Apim-Subscription-Region"] = self._config.azure_translator_region + + body = [{"text": text}] + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, + params=params, + headers=headers, + json=body, + timeout=aiohttp.ClientTimeout(total=self._config.timeout_seconds), + ) as response: + response.raise_for_status() + result = await response.json() + + translation = result[0]["translations"][0] + detected_language = result[0].get("detectedLanguage", {}) + + return TranslationResult( + translated_text=translation["text"], + source_language=source_language + or detected_language.get("language", "unknown"), + target_language=target_language, + confidence_score=detected_language.get("score", 0.9), + provider=self.provider_name, + raw_response=result[0], + ) + + except Exception as e: + logger.error(f"Azure translation failed: {e}") + raise + + async def translate_batch( + self, + texts: list[str], + target_language: str = "en", + source_language: Optional[str] = None, + ) -> list[TranslationResult]: + """ + Translate multiple texts using Azure Translator API. + + Azure Translator supports up to 100 texts per request. + + Args: + texts: List of texts to translate + target_language: Target language code + source_language: Source language code (optional) + + Returns: + List of TranslationResult objects + """ + if not self.is_available(): + raise ValueError("Azure Translator API key not configured.") + + endpoint = f"{self._config.azure_translator_endpoint}/translate" + + params = { + "api-version": "3.0", + "to": target_language, + } + if source_language: + params["from"] = source_language + + headers = { + "Ocp-Apim-Subscription-Key": self._config.azure_translator_key, + "Content-Type": "application/json", + } + if self._config.azure_translator_region: + headers["Ocp-Apim-Subscription-Region"] = self._config.azure_translator_region + + # Azure supports up to 100 texts per request + batch_size = min(100, self._config.batch_size) + all_results = [] + + for i in range(0, len(texts), batch_size): + batch = texts[i : i + batch_size] + body = [{"text": text} for text in batch] + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, + params=params, + headers=headers, + json=body, + timeout=aiohttp.ClientTimeout(total=self._config.timeout_seconds), + ) as response: + response.raise_for_status() + results = await response.json() + + for result in results: + translation = result["translations"][0] + detected_language = result.get("detectedLanguage", {}) + + all_results.append( + TranslationResult( + translated_text=translation["text"], + source_language=source_language + or detected_language.get("language", "unknown"), + target_language=target_language, + confidence_score=detected_language.get("score", 0.9), + provider=self.provider_name, + raw_response=result, + ) + ) + + except Exception as e: + logger.error(f"Azure batch translation failed: {e}") + raise + + return all_results diff --git a/cognee/tasks/translation/providers/base.py b/cognee/tasks/translation/providers/base.py new file mode 100644 index 000000000..d8e5e981e --- /dev/null +++ b/cognee/tasks/translation/providers/base.py @@ -0,0 +1,69 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class TranslationResult: + """Result of a translation operation.""" + + translated_text: str + source_language: str + target_language: str + confidence_score: float + provider: str + raw_response: Optional[dict] = None + + +class TranslationProvider(ABC): + """Abstract base class for translation providers.""" + + @property + @abstractmethod + def provider_name(self) -> str: + """Return the name of this translation provider.""" + pass + + @abstractmethod + async def translate( + self, + text: str, + target_language: str = "en", + source_language: Optional[str] = None, + ) -> TranslationResult: + """ + Translate text to the target language. + + Args: + text: The text to translate + target_language: Target language code (default: "en") + source_language: Source language code (optional, will be auto-detected if not provided) + + Returns: + TranslationResult with translated text and metadata + """ + pass + + @abstractmethod + async def translate_batch( + self, + texts: list[str], + target_language: str = "en", + source_language: Optional[str] = None, + ) -> list[TranslationResult]: + """ + Translate multiple texts to the target language. + + Args: + texts: List of texts to translate + target_language: Target language code (default: "en") + source_language: Source language code (optional) + + Returns: + List of TranslationResult objects + """ + pass + + def is_available(self) -> bool: + """Check if this provider is available (has required credentials).""" + return True diff --git a/cognee/tasks/translation/providers/google_provider.py b/cognee/tasks/translation/providers/google_provider.py new file mode 100644 index 000000000..0a7373b54 --- /dev/null +++ b/cognee/tasks/translation/providers/google_provider.py @@ -0,0 +1,159 @@ +import asyncio +from typing import Optional + +from cognee.shared.logging_utils import get_logger + +from .base import TranslationProvider, TranslationResult +from ..config import get_translation_config + +logger = get_logger(__name__) + + +class GoogleTranslationProvider(TranslationProvider): + """ + Translation provider using Google Cloud Translation API. + + Requires: + - google-cloud-translate package + - GOOGLE_TRANSLATE_API_KEY or GOOGLE_PROJECT_ID environment variable + """ + + def __init__(self): + self._client = None + self._config = get_translation_config() + + @property + def provider_name(self) -> str: + return "google" + + def _get_client(self): + """Lazy initialization of Google Translate client.""" + if self._client is None: + try: + from google.cloud import translate_v2 as translate + + self._client = translate.Client() + except ImportError: + raise ImportError( + "google-cloud-translate is required for Google translation. " + "Install it with: pip install google-cloud-translate" + ) + except Exception as e: + logger.error(f"Failed to initialize Google Translate client: {e}") + raise + return self._client + + def is_available(self) -> bool: + """Check if Google Translate is available.""" + try: + self._get_client() + return True + except Exception: + return False + + async def translate( + self, + text: str, + target_language: str = "en", + source_language: Optional[str] = None, + ) -> TranslationResult: + """ + Translate text using Google Translate API. + + Args: + text: The text to translate + target_language: Target language code (default: "en") + source_language: Source language code (optional) + + Returns: + TranslationResult with translated text and metadata + """ + try: + client = self._get_client() + + # Run in thread pool since google-cloud-translate is synchronous + loop = asyncio.get_event_loop() + + if source_language: + result = await loop.run_in_executor( + None, + lambda: client.translate( + text, target_language=target_language, source_language=source_language + ), + ) + else: + result = await loop.run_in_executor( + None, lambda: client.translate(text, target_language=target_language) + ) + + detected_language = result.get("detectedSourceLanguage", source_language or "unknown") + + return TranslationResult( + translated_text=result["translatedText"], + source_language=detected_language, + target_language=target_language, + confidence_score=0.9, # Google Translate is generally reliable + provider=self.provider_name, + raw_response=result, + ) + + except Exception as e: + logger.error(f"Google translation failed: {e}") + raise + + async def translate_batch( + self, + texts: list[str], + target_language: str = "en", + source_language: Optional[str] = None, + ) -> list[TranslationResult]: + """ + Translate multiple texts using Google Translate API. + + Google Translate supports batch translation natively. + + Args: + texts: List of texts to translate + target_language: Target language code + source_language: Source language code (optional) + + Returns: + List of TranslationResult objects + """ + try: + client = self._get_client() + loop = asyncio.get_event_loop() + + if source_language: + results = await loop.run_in_executor( + None, + lambda: client.translate( + texts, target_language=target_language, source_language=source_language + ), + ) + else: + results = await loop.run_in_executor( + None, lambda: client.translate(texts, target_language=target_language) + ) + + translation_results = [] + for result in results: + detected_language = result.get( + "detectedSourceLanguage", source_language or "unknown" + ) + translation_results.append( + TranslationResult( + translated_text=result["translatedText"], + source_language=detected_language, + target_language=target_language, + confidence_score=0.9, + provider=self.provider_name, + raw_response=result, + ) + ) + + return translation_results + + except Exception as e: + logger.error(f"Google batch translation failed: {e}") + raise diff --git a/cognee/tasks/translation/providers/openai_provider.py b/cognee/tasks/translation/providers/openai_provider.py new file mode 100644 index 000000000..2c70c6edb --- /dev/null +++ b/cognee/tasks/translation/providers/openai_provider.py @@ -0,0 +1,107 @@ +import asyncio +from typing import Optional + +from pydantic import BaseModel + +from cognee.infrastructure.llm.LLMGateway import LLMGateway +from cognee.infrastructure.llm.prompts import read_query_prompt +from cognee.shared.logging_utils import get_logger + +from .base import TranslationProvider, TranslationResult + +logger = get_logger(__name__) + + +class TranslationOutput(BaseModel): + """Pydantic model for structured translation output from LLM.""" + + translated_text: str + detected_source_language: str + translation_notes: Optional[str] = None + + +class OpenAITranslationProvider(TranslationProvider): + """ + Translation provider using OpenAI's LLM for translation. + + This provider leverages the existing LLM infrastructure in Cognee + to perform translations using GPT models. + """ + + @property + def provider_name(self) -> str: + return "openai" + + async def translate( + self, + text: str, + target_language: str = "en", + source_language: Optional[str] = None, + ) -> TranslationResult: + """ + Translate text using OpenAI's LLM. + + Args: + text: The text to translate + target_language: Target language code (default: "en") + source_language: Source language code (optional) + + Returns: + TranslationResult with translated text and metadata + """ + try: + system_prompt = read_query_prompt("translate_content.txt") + + # Build the input with context + if source_language: + input_text = ( + f"Translate the following text from {source_language} to {target_language}.\n\n" + f"Text to translate:\n{text}" + ) + else: + input_text = ( + f"Translate the following text to {target_language}. " + f"First detect the source language.\n\n" + f"Text to translate:\n{text}" + ) + + result = await LLMGateway.acreate_structured_output( + text_input=input_text, + system_prompt=system_prompt, + response_model=TranslationOutput, + ) + + return TranslationResult( + translated_text=result.translated_text, + source_language=source_language or result.detected_source_language, + target_language=target_language, + confidence_score=0.95, # LLM translations are generally high quality + provider=self.provider_name, + raw_response={"notes": result.translation_notes}, + ) + + except Exception as e: + logger.error(f"OpenAI translation failed: {e}") + raise + + async def translate_batch( + self, + texts: list[str], + target_language: str = "en", + source_language: Optional[str] = None, + ) -> list[TranslationResult]: + """ + Translate multiple texts using OpenAI's LLM. + + Args: + texts: List of texts to translate + target_language: Target language code + source_language: Source language code (optional) + + Returns: + List of TranslationResult objects + """ + tasks = [ + self.translate(text, target_language, source_language) for text in texts + ] + return await asyncio.gather(*tasks) diff --git a/cognee/tasks/translation/translate_content.py b/cognee/tasks/translation/translate_content.py new file mode 100644 index 000000000..e200f659d --- /dev/null +++ b/cognee/tasks/translation/translate_content.py @@ -0,0 +1,265 @@ +import asyncio +from typing import List, Optional +from uuid import uuid5 + +from cognee.modules.chunking.models import DocumentChunk +from cognee.shared.logging_utils import get_logger + +from .config import get_translation_config, TranslationProviderType +from .detect_language import detect_language_async, LanguageDetectionResult +from .exceptions import TranslationError, LanguageDetectionError +from .models import TranslatedContent, LanguageMetadata +from .providers import get_translation_provider, TranslationResult + +logger = get_logger(__name__) + + +async def translate_content( + data_chunks: List[DocumentChunk], + target_language: str = "en", + translation_provider: TranslationProviderType = None, + confidence_threshold: float = None, + skip_if_target_language: bool = True, + preserve_original: bool = True, +) -> List[DocumentChunk]: + """ + Translate non-English content to the target language. + + This task detects the language of each document chunk and translates + non-target-language content using the specified translation provider. + Original text is preserved alongside translated versions. + + Args: + data_chunks: List of DocumentChunk objects to process + target_language: Target language code (default: "en" for English) + translation_provider: Translation service to use ("openai", "google", "azure") + If not provided, uses config default + confidence_threshold: Minimum confidence for language detection (0.0 to 1.0) + If not provided, uses config default + skip_if_target_language: If True, skip chunks already in target language + preserve_original: If True, store original text in TranslatedContent + + Returns: + List of DocumentChunk objects with translated content. + Chunks that required translation will have TranslatedContent + objects in their 'contains' list. + + Example: + ```python + from cognee.tasks.translation import translate_content + + # Translate chunks using default settings + translated_chunks = await translate_content(chunks) + + # Translate with specific provider + translated_chunks = await translate_content( + chunks, + translation_provider="openai", + confidence_threshold=0.9 + ) + ``` + """ + if not isinstance(data_chunks, list): + raise TranslationError("data_chunks must be a list") + + if len(data_chunks) == 0: + return data_chunks + + # Get configuration + config = get_translation_config() + provider_name = translation_provider or config.translation_provider + threshold = confidence_threshold or config.confidence_threshold + + logger.info( + f"Starting translation task for {len(data_chunks)} chunks " + f"using {provider_name} provider, target language: {target_language}" + ) + + # Get the translation provider + provider = get_translation_provider(provider_name) + + # Process chunks + processed_chunks = [] + + for chunk in data_chunks: + if not hasattr(chunk, "text") or not chunk.text: + processed_chunks.append(chunk) + continue + + try: + # Detect language + detection = await detect_language_async( + chunk.text, target_language, threshold + ) + + # Create language metadata + language_metadata = LanguageMetadata( + id=uuid5(chunk.id, "LanguageMetadata"), + content_id=chunk.id, + detected_language=detection.language_code, + language_confidence=detection.confidence, + requires_translation=detection.requires_translation, + character_count=detection.character_count, + language_name=detection.language_name, + ) + + # Skip if already in target language + if not detection.requires_translation: + if skip_if_target_language: + logger.debug( + f"Skipping chunk {chunk.id}: already in target language " + f"({detection.language_code})" + ) + # Add language metadata to chunk + _add_to_chunk_contains(chunk, language_metadata) + processed_chunks.append(chunk) + continue + + # Translate the content + logger.debug( + f"Translating chunk {chunk.id} from {detection.language_code} " + f"to {target_language}" + ) + + translation_result = await provider.translate( + text=chunk.text, + target_language=target_language, + source_language=detection.language_code, + ) + + # Create TranslatedContent data point + translated_content = TranslatedContent( + id=uuid5(chunk.id, "TranslatedContent"), + original_chunk_id=chunk.id, + original_text=chunk.text if preserve_original else "", + translated_text=translation_result.translated_text, + source_language=translation_result.source_language, + target_language=translation_result.target_language, + translation_provider=translation_result.provider, + confidence_score=translation_result.confidence_score, + translated_from=chunk, + ) + + # Update chunk text with translated content + chunk.text = translation_result.translated_text + + # Add metadata to chunk's contains list + _add_to_chunk_contains(chunk, language_metadata) + _add_to_chunk_contains(chunk, translated_content) + + processed_chunks.append(chunk) + + logger.debug( + f"Successfully translated chunk {chunk.id}: " + f"{detection.language_code} -> {target_language}" + ) + + except LanguageDetectionError as e: + logger.warning(f"Language detection failed for chunk {chunk.id}: {e}") + processed_chunks.append(chunk) + except TranslationError as e: + logger.error(f"Translation failed for chunk {chunk.id}: {e}") + processed_chunks.append(chunk) + except Exception as e: + logger.error(f"Unexpected error processing chunk {chunk.id}: {e}") + processed_chunks.append(chunk) + + logger.info(f"Translation task completed for {len(processed_chunks)} chunks") + return processed_chunks + + +def _add_to_chunk_contains(chunk: DocumentChunk, item) -> None: + """Helper to add an item to a chunk's contains list.""" + if chunk.contains is None: + chunk.contains = [] + chunk.contains.append(item) + + +async def translate_text( + text: str, + target_language: str = "en", + translation_provider: TranslationProviderType = None, + source_language: Optional[str] = None, +) -> TranslationResult: + """ + Translate a single text string. + + This is a convenience function for translating individual texts + without creating DocumentChunk objects. + + Args: + text: The text to translate + target_language: Target language code (default: "en") + translation_provider: Translation service to use + source_language: Source language code (optional, auto-detected if not provided) + + Returns: + TranslationResult with translated text and metadata + + Example: + ```python + from cognee.tasks.translation import translate_text + + result = await translate_text( + "Bonjour le monde!", + target_language="en" + ) + print(result.translated_text) # "Hello world!" + print(result.source_language) # "fr" + ``` + """ + config = get_translation_config() + provider_name = translation_provider or config.translation_provider + + provider = get_translation_provider(provider_name) + + return await provider.translate( + text=text, + target_language=target_language, + source_language=source_language, + ) + + +async def batch_translate_texts( + texts: List[str], + target_language: str = "en", + translation_provider: TranslationProviderType = None, + source_language: Optional[str] = None, +) -> List[TranslationResult]: + """ + Translate multiple text strings in batch. + + This is more efficient than translating texts individually, + especially for providers that support native batch operations. + + Args: + texts: List of texts to translate + target_language: Target language code (default: "en") + translation_provider: Translation service to use + source_language: Source language code (optional) + + Returns: + List of TranslationResult objects + + Example: + ```python + from cognee.tasks.translation import batch_translate_texts + + results = await batch_translate_texts( + ["Hola", "¿Cómo estás?", "Adiós"], + target_language="en" + ) + for result in results: + print(f"{result.source_language}: {result.translated_text}") + ``` + """ + config = get_translation_config() + provider_name = translation_provider or config.translation_provider + + provider = get_translation_provider(provider_name) + + return await provider.translate_batch( + texts=texts, + target_language=target_language, + source_language=source_language, + )