feat(translation): implement multilingual content translation task

- Add translation module with OpenAI, Google, Azure provider support
- Implement language detection using langdetect
- Add TranslatedContent and LanguageMetadata models
- Integrate translation task into cognify pipeline
- Add auto_translate parameter to cognify() function
- Preserve original text alongside translations
- Support custom translation providers and target languages
This commit is contained in:
andikarachman 2026-01-01 15:46:53 +07:00
parent 5b42b21af5
commit db0818cd33
13 changed files with 1397 additions and 7 deletions

View file

@ -26,6 +26,8 @@ from cognee.tasks.documents import (
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.tasks.translation import translate_content
from cognee.tasks.translation.config import TranslationProviderType
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.temporal_graph.extract_events_and_entities import extract_events_and_timestamps
from cognee.tasks.temporal_graph.extract_knowledge_graph_from_events import (
@ -53,6 +55,9 @@ async def cognify(
custom_prompt: Optional[str] = None,
temporal_cognify: bool = False,
data_per_batch: int = 20,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
**kwargs,
):
"""
@ -118,6 +123,15 @@ async def cognify(
If provided, this prompt will be used instead of the default prompts for
knowledge graph extraction. The prompt should guide the LLM on how to
extract entities and relationships from the text content.
auto_translate: If True, automatically detect and translate non-English content to the
target language before processing. Uses language detection to identify
content that needs translation. Defaults to False.
target_language: Target language code for translation (e.g., "en", "es", "fr").
Only used when auto_translate=True. Defaults to "en" (English).
translation_provider: Translation service to use ("openai", "google", "azure").
OpenAI uses the existing LLM infrastructure, Google requires
GOOGLE_TRANSLATE_API_KEY, Azure requires AZURE_TRANSLATOR_KEY.
If not specified, uses TRANSLATION_PROVIDER env var or defaults to "openai".
Returns:
Union[dict, list[PipelineRunInfo]]:
@ -182,6 +196,14 @@ async def cognify(
run_in_background=True
)
# Check status later with run_info.pipeline_run_id
# Auto-translate multilingual content to English
await cognee.add("document_spanish.pdf")
await cognee.cognify(
auto_translate=True,
target_language="en",
translation_provider="openai" # or "google", "azure"
)
```
@ -193,6 +215,9 @@ async def cognify(
- LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER
- LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False)
- LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60)
- TRANSLATION_PROVIDER: Default translation provider ("openai", "google", "azure")
- GOOGLE_TRANSLATE_API_KEY: API key for Google Translate
- AZURE_TRANSLATOR_KEY: API key for Azure Translator
"""
if config is None:
ontology_config = get_ontology_env_config()
@ -213,7 +238,13 @@ async def cognify(
if temporal_cognify:
tasks = await get_temporal_tasks(
user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch
user=user,
chunker=chunker,
chunk_size=chunk_size,
chunks_per_batch=chunks_per_batch,
auto_translate=auto_translate,
target_language=target_language,
translation_provider=translation_provider,
)
else:
tasks = await get_default_tasks(
@ -224,6 +255,9 @@ async def cognify(
config=config,
custom_prompt=custom_prompt,
chunks_per_batch=chunks_per_batch,
auto_translate=auto_translate,
target_language=target_language,
translation_provider=translation_provider,
**kwargs,
)
@ -253,6 +287,9 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
config: Config = None,
custom_prompt: Optional[str] = None,
chunks_per_batch: int = 100,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
**kwargs,
) -> list[Task]:
if config is None:
@ -285,6 +322,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker,
), # Extract text chunks based on the document type.
]
# Add translation task if auto_translate is enabled
if auto_translate:
default_tasks.append(
Task(
translate_content,
target_language=target_language,
translation_provider=translation_provider,
task_config={"batch_size": chunks_per_batch},
)
)
default_tasks.extend([
Task(
extract_graph_from_data,
graph_model=graph_model,
@ -302,13 +353,19 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
embed_triplets=embed_triplets,
task_config={"batch_size": chunks_per_batch},
),
]
])
return default_tasks
async def get_temporal_tasks(
user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10
user: User = None,
chunker=TextChunker,
chunk_size: int = None,
chunks_per_batch: int = 10,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
) -> list[Task]:
"""
Builds and returns a list of temporal processing tasks to be executed in sequence.
@ -316,15 +373,19 @@ async def get_temporal_tasks(
The pipeline includes:
1. Document classification.
2. Document chunking with a specified or default chunk size.
3. Event and timestamp extraction from chunks.
4. Knowledge graph extraction from events.
5. Batched insertion of data points.
3. (Optional) Translation of non-English content to target language.
4. Event and timestamp extraction from chunks.
5. Knowledge graph extraction from events.
6. Batched insertion of data points.
Args:
user (User, optional): The user requesting task execution.
chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker.
chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default.
chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify
auto_translate (bool, optional): If True, translate non-English content. Defaults to False.
target_language (str, optional): Target language for translation. Defaults to "en".
translation_provider (str, optional): Translation provider to use ("openai", "google", "azure").
Returns:
list[Task]: A list of Task objects representing the temporal processing pipeline.
@ -339,9 +400,23 @@ async def get_temporal_tasks(
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker,
),
]
# Add translation task if auto_translate is enabled
if auto_translate:
temporal_tasks.append(
Task(
translate_content,
target_language=target_language,
translation_provider=translation_provider,
task_config={"batch_size": chunks_per_batch},
)
)
temporal_tasks.extend([
Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}),
Task(extract_knowledge_graph_from_events),
Task(add_data_points, task_config={"batch_size": chunks_per_batch}),
]
])
return temporal_tasks

View file

@ -0,0 +1,19 @@
You are an expert translator with deep knowledge of languages, cultures, and linguistics.
Your task is to:
1. Detect the source language of the provided text if not specified
2. Translate the text accurately to the target language
3. Preserve the original meaning, tone, and intent
4. Maintain proper grammar and natural phrasing in the target language
Guidelines:
- Preserve technical terms, proper nouns, and specialized vocabulary appropriately
- Maintain formatting such as paragraphs, lists, and emphasis where applicable
- If the text contains code, URLs, or other non-translatable content, preserve them as-is
- Handle idioms and cultural references thoughtfully, adapting when necessary
- Ensure the translation reads naturally to a native speaker of the target language
Provide the translation in a structured format with:
- The translated text
- The detected source language (ISO 639-1 code like "en", "es", "fr", "de", etc.)
- Any notes about the translation (optional, for ambiguous terms or cultural adaptations)

View file

@ -0,0 +1,96 @@
"""
Translation task for Cognee.
This module provides multilingual content translation capabilities,
allowing automatic detection and translation of non-English content
to a target language while preserving original text and metadata.
Main Components:
- translate_content: Main task function for translating document chunks
- translate_text: Convenience function for translating single texts
- batch_translate_texts: Batch translation for multiple texts
- detect_language: Language detection utility
- TranslatedContent: DataPoint model for translated content
- LanguageMetadata: DataPoint model for language information
Supported Translation Providers:
- OpenAI (default): Uses GPT models via existing LLM infrastructure
- Google Translate: Requires google-cloud-translate package
- Azure Translator: Requires Azure Translator API key
Example Usage:
```python
from cognee.tasks.translation import translate_content, translate_text
# Translate document chunks in a pipeline
translated_chunks = await translate_content(
chunks,
target_language="en",
translation_provider="openai"
)
# Translate a single text
result = await translate_text("Bonjour le monde!")
print(result.translated_text) # "Hello world!"
```
"""
from .config import get_translation_config, TranslationConfig
from .detect_language import (
detect_language,
detect_language_async,
LanguageDetectionResult,
get_language_name,
)
from .exceptions import (
TranslationError,
LanguageDetectionError,
TranslationProviderError,
UnsupportedLanguageError,
TranslationConfigError,
)
from .models import TranslatedContent, LanguageMetadata
from .providers import (
TranslationProvider,
TranslationResult,
get_translation_provider,
OpenAITranslationProvider,
GoogleTranslationProvider,
AzureTranslationProvider,
)
from .translate_content import (
translate_content,
translate_text,
batch_translate_texts,
)
__all__ = [
# Main task functions
"translate_content",
"translate_text",
"batch_translate_texts",
# Language detection
"detect_language",
"detect_language_async",
"LanguageDetectionResult",
"get_language_name",
# Models
"TranslatedContent",
"LanguageMetadata",
# Configuration
"get_translation_config",
"TranslationConfig",
# Providers
"TranslationProvider",
"TranslationResult",
"get_translation_provider",
"OpenAITranslationProvider",
"GoogleTranslationProvider",
"AzureTranslationProvider",
# Exceptions
"TranslationError",
"LanguageDetectionError",
"TranslationProviderError",
"UnsupportedLanguageError",
"TranslationConfigError",
]

View file

@ -0,0 +1,63 @@
from functools import lru_cache
from typing import Literal, Optional
from pydantic_settings import BaseSettings, SettingsConfigDict
TranslationProviderType = Literal["openai", "google", "azure"]
class TranslationConfig(BaseSettings):
"""
Configuration settings for the translation task.
Environment variables can be used to configure these settings:
- TRANSLATION_PROVIDER: The translation service to use
- TRANSLATION_TARGET_LANGUAGE: Default target language
- TRANSLATION_CONFIDENCE_THRESHOLD: Minimum confidence for language detection
- GOOGLE_TRANSLATE_API_KEY: API key for Google Translate
- AZURE_TRANSLATOR_KEY: API key for Azure Translator
- AZURE_TRANSLATOR_REGION: Region for Azure Translator
"""
# Translation provider settings
translation_provider: TranslationProviderType = "openai"
target_language: str = "en"
confidence_threshold: float = 0.8
# Google Translate settings
google_translate_api_key: Optional[str] = None
google_project_id: Optional[str] = None
# Azure Translator settings
azure_translator_key: Optional[str] = None
azure_translator_region: Optional[str] = None
azure_translator_endpoint: str = "https://api.cognitive.microsofttranslator.com"
# OpenAI uses the existing LLM configuration
# Performance settings
batch_size: int = 10
max_retries: int = 3
timeout_seconds: int = 30
# Language detection settings
min_text_length_for_detection: int = 10
skip_detection_for_short_text: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow")
def to_dict(self) -> dict:
return {
"translation_provider": self.translation_provider,
"target_language": self.target_language,
"confidence_threshold": self.confidence_threshold,
"batch_size": self.batch_size,
"max_retries": self.max_retries,
}
@lru_cache
def get_translation_config() -> TranslationConfig:
"""Get the translation configuration singleton."""
return TranslationConfig()

View file

@ -0,0 +1,190 @@
from dataclasses import dataclass
from typing import Optional
from cognee.shared.logging_utils import get_logger
from .config import get_translation_config
from .exceptions import LanguageDetectionError
logger = get_logger(__name__)
# ISO 639-1 language code to name mapping
LANGUAGE_NAMES = {
"af": "Afrikaans",
"ar": "Arabic",
"bg": "Bulgarian",
"bn": "Bengali",
"ca": "Catalan",
"cs": "Czech",
"cy": "Welsh",
"da": "Danish",
"de": "German",
"el": "Greek",
"en": "English",
"es": "Spanish",
"et": "Estonian",
"fa": "Persian",
"fi": "Finnish",
"fr": "French",
"gu": "Gujarati",
"he": "Hebrew",
"hi": "Hindi",
"hr": "Croatian",
"hu": "Hungarian",
"id": "Indonesian",
"it": "Italian",
"ja": "Japanese",
"kn": "Kannada",
"ko": "Korean",
"lt": "Lithuanian",
"lv": "Latvian",
"mk": "Macedonian",
"ml": "Malayalam",
"mr": "Marathi",
"ne": "Nepali",
"nl": "Dutch",
"no": "Norwegian",
"pa": "Punjabi",
"pl": "Polish",
"pt": "Portuguese",
"ro": "Romanian",
"ru": "Russian",
"sk": "Slovak",
"sl": "Slovenian",
"so": "Somali",
"sq": "Albanian",
"sv": "Swedish",
"sw": "Swahili",
"ta": "Tamil",
"te": "Telugu",
"th": "Thai",
"tl": "Tagalog",
"tr": "Turkish",
"uk": "Ukrainian",
"ur": "Urdu",
"vi": "Vietnamese",
"zh-cn": "Chinese (Simplified)",
"zh-tw": "Chinese (Traditional)",
}
@dataclass
class LanguageDetectionResult:
"""Result of language detection."""
language_code: str
language_name: str
confidence: float
requires_translation: bool
character_count: int
def get_language_name(language_code: str) -> str:
"""Get the human-readable name for a language code."""
return LANGUAGE_NAMES.get(language_code.lower(), language_code)
def detect_language(
text: str,
target_language: str = "en",
confidence_threshold: float = None,
) -> LanguageDetectionResult:
"""
Detect the language of the given text.
Uses the langdetect library which is already a dependency of cognee.
Args:
text: The text to analyze
target_language: The target language for translation comparison
confidence_threshold: Minimum confidence to consider detection reliable
Returns:
LanguageDetectionResult with language info and translation requirement
Raises:
LanguageDetectionError: If language detection fails
"""
config = get_translation_config()
threshold = confidence_threshold or config.confidence_threshold
# Handle empty or very short text
if not text or len(text.strip()) < config.min_text_length_for_detection:
if config.skip_detection_for_short_text:
return LanguageDetectionResult(
language_code="unknown",
language_name="Unknown",
confidence=0.0,
requires_translation=False,
character_count=len(text) if text else 0,
)
else:
raise LanguageDetectionError(
f"Text too short for reliable language detection: {len(text)} characters"
)
try:
from langdetect import detect_langs, LangDetectException
except ImportError:
raise LanguageDetectionError(
"langdetect is required for language detection. Install it with: pip install langdetect"
)
try:
# Get detection results with probabilities
detections = detect_langs(text)
if not detections:
raise LanguageDetectionError("No language detected")
# Get the most likely language
best_detection = detections[0]
language_code = best_detection.lang
confidence = best_detection.prob
# Check if translation is needed
requires_translation = (
language_code.lower() != target_language.lower() and confidence >= threshold
)
return LanguageDetectionResult(
language_code=language_code,
language_name=get_language_name(language_code),
confidence=confidence,
requires_translation=requires_translation,
character_count=len(text),
)
except LangDetectException as e:
logger.warning(f"Language detection failed: {e}")
raise LanguageDetectionError(f"Language detection failed: {e}", original_error=e)
except Exception as e:
logger.error(f"Unexpected error during language detection: {e}")
raise LanguageDetectionError(
f"Unexpected error during language detection: {e}", original_error=e
)
async def detect_language_async(
text: str,
target_language: str = "en",
confidence_threshold: float = None,
) -> LanguageDetectionResult:
"""
Async wrapper for language detection.
Args:
text: The text to analyze
target_language: The target language for translation comparison
confidence_threshold: Minimum confidence to consider detection reliable
Returns:
LanguageDetectionResult with language info and translation requirement
"""
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, detect_language, text, target_language, confidence_threshold
)

View file

@ -0,0 +1,53 @@
class TranslationError(Exception):
"""Base exception for translation errors."""
def __init__(self, message: str, original_error: Exception = None):
self.message = message
self.original_error = original_error
super().__init__(self.message)
class LanguageDetectionError(TranslationError):
"""Exception raised when language detection fails."""
def __init__(self, message: str = "Failed to detect language", original_error: Exception = None):
super().__init__(message, original_error)
class TranslationProviderError(TranslationError):
"""Exception raised when the translation provider encounters an error."""
def __init__(
self,
provider: str,
message: str = "Translation provider error",
original_error: Exception = None,
):
self.provider = provider
full_message = f"[{provider}] {message}"
super().__init__(full_message, original_error)
class UnsupportedLanguageError(TranslationError):
"""Exception raised when the language is not supported."""
def __init__(
self,
language: str,
provider: str = None,
message: str = None,
):
self.language = language
self.provider = provider
if message is None:
message = f"Language '{language}' is not supported"
if provider:
message += f" by {provider}"
super().__init__(message)
class TranslationConfigError(TranslationError):
"""Exception raised when translation configuration is invalid."""
def __init__(self, message: str = "Invalid translation configuration"):
super().__init__(message)

View file

@ -0,0 +1,72 @@
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk
class TranslatedContent(DataPoint):
"""
Represents translated content with quality metrics.
This class stores both the original and translated versions of content,
along with metadata about the translation process including source and
target languages, translation provider used, and confidence scores.
Instance variables include:
- original_chunk_id: UUID of the original document chunk
- original_text: The original text before translation
- translated_text: The translated text content
- source_language: Detected or specified source language code (e.g., "es", "fr", "de")
- target_language: Target language code for translation (default: "en")
- translation_provider: Name of the translation service used
- confidence_score: Translation quality/confidence score (0.0 to 1.0)
- translation_timestamp: When the translation was performed
- translated_from: Reference to the original DocumentChunk
"""
original_chunk_id: UUID
original_text: str
translated_text: str
source_language: str
target_language: str = "en"
translation_provider: str
confidence_score: float
translation_timestamp: datetime = None
translated_from: Optional[DocumentChunk] = None
metadata: dict = {"index_fields": ["source_language", "original_chunk_id", "translated_text"]}
def __init__(self, **data):
if data.get("translation_timestamp") is None:
data["translation_timestamp"] = datetime.now(timezone.utc)
super().__init__(**data)
class LanguageMetadata(DataPoint):
"""
Language information for content.
This class stores metadata about the detected language of content,
including confidence scores and whether translation is required.
Instance variables include:
- content_id: UUID of the associated content
- detected_language: ISO 639-1 language code (e.g., "en", "es", "fr")
- language_confidence: Confidence score for language detection (0.0 to 1.0)
- requires_translation: Whether the content needs translation
- character_count: Number of characters in the content
- language_name: Human-readable language name (e.g., "English", "Spanish")
"""
content_id: UUID
detected_language: str
language_confidence: float
requires_translation: bool
character_count: int
language_name: Optional[str] = None
metadata: dict = {"index_fields": ["detected_language", "content_id"]}

View file

@ -0,0 +1,40 @@
from .base import TranslationProvider, TranslationResult
from .openai_provider import OpenAITranslationProvider
from .google_provider import GoogleTranslationProvider
from .azure_provider import AzureTranslationProvider
__all__ = [
"TranslationProvider",
"TranslationResult",
"OpenAITranslationProvider",
"GoogleTranslationProvider",
"AzureTranslationProvider",
]
def get_translation_provider(provider_name: str) -> TranslationProvider:
"""
Factory function to get the appropriate translation provider.
Args:
provider_name: Name of the provider ("openai", "google", or "azure")
Returns:
TranslationProvider instance
Raises:
ValueError: If the provider name is not recognized
"""
providers = {
"openai": OpenAITranslationProvider,
"google": GoogleTranslationProvider,
"azure": AzureTranslationProvider,
}
if provider_name.lower() not in providers:
raise ValueError(
f"Unknown translation provider: {provider_name}. "
f"Available providers: {list(providers.keys())}"
)
return providers[provider_name.lower()]()

View file

@ -0,0 +1,182 @@
import asyncio
from typing import Optional
import aiohttp
from cognee.shared.logging_utils import get_logger
from .base import TranslationProvider, TranslationResult
from ..config import get_translation_config
logger = get_logger(__name__)
class AzureTranslationProvider(TranslationProvider):
"""
Translation provider using Azure Translator API.
Requires:
- AZURE_TRANSLATOR_KEY environment variable
- AZURE_TRANSLATOR_REGION environment variable (optional)
"""
def __init__(self):
self._config = get_translation_config()
@property
def provider_name(self) -> str:
return "azure"
def is_available(self) -> bool:
"""Check if Azure Translator is available."""
return self._config.azure_translator_key is not None
async def translate(
self,
text: str,
target_language: str = "en",
source_language: Optional[str] = None,
) -> TranslationResult:
"""
Translate text using Azure Translator API.
Args:
text: The text to translate
target_language: Target language code (default: "en")
source_language: Source language code (optional)
Returns:
TranslationResult with translated text and metadata
"""
if not self.is_available():
raise ValueError(
"Azure Translator API key not configured. "
"Set AZURE_TRANSLATOR_KEY environment variable."
)
endpoint = f"{self._config.azure_translator_endpoint}/translate"
params = {
"api-version": "3.0",
"to": target_language,
}
if source_language:
params["from"] = source_language
headers = {
"Ocp-Apim-Subscription-Key": self._config.azure_translator_key,
"Content-Type": "application/json",
}
if self._config.azure_translator_region:
headers["Ocp-Apim-Subscription-Region"] = self._config.azure_translator_region
body = [{"text": text}]
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
params=params,
headers=headers,
json=body,
timeout=aiohttp.ClientTimeout(total=self._config.timeout_seconds),
) as response:
response.raise_for_status()
result = await response.json()
translation = result[0]["translations"][0]
detected_language = result[0].get("detectedLanguage", {})
return TranslationResult(
translated_text=translation["text"],
source_language=source_language
or detected_language.get("language", "unknown"),
target_language=target_language,
confidence_score=detected_language.get("score", 0.9),
provider=self.provider_name,
raw_response=result[0],
)
except Exception as e:
logger.error(f"Azure translation failed: {e}")
raise
async def translate_batch(
self,
texts: list[str],
target_language: str = "en",
source_language: Optional[str] = None,
) -> list[TranslationResult]:
"""
Translate multiple texts using Azure Translator API.
Azure Translator supports up to 100 texts per request.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code (optional)
Returns:
List of TranslationResult objects
"""
if not self.is_available():
raise ValueError("Azure Translator API key not configured.")
endpoint = f"{self._config.azure_translator_endpoint}/translate"
params = {
"api-version": "3.0",
"to": target_language,
}
if source_language:
params["from"] = source_language
headers = {
"Ocp-Apim-Subscription-Key": self._config.azure_translator_key,
"Content-Type": "application/json",
}
if self._config.azure_translator_region:
headers["Ocp-Apim-Subscription-Region"] = self._config.azure_translator_region
# Azure supports up to 100 texts per request
batch_size = min(100, self._config.batch_size)
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
body = [{"text": text} for text in batch]
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
params=params,
headers=headers,
json=body,
timeout=aiohttp.ClientTimeout(total=self._config.timeout_seconds),
) as response:
response.raise_for_status()
results = await response.json()
for result in results:
translation = result["translations"][0]
detected_language = result.get("detectedLanguage", {})
all_results.append(
TranslationResult(
translated_text=translation["text"],
source_language=source_language
or detected_language.get("language", "unknown"),
target_language=target_language,
confidence_score=detected_language.get("score", 0.9),
provider=self.provider_name,
raw_response=result,
)
)
except Exception as e:
logger.error(f"Azure batch translation failed: {e}")
raise
return all_results

View file

@ -0,0 +1,69 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class TranslationResult:
"""Result of a translation operation."""
translated_text: str
source_language: str
target_language: str
confidence_score: float
provider: str
raw_response: Optional[dict] = None
class TranslationProvider(ABC):
"""Abstract base class for translation providers."""
@property
@abstractmethod
def provider_name(self) -> str:
"""Return the name of this translation provider."""
pass
@abstractmethod
async def translate(
self,
text: str,
target_language: str = "en",
source_language: Optional[str] = None,
) -> TranslationResult:
"""
Translate text to the target language.
Args:
text: The text to translate
target_language: Target language code (default: "en")
source_language: Source language code (optional, will be auto-detected if not provided)
Returns:
TranslationResult with translated text and metadata
"""
pass
@abstractmethod
async def translate_batch(
self,
texts: list[str],
target_language: str = "en",
source_language: Optional[str] = None,
) -> list[TranslationResult]:
"""
Translate multiple texts to the target language.
Args:
texts: List of texts to translate
target_language: Target language code (default: "en")
source_language: Source language code (optional)
Returns:
List of TranslationResult objects
"""
pass
def is_available(self) -> bool:
"""Check if this provider is available (has required credentials)."""
return True

View file

@ -0,0 +1,159 @@
import asyncio
from typing import Optional
from cognee.shared.logging_utils import get_logger
from .base import TranslationProvider, TranslationResult
from ..config import get_translation_config
logger = get_logger(__name__)
class GoogleTranslationProvider(TranslationProvider):
"""
Translation provider using Google Cloud Translation API.
Requires:
- google-cloud-translate package
- GOOGLE_TRANSLATE_API_KEY or GOOGLE_PROJECT_ID environment variable
"""
def __init__(self):
self._client = None
self._config = get_translation_config()
@property
def provider_name(self) -> str:
return "google"
def _get_client(self):
"""Lazy initialization of Google Translate client."""
if self._client is None:
try:
from google.cloud import translate_v2 as translate
self._client = translate.Client()
except ImportError:
raise ImportError(
"google-cloud-translate is required for Google translation. "
"Install it with: pip install google-cloud-translate"
)
except Exception as e:
logger.error(f"Failed to initialize Google Translate client: {e}")
raise
return self._client
def is_available(self) -> bool:
"""Check if Google Translate is available."""
try:
self._get_client()
return True
except Exception:
return False
async def translate(
self,
text: str,
target_language: str = "en",
source_language: Optional[str] = None,
) -> TranslationResult:
"""
Translate text using Google Translate API.
Args:
text: The text to translate
target_language: Target language code (default: "en")
source_language: Source language code (optional)
Returns:
TranslationResult with translated text and metadata
"""
try:
client = self._get_client()
# Run in thread pool since google-cloud-translate is synchronous
loop = asyncio.get_event_loop()
if source_language:
result = await loop.run_in_executor(
None,
lambda: client.translate(
text, target_language=target_language, source_language=source_language
),
)
else:
result = await loop.run_in_executor(
None, lambda: client.translate(text, target_language=target_language)
)
detected_language = result.get("detectedSourceLanguage", source_language or "unknown")
return TranslationResult(
translated_text=result["translatedText"],
source_language=detected_language,
target_language=target_language,
confidence_score=0.9, # Google Translate is generally reliable
provider=self.provider_name,
raw_response=result,
)
except Exception as e:
logger.error(f"Google translation failed: {e}")
raise
async def translate_batch(
self,
texts: list[str],
target_language: str = "en",
source_language: Optional[str] = None,
) -> list[TranslationResult]:
"""
Translate multiple texts using Google Translate API.
Google Translate supports batch translation natively.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code (optional)
Returns:
List of TranslationResult objects
"""
try:
client = self._get_client()
loop = asyncio.get_event_loop()
if source_language:
results = await loop.run_in_executor(
None,
lambda: client.translate(
texts, target_language=target_language, source_language=source_language
),
)
else:
results = await loop.run_in_executor(
None, lambda: client.translate(texts, target_language=target_language)
)
translation_results = []
for result in results:
detected_language = result.get(
"detectedSourceLanguage", source_language or "unknown"
)
translation_results.append(
TranslationResult(
translated_text=result["translatedText"],
source_language=detected_language,
target_language=target_language,
confidence_score=0.9,
provider=self.provider_name,
raw_response=result,
)
)
return translation_results
except Exception as e:
logger.error(f"Google batch translation failed: {e}")
raise

View file

@ -0,0 +1,107 @@
import asyncio
from typing import Optional
from pydantic import BaseModel
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.shared.logging_utils import get_logger
from .base import TranslationProvider, TranslationResult
logger = get_logger(__name__)
class TranslationOutput(BaseModel):
"""Pydantic model for structured translation output from LLM."""
translated_text: str
detected_source_language: str
translation_notes: Optional[str] = None
class OpenAITranslationProvider(TranslationProvider):
"""
Translation provider using OpenAI's LLM for translation.
This provider leverages the existing LLM infrastructure in Cognee
to perform translations using GPT models.
"""
@property
def provider_name(self) -> str:
return "openai"
async def translate(
self,
text: str,
target_language: str = "en",
source_language: Optional[str] = None,
) -> TranslationResult:
"""
Translate text using OpenAI's LLM.
Args:
text: The text to translate
target_language: Target language code (default: "en")
source_language: Source language code (optional)
Returns:
TranslationResult with translated text and metadata
"""
try:
system_prompt = read_query_prompt("translate_content.txt")
# Build the input with context
if source_language:
input_text = (
f"Translate the following text from {source_language} to {target_language}.\n\n"
f"Text to translate:\n{text}"
)
else:
input_text = (
f"Translate the following text to {target_language}. "
f"First detect the source language.\n\n"
f"Text to translate:\n{text}"
)
result = await LLMGateway.acreate_structured_output(
text_input=input_text,
system_prompt=system_prompt,
response_model=TranslationOutput,
)
return TranslationResult(
translated_text=result.translated_text,
source_language=source_language or result.detected_source_language,
target_language=target_language,
confidence_score=0.95, # LLM translations are generally high quality
provider=self.provider_name,
raw_response={"notes": result.translation_notes},
)
except Exception as e:
logger.error(f"OpenAI translation failed: {e}")
raise
async def translate_batch(
self,
texts: list[str],
target_language: str = "en",
source_language: Optional[str] = None,
) -> list[TranslationResult]:
"""
Translate multiple texts using OpenAI's LLM.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code (optional)
Returns:
List of TranslationResult objects
"""
tasks = [
self.translate(text, target_language, source_language) for text in texts
]
return await asyncio.gather(*tasks)

View file

@ -0,0 +1,265 @@
import asyncio
from typing import List, Optional
from uuid import uuid5
from cognee.modules.chunking.models import DocumentChunk
from cognee.shared.logging_utils import get_logger
from .config import get_translation_config, TranslationProviderType
from .detect_language import detect_language_async, LanguageDetectionResult
from .exceptions import TranslationError, LanguageDetectionError
from .models import TranslatedContent, LanguageMetadata
from .providers import get_translation_provider, TranslationResult
logger = get_logger(__name__)
async def translate_content(
data_chunks: List[DocumentChunk],
target_language: str = "en",
translation_provider: TranslationProviderType = None,
confidence_threshold: float = None,
skip_if_target_language: bool = True,
preserve_original: bool = True,
) -> List[DocumentChunk]:
"""
Translate non-English content to the target language.
This task detects the language of each document chunk and translates
non-target-language content using the specified translation provider.
Original text is preserved alongside translated versions.
Args:
data_chunks: List of DocumentChunk objects to process
target_language: Target language code (default: "en" for English)
translation_provider: Translation service to use ("openai", "google", "azure")
If not provided, uses config default
confidence_threshold: Minimum confidence for language detection (0.0 to 1.0)
If not provided, uses config default
skip_if_target_language: If True, skip chunks already in target language
preserve_original: If True, store original text in TranslatedContent
Returns:
List of DocumentChunk objects with translated content.
Chunks that required translation will have TranslatedContent
objects in their 'contains' list.
Example:
```python
from cognee.tasks.translation import translate_content
# Translate chunks using default settings
translated_chunks = await translate_content(chunks)
# Translate with specific provider
translated_chunks = await translate_content(
chunks,
translation_provider="openai",
confidence_threshold=0.9
)
```
"""
if not isinstance(data_chunks, list):
raise TranslationError("data_chunks must be a list")
if len(data_chunks) == 0:
return data_chunks
# Get configuration
config = get_translation_config()
provider_name = translation_provider or config.translation_provider
threshold = confidence_threshold or config.confidence_threshold
logger.info(
f"Starting translation task for {len(data_chunks)} chunks "
f"using {provider_name} provider, target language: {target_language}"
)
# Get the translation provider
provider = get_translation_provider(provider_name)
# Process chunks
processed_chunks = []
for chunk in data_chunks:
if not hasattr(chunk, "text") or not chunk.text:
processed_chunks.append(chunk)
continue
try:
# Detect language
detection = await detect_language_async(
chunk.text, target_language, threshold
)
# Create language metadata
language_metadata = LanguageMetadata(
id=uuid5(chunk.id, "LanguageMetadata"),
content_id=chunk.id,
detected_language=detection.language_code,
language_confidence=detection.confidence,
requires_translation=detection.requires_translation,
character_count=detection.character_count,
language_name=detection.language_name,
)
# Skip if already in target language
if not detection.requires_translation:
if skip_if_target_language:
logger.debug(
f"Skipping chunk {chunk.id}: already in target language "
f"({detection.language_code})"
)
# Add language metadata to chunk
_add_to_chunk_contains(chunk, language_metadata)
processed_chunks.append(chunk)
continue
# Translate the content
logger.debug(
f"Translating chunk {chunk.id} from {detection.language_code} "
f"to {target_language}"
)
translation_result = await provider.translate(
text=chunk.text,
target_language=target_language,
source_language=detection.language_code,
)
# Create TranslatedContent data point
translated_content = TranslatedContent(
id=uuid5(chunk.id, "TranslatedContent"),
original_chunk_id=chunk.id,
original_text=chunk.text if preserve_original else "",
translated_text=translation_result.translated_text,
source_language=translation_result.source_language,
target_language=translation_result.target_language,
translation_provider=translation_result.provider,
confidence_score=translation_result.confidence_score,
translated_from=chunk,
)
# Update chunk text with translated content
chunk.text = translation_result.translated_text
# Add metadata to chunk's contains list
_add_to_chunk_contains(chunk, language_metadata)
_add_to_chunk_contains(chunk, translated_content)
processed_chunks.append(chunk)
logger.debug(
f"Successfully translated chunk {chunk.id}: "
f"{detection.language_code} -> {target_language}"
)
except LanguageDetectionError as e:
logger.warning(f"Language detection failed for chunk {chunk.id}: {e}")
processed_chunks.append(chunk)
except TranslationError as e:
logger.error(f"Translation failed for chunk {chunk.id}: {e}")
processed_chunks.append(chunk)
except Exception as e:
logger.error(f"Unexpected error processing chunk {chunk.id}: {e}")
processed_chunks.append(chunk)
logger.info(f"Translation task completed for {len(processed_chunks)} chunks")
return processed_chunks
def _add_to_chunk_contains(chunk: DocumentChunk, item) -> None:
"""Helper to add an item to a chunk's contains list."""
if chunk.contains is None:
chunk.contains = []
chunk.contains.append(item)
async def translate_text(
text: str,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
source_language: Optional[str] = None,
) -> TranslationResult:
"""
Translate a single text string.
This is a convenience function for translating individual texts
without creating DocumentChunk objects.
Args:
text: The text to translate
target_language: Target language code (default: "en")
translation_provider: Translation service to use
source_language: Source language code (optional, auto-detected if not provided)
Returns:
TranslationResult with translated text and metadata
Example:
```python
from cognee.tasks.translation import translate_text
result = await translate_text(
"Bonjour le monde!",
target_language="en"
)
print(result.translated_text) # "Hello world!"
print(result.source_language) # "fr"
```
"""
config = get_translation_config()
provider_name = translation_provider or config.translation_provider
provider = get_translation_provider(provider_name)
return await provider.translate(
text=text,
target_language=target_language,
source_language=source_language,
)
async def batch_translate_texts(
texts: List[str],
target_language: str = "en",
translation_provider: TranslationProviderType = None,
source_language: Optional[str] = None,
) -> List[TranslationResult]:
"""
Translate multiple text strings in batch.
This is more efficient than translating texts individually,
especially for providers that support native batch operations.
Args:
texts: List of texts to translate
target_language: Target language code (default: "en")
translation_provider: Translation service to use
source_language: Source language code (optional)
Returns:
List of TranslationResult objects
Example:
```python
from cognee.tasks.translation import batch_translate_texts
results = await batch_translate_texts(
["Hola", "¿Cómo estás?", "Adiós"],
target_language="en"
)
for result in results:
print(f"{result.source_language}: {result.translated_text}")
```
"""
config = get_translation_config()
provider_name = translation_provider or config.translation_provider
provider = get_translation_provider(provider_name)
return await provider.translate_batch(
texts=texts,
target_language=target_language,
source_language=source_language,
)