From abfcbc69d61ec8a71ed83a8dd32894f5e99d8248 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 15:36:36 +0200 Subject: [PATCH 01/16] refactor: Have embedding calls run in async gather --- cognee/api/v1/cognify/cognify.py | 6 ++--- cognee/tasks/storage/index_data_points.py | 33 +++++++++++++---------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 1292d243a..6a9f68443 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), - Task(add_data_points, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": 100}), ] return default_tasks diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index 362412657..ebc4640d6 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -1,6 +1,6 @@ -from cognee.shared.logging_utils import get_logger +import asyncio -from cognee.infrastructure.databases.exceptions import EmbeddingException +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.engine import DataPoint @@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]): indexed_data_point.metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point) - for index_name_and_field, indexable_points in index_points.items(): - first_occurence = index_name_and_field.index("_") - index_name = index_name_and_field[:first_occurence] - field_name = index_name_and_field[first_occurence + 1 :] - try: - # In case the amount of indexable points is too large we need to send them in batches - batch_size = vector_engine.embedding_engine.get_batch_size() - for i in range(0, len(indexable_points), batch_size): - batch = indexable_points[i : i + batch_size] - await vector_engine.index_data_points(index_name, field_name, batch) - except EmbeddingException as e: - logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}") + tasks: list[asyncio.Task] = [] + batch_size = vector_engine.embedding_engine.get_batch_size() + + for index_name_and_field, points in index_points.items(): + first = index_name_and_field.index("_") + index_name = index_name_and_field[:first] + field_name = index_name_and_field[first + 1 :] + + # Split in the usual “range step batch_size” manner + for i in range(0, len(points), batch_size): + batch = points[i : i + batch_size] + tasks.append( + asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch)) + ) + + # Fire them all and wait until every task is done. + await asyncio.gather(*tasks) return data_points From 757d745b5d262975c05f5fe3bb3f410f5c3d72b7 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 17:12:09 +0200 Subject: [PATCH 02/16] refactor: Optimize cognification speed --- cognee/api/v1/cognify/cognify.py | 4 ++-- .../databases/vector/embeddings/config.py | 4 ++-- cognee/tasks/storage/index_graph_edges.py | 15 +++++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 6a9f68443..30afb269a 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,11 +269,11 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 30}, + task_config={"batch_size": 100}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 30}, + task_config={"batch_size": 100}, ), Task(add_data_points, task_config={"batch_size": 100}), ] diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index 24f724151..dcb55f4a4 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -26,9 +26,9 @@ class EmbeddingConfig(BaseSettings): def model_post_init(self, __context) -> None: # If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 2048 + self.embedding_batch_size = 30 elif not self.embedding_batch_size: - self.embedding_batch_size = 100 + self.embedding_batch_size = 10 def to_dict(self) -> dict: """ diff --git a/cognee/tasks/storage/index_graph_edges.py b/cognee/tasks/storage/index_graph_edges.py index b7bf7a2b9..4fa8cfc75 100644 --- a/cognee/tasks/storage/index_graph_edges.py +++ b/cognee/tasks/storage/index_graph_edges.py @@ -1,3 +1,5 @@ +import asyncio + from cognee.modules.engine.utils.generate_edge_id import generate_edge_id from cognee.shared.logging_utils import get_logger from collections import Counter @@ -76,15 +78,20 @@ async def index_graph_edges( indexed_data_point.metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point) + # Get maximum batch size for embedding model + batch_size = vector_engine.embedding_engine.get_batch_size() + tasks: list[asyncio.Task] = [] + for index_name, indexable_points in index_points.items(): index_name, field_name = index_name.split(".") - # Get maximum batch size for embedding model - batch_size = vector_engine.embedding_engine.get_batch_size() - # We save the data in batches of {batch_size} to not put a lot of pressure on the database + # Create embedding tasks to run in parallel later for start in range(0, len(indexable_points), batch_size): batch = indexable_points[start : start + batch_size] - await vector_engine.index_data_points(index_name, field_name, batch) + tasks.append(vector_engine.index_data_points(index_name, field_name, batch)) + + # Start all embedding tasks and wait for completion + await asyncio.gather(*tasks) return None From 13d1133680a241a9423b57d760c2319c20b80670 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 17:14:10 +0200 Subject: [PATCH 03/16] chore: Change comments --- cognee/tasks/storage/index_data_points.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index ebc4640d6..902789c80 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -41,14 +41,14 @@ async def index_data_points(data_points: list[DataPoint]): index_name = index_name_and_field[:first] field_name = index_name_and_field[first + 1 :] - # Split in the usual “range step batch_size” manner + # Create embedding requests per batch to run in parallel later for i in range(0, len(points), batch_size): batch = points[i : i + batch_size] tasks.append( asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch)) ) - # Fire them all and wait until every task is done. + # Run all embedding requests in parallel await asyncio.gather(*tasks) return data_points From eb631a23ad6eeaba9c1111b598a6f4f955cd6c86 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 13:57:41 +0200 Subject: [PATCH 04/16] refactor: set default numbers that are more reasonable --- cognee/api/v1/cognify/cognify.py | 6 +++--- cognee/infrastructure/databases/vector/embeddings/config.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 30afb269a..898c35518 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 100}, + task_config={"batch_size": 20}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 100}, + task_config={"batch_size": 20}, ), - Task(add_data_points, task_config={"batch_size": 100}), + Task(add_data_points, task_config={"batch_size": 20}), ] return default_tasks diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index dcb55f4a4..314adbd99 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -24,11 +24,10 @@ class EmbeddingConfig(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="allow") def model_post_init(self, __context) -> None: - # If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 30 + self.embedding_batch_size = 1024 elif not self.embedding_batch_size: - self.embedding_batch_size = 10 + self.embedding_batch_size = 100 def to_dict(self) -> dict: """ From 84a23756f5c77ef3c7e0c78c4aff122416249341 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 14:25:38 +0200 Subject: [PATCH 05/16] fix: Change chunk_size ot batch_size for temporal task --- cognee/api/v1/cognify/cognify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 898c35518..2c87dbc4b 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -311,7 +311,7 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"chunk_size": 10}), + Task(extract_events_and_timestamps, task_config={"batch_size": 10}), Task(extract_knowledge_graph_from_events), Task(add_data_points, task_config={"batch_size": 10}), ] From 98daadbb0461ae99935032bde96d8c056f874050 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 20:29:55 +0200 Subject: [PATCH 06/16] refactor: Add tenacity retry mechanism --- .../embeddings/LiteLLMEmbeddingEngine.py | 18 ++++++++++++++++-- .../embeddings/OllamaEmbeddingEngine.py | 19 ++++++++++++++++--- poetry.lock | 2 +- pyproject.toml | 3 ++- uv.lock | 4 +++- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py index d68941d25..2a71d674d 100644 --- a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py @@ -1,8 +1,17 @@ import asyncio +import logging + from cognee.shared.logging_utils import get_logger from typing import List, Optional import numpy as np import math +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) import litellm import os from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine @@ -76,8 +85,13 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") - @embedding_sleep_and_retry_async() - @embedding_rate_limit_async + @retry( + stop=stop_after_delay(180), + wait=wait_exponential_jitter(1, 180), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def embed_text(self, text: List[str]) -> List[List[float]]: """ Embed a list of text strings into vector representations. diff --git a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py index e79ba3f6a..b8ee9c7df 100644 --- a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py @@ -3,8 +3,16 @@ from cognee.shared.logging_utils import get_logger import aiohttp from typing import List, Optional import os - +import litellm +import logging import aiohttp.http_exceptions +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.llm.tokenizer.HuggingFace import ( @@ -69,7 +77,6 @@ class OllamaEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") - @embedding_rate_limit_async async def embed_text(self, text: List[str]) -> List[List[float]]: """ Generate embedding vectors for a list of text prompts. @@ -92,7 +99,13 @@ class OllamaEmbeddingEngine(EmbeddingEngine): embeddings = await asyncio.gather(*[self._get_embedding(prompt) for prompt in text]) return embeddings - @embedding_sleep_and_retry_async() + @retry( + stop=stop_after_delay(180), + wait=wait_exponential_jitter(1, 180), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def _get_embedding(self, prompt: str) -> List[float]: """ Internal method to call the Ollama embeddings endpoint for a single prompt. diff --git a/poetry.lock b/poetry.lock index 551295733..ffc5ec575 100644 --- a/poetry.lock +++ b/poetry.lock @@ -12738,4 +12738,4 @@ posthog = ["posthog"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<=3.13" -content-hash = "38353807b06e5c06caaa107979529937b978204f0f405c6b38cee283f4a49d3c" +content-hash = "d8cd8a8db46416e0c844ff90df5bd64551ebf9a0c338fbb2023a61008ff5941d" diff --git a/pyproject.toml b/pyproject.toml index 3df57e1f5..7ac2915d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ dependencies = [ "networkx>=3.4.2,<4", "uvicorn>=0.34.0,<1.0.0", "gunicorn>=20.1.0,<24", - "websockets>=15.0.1,<16.0.0" + "websockets>=15.0.1,<16.0.0", + "tenacity>=9.0.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 570da9289..5c06b96be 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ wheels = [ [[package]] name = "cognee" -version = "0.3.4" +version = "0.3.5" source = { editable = "." } dependencies = [ { name = "aiofiles" }, @@ -892,6 +892,7 @@ dependencies = [ { name = "rdflib" }, { name = "sqlalchemy" }, { name = "structlog" }, + { name = "tenacity" }, { name = "tiktoken" }, { name = "typing-extensions" }, { name = "uvicorn" }, @@ -1086,6 +1087,7 @@ requires-dist = [ { name = "sentry-sdk", extras = ["fastapi"], marker = "extra == 'monitoring'", specifier = ">=2.9.0,<3" }, { name = "sqlalchemy", specifier = ">=2.0.39,<3.0.0" }, { name = "structlog", specifier = ">=25.2.0,<26" }, + { name = "tenacity", specifier = ">=9.0.0" }, { name = "tiktoken", specifier = ">=0.8.0,<1.0.0" }, { name = "transformers", marker = "extra == 'codegraph'", specifier = ">=4.46.3,<5" }, { name = "transformers", marker = "extra == 'huggingface'", specifier = ">=4.46.3,<5" }, From 1b28f137431d30c940568406fab1678db9276c28 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 13:32:17 +0200 Subject: [PATCH 07/16] refactor: Optimize Cognee speed --- cognee/api/v1/cognify/cognify.py | 6 +++--- .../embeddings/FastembedEmbeddingEngine.py | 20 +++++++++++++++++-- .../embeddings/LiteLLMEmbeddingEngine.py | 11 ++-------- .../embeddings/OllamaEmbeddingEngine.py | 4 ++-- .../databases/vector/embeddings/config.py | 4 ++-- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 9215c9369..3032bd4e8 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 20}, + task_config={"batch_size": 100}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 20}, + task_config={"batch_size": 100}, ), - Task(add_data_points, task_config={"batch_size": 20}), + Task(add_data_points, task_config={"batch_size": 100}), ] return default_tasks diff --git a/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py index e34ab5d9d..c2acd516e 100644 --- a/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py @@ -1,8 +1,17 @@ -from cognee.shared.logging_utils import get_logger +import os +import logging from typing import List, Optional from fastembed import TextEmbedding import litellm -import os +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.databases.exceptions import EmbeddingException from cognee.infrastructure.llm.tokenizer.TikToken import ( @@ -57,6 +66,13 @@ class FastembedEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def embed_text(self, text: List[str]) -> List[List[float]]: """ Embed the given text into numerical vectors. diff --git a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py index 302950f66..03ce86bee 100644 --- a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py @@ -16,9 +16,6 @@ import litellm import os from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.databases.exceptions import EmbeddingException -from cognee.infrastructure.llm.tokenizer.Gemini import ( - GeminiTokenizer, -) from cognee.infrastructure.llm.tokenizer.HuggingFace import ( HuggingFaceTokenizer, ) @@ -28,10 +25,6 @@ from cognee.infrastructure.llm.tokenizer.Mistral import ( from cognee.infrastructure.llm.tokenizer.TikToken import ( TikTokenTokenizer, ) -from cognee.infrastructure.databases.vector.embeddings.embedding_rate_limiter import ( - embedding_rate_limit_async, - embedding_sleep_and_retry_async, -) litellm.set_verbose = False logger = get_logger("LiteLLMEmbeddingEngine") @@ -86,8 +79,8 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine): self.mock = enable_mocking in ("true", "1", "yes") @retry( - stop=stop_after_delay(180), - wait=wait_exponential_jitter(1, 180), + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, diff --git a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py index b8ee9c7df..2882b679a 100644 --- a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py @@ -100,8 +100,8 @@ class OllamaEmbeddingEngine(EmbeddingEngine): return embeddings @retry( - stop=stop_after_delay(180), - wait=wait_exponential_jitter(1, 180), + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index 314adbd99..56cd79678 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -25,9 +25,9 @@ class EmbeddingConfig(BaseSettings): def model_post_init(self, __context) -> None: if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 1024 + self.embedding_batch_size = 36 elif not self.embedding_batch_size: - self.embedding_batch_size = 100 + self.embedding_batch_size = 36 def to_dict(self) -> dict: """ From fc4440da8c7b7cdfd4087f34c40ac90cc86bb839 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 14:43:21 +0200 Subject: [PATCH 08/16] refactor: update env template --- .env.template | 5 ++--- .../loaders/external/advanced_pdf_loader.py | 10 ++-------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/.env.template b/.env.template index 7fd3ba9e8..3137636d3 100644 --- a/.env.template +++ b/.env.template @@ -28,11 +28,10 @@ EMBEDDING_ENDPOINT="" EMBEDDING_API_VERSION="" EMBEDDING_DIMENSIONS=3072 EMBEDDING_MAX_TOKENS=8191 +EMBEDDING_BATCH_SIZE=36 # If embedding key is not provided same key set for LLM_API_KEY will be used #EMBEDDING_API_KEY="your_api_key" -# Note: OpenAI support up to 2048 elements and Gemini supports a maximum of 100 elements in an embedding batch, -# Cognee sets the optimal batch size for OpenAI and Gemini, but a custom size can be defined if necessary for other models -#EMBEDDING_BATCH_SIZE=2048 + # If using BAML structured output these env variables will be used BAML_LLM_PROVIDER=openai diff --git a/cognee/infrastructure/loaders/external/advanced_pdf_loader.py b/cognee/infrastructure/loaders/external/advanced_pdf_loader.py index 7bab8cac6..6d1412b77 100644 --- a/cognee/infrastructure/loaders/external/advanced_pdf_loader.py +++ b/cognee/infrastructure/loaders/external/advanced_pdf_loader.py @@ -14,14 +14,6 @@ from cognee.infrastructure.loaders.external.pypdf_loader import PyPdfLoader logger = get_logger(__name__) -try: - from unstructured.partition.pdf import partition_pdf -except ImportError as e: - logger.info( - "unstructured[pdf] not installed, can't use AdvancedPdfLoader, will use PyPdfLoader instead." - ) - raise ImportError from e - @dataclass class _PageBuffer: @@ -88,6 +80,8 @@ class AdvancedPdfLoader(LoaderInterface): **kwargs, } # Use partition to extract elements + from unstructured.partition.pdf import partition_pdf + elements = partition_pdf(**partition_kwargs) # Process elements into text content From 5663c3fe3ab80f0eee7adb3576af4b579a1d8306 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 17:38:18 +0200 Subject: [PATCH 09/16] refactor: add batch size param to temporal graphs --- cognee/api/v1/cognify/cognify.py | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 3032bd4e8..d29d8c939 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,6 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, + batch_size: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -105,6 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. + batch_size: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -209,10 +211,18 @@ async def cognify( } if temporal_cognify: - tasks = await get_temporal_tasks(user, chunker, chunk_size) + tasks = await get_temporal_tasks( + user=user, chunker=chunker, chunk_size=chunk_size, batch_size=batch_size + ) else: tasks = await get_default_tasks( - user, graph_model, chunker, chunk_size, config, custom_prompt + user=user, + graph_model=graph_model, + chunker=chunker, + chunk_size=chunk_size, + config=config, + custom_prompt=custom_prompt, + batch_size=batch_size, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -238,6 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, + batch_size: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -256,6 +267,9 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } + if batch_size is None: + batch_size = 100 + default_tasks = [ Task(classify_documents), Task(check_permissions_on_dataset, user=user, permissions=["write"]), @@ -269,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 100}, + task_config={"batch_size": batch_size}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 100}, + task_config={"batch_size": batch_size}, ), - Task(add_data_points, task_config={"batch_size": 100}), + Task(add_data_points, task_config={"batch_size": batch_size}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None + user: User = None, chunker=TextChunker, chunk_size: int = None, batch_size: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -299,10 +313,14 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. + batch_size (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ + if batch_size is None: + batch_size = 10 + temporal_tasks = [ Task(classify_documents), Task(check_permissions_on_dataset, user=user, permissions=["write"]), @@ -311,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": 10}), + Task(extract_events_and_timestamps, task_config={"batch_size": batch_size}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": batch_size}), ] return temporal_tasks From 96496f38ed1e4ce2dd63190c9cbf6a16338fbeb0 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 18:08:18 +0200 Subject: [PATCH 10/16] refactor: Switch to using tenacity for rate limiting --- .../llm/anthropic/adapter.py | 28 ++++++---- .../litellm_instructor/llm/gemini/adapter.py | 22 ++++++-- .../llm/generic_llm_api/adapter.py | 22 ++++++-- .../litellm_instructor/llm/mistral/adapter.py | 54 ++++++------------- .../litellm_instructor/llm/ollama/adapter.py | 41 +++++++++++--- .../litellm_instructor/llm/openai/adapter.py | 53 +++++++++++++----- 6 files changed, 142 insertions(+), 78 deletions(-) diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py index 2d88a8271..bf19d6e86 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py @@ -1,19 +1,24 @@ +import logging from typing import Type from pydantic import BaseModel +import litellm import instructor +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) -from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, -) - -from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.config import get_llm_config +logger = get_logger() + class AnthropicAdapter(LLMInterface): """ @@ -35,8 +40,13 @@ class AnthropicAdapter(LLMInterface): self.model = model self.max_completion_tokens = max_completion_tokens - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py index 510d29ce8..1187e0cad 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py @@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, +import logging +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) +logger = get_logger() + class GeminiAdapter(LLMInterface): """ @@ -58,8 +65,13 @@ class GeminiAdapter(LLMInterface): self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py index 917599d4d..8bbbaa2cc 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py @@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, +import logging +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) +logger = get_logger() + class GenericAPIAdapter(LLMInterface): """ @@ -58,8 +65,13 @@ class GenericAPIAdapter(LLMInterface): self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py index c4e51b70b..78a3cbff5 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py @@ -1,20 +1,23 @@ import litellm import instructor from pydantic import BaseModel -from typing import Type, Optional -from litellm import acompletion, JSONSchemaValidationError +from typing import Type +from litellm import JSONSchemaValidationError from cognee.shared.logging_utils import get_logger from cognee.modules.observability.get_observe import get_observe -from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.config import get_llm_config -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, + +import logging +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) logger = get_logger() @@ -47,8 +50,13 @@ class MistralAdapter(LLMInterface): api_key=get_llm_config().llm_api_key, ) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -99,31 +107,3 @@ class MistralAdapter(LLMInterface): logger.error(f"Schema validation failed: {str(e)}") logger.debug(f"Raw response: {e.raw_response}") raise ValueError(f"Response failed schema validation: {str(e)}") - - def show_prompt(self, text_input: str, system_prompt: str) -> str: - """ - Format and display the prompt for a user query. - - Parameters: - ----------- - - text_input (str): Input text from the user to be included in the prompt. - - system_prompt (str): The system prompt that will be shown alongside the user input. - - Returns: - -------- - - str: The formatted prompt string combining system prompt and user input. - """ - if not text_input: - text_input = "No user input provided." - if not system_prompt: - raise MissingSystemPromptPathError() - - system_prompt = LLMGateway.read_query_prompt(system_prompt) - - formatted_prompt = ( - f"""System Prompt:\n{system_prompt}\n\nUser Input:\n{text_input}\n""" - if system_prompt - else None - ) - - return formatted_prompt diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py index 314cb79d8..9c3d185aa 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py @@ -1,4 +1,6 @@ import base64 +import litellm +import logging import instructor from typing import Type from openai import OpenAI @@ -7,11 +9,17 @@ from pydantic import BaseModel from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, -) from cognee.infrastructure.files.utils.open_data_file import open_data_file +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + +logger = get_logger() class OllamaAPIAdapter(LLMInterface): @@ -47,8 +55,13 @@ class OllamaAPIAdapter(LLMInterface): OpenAI(base_url=self.endpoint, api_key=self.api_key), mode=instructor.Mode.JSON ) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -90,7 +103,13 @@ class OllamaAPIAdapter(LLMInterface): return response - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def create_transcript(self, input_file: str) -> str: """ Generate an audio transcript from a user query. @@ -123,7 +142,13 @@ class OllamaAPIAdapter(LLMInterface): return transcription.text - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def transcribe_image(self, input_file: str) -> str: """ Transcribe content from an image using base64 encoding. diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py index 527f64d75..8877c2bdf 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py @@ -7,6 +7,15 @@ from openai import ContentFilterFinishReasonError from litellm.exceptions import ContentPolicyViolationError from instructor.core import InstructorRetryException +import logging +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) @@ -14,19 +23,13 @@ from cognee.infrastructure.llm.exceptions import ( ContentPolicyFilterError, ) from cognee.infrastructure.files.utils.open_data_file import open_data_file -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - rate_limit_sync, - sleep_and_retry_async, - sleep_and_retry_sync, -) from cognee.modules.observability.get_observe import get_observe from cognee.shared.logging_utils import get_logger -observe = get_observe() - logger = get_logger() +observe = get_observe() + class OpenAIAdapter(LLMInterface): """ @@ -97,8 +100,13 @@ class OpenAIAdapter(LLMInterface): self.fallback_endpoint = fallback_endpoint @observe(as_type="generation") - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -186,8 +194,13 @@ class OpenAIAdapter(LLMInterface): ) from error @observe - @sleep_and_retry_sync() - @rate_limit_sync + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) def create_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -231,7 +244,13 @@ class OpenAIAdapter(LLMInterface): max_retries=self.MAX_RETRIES, ) - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def create_transcript(self, input): """ Generate an audio transcript from a user query. @@ -263,7 +282,13 @@ class OpenAIAdapter(LLMInterface): return transcription - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def transcribe_image(self, input) -> BaseModel: """ Generate a transcription of an image from a user query. From 99dc35f23e26e4cd2016f50a6c783f6a0a1749e1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:01:09 +0200 Subject: [PATCH 11/16] fix: resolve issue with neo4j metrics test --- .../tasks/descriptive_metrics/metrics_test_utils.py | 2 -- .../tasks/descriptive_metrics/neo4j_metrics_test.py | 11 ++++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py b/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py index 911d9c33b..579a499fd 100644 --- a/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py +++ b/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py @@ -1,7 +1,6 @@ from typing import List from cognee.infrastructure.engine import DataPoint from cognee.tasks.storage.add_data_points import add_data_points -from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine import cognee from cognee.infrastructure.databases.graph import get_graph_engine import json @@ -64,7 +63,6 @@ async def create_connected_test_graph(): async def get_metrics(provider: str, include_optional=True): - create_graph_engine.cache_clear() cognee.config.set_graph_database_provider(provider) graph_engine = await get_graph_engine() await graph_engine.delete_graph() diff --git a/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py b/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py index 2ca9e9f7e..8d7a6ab02 100644 --- a/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py +++ b/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py @@ -1,7 +1,12 @@ -from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics import asyncio +async def main(): + from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics + + await assert_metrics(provider="neo4j", include_optional=False) + await assert_metrics(provider="neo4j", include_optional=True) + + if __name__ == "__main__": - asyncio.run(assert_metrics(provider="neo4j", include_optional=False)) - asyncio.run(assert_metrics(provider="neo4j", include_optional=True)) + asyncio.run(main()) From 3a9022a26c1e26b1b70867b441096b266f884cc5 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:22:29 +0200 Subject: [PATCH 12/16] refactor: Rename batch size for tasks to chunk batch size --- cognee/api/v1/cognify/cognify.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index d29d8c939..e0f6253d8 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,7 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, - batch_size: int = None, + chunk_batch_size: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -106,7 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. - batch_size: Number of chunks to be processed in a single batch in Cognify tasks. + chunk_batch_size: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -212,7 +212,7 @@ async def cognify( if temporal_cognify: tasks = await get_temporal_tasks( - user=user, chunker=chunker, chunk_size=chunk_size, batch_size=batch_size + user=user, chunker=chunker, chunk_size=chunk_size, chunk_batch_size=chunk_batch_size ) else: tasks = await get_default_tasks( @@ -222,7 +222,7 @@ async def cognify( chunk_size=chunk_size, config=config, custom_prompt=custom_prompt, - batch_size=batch_size, + chunk_batch_size=chunk_batch_size, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -248,7 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, - batch_size: int = 100, + chunk_batch_size: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -267,8 +267,8 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } - if batch_size is None: - batch_size = 100 + if chunk_batch_size is None: + chunk_batch_size = 100 default_tasks = [ Task(classify_documents), @@ -283,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": batch_size}, + task_config={"batch_size": chunk_batch_size}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": batch_size}, + task_config={"batch_size": chunk_batch_size}, ), - Task(add_data_points, task_config={"batch_size": batch_size}), + Task(add_data_points, task_config={"batch_size": chunk_batch_size}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None, batch_size: int = 10 + user: User = None, chunker=TextChunker, chunk_size: int = None, chunk_batch_size: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -313,13 +313,13 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. - batch_size (int, optional): Number of chunks to process in a single batch in Cognify + chunk_batch_size (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ - if batch_size is None: - batch_size = 10 + if chunk_batch_size is None: + chunk_batch_size = 10 temporal_tasks = [ Task(classify_documents), @@ -329,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": batch_size}), + Task(extract_events_and_timestamps, task_config={"batch_size": chunk_batch_size}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": batch_size}), + Task(add_data_points, task_config={"batch_size": chunk_batch_size}), ] return temporal_tasks From a210bd59054dd353675589c63e57fe9d7349b766 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:24:36 +0200 Subject: [PATCH 13/16] refactor: rename chunk_batch_size to chunks_per_batch --- cognee/api/v1/cognify/cognify.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index e0f6253d8..1d5c36a3c 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,7 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, - chunk_batch_size: int = None, + chunks_per_batch: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -106,7 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. - chunk_batch_size: Number of chunks to be processed in a single batch in Cognify tasks. + chunks_per_batch: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -212,7 +212,7 @@ async def cognify( if temporal_cognify: tasks = await get_temporal_tasks( - user=user, chunker=chunker, chunk_size=chunk_size, chunk_batch_size=chunk_batch_size + user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch ) else: tasks = await get_default_tasks( @@ -222,7 +222,7 @@ async def cognify( chunk_size=chunk_size, config=config, custom_prompt=custom_prompt, - chunk_batch_size=chunk_batch_size, + chunks_per_batch=chunks_per_batch, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -248,7 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, - chunk_batch_size: int = 100, + chunks_per_batch: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -267,8 +267,8 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } - if chunk_batch_size is None: - chunk_batch_size = 100 + if chunks_per_batch is None: + chunks_per_batch = 100 default_tasks = [ Task(classify_documents), @@ -283,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": chunk_batch_size}, + task_config={"batch_size": chunks_per_batch}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": chunk_batch_size}, + task_config={"batch_size": chunks_per_batch}, ), - Task(add_data_points, task_config={"batch_size": chunk_batch_size}), + Task(add_data_points, task_config={"batch_size": chunks_per_batch}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None, chunk_batch_size: int = 10 + user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -313,13 +313,13 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. - chunk_batch_size (int, optional): Number of chunks to process in a single batch in Cognify + chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ - if chunk_batch_size is None: - chunk_batch_size = 10 + if chunks_per_batch is None: + chunks_per_batch = 10 temporal_tasks = [ Task(classify_documents), @@ -329,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": chunk_batch_size}), + Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": chunk_batch_size}), + Task(add_data_points, task_config={"batch_size": chunks_per_batch}), ] return temporal_tasks From 2998802c00961e36115bed93f5eda446e8500c75 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 17 Oct 2025 11:58:14 +0200 Subject: [PATCH 14/16] fix: Resolve issue with wrong error for OpenAI --- .../litellm_instructor/llm/openai/adapter.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py index 8877c2bdf..305b426b8 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py @@ -156,10 +156,7 @@ class OpenAIAdapter(LLMInterface): InstructorRetryException, ) as e: if not (self.fallback_model and self.fallback_api_key): - raise ContentPolicyFilterError( - f"The provided input contains content that is not aligned with our content policy: {text_input}" - ) from e - + raise e try: return await self.aclient.chat.completions.create( model=self.fallback_model, From 04719129a64809e28ed9c5e0af40dcd77a2e32dc Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 15:53:38 +0200 Subject: [PATCH 15/16] updated env template --- .env.template | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.env.template b/.env.template index 3137636d3..89ac06830 100644 --- a/.env.template +++ b/.env.template @@ -247,10 +247,10 @@ LITELLM_LOG="ERROR" #LLM_PROVIDER="ollama" #LLM_ENDPOINT="http://localhost:11434/v1" #EMBEDDING_PROVIDER="ollama" -#EMBEDDING_MODEL="avr/sfr-embedding-mistral:latest" +#EMBEDDING_MODEL="nomic-embed-text:latest" #EMBEDDING_ENDPOINT="http://localhost:11434/api/embeddings" -#EMBEDDING_DIMENSIONS=4096 -#HUGGINGFACE_TOKENIZER="Salesforce/SFR-Embedding-Mistral" +#EMBEDDING_DIMENSIONS=768 +#HUGGINGFACE_TOKENIZER="nomic-ai/nomic-embed-text-v1.5" ########## OpenRouter (also free) ######################################################### From 612a2252ce012fc8929ffe6523ed6bc948a4db55 Mon Sep 17 00:00:00 2001 From: vasilije Date: Tue, 21 Oct 2025 07:22:52 +0200 Subject: [PATCH 16/16] fix --- poetry.lock | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 80263027e..2773e61b9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "accelerate" @@ -4366,6 +4366,8 @@ groups = ["main"] markers = "extra == \"dlt\"" files = [ {file = "jsonpath-ng-1.7.0.tar.gz", hash = "sha256:f6f5f7fd4e5ff79c785f1573b394043b39849fb2bb47bcead935d12b00beab3c"}, + {file = "jsonpath_ng-1.7.0-py2-none-any.whl", hash = "sha256:898c93fc173f0c336784a3fa63d7434297544b7198124a68f9a3ef9597b0ae6e"}, + {file = "jsonpath_ng-1.7.0-py3-none-any.whl", hash = "sha256:f3d7f9e848cba1b6da28c55b1c26ff915dc9e0b1ba7e752a53d6da8d5cbd00b6"}, ] [package.dependencies] @@ -10208,6 +10210,13 @@ optional = false python-versions = ">=3.8" groups = ["main"] files = [ + {file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"}, + {file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"}, + {file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"}, + {file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"}, {file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},