refactor: remove auto-translate from cognify pipeline

The translation module is kept as a standalone task that can be used
independently. Users can manually add translate_content to their
custom pipelines if needed.
This commit is contained in:
andikarachman 2026-01-13 19:08:15 +07:00
parent aac115cc84
commit d2f98fe880

View file

@ -26,8 +26,6 @@ from cognee.tasks.documents import (
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.tasks.translation import translate_content
from cognee.tasks.translation.config import TranslationProviderType
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.temporal_graph.extract_events_and_entities import extract_events_and_timestamps
from cognee.tasks.temporal_graph.extract_knowledge_graph_from_events import (
@ -55,9 +53,6 @@ async def cognify(
custom_prompt: Optional[str] = None,
temporal_cognify: bool = False,
data_per_batch: int = 20,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
**kwargs,
):
"""
@ -123,15 +118,6 @@ async def cognify(
If provided, this prompt will be used instead of the default prompts for
knowledge graph extraction. The prompt should guide the LLM on how to
extract entities and relationships from the text content.
auto_translate: If True, automatically detect and translate non-English content to the
target language before processing. Uses language detection to identify
content that needs translation. Defaults to False.
target_language: Target language code for translation (e.g., "en", "es", "fr").
Only used when auto_translate=True. Defaults to "en" (English).
translation_provider: Translation service to use ("llm", "google", "azure").
LLM uses the existing LLM infrastructure, Google requires
GOOGLE_TRANSLATE_API_KEY, Azure requires AZURE_TRANSLATOR_KEY.
If not specified, uses TRANSLATION_PROVIDER env var or defaults to "llm".
Returns:
Union[dict, list[PipelineRunInfo]]:
@ -196,14 +182,6 @@ async def cognify(
run_in_background=True
)
# Check status later with run_info.pipeline_run_id
# Auto-translate multilingual content to English
await cognee.add("document_spanish.pdf")
await cognee.cognify(
auto_translate=True,
target_language="en",
translation_provider="llm" # or "google", "azure"
)
```
@ -215,9 +193,6 @@ async def cognify(
- LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER
- LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False)
- LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60)
- TRANSLATION_PROVIDER: Default translation provider ("llm", "google", "azure")
- GOOGLE_TRANSLATE_API_KEY: API key for Google Translate
- AZURE_TRANSLATOR_KEY: API key for Azure Translator
"""
if config is None:
ontology_config = get_ontology_env_config()
@ -238,13 +213,7 @@ async def cognify(
if temporal_cognify:
tasks = await get_temporal_tasks(
user=user,
chunker=chunker,
chunk_size=chunk_size,
chunks_per_batch=chunks_per_batch,
auto_translate=auto_translate,
target_language=target_language,
translation_provider=translation_provider,
user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch
)
else:
tasks = await get_default_tasks(
@ -255,9 +224,6 @@ async def cognify(
config=config,
custom_prompt=custom_prompt,
chunks_per_batch=chunks_per_batch,
auto_translate=auto_translate,
target_language=target_language,
translation_provider=translation_provider,
**kwargs,
)
@ -287,9 +253,6 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
config: Config = None,
custom_prompt: Optional[str] = None,
chunks_per_batch: int = 100,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
**kwargs,
) -> list[Task]:
if config is None:
@ -322,52 +285,30 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker,
), # Extract text chunks based on the document type.
Task(
extract_graph_from_data,
graph_model=graph_model,
config=config,
custom_prompt=custom_prompt,
task_config={"batch_size": chunks_per_batch},
**kwargs,
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
task_config={"batch_size": chunks_per_batch},
),
Task(
add_data_points,
embed_triplets=embed_triplets,
task_config={"batch_size": chunks_per_batch},
),
]
# Add translation task if auto_translate is enabled
if auto_translate:
default_tasks.append(
Task(
translate_content,
target_language=target_language,
translation_provider=translation_provider,
task_config={"batch_size": chunks_per_batch},
)
)
default_tasks.extend(
[
Task(
extract_graph_from_data,
graph_model=graph_model,
config=config,
custom_prompt=custom_prompt,
task_config={"batch_size": chunks_per_batch},
**kwargs,
), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
task_config={"batch_size": chunks_per_batch},
),
Task(
add_data_points,
embed_triplets=embed_triplets,
task_config={"batch_size": chunks_per_batch},
),
]
)
return default_tasks
async def get_temporal_tasks(
user: User = None,
chunker=TextChunker,
chunk_size: int = None,
chunks_per_batch: int = 10,
auto_translate: bool = False,
target_language: str = "en",
translation_provider: TranslationProviderType = None,
user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10
) -> list[Task]:
"""
Builds and returns a list of temporal processing tasks to be executed in sequence.
@ -375,19 +316,15 @@ async def get_temporal_tasks(
The pipeline includes:
1. Document classification.
2. Document chunking with a specified or default chunk size.
3. (Optional) Translation of non-English content to target language.
4. Event and timestamp extraction from chunks.
5. Knowledge graph extraction from events.
6. Batched insertion of data points.
3. Event and timestamp extraction from chunks.
4. Knowledge graph extraction from events.
5. Batched insertion of data points.
Args:
user (User, optional): The user requesting task execution.
chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker.
chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default.
chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify
auto_translate (bool, optional): If True, translate non-English content. Defaults to False.
target_language (str, optional): Target language for translation. Defaults to "en".
translation_provider (str, optional): Translation provider to use ("llm", "google", "azure").
Returns:
list[Task]: A list of Task objects representing the temporal processing pipeline.
@ -402,25 +339,9 @@ async def get_temporal_tasks(
max_chunk_size=chunk_size or get_max_chunk_tokens(),
chunker=chunker,
),
Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}),
Task(extract_knowledge_graph_from_events),
Task(add_data_points, task_config={"batch_size": chunks_per_batch}),
]
# Add translation task if auto_translate is enabled
if auto_translate:
temporal_tasks.append(
Task(
translate_content,
target_language=target_language,
translation_provider=translation_provider,
task_config={"batch_size": chunks_per_batch},
)
)
temporal_tasks.extend(
[
Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}),
Task(extract_knowledge_graph_from_events),
Task(add_data_points, task_config={"batch_size": chunks_per_batch}),
]
)
return temporal_tasks