From 9206d8536b89d4292c0286a35985665ce6f133d1 Mon Sep 17 00:00:00 2001 From: Andrej Milicevic Date: Mon, 6 Oct 2025 17:45:22 +0200 Subject: [PATCH 01/35] initial changes, still need to work on this. commit so I can checkout to diff branch --- .github/workflows/examples_tests.yml | 58 ++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/.github/workflows/examples_tests.yml b/.github/workflows/examples_tests.yml index 4eb9e184f..406420351 100644 --- a/.github/workflows/examples_tests.yml +++ b/.github/workflows/examples_tests.yml @@ -85,8 +85,8 @@ jobs: run: uv run python ./cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py - test-dynamic-steps-metrics: - name: Run Dynamic Steps Example + test-multiple-examples: + name: Run Multiple Example Scripts runs-on: ubuntu-22.04 steps: - name: Check out repository @@ -97,7 +97,7 @@ jobs: with: python-version: '3.11.x' - - name: Run Dynamic Steps Tests + - name: Run Dynamic Steps Example env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} LLM_MODEL: ${{ secrets.LLM_MODEL }} @@ -110,6 +110,58 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./examples/python/dynamic_steps_example.py + - name: Run Temporal Example + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./examples/python/temporal_example.py + + - name: Run Ontology Demo Example + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./examples/python/ontology_demo_example.py + + - name: Run Temporal Example + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./examples/python/temporal_example.py + + - name: Run Agentic Reasoning Example + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./examples/python/agentic_reasoning_procurement_example.py + test-memify: name: Run Memify Example runs-on: ubuntu-22.04 From abfcbc69d61ec8a71ed83a8dd32894f5e99d8248 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 15:36:36 +0200 Subject: [PATCH 02/35] refactor: Have embedding calls run in async gather --- cognee/api/v1/cognify/cognify.py | 6 ++--- cognee/tasks/storage/index_data_points.py | 33 +++++++++++++---------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 1292d243a..6a9f68443 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), - Task(add_data_points, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": 100}), ] return default_tasks diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index 362412657..ebc4640d6 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -1,6 +1,6 @@ -from cognee.shared.logging_utils import get_logger +import asyncio -from cognee.infrastructure.databases.exceptions import EmbeddingException +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.engine import DataPoint @@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]): indexed_data_point.metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point) - for index_name_and_field, indexable_points in index_points.items(): - first_occurence = index_name_and_field.index("_") - index_name = index_name_and_field[:first_occurence] - field_name = index_name_and_field[first_occurence + 1 :] - try: - # In case the amount of indexable points is too large we need to send them in batches - batch_size = vector_engine.embedding_engine.get_batch_size() - for i in range(0, len(indexable_points), batch_size): - batch = indexable_points[i : i + batch_size] - await vector_engine.index_data_points(index_name, field_name, batch) - except EmbeddingException as e: - logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}") + tasks: list[asyncio.Task] = [] + batch_size = vector_engine.embedding_engine.get_batch_size() + + for index_name_and_field, points in index_points.items(): + first = index_name_and_field.index("_") + index_name = index_name_and_field[:first] + field_name = index_name_and_field[first + 1 :] + + # Split in the usual “range step batch_size” manner + for i in range(0, len(points), batch_size): + batch = points[i : i + batch_size] + tasks.append( + asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch)) + ) + + # Fire them all and wait until every task is done. + await asyncio.gather(*tasks) return data_points From 757d745b5d262975c05f5fe3bb3f410f5c3d72b7 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 17:12:09 +0200 Subject: [PATCH 03/35] refactor: Optimize cognification speed --- cognee/api/v1/cognify/cognify.py | 4 ++-- .../databases/vector/embeddings/config.py | 4 ++-- cognee/tasks/storage/index_graph_edges.py | 15 +++++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 6a9f68443..30afb269a 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,11 +269,11 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 30}, + task_config={"batch_size": 100}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 30}, + task_config={"batch_size": 100}, ), Task(add_data_points, task_config={"batch_size": 100}), ] diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index 24f724151..dcb55f4a4 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -26,9 +26,9 @@ class EmbeddingConfig(BaseSettings): def model_post_init(self, __context) -> None: # If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 2048 + self.embedding_batch_size = 30 elif not self.embedding_batch_size: - self.embedding_batch_size = 100 + self.embedding_batch_size = 10 def to_dict(self) -> dict: """ diff --git a/cognee/tasks/storage/index_graph_edges.py b/cognee/tasks/storage/index_graph_edges.py index b7bf7a2b9..4fa8cfc75 100644 --- a/cognee/tasks/storage/index_graph_edges.py +++ b/cognee/tasks/storage/index_graph_edges.py @@ -1,3 +1,5 @@ +import asyncio + from cognee.modules.engine.utils.generate_edge_id import generate_edge_id from cognee.shared.logging_utils import get_logger from collections import Counter @@ -76,15 +78,20 @@ async def index_graph_edges( indexed_data_point.metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point) + # Get maximum batch size for embedding model + batch_size = vector_engine.embedding_engine.get_batch_size() + tasks: list[asyncio.Task] = [] + for index_name, indexable_points in index_points.items(): index_name, field_name = index_name.split(".") - # Get maximum batch size for embedding model - batch_size = vector_engine.embedding_engine.get_batch_size() - # We save the data in batches of {batch_size} to not put a lot of pressure on the database + # Create embedding tasks to run in parallel later for start in range(0, len(indexable_points), batch_size): batch = indexable_points[start : start + batch_size] - await vector_engine.index_data_points(index_name, field_name, batch) + tasks.append(vector_engine.index_data_points(index_name, field_name, batch)) + + # Start all embedding tasks and wait for completion + await asyncio.gather(*tasks) return None From 13d1133680a241a9423b57d760c2319c20b80670 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 10 Oct 2025 17:14:10 +0200 Subject: [PATCH 04/35] chore: Change comments --- cognee/tasks/storage/index_data_points.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index ebc4640d6..902789c80 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -41,14 +41,14 @@ async def index_data_points(data_points: list[DataPoint]): index_name = index_name_and_field[:first] field_name = index_name_and_field[first + 1 :] - # Split in the usual “range step batch_size” manner + # Create embedding requests per batch to run in parallel later for i in range(0, len(points), batch_size): batch = points[i : i + batch_size] tasks.append( asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch)) ) - # Fire them all and wait until every task is done. + # Run all embedding requests in parallel await asyncio.gather(*tasks) return data_points From eb631a23ad6eeaba9c1111b598a6f4f955cd6c86 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 13:57:41 +0200 Subject: [PATCH 05/35] refactor: set default numbers that are more reasonable --- cognee/api/v1/cognify/cognify.py | 6 +++--- cognee/infrastructure/databases/vector/embeddings/config.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 30afb269a..898c35518 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 100}, + task_config={"batch_size": 20}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 100}, + task_config={"batch_size": 20}, ), - Task(add_data_points, task_config={"batch_size": 100}), + Task(add_data_points, task_config={"batch_size": 20}), ] return default_tasks diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index dcb55f4a4..314adbd99 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -24,11 +24,10 @@ class EmbeddingConfig(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="allow") def model_post_init(self, __context) -> None: - # If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 30 + self.embedding_batch_size = 1024 elif not self.embedding_batch_size: - self.embedding_batch_size = 10 + self.embedding_batch_size = 100 def to_dict(self) -> dict: """ From 84a23756f5c77ef3c7e0c78c4aff122416249341 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 14:25:38 +0200 Subject: [PATCH 06/35] fix: Change chunk_size ot batch_size for temporal task --- cognee/api/v1/cognify/cognify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 898c35518..2c87dbc4b 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -311,7 +311,7 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"chunk_size": 10}), + Task(extract_events_and_timestamps, task_config={"batch_size": 10}), Task(extract_knowledge_graph_from_events), Task(add_data_points, task_config={"batch_size": 10}), ] From 98daadbb0461ae99935032bde96d8c056f874050 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 14 Oct 2025 20:29:55 +0200 Subject: [PATCH 07/35] refactor: Add tenacity retry mechanism --- .../embeddings/LiteLLMEmbeddingEngine.py | 18 ++++++++++++++++-- .../embeddings/OllamaEmbeddingEngine.py | 19 ++++++++++++++++--- poetry.lock | 2 +- pyproject.toml | 3 ++- uv.lock | 4 +++- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py index d68941d25..2a71d674d 100644 --- a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py @@ -1,8 +1,17 @@ import asyncio +import logging + from cognee.shared.logging_utils import get_logger from typing import List, Optional import numpy as np import math +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) import litellm import os from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine @@ -76,8 +85,13 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") - @embedding_sleep_and_retry_async() - @embedding_rate_limit_async + @retry( + stop=stop_after_delay(180), + wait=wait_exponential_jitter(1, 180), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def embed_text(self, text: List[str]) -> List[List[float]]: """ Embed a list of text strings into vector representations. diff --git a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py index e79ba3f6a..b8ee9c7df 100644 --- a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py @@ -3,8 +3,16 @@ from cognee.shared.logging_utils import get_logger import aiohttp from typing import List, Optional import os - +import litellm +import logging import aiohttp.http_exceptions +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.llm.tokenizer.HuggingFace import ( @@ -69,7 +77,6 @@ class OllamaEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") - @embedding_rate_limit_async async def embed_text(self, text: List[str]) -> List[List[float]]: """ Generate embedding vectors for a list of text prompts. @@ -92,7 +99,13 @@ class OllamaEmbeddingEngine(EmbeddingEngine): embeddings = await asyncio.gather(*[self._get_embedding(prompt) for prompt in text]) return embeddings - @embedding_sleep_and_retry_async() + @retry( + stop=stop_after_delay(180), + wait=wait_exponential_jitter(1, 180), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def _get_embedding(self, prompt: str) -> List[float]: """ Internal method to call the Ollama embeddings endpoint for a single prompt. diff --git a/poetry.lock b/poetry.lock index 551295733..ffc5ec575 100644 --- a/poetry.lock +++ b/poetry.lock @@ -12738,4 +12738,4 @@ posthog = ["posthog"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<=3.13" -content-hash = "38353807b06e5c06caaa107979529937b978204f0f405c6b38cee283f4a49d3c" +content-hash = "d8cd8a8db46416e0c844ff90df5bd64551ebf9a0c338fbb2023a61008ff5941d" diff --git a/pyproject.toml b/pyproject.toml index 3df57e1f5..7ac2915d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ dependencies = [ "networkx>=3.4.2,<4", "uvicorn>=0.34.0,<1.0.0", "gunicorn>=20.1.0,<24", - "websockets>=15.0.1,<16.0.0" + "websockets>=15.0.1,<16.0.0", + "tenacity>=9.0.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 570da9289..5c06b96be 100644 --- a/uv.lock +++ b/uv.lock @@ -856,7 +856,7 @@ wheels = [ [[package]] name = "cognee" -version = "0.3.4" +version = "0.3.5" source = { editable = "." } dependencies = [ { name = "aiofiles" }, @@ -892,6 +892,7 @@ dependencies = [ { name = "rdflib" }, { name = "sqlalchemy" }, { name = "structlog" }, + { name = "tenacity" }, { name = "tiktoken" }, { name = "typing-extensions" }, { name = "uvicorn" }, @@ -1086,6 +1087,7 @@ requires-dist = [ { name = "sentry-sdk", extras = ["fastapi"], marker = "extra == 'monitoring'", specifier = ">=2.9.0,<3" }, { name = "sqlalchemy", specifier = ">=2.0.39,<3.0.0" }, { name = "structlog", specifier = ">=25.2.0,<26" }, + { name = "tenacity", specifier = ">=9.0.0" }, { name = "tiktoken", specifier = ">=0.8.0,<1.0.0" }, { name = "transformers", marker = "extra == 'codegraph'", specifier = ">=4.46.3,<5" }, { name = "transformers", marker = "extra == 'huggingface'", specifier = ">=4.46.3,<5" }, From 1b28f137431d30c940568406fab1678db9276c28 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 13:32:17 +0200 Subject: [PATCH 08/35] refactor: Optimize Cognee speed --- cognee/api/v1/cognify/cognify.py | 6 +++--- .../embeddings/FastembedEmbeddingEngine.py | 20 +++++++++++++++++-- .../embeddings/LiteLLMEmbeddingEngine.py | 11 ++-------- .../embeddings/OllamaEmbeddingEngine.py | 4 ++-- .../databases/vector/embeddings/config.py | 4 ++-- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 9215c9369..3032bd4e8 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 20}, + task_config={"batch_size": 100}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 20}, + task_config={"batch_size": 100}, ), - Task(add_data_points, task_config={"batch_size": 20}), + Task(add_data_points, task_config={"batch_size": 100}), ] return default_tasks diff --git a/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py index e34ab5d9d..c2acd516e 100644 --- a/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/FastembedEmbeddingEngine.py @@ -1,8 +1,17 @@ -from cognee.shared.logging_utils import get_logger +import os +import logging from typing import List, Optional from fastembed import TextEmbedding import litellm -import os +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.databases.exceptions import EmbeddingException from cognee.infrastructure.llm.tokenizer.TikToken import ( @@ -57,6 +66,13 @@ class FastembedEmbeddingEngine(EmbeddingEngine): enable_mocking = str(enable_mocking).lower() self.mock = enable_mocking in ("true", "1", "yes") + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def embed_text(self, text: List[str]) -> List[List[float]]: """ Embed the given text into numerical vectors. diff --git a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py index 302950f66..03ce86bee 100644 --- a/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/LiteLLMEmbeddingEngine.py @@ -16,9 +16,6 @@ import litellm import os from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine from cognee.infrastructure.databases.exceptions import EmbeddingException -from cognee.infrastructure.llm.tokenizer.Gemini import ( - GeminiTokenizer, -) from cognee.infrastructure.llm.tokenizer.HuggingFace import ( HuggingFaceTokenizer, ) @@ -28,10 +25,6 @@ from cognee.infrastructure.llm.tokenizer.Mistral import ( from cognee.infrastructure.llm.tokenizer.TikToken import ( TikTokenTokenizer, ) -from cognee.infrastructure.databases.vector.embeddings.embedding_rate_limiter import ( - embedding_rate_limit_async, - embedding_sleep_and_retry_async, -) litellm.set_verbose = False logger = get_logger("LiteLLMEmbeddingEngine") @@ -86,8 +79,8 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine): self.mock = enable_mocking in ("true", "1", "yes") @retry( - stop=stop_after_delay(180), - wait=wait_exponential_jitter(1, 180), + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, diff --git a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py index b8ee9c7df..2882b679a 100644 --- a/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py +++ b/cognee/infrastructure/databases/vector/embeddings/OllamaEmbeddingEngine.py @@ -100,8 +100,8 @@ class OllamaEmbeddingEngine(EmbeddingEngine): return embeddings @retry( - stop=stop_after_delay(180), - wait=wait_exponential_jitter(1, 180), + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, diff --git a/cognee/infrastructure/databases/vector/embeddings/config.py b/cognee/infrastructure/databases/vector/embeddings/config.py index 314adbd99..56cd79678 100644 --- a/cognee/infrastructure/databases/vector/embeddings/config.py +++ b/cognee/infrastructure/databases/vector/embeddings/config.py @@ -25,9 +25,9 @@ class EmbeddingConfig(BaseSettings): def model_post_init(self, __context) -> None: if not self.embedding_batch_size and self.embedding_provider.lower() == "openai": - self.embedding_batch_size = 1024 + self.embedding_batch_size = 36 elif not self.embedding_batch_size: - self.embedding_batch_size = 100 + self.embedding_batch_size = 36 def to_dict(self) -> dict: """ From fc4440da8c7b7cdfd4087f34c40ac90cc86bb839 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 14:43:21 +0200 Subject: [PATCH 09/35] refactor: update env template --- .env.template | 5 ++--- .../loaders/external/advanced_pdf_loader.py | 10 ++-------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/.env.template b/.env.template index 7fd3ba9e8..3137636d3 100644 --- a/.env.template +++ b/.env.template @@ -28,11 +28,10 @@ EMBEDDING_ENDPOINT="" EMBEDDING_API_VERSION="" EMBEDDING_DIMENSIONS=3072 EMBEDDING_MAX_TOKENS=8191 +EMBEDDING_BATCH_SIZE=36 # If embedding key is not provided same key set for LLM_API_KEY will be used #EMBEDDING_API_KEY="your_api_key" -# Note: OpenAI support up to 2048 elements and Gemini supports a maximum of 100 elements in an embedding batch, -# Cognee sets the optimal batch size for OpenAI and Gemini, but a custom size can be defined if necessary for other models -#EMBEDDING_BATCH_SIZE=2048 + # If using BAML structured output these env variables will be used BAML_LLM_PROVIDER=openai diff --git a/cognee/infrastructure/loaders/external/advanced_pdf_loader.py b/cognee/infrastructure/loaders/external/advanced_pdf_loader.py index 7bab8cac6..6d1412b77 100644 --- a/cognee/infrastructure/loaders/external/advanced_pdf_loader.py +++ b/cognee/infrastructure/loaders/external/advanced_pdf_loader.py @@ -14,14 +14,6 @@ from cognee.infrastructure.loaders.external.pypdf_loader import PyPdfLoader logger = get_logger(__name__) -try: - from unstructured.partition.pdf import partition_pdf -except ImportError as e: - logger.info( - "unstructured[pdf] not installed, can't use AdvancedPdfLoader, will use PyPdfLoader instead." - ) - raise ImportError from e - @dataclass class _PageBuffer: @@ -88,6 +80,8 @@ class AdvancedPdfLoader(LoaderInterface): **kwargs, } # Use partition to extract elements + from unstructured.partition.pdf import partition_pdf + elements = partition_pdf(**partition_kwargs) # Process elements into text content From 5663c3fe3ab80f0eee7adb3576af4b579a1d8306 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 17:38:18 +0200 Subject: [PATCH 10/35] refactor: add batch size param to temporal graphs --- cognee/api/v1/cognify/cognify.py | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 3032bd4e8..d29d8c939 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,6 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, + batch_size: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -105,6 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. + batch_size: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -209,10 +211,18 @@ async def cognify( } if temporal_cognify: - tasks = await get_temporal_tasks(user, chunker, chunk_size) + tasks = await get_temporal_tasks( + user=user, chunker=chunker, chunk_size=chunk_size, batch_size=batch_size + ) else: tasks = await get_default_tasks( - user, graph_model, chunker, chunk_size, config, custom_prompt + user=user, + graph_model=graph_model, + chunker=chunker, + chunk_size=chunk_size, + config=config, + custom_prompt=custom_prompt, + batch_size=batch_size, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -238,6 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, + batch_size: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -256,6 +267,9 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } + if batch_size is None: + batch_size = 100 + default_tasks = [ Task(classify_documents), Task(check_permissions_on_dataset, user=user, permissions=["write"]), @@ -269,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 100}, + task_config={"batch_size": batch_size}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 100}, + task_config={"batch_size": batch_size}, ), - Task(add_data_points, task_config={"batch_size": 100}), + Task(add_data_points, task_config={"batch_size": batch_size}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None + user: User = None, chunker=TextChunker, chunk_size: int = None, batch_size: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -299,10 +313,14 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. + batch_size (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ + if batch_size is None: + batch_size = 10 + temporal_tasks = [ Task(classify_documents), Task(check_permissions_on_dataset, user=user, permissions=["write"]), @@ -311,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": 10}), + Task(extract_events_and_timestamps, task_config={"batch_size": batch_size}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": batch_size}), ] return temporal_tasks From 96496f38ed1e4ce2dd63190c9cbf6a16338fbeb0 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 18:08:18 +0200 Subject: [PATCH 11/35] refactor: Switch to using tenacity for rate limiting --- .../llm/anthropic/adapter.py | 28 ++++++---- .../litellm_instructor/llm/gemini/adapter.py | 22 ++++++-- .../llm/generic_llm_api/adapter.py | 22 ++++++-- .../litellm_instructor/llm/mistral/adapter.py | 54 ++++++------------- .../litellm_instructor/llm/ollama/adapter.py | 41 +++++++++++--- .../litellm_instructor/llm/openai/adapter.py | 53 +++++++++++++----- 6 files changed, 142 insertions(+), 78 deletions(-) diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py index 2d88a8271..bf19d6e86 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/anthropic/adapter.py @@ -1,19 +1,24 @@ +import logging from typing import Type from pydantic import BaseModel +import litellm import instructor +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) -from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, -) - -from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.config import get_llm_config +logger = get_logger() + class AnthropicAdapter(LLMInterface): """ @@ -35,8 +40,13 @@ class AnthropicAdapter(LLMInterface): self.model = model self.max_completion_tokens = max_completion_tokens - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py index 510d29ce8..1187e0cad 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/gemini/adapter.py @@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, +import logging +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) +logger = get_logger() + class GeminiAdapter(LLMInterface): """ @@ -58,8 +65,13 @@ class GeminiAdapter(LLMInterface): self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py index 917599d4d..8bbbaa2cc 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/generic_llm_api/adapter.py @@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, +import logging +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) +logger = get_logger() + class GenericAPIAdapter(LLMInterface): """ @@ -58,8 +65,13 @@ class GenericAPIAdapter(LLMInterface): self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py index c4e51b70b..78a3cbff5 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py @@ -1,20 +1,23 @@ import litellm import instructor from pydantic import BaseModel -from typing import Type, Optional -from litellm import acompletion, JSONSchemaValidationError +from typing import Type +from litellm import JSONSchemaValidationError from cognee.shared.logging_utils import get_logger from cognee.modules.observability.get_observe import get_observe -from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.config import get_llm_config -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, + +import logging +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, ) logger = get_logger() @@ -47,8 +50,13 @@ class MistralAdapter(LLMInterface): api_key=get_llm_config().llm_api_key, ) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -99,31 +107,3 @@ class MistralAdapter(LLMInterface): logger.error(f"Schema validation failed: {str(e)}") logger.debug(f"Raw response: {e.raw_response}") raise ValueError(f"Response failed schema validation: {str(e)}") - - def show_prompt(self, text_input: str, system_prompt: str) -> str: - """ - Format and display the prompt for a user query. - - Parameters: - ----------- - - text_input (str): Input text from the user to be included in the prompt. - - system_prompt (str): The system prompt that will be shown alongside the user input. - - Returns: - -------- - - str: The formatted prompt string combining system prompt and user input. - """ - if not text_input: - text_input = "No user input provided." - if not system_prompt: - raise MissingSystemPromptPathError() - - system_prompt = LLMGateway.read_query_prompt(system_prompt) - - formatted_prompt = ( - f"""System Prompt:\n{system_prompt}\n\nUser Input:\n{text_input}\n""" - if system_prompt - else None - ) - - return formatted_prompt diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py index 314cb79d8..9c3d185aa 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/ollama/adapter.py @@ -1,4 +1,6 @@ import base64 +import litellm +import logging import instructor from typing import Type from openai import OpenAI @@ -7,11 +9,17 @@ from pydantic import BaseModel from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - sleep_and_retry_async, -) from cognee.infrastructure.files.utils.open_data_file import open_data_file +from cognee.shared.logging_utils import get_logger +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + +logger = get_logger() class OllamaAPIAdapter(LLMInterface): @@ -47,8 +55,13 @@ class OllamaAPIAdapter(LLMInterface): OpenAI(base_url=self.endpoint, api_key=self.api_key), mode=instructor.Mode.JSON ) - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -90,7 +103,13 @@ class OllamaAPIAdapter(LLMInterface): return response - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def create_transcript(self, input_file: str) -> str: """ Generate an audio transcript from a user query. @@ -123,7 +142,13 @@ class OllamaAPIAdapter(LLMInterface): return transcription.text - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def transcribe_image(self, input_file: str) -> str: """ Transcribe content from an image using base64 encoding. diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py index 527f64d75..8877c2bdf 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py @@ -7,6 +7,15 @@ from openai import ContentFilterFinishReasonError from litellm.exceptions import ContentPolicyViolationError from instructor.core import InstructorRetryException +import logging +from tenacity import ( + retry, + stop_after_delay, + wait_exponential_jitter, + retry_if_not_exception_type, + before_sleep_log, +) + from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import ( LLMInterface, ) @@ -14,19 +23,13 @@ from cognee.infrastructure.llm.exceptions import ( ContentPolicyFilterError, ) from cognee.infrastructure.files.utils.open_data_file import open_data_file -from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import ( - rate_limit_async, - rate_limit_sync, - sleep_and_retry_async, - sleep_and_retry_sync, -) from cognee.modules.observability.get_observe import get_observe from cognee.shared.logging_utils import get_logger -observe = get_observe() - logger = get_logger() +observe = get_observe() + class OpenAIAdapter(LLMInterface): """ @@ -97,8 +100,13 @@ class OpenAIAdapter(LLMInterface): self.fallback_endpoint = fallback_endpoint @observe(as_type="generation") - @sleep_and_retry_async() - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def acreate_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -186,8 +194,13 @@ class OpenAIAdapter(LLMInterface): ) from error @observe - @sleep_and_retry_sync() - @rate_limit_sync + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) def create_structured_output( self, text_input: str, system_prompt: str, response_model: Type[BaseModel] ) -> BaseModel: @@ -231,7 +244,13 @@ class OpenAIAdapter(LLMInterface): max_retries=self.MAX_RETRIES, ) - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def create_transcript(self, input): """ Generate an audio transcript from a user query. @@ -263,7 +282,13 @@ class OpenAIAdapter(LLMInterface): return transcription - @rate_limit_async + @retry( + stop=stop_after_delay(128), + wait=wait_exponential_jitter(2, 128), + retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError), + before_sleep=before_sleep_log(logger, logging.DEBUG), + reraise=True, + ) async def transcribe_image(self, input) -> BaseModel: """ Generate a transcription of an image from a user query. From 38406a0ab1b3d4d47f8d9fb4e95b4612cd3ce117 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Wed, 15 Oct 2025 17:32:48 +0100 Subject: [PATCH 12/35] chore: remove memgraph from cognee repo --- .../databases/graph/get_graph_engine.py | 2 +- .../graph/memgraph/memgraph_adapter.py | 1116 ----------------- cognee/tests/test_memgraph.py | 105 -- notebooks/neptune-analytics-example.ipynb | 82 +- 4 files changed, 42 insertions(+), 1263 deletions(-) delete mode 100644 cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py delete mode 100644 cognee/tests/test_memgraph.py diff --git a/cognee/infrastructure/databases/graph/get_graph_engine.py b/cognee/infrastructure/databases/graph/get_graph_engine.py index 1861aa15c..1ea61d29f 100644 --- a/cognee/infrastructure/databases/graph/get_graph_engine.py +++ b/cognee/infrastructure/databases/graph/get_graph_engine.py @@ -162,5 +162,5 @@ def create_graph_engine( raise EnvironmentError( f"Unsupported graph database provider: {graph_database_provider}. " - f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote', 'memgraph', 'neptune', 'neptune_analytics'])}" + f"Supported providers are: {', '.join(list(supported_databases.keys()) + ['neo4j', 'kuzu', 'kuzu-remote', 'neptune', 'neptune_analytics'])}" ) diff --git a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py b/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py deleted file mode 100644 index 3612e3277..000000000 --- a/cognee/infrastructure/databases/graph/memgraph/memgraph_adapter.py +++ /dev/null @@ -1,1116 +0,0 @@ -"""Memgraph Adapter for Graph Database""" - -import json -from cognee.shared.logging_utils import get_logger, ERROR -import asyncio -from textwrap import dedent -from typing import Optional, Any, List, Dict, Type, Tuple -from contextlib import asynccontextmanager -from uuid import UUID -from neo4j import AsyncSession -from neo4j import AsyncGraphDatabase -from neo4j.exceptions import Neo4jError -from cognee.infrastructure.engine import DataPoint -from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface -from cognee.modules.storage.utils import JSONEncoder -from cognee.infrastructure.databases.exceptions.exceptions import NodesetFilterNotSupportedError - -logger = get_logger("MemgraphAdapter", level=ERROR) - - -class MemgraphAdapter(GraphDBInterface): - """ - Handles interaction with a Memgraph database through various graph operations. - - Public methods include: - - get_session - - query - - has_node - - add_node - - add_nodes - - extract_node - - extract_nodes - - delete_node - - delete_nodes - - has_edge - - has_edges - - add_edge - - add_edges - - get_edges - - get_disconnected_nodes - - get_predecessors - - get_successors - - get_neighbours - - get_connections - - remove_connection_to_predecessors_of - - remove_connection_to_successors_of - - delete_graph - - serialize_properties - - get_model_independent_graph_data - - get_graph_data - - get_nodeset_subgraph - - get_filtered_graph_data - - get_node_labels_string - - get_relationship_labels_string - - get_graph_metrics - """ - - def __init__( - self, - graph_database_url: str, - graph_database_username: Optional[str] = None, - graph_database_password: Optional[str] = None, - driver: Optional[Any] = None, - ): - # Only use auth if both username and password are provided - auth = None - if graph_database_username and graph_database_password: - auth = (graph_database_username, graph_database_password) - - self.driver = driver or AsyncGraphDatabase.driver( - graph_database_url, - auth=auth, - max_connection_lifetime=120, - ) - - @asynccontextmanager - async def get_session(self) -> AsyncSession: - """ - Manage a session with the database, yielding the session for use in operations. - """ - async with self.driver.session() as session: - yield session - - async def query( - self, - query: str, - params: Optional[Dict[str, Any]] = None, - ) -> List[Dict[str, Any]]: - """ - Execute a provided query on the Memgraph database and return the results. - - Parameters: - ----------- - - - query (str): The Cypher query to be executed against the database. - - params (Optional[Dict[str, Any]]): Optional parameters to be used in the query. - (default None) - - Returns: - -------- - - - List[Dict[str, Any]]: A list of dictionaries representing the result set of the - query. - """ - try: - async with self.get_session() as session: - result = await session.run(query, params) - data = await result.data() - return data - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def has_node(self, node_id: str) -> bool: - """ - Determine if a node with the given ID exists in the database. - - Parameters: - ----------- - - - node_id (str): The ID of the node to check for existence. - - Returns: - -------- - - - bool: True if the node exists; otherwise, False. - """ - results = await self.query( - """ - MATCH (n) - WHERE n.id = $node_id - RETURN COUNT(n) > 0 AS node_exists - """, - {"node_id": node_id}, - ) - return results[0]["node_exists"] if len(results) > 0 else False - - async def add_node(self, node: DataPoint): - """ - Add a new node to the database with specified properties. - - Parameters: - ----------- - - - node (DataPoint): The DataPoint object representing the node to add. - - Returns: - -------- - - The result of the node addition, including its internal ID and node ID. - """ - serialized_properties = self.serialize_properties(node.model_dump()) - - query = """ - MERGE (node {id: $node_id}) - ON CREATE SET node:$node_label, node += $properties, node.updated_at = timestamp() - ON MATCH SET node:$node_label, node += $properties, node.updated_at = timestamp() - RETURN ID(node) AS internal_id, node.id AS nodeId - """ - - params = { - "node_id": str(node.id), - "node_label": type(node).__name__, - "properties": serialized_properties, - } - return await self.query(query, params) - - async def add_nodes(self, nodes: list[DataPoint]) -> None: - """ - Add multiple nodes to the database in a single operation. - - Parameters: - ----------- - - - nodes (list[DataPoint]): A list of DataPoint objects representing the nodes to - add. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $nodes AS node - MERGE (n {id: node.node_id}) - ON CREATE SET n:node.label, n += node.properties, n.updated_at = timestamp() - ON MATCH SET n:node.label, n += node.properties, n.updated_at = timestamp() - RETURN ID(n) AS internal_id, n.id AS nodeId - """ - - nodes = [ - { - "node_id": str(node.id), - "label": type(node).__name__, - "properties": self.serialize_properties(node.model_dump()), - } - for node in nodes - ] - - results = await self.query(query, dict(nodes=nodes)) - return results - - async def extract_node(self, node_id: str): - """ - Retrieve a single node based on its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node to retrieve. - - Returns: - -------- - - The node corresponding to the provided ID, or None if not found. - """ - results = await self.extract_nodes([node_id]) - - return results[0] if len(results) > 0 else None - - async def extract_nodes(self, node_ids: List[str]): - """ - Retrieve multiple nodes based on their IDs. - - Parameters: - ----------- - - - node_ids (List[str]): A list of IDs for the nodes to retrieve. - - Returns: - -------- - - A list of nodes corresponding to the provided IDs. - """ - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - RETURN node""" - - params = {"node_ids": node_ids} - - results = await self.query(query, params) - - return [result["node"] for result in results] - - async def delete_node(self, node_id: str): - """ - Delete a node from the database based on its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node to delete. - - Returns: - -------- - - None. - """ - sanitized_id = node_id.replace(":", "_") - - query = "MATCH (node: {{id: $node_id}}) DETACH DELETE node" - params = {"node_id": sanitized_id} - - return await self.query(query, params) - - async def delete_nodes(self, node_ids: list[str]) -> None: - """ - Delete multiple nodes from the database based on their IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of IDs for the nodes to delete. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - DETACH DELETE node""" - - params = {"node_ids": node_ids} - - return await self.query(query, params) - - async def has_edge(self, from_node: UUID, to_node: UUID, edge_label: str) -> bool: - """ - Check if a directed edge exists between two nodes identified by their IDs. - - Parameters: - ----------- - - - from_node (UUID): The ID of the source node. - - to_node (UUID): The ID of the target node. - - edge_label (str): The label of the edge to check. - - Returns: - -------- - - - bool: True if the edge exists; otherwise, False. - """ - query = """ - MATCH (from_node)-[relationship]->(to_node) - WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id AND type(relationship) = $edge_label - RETURN COUNT(relationship) > 0 AS edge_exists - """ - - params = { - "from_node_id": str(from_node), - "to_node_id": str(to_node), - "edge_label": edge_label, - } - - records = await self.query(query, params) - return records[0]["edge_exists"] if records else False - - async def has_edges(self, edges): - """ - Check for the existence of multiple edges based on provided criteria. - - Parameters: - ----------- - - - edges: A list of edges to verify existence for. - - Returns: - -------- - - A list of boolean values indicating the existence of each edge. - """ - query = """ - UNWIND $edges AS edge - MATCH (a)-[r]->(b) - WHERE id(a) = edge.from_node AND id(b) = edge.to_node AND type(r) = edge.relationship_name - RETURN edge.from_node AS from_node, edge.to_node AS to_node, edge.relationship_name AS relationship_name, count(r) > 0 AS edge_exists - """ - - try: - params = { - "edges": [ - { - "from_node": str(edge[0]), - "to_node": str(edge[1]), - "relationship_name": edge[2], - } - for edge in edges - ], - } - - results = await self.query(query, params) - return [result["edge_exists"] for result in results] - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def add_edge( - self, - from_node: UUID, - to_node: UUID, - relationship_name: str, - edge_properties: Optional[Dict[str, Any]] = None, - ): - """ - Add a directed edge between two nodes with optional properties. - - Parameters: - ----------- - - - from_node (UUID): The ID of the source node. - - to_node (UUID): The ID of the target node. - - relationship_name (str): The type/label of the relationship to create. - - edge_properties (Optional[Dict[str, Any]]): Optional properties associated with - the edge. (default None) - - Returns: - -------- - - The result of the edge addition operation, including relationship details. - """ - - exists = await asyncio.gather(self.has_node(str(from_node)), self.has_node(str(to_node))) - - if not all(exists): - return None - - serialized_properties = self.serialize_properties(edge_properties or {}) - - query = dedent( - f"""\ - MATCH (from_node {{id: $from_node}}), - (to_node {{id: $to_node}}) - WHERE from_node IS NOT NULL AND to_node IS NOT NULL - MERGE (from_node)-[r:{relationship_name}]->(to_node) - ON CREATE SET r += $properties, r.updated_at = timestamp() - ON MATCH SET r += $properties, r.updated_at = timestamp() - RETURN r - """ - ) - - params = { - "from_node": str(from_node), - "to_node": str(to_node), - "relationship_name": relationship_name, - "properties": serialized_properties, - } - - return await self.query(query, params) - - async def add_edges(self, edges: list[tuple[str, str, str, dict[str, Any]]]) -> None: - """ - Batch add multiple edges between nodes, enforcing specified relationships. - - Parameters: - ----------- - - - edges (list[tuple[str, str, str, dict[str, Any]]): A list of tuples containing - specifications for each edge to add. - - Returns: - -------- - - - None: None. - """ - query = """ - UNWIND $edges AS edge - MATCH (from_node {id: edge.from_node}) - MATCH (to_node {id: edge.to_node}) - CALL merge.relationship( - from_node, - edge.relationship_name, - { - source_node_id: edge.from_node, - target_node_id: edge.to_node - }, - edge.properties, - to_node, - {} - ) YIELD rel - RETURN rel""" - - edges = [ - { - "from_node": str(edge[0]), - "to_node": str(edge[1]), - "relationship_name": edge[2], - "properties": { - **(edge[3] if edge[3] else {}), - "source_node_id": str(edge[0]), - "target_node_id": str(edge[1]), - }, - } - for edge in edges - ] - - try: - results = await self.query(query, dict(edges=edges)) - return results - except Neo4jError as error: - logger.error("Memgraph query error: %s", error, exc_info=True) - raise error - - async def get_edges(self, node_id: str): - """ - Retrieve all edges connected to a specific node identified by its ID. - - Parameters: - ----------- - - - node_id (str): The ID of the node for which to retrieve connected edges. - - Returns: - -------- - - A list of tuples representing the edges connected to the node. - """ - query = """ - MATCH (n {id: $node_id})-[r]-(m) - RETURN n, r, m - """ - - results = await self.query(query, dict(node_id=node_id)) - - return [ - (result["n"]["id"], result["m"]["id"], {"relationship_name": result["r"][1]}) - for result in results - ] - - async def get_disconnected_nodes(self) -> list[str]: - """ - Identify nodes in the graph that do not belong to the largest connected component. - - Returns: - -------- - - - list[str]: A list of IDs representing the disconnected nodes. - """ - query = """ - // Step 1: Collect all nodes - MATCH (n) - WITH COLLECT(n) AS nodes - - // Step 2: Find all connected components - WITH nodes - CALL { - WITH nodes - UNWIND nodes AS startNode - MATCH path = (startNode)-[*]-(connectedNode) - WITH startNode, COLLECT(DISTINCT connectedNode) AS component - RETURN component - } - - // Step 3: Aggregate components - WITH COLLECT(component) AS components - - // Step 4: Identify the largest connected component - UNWIND components AS component - WITH component - ORDER BY SIZE(component) DESC - LIMIT 1 - WITH component AS largestComponent - - // Step 5: Find nodes not in the largest connected component - MATCH (n) - WHERE NOT n IN largestComponent - RETURN COLLECT(ID(n)) AS ids - """ - - results = await self.query(query) - return results[0]["ids"] if len(results) > 0 else [] - - async def get_predecessors(self, node_id: str, edge_label: str = None) -> list[str]: - """ - Retrieve all predecessors of a node based on its ID and optional edge label. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find predecessors for. - - edge_label (str): Optional edge label to filter predecessors. (default None) - - Returns: - -------- - - - list[str]: A list of predecessor node IDs. - """ - if edge_label is not None: - query = """ - MATCH (node)<-[r]-(predecessor) - WHERE node.id = $node_id AND type(r) = $edge_label - RETURN predecessor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - edge_label=edge_label, - ), - ) - - return [result["predecessor"] for result in results] - else: - query = """ - MATCH (node)<-[r]-(predecessor) - WHERE node.id = $node_id - RETURN predecessor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - ), - ) - - return [result["predecessor"] for result in results] - - async def get_successors(self, node_id: str, edge_label: str = None) -> list[str]: - """ - Retrieve all successors of a node based on its ID and optional edge label. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find successors for. - - edge_label (str): Optional edge label to filter successors. (default None) - - Returns: - -------- - - - list[str]: A list of successor node IDs. - """ - if edge_label is not None: - query = """ - MATCH (node)-[r]->(successor) - WHERE node.id = $node_id AND type(r) = $edge_label - RETURN successor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - edge_label=edge_label, - ), - ) - - return [result["successor"] for result in results] - else: - query = """ - MATCH (node)-[r]->(successor) - WHERE node.id = $node_id - RETURN successor - """ - - results = await self.query( - query, - dict( - node_id=node_id, - ), - ) - - return [result["successor"] for result in results] - - async def get_neighbors(self, node_id: str) -> List[Dict[str, Any]]: - """ - Get both predecessors and successors of a node. - - Parameters: - ----------- - - - node_id (str): The ID of the node to find neighbors for. - - Returns: - -------- - - - List[Dict[str, Any]]: A combined list of neighbor node IDs. - """ - predecessors, successors = await asyncio.gather( - self.get_predecessors(node_id), self.get_successors(node_id) - ) - - return predecessors + successors - - async def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: - """Get a single node by ID.""" - query = """ - MATCH (node {id: $node_id}) - RETURN node - """ - results = await self.query(query, {"node_id": node_id}) - return results[0]["node"] if results else None - - async def get_nodes(self, node_ids: List[str]) -> List[Dict[str, Any]]: - """Get multiple nodes by their IDs.""" - query = """ - UNWIND $node_ids AS id - MATCH (node {id: id}) - RETURN node - """ - results = await self.query(query, {"node_ids": node_ids}) - return [result["node"] for result in results] - - async def get_connections(self, node_id: UUID) -> list: - """ - Retrieve connections for a given node, including both predecessors and successors. - - Parameters: - ----------- - - - node_id (UUID): The ID of the node for which to retrieve connections. - - Returns: - -------- - - - list: A list of connections associated with the node. - """ - predecessors_query = """ - MATCH (node)<-[relation]-(neighbour) - WHERE node.id = $node_id - RETURN neighbour, relation, node - """ - successors_query = """ - MATCH (node)-[relation]->(neighbour) - WHERE node.id = $node_id - RETURN node, relation, neighbour - """ - - predecessors, successors = await asyncio.gather( - self.query(predecessors_query, dict(node_id=str(node_id))), - self.query(successors_query, dict(node_id=str(node_id))), - ) - - connections = [] - - for neighbour in predecessors: - neighbour = neighbour["relation"] - connections.append((neighbour[0], {"relationship_name": neighbour[1]}, neighbour[2])) - - for neighbour in successors: - neighbour = neighbour["relation"] - connections.append((neighbour[0], {"relationship_name": neighbour[1]}, neighbour[2])) - - return connections - - async def remove_connection_to_predecessors_of( - self, node_ids: list[str], edge_label: str - ) -> None: - """ - Remove specified connections to the predecessors of the given node IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of node IDs from which to remove predecessor - connections. - - edge_label (str): The label of the edges to remove. - - Returns: - -------- - - - None: None. - """ - query = f""" - UNWIND $node_ids AS nid - MATCH (node {id: nid})-[r]->(predecessor) - WHERE type(r) = $edge_label - DELETE r; - """ - - params = {"node_ids": node_ids, "edge_label": edge_label} - - return await self.query(query, params) - - async def remove_connection_to_successors_of( - self, node_ids: list[str], edge_label: str - ) -> None: - """ - Remove specified connections to the successors of the given node IDs. - - Parameters: - ----------- - - - node_ids (list[str]): A list of node IDs from which to remove successor - connections. - - edge_label (str): The label of the edges to remove. - - Returns: - -------- - - - None: None. - """ - query = f""" - UNWIND $node_ids AS id - MATCH (node:`{id}`)<-[r:{edge_label}]-(successor) - DELETE r; - """ - - params = {"node_ids": node_ids} - - return await self.query(query, params) - - async def delete_graph(self): - """ - Completely delete the graph from the database, removing all nodes and edges. - - Returns: - -------- - - None. - """ - query = """MATCH (node) - DETACH DELETE node;""" - - return await self.query(query) - - def serialize_properties(self, properties=dict()): - """ - Convert property values to a suitable representation for storage. - - Parameters: - ----------- - - - properties: A dictionary of properties to serialize. (default dict()) - - Returns: - -------- - - A dictionary of serialized properties. - """ - serialized_properties = {} - - for property_key, property_value in properties.items(): - if isinstance(property_value, UUID): - serialized_properties[property_key] = str(property_value) - continue - - if isinstance(property_value, dict): - serialized_properties[property_key] = json.dumps(property_value, cls=JSONEncoder) - continue - - serialized_properties[property_key] = property_value - - return serialized_properties - - async def get_model_independent_graph_data(self): - """ - Fetch nodes and relationships without any specific model filtering. - - Returns: - -------- - - A tuple containing nodes and edges as collections. - """ - query_nodes = "MATCH (n) RETURN collect(n) AS nodes" - nodes = await self.query(query_nodes) - - query_edges = "MATCH (n)-[r]->(m) RETURN collect([n, r, m]) AS elements" - edges = await self.query(query_edges) - - return (nodes, edges) - - async def get_graph_data(self): - """ - Retrieve all nodes and edges from the graph, including their properties. - - Returns: - -------- - - A tuple containing lists of nodes and edges. - """ - query = "MATCH (n) RETURN ID(n) AS id, labels(n) AS labels, properties(n) AS properties" - - result = await self.query(query) - - nodes = [ - ( - record["id"], - record["properties"], - ) - for record in result - ] - - query = """ - MATCH (n)-[r]->(m) - RETURN ID(n) AS source, ID(m) AS target, TYPE(r) AS type, properties(r) AS properties - """ - result = await self.query(query) - edges = [ - ( - record["source"], - record["target"], - record["type"], - record["properties"], - ) - for record in result - ] - - return (nodes, edges) - - async def get_nodeset_subgraph( - self, node_type: Type[Any], node_name: List[str] - ) -> Tuple[List[Tuple[int, dict]], List[Tuple[int, int, str, dict]]]: - """ - Throw an error indicating that node set filtering is not supported. - - Parameters: - ----------- - - - node_type (Type[Any]): The type of nodes to filter. - - node_name (List[str]): A list of node names to filter. - """ - raise NodesetFilterNotSupportedError - - async def get_filtered_graph_data(self, attribute_filters): - """ - Fetch nodes and relationships based on specified attribute filters. - - Parameters: - ----------- - - - attribute_filters: A list of criteria to filter nodes and relationships. - - Returns: - -------- - - A tuple containing filtered nodes and edges. - """ - where_clauses = [] - for attribute, values in attribute_filters[0].items(): - values_str = ", ".join( - f"'{value}'" if isinstance(value, str) else str(value) for value in values - ) - where_clauses.append(f"n.{attribute} IN [{values_str}]") - - where_clause = " AND ".join(where_clauses) - - query_nodes = f""" - MATCH (n) - WHERE {where_clause} - RETURN ID(n) AS id, labels(n) AS labels, properties(n) AS properties - """ - result_nodes = await self.query(query_nodes) - - nodes = [ - ( - record["id"], - record["properties"], - ) - for record in result_nodes - ] - - query_edges = f""" - MATCH (n)-[r]->(m) - WHERE {where_clause} AND {where_clause.replace("n.", "m.")} - RETURN ID(n) AS source, ID(m) AS target, TYPE(r) AS type, properties(r) AS properties - """ - result_edges = await self.query(query_edges) - - edges = [ - ( - record["source"], - record["target"], - record["type"], - record["properties"], - ) - for record in result_edges - ] - - return (nodes, edges) - - async def get_node_labels_string(self): - """ - Retrieve a string representation of all unique node labels in the graph. - - Returns: - -------- - - A string containing unique node labels. - """ - node_labels_query = """ - MATCH (n) - WITH DISTINCT labels(n) AS labelList - UNWIND labelList AS label - RETURN collect(DISTINCT label) AS labels; - """ - node_labels_result = await self.query(node_labels_query) - node_labels = node_labels_result[0]["labels"] if node_labels_result else [] - - if not node_labels: - raise ValueError("No node labels found in the database") - - node_labels_str = "[" + ", ".join(f"'{label}'" for label in node_labels) + "]" - return node_labels_str - - async def get_relationship_labels_string(self): - """ - Retrieve a string representation of all unique relationship types in the graph. - - Returns: - -------- - - A string containing unique relationship types. - """ - relationship_types_query = ( - "MATCH ()-[r]->() RETURN collect(DISTINCT type(r)) AS relationships;" - ) - relationship_types_result = await self.query(relationship_types_query) - relationship_types = ( - relationship_types_result[0]["relationships"] if relationship_types_result else [] - ) - - if not relationship_types: - raise ValueError("No relationship types found in the database.") - - relationship_types_undirected_str = ( - "{" - + ", ".join(f"{rel}" + ": {orientation: 'UNDIRECTED'}" for rel in relationship_types) - + "}" - ) - return relationship_types_undirected_str - - async def get_graph_metrics(self, include_optional=False): - """ - Calculate and return various metrics of the graph, including mandatory and optional - metrics. - - Parameters: - ----------- - - - include_optional: Specify whether to include optional metrics in the results. - (default False) - - Returns: - -------- - - A dictionary containing calculated graph metrics. - """ - - try: - # Basic metrics - node_count = await self.query("MATCH (n) RETURN count(n)") - edge_count = await self.query("MATCH ()-[r]->() RETURN count(r)") - num_nodes = node_count[0][0] if node_count else 0 - num_edges = edge_count[0][0] if edge_count else 0 - - # Calculate mandatory metrics - mandatory_metrics = { - "num_nodes": num_nodes, - "num_edges": num_edges, - "mean_degree": (2 * num_edges) / num_nodes if num_nodes > 0 else 0, - "edge_density": (num_edges) / (num_nodes * (num_nodes - 1)) if num_nodes > 1 else 0, - } - - # Calculate connected components - components_query = """ - MATCH (n:Node) - WITH n.id AS node_id - MATCH path = (n)-[:EDGE*0..]-() - WITH COLLECT(DISTINCT node_id) AS component - RETURN COLLECT(component) AS components - """ - components_result = await self.query(components_query) - component_sizes = ( - [len(comp) for comp in components_result[0][0]] if components_result else [] - ) - - mandatory_metrics.update( - { - "num_connected_components": len(component_sizes), - "sizes_of_connected_components": component_sizes, - } - ) - - if include_optional: - # Self-loops - self_loops_query = """ - MATCH (n:Node)-[r:EDGE]->(n) - RETURN COUNT(r) - """ - self_loops = await self.query(self_loops_query) - num_selfloops = self_loops[0][0] if self_loops else 0 - - # Shortest paths (simplified for Kuzu) - paths_query = """ - MATCH (n:Node), (m:Node) - WHERE n.id < m.id - MATCH path = (n)-[:EDGE*]-(m) - RETURN MIN(LENGTH(path)) AS length - """ - paths = await self.query(paths_query) - path_lengths = [p[0] for p in paths if p[0] is not None] - - # Local clustering coefficient - clustering_query = """ - /// Step 1: Get each node with its neighbors and degree - MATCH (n:Node)-[:EDGE]-(neighbor) - WITH n, COLLECT(DISTINCT neighbor) AS neighbors, COUNT(DISTINCT neighbor) AS degree - - // Step 2: Pair up neighbors and check if they are connected - UNWIND neighbors AS n1 - UNWIND neighbors AS n2 - WITH n, degree, n1, n2 - WHERE id(n1) < id(n2) // avoid duplicate pairs - - // Step 3: Use OPTIONAL MATCH to see if n1 and n2 are connected - OPTIONAL MATCH (n1)-[:EDGE]-(n2) - WITH n, degree, COUNT(n2) AS triangle_count - - // Step 4: Compute local clustering coefficient - WITH n, degree, - CASE WHEN degree <= 1 THEN 0.0 - ELSE (1.0 * triangle_count) / (degree * (degree - 1) / 2.0) - END AS local_cc - - // Step 5: Compute average - RETURN AVG(local_cc) AS avg_clustering_coefficient - """ - clustering = await self.query(clustering_query) - - optional_metrics = { - "num_selfloops": num_selfloops, - "diameter": max(path_lengths) if path_lengths else -1, - "avg_shortest_path_length": sum(path_lengths) / len(path_lengths) - if path_lengths - else -1, - "avg_clustering": clustering[0][0] if clustering and clustering[0][0] else -1, - } - else: - optional_metrics = { - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - } - - return {**mandatory_metrics, **optional_metrics} - - except Exception as e: - logger.error(f"Failed to get graph metrics: {e}") - return { - "num_nodes": 0, - "num_edges": 0, - "mean_degree": 0, - "edge_density": 0, - "num_connected_components": 0, - "sizes_of_connected_components": [], - "num_selfloops": -1, - "diameter": -1, - "avg_shortest_path_length": -1, - "avg_clustering": -1, - } diff --git a/cognee/tests/test_memgraph.py b/cognee/tests/test_memgraph.py deleted file mode 100644 index d0d968fc4..000000000 --- a/cognee/tests/test_memgraph.py +++ /dev/null @@ -1,105 +0,0 @@ -import os - -import pathlib -import cognee -from cognee.infrastructure.files.storage import get_storage_config -from cognee.modules.search.operations import get_history -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - - -logger = get_logger() - - -async def main(): - cognee.config.set_graph_database_provider("memgraph") - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_memgraph") - ).resolve() - ) - cognee.config.data_root_directory(data_directory_path) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_memgraph") - ).resolve() - ) - cognee.config.system_root_directory(cognee_directory_path) - - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - dataset_name = "cs_explanations" - - explanation_file_path_nlp = os.path.join( - pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" - ) - await cognee.add([explanation_file_path_nlp], dataset_name) - - explanation_file_path_quantum = os.path.join( - pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt" - ) - - await cognee.add([explanation_file_path_quantum], dataset_name) - - await cognee.cognify([dataset_name]) - - from cognee.infrastructure.databases.vector import get_vector_engine - - vector_engine = get_vector_engine() - random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0] - random_node_name = random_node.payload["text"] - - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name - ) - assert len(search_results) != 0, "The search results list is empty." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search(query_type=SearchType.CHUNKS, query_text=random_node_name) - assert len(search_results) != 0, "The search results list is empty." - print("\n\nExtracted chunks are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search( - query_type=SearchType.SUMMARIES, query_text=random_node_name - ) - assert len(search_results) != 0, "Query related summaries don't exist." - print("\nExtracted results are:\n") - for result in search_results: - print(f"{result}\n") - - search_results = await cognee.search( - query_type=SearchType.NATURAL_LANGUAGE, - query_text=f"Find nodes connected to node with name {random_node_name}", - ) - assert len(search_results) != 0, "Query related natural language don't exist." - print("\nExtracted results are:\n") - for result in search_results: - print(f"{result}\n") - - user = await get_default_user() - history = await get_history(user.id) - - assert len(history) == 8, "Search history is not correct." - - await cognee.prune.prune_data() - data_root_directory = get_storage_config()["data_root_directory"] - assert not os.path.isdir(data_root_directory), "Local data files are not deleted" - - await cognee.prune.prune_system(metadata=True) - from cognee.infrastructure.databases.graph import get_graph_engine - - graph_engine = await get_graph_engine() - nodes, edges = await graph_engine.get_graph_data() - assert len(nodes) == 0 and len(edges) == 0, "Memgraph graph database is not empty" - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) diff --git a/notebooks/neptune-analytics-example.ipynb b/notebooks/neptune-analytics-example.ipynb index e80ea4dcb..c85ccf58a 100644 --- a/notebooks/neptune-analytics-example.ipynb +++ b/notebooks/neptune-analytics-example.ipynb @@ -83,16 +83,16 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "import os\n", "import pathlib\n", "from cognee import config, add, cognify, search, SearchType, prune, visualize_graph\n", "from dotenv import load_dotenv" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -106,7 +106,9 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# load environment variables from file .env\n", "load_dotenv()\n", @@ -145,9 +147,7 @@ " \"vector_db_url\": f\"neptune-graph://{graph_identifier}\", # Neptune Analytics endpoint with the format neptune-graph://\n", " }\n", ")" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -159,19 +159,19 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Prune data and system metadata before running, only if we want \"fresh\" state.\n", "await prune.prune_data()\n", "await prune.prune_system(metadata=True)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## Setup data and cognify\n", "\n", @@ -180,7 +180,9 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Add sample text to the dataset\n", "sample_text_1 = \"\"\"Neptune Analytics is a memory-optimized graph database engine for analytics. With Neptune\n", @@ -205,9 +207,7 @@ "\n", "# Cognify the text data.\n", "await cognify([dataset_name])" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -215,14 +215,16 @@ "source": [ "## Graph Memory visualization\n", "\n", - "Initialize Memgraph as a Graph Memory store and save to .artefacts/graph_visualization.html\n", + "Initialize Neptune as a Graph Memory store and save to .artefacts/graph_visualization.html\n", "\n", "![visualization](./neptune_analytics_demo.png)" ] }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Get a graphistry url (Register for a free account at https://www.graphistry.com)\n", "# url = await render_graph()\n", @@ -235,9 +237,7 @@ " ).resolve()\n", ")\n", "await visualize_graph(graph_file_path)" - ], - "outputs": [], - "execution_count": null + ] }, { "cell_type": "markdown", @@ -250,19 +250,19 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Completion query that uses graph data to form context.\n", "graph_completion = await search(query_text=\"What is Neptune Analytics?\", query_type=SearchType.GRAPH_COMPLETION)\n", "print(\"\\nGraph completion result is:\")\n", "print(graph_completion)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: RAG Completion\n", "\n", @@ -271,19 +271,19 @@ }, { "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ "# Completion query that uses document chunks to form context.\n", "rag_completion = await search(query_text=\"What is Neptune Analytics?\", query_type=SearchType.RAG_COMPLETION)\n", "print(\"\\nRAG Completion result is:\")\n", "print(rag_completion)" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Graph Insights\n", "\n", @@ -291,8 +291,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Search graph insights\n", "insights_results = await search(query_text=\"Neptune Analytics\", query_type=SearchType.GRAPH_COMPLETION)\n", @@ -302,13 +304,11 @@ " tgt_node = result[2].get(\"name\", result[2][\"type\"])\n", " relationship = result[1].get(\"relationship_name\", \"__relationship__\")\n", " print(f\"- {src_node} -[{relationship}]-> {tgt_node}\")" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Entity Summaries\n", "\n", @@ -316,8 +316,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "# Query all summaries related to query.\n", "summaries = await search(query_text=\"Neptune Analytics\", query_type=SearchType.SUMMARIES)\n", @@ -326,13 +328,11 @@ " type = summary[\"type\"]\n", " text = summary[\"text\"]\n", " print(f\"- {type}: {text}\")" - ], - "outputs": [], - "execution_count": null + ] }, { - "metadata": {}, "cell_type": "markdown", + "metadata": {}, "source": [ "## SEARCH: Chunks\n", "\n", @@ -340,8 +340,10 @@ ] }, { - "metadata": {}, "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "chunks = await search(query_text=\"Neptune Analytics\", query_type=SearchType.CHUNKS)\n", "print(\"\\nChunk results are:\")\n", @@ -349,9 +351,7 @@ " type = chunk[\"type\"]\n", " text = chunk[\"text\"]\n", " print(f\"- {type}: {text}\")" - ], - "outputs": [], - "execution_count": null + ] } ], "metadata": { From 99dc35f23e26e4cd2016f50a6c783f6a0a1749e1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:01:09 +0200 Subject: [PATCH 13/35] fix: resolve issue with neo4j metrics test --- .../tasks/descriptive_metrics/metrics_test_utils.py | 2 -- .../tasks/descriptive_metrics/neo4j_metrics_test.py | 11 ++++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py b/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py index 911d9c33b..579a499fd 100644 --- a/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py +++ b/cognee/tests/tasks/descriptive_metrics/metrics_test_utils.py @@ -1,7 +1,6 @@ from typing import List from cognee.infrastructure.engine import DataPoint from cognee.tasks.storage.add_data_points import add_data_points -from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine import cognee from cognee.infrastructure.databases.graph import get_graph_engine import json @@ -64,7 +63,6 @@ async def create_connected_test_graph(): async def get_metrics(provider: str, include_optional=True): - create_graph_engine.cache_clear() cognee.config.set_graph_database_provider(provider) graph_engine = await get_graph_engine() await graph_engine.delete_graph() diff --git a/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py b/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py index 2ca9e9f7e..8d7a6ab02 100644 --- a/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py +++ b/cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py @@ -1,7 +1,12 @@ -from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics import asyncio +async def main(): + from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics + + await assert_metrics(provider="neo4j", include_optional=False) + await assert_metrics(provider="neo4j", include_optional=True) + + if __name__ == "__main__": - asyncio.run(assert_metrics(provider="neo4j", include_optional=False)) - asyncio.run(assert_metrics(provider="neo4j", include_optional=True)) + asyncio.run(main()) From 3a9022a26c1e26b1b70867b441096b266f884cc5 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:22:29 +0200 Subject: [PATCH 14/35] refactor: Rename batch size for tasks to chunk batch size --- cognee/api/v1/cognify/cognify.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index d29d8c939..e0f6253d8 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,7 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, - batch_size: int = None, + chunk_batch_size: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -106,7 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. - batch_size: Number of chunks to be processed in a single batch in Cognify tasks. + chunk_batch_size: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -212,7 +212,7 @@ async def cognify( if temporal_cognify: tasks = await get_temporal_tasks( - user=user, chunker=chunker, chunk_size=chunk_size, batch_size=batch_size + user=user, chunker=chunker, chunk_size=chunk_size, chunk_batch_size=chunk_batch_size ) else: tasks = await get_default_tasks( @@ -222,7 +222,7 @@ async def cognify( chunk_size=chunk_size, config=config, custom_prompt=custom_prompt, - batch_size=batch_size, + chunk_batch_size=chunk_batch_size, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -248,7 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, - batch_size: int = 100, + chunk_batch_size: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -267,8 +267,8 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } - if batch_size is None: - batch_size = 100 + if chunk_batch_size is None: + chunk_batch_size = 100 default_tasks = [ Task(classify_documents), @@ -283,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": batch_size}, + task_config={"batch_size": chunk_batch_size}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": batch_size}, + task_config={"batch_size": chunk_batch_size}, ), - Task(add_data_points, task_config={"batch_size": batch_size}), + Task(add_data_points, task_config={"batch_size": chunk_batch_size}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None, batch_size: int = 10 + user: User = None, chunker=TextChunker, chunk_size: int = None, chunk_batch_size: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -313,13 +313,13 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. - batch_size (int, optional): Number of chunks to process in a single batch in Cognify + chunk_batch_size (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ - if batch_size is None: - batch_size = 10 + if chunk_batch_size is None: + chunk_batch_size = 10 temporal_tasks = [ Task(classify_documents), @@ -329,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": batch_size}), + Task(extract_events_and_timestamps, task_config={"batch_size": chunk_batch_size}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": batch_size}), + Task(add_data_points, task_config={"batch_size": chunk_batch_size}), ] return temporal_tasks From a210bd59054dd353675589c63e57fe9d7349b766 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:24:36 +0200 Subject: [PATCH 15/35] refactor: rename chunk_batch_size to chunks_per_batch --- cognee/api/v1/cognify/cognify.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index e0f6253d8..1d5c36a3c 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -44,7 +44,7 @@ async def cognify( graph_model: BaseModel = KnowledgeGraph, chunker=TextChunker, chunk_size: int = None, - chunk_batch_size: int = None, + chunks_per_batch: int = None, config: Config = None, vector_db_config: dict = None, graph_db_config: dict = None, @@ -106,7 +106,7 @@ async def cognify( Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2) Default limits: ~512-8192 tokens depending on models. Smaller chunks = more granular but potentially fragmented knowledge. - chunk_batch_size: Number of chunks to be processed in a single batch in Cognify tasks. + chunks_per_batch: Number of chunks to be processed in a single batch in Cognify tasks. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -212,7 +212,7 @@ async def cognify( if temporal_cognify: tasks = await get_temporal_tasks( - user=user, chunker=chunker, chunk_size=chunk_size, chunk_batch_size=chunk_batch_size + user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch ) else: tasks = await get_default_tasks( @@ -222,7 +222,7 @@ async def cognify( chunk_size=chunk_size, config=config, custom_prompt=custom_prompt, - chunk_batch_size=chunk_batch_size, + chunks_per_batch=chunks_per_batch, ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for @@ -248,7 +248,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, config: Config = None, custom_prompt: Optional[str] = None, - chunk_batch_size: int = 100, + chunks_per_batch: int = 100, ) -> list[Task]: if config is None: ontology_config = get_ontology_env_config() @@ -267,8 +267,8 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } - if chunk_batch_size is None: - chunk_batch_size = 100 + if chunks_per_batch is None: + chunks_per_batch = 100 default_tasks = [ Task(classify_documents), @@ -283,20 +283,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": chunk_batch_size}, + task_config={"batch_size": chunks_per_batch}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": chunk_batch_size}, + task_config={"batch_size": chunks_per_batch}, ), - Task(add_data_points, task_config={"batch_size": chunk_batch_size}), + Task(add_data_points, task_config={"batch_size": chunks_per_batch}), ] return default_tasks async def get_temporal_tasks( - user: User = None, chunker=TextChunker, chunk_size: int = None, chunk_batch_size: int = 10 + user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10 ) -> list[Task]: """ Builds and returns a list of temporal processing tasks to be executed in sequence. @@ -313,13 +313,13 @@ async def get_temporal_tasks( user (User, optional): The user requesting task execution, used for permission checks. chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker. chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default. - chunk_batch_size (int, optional): Number of chunks to process in a single batch in Cognify + chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify Returns: list[Task]: A list of Task objects representing the temporal processing pipeline. """ - if chunk_batch_size is None: - chunk_batch_size = 10 + if chunks_per_batch is None: + chunks_per_batch = 10 temporal_tasks = [ Task(classify_documents), @@ -329,9 +329,9 @@ async def get_temporal_tasks( max_chunk_size=chunk_size or get_max_chunk_tokens(), chunker=chunker, ), - Task(extract_events_and_timestamps, task_config={"batch_size": chunk_batch_size}), + Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}), Task(extract_knowledge_graph_from_events), - Task(add_data_points, task_config={"batch_size": chunk_batch_size}), + Task(add_data_points, task_config={"batch_size": chunks_per_batch}), ] return temporal_tasks From 88cc7af4d7b41b764ddb9db8517ddd56d04677a8 Mon Sep 17 00:00:00 2001 From: Andrej Milicevic Date: Thu, 16 Oct 2025 10:50:50 +0200 Subject: [PATCH 16/35] test: Add a few more examples to the workflow. --- .github/workflows/examples_tests.yml | 53 ++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/.github/workflows/examples_tests.yml b/.github/workflows/examples_tests.yml index 406420351..df007a576 100644 --- a/.github/workflows/examples_tests.yml +++ b/.github/workflows/examples_tests.yml @@ -85,8 +85,8 @@ jobs: run: uv run python ./cognee/tests/tasks/descriptive_metrics/neo4j_metrics_test.py - test-multiple-examples: - name: Run Multiple Example Scripts + test-dynamic-steps-metrics: + name: Run Dynamic Steps Example runs-on: ubuntu-22.04 steps: - name: Check out repository @@ -97,7 +97,7 @@ jobs: with: python-version: '3.11.x' - - name: Run Dynamic Steps Example + - name: Run Dynamic Steps Tests env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} LLM_MODEL: ${{ secrets.LLM_MODEL }} @@ -110,6 +110,18 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./examples/python/dynamic_steps_example.py + test-temporal-example: + name: Run Temporal Tests + runs-on: ubuntu-22.04 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + - name: Run Temporal Example env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} @@ -123,6 +135,18 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./examples/python/temporal_example.py + test-ontology-example: + name: Run Ontology Tests + runs-on: ubuntu-22.04 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + - name: Run Ontology Demo Example env: OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} @@ -136,18 +160,17 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./examples/python/ontology_demo_example.py - - name: Run Temporal Example - env: - OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} - LLM_MODEL: ${{ secrets.LLM_MODEL }} - LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} - LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} - EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} - EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} - EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} - EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} - run: uv run python ./examples/python/temporal_example.py + test-agentic-reasoning: + name: Run Agentic Reasoning Tests + runs-on: ubuntu-22.04 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' - name: Run Agentic Reasoning Example env: From 6a693d319add7eaf7875599e2f965c0da5d43096 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Thu, 16 Oct 2025 15:45:21 +0100 Subject: [PATCH 17/35] fix: preferred_loaders is always None in `data_item_to_text_file.load_file()` --- .../tasks/ingestion/data_item_to_text_file.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cognee/tasks/ingestion/data_item_to_text_file.py b/cognee/tasks/ingestion/data_item_to_text_file.py index 9fcafca57..dc0d1d0a7 100644 --- a/cognee/tasks/ingestion/data_item_to_text_file.py +++ b/cognee/tasks/ingestion/data_item_to_text_file.py @@ -48,17 +48,17 @@ async def data_item_to_text_file( await pull_from_s3(data_item_path, temp_file) temp_file.flush() # Data needs to be saved to local storage loader = get_loader_engine() - return await loader.load_file(temp_file.name, preferred_loaders), loader.get_loader( - temp_file.name, preferred_loaders - ) + return await loader.load_file( + temp_file.name, None, preferred_loaders + ), loader.get_loader(temp_file.name, preferred_loaders) # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: loader = get_loader_engine() - return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader( - data_item_path, preferred_loaders - ) + return await loader.load_file( + data_item_path, None, preferred_loaders + ), loader.get_loader(data_item_path, preferred_loaders) else: raise IngestionError(message="Local files are not accepted.") @@ -69,9 +69,9 @@ async def data_item_to_text_file( # Handle both Unix absolute paths (/path) and Windows absolute paths (C:\path) if settings.accept_local_file_path: loader = get_loader_engine() - return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader( - data_item_path, preferred_loaders - ) + return await loader.load_file( + data_item_path, None, preferred_loaders + ), loader.get_loader(data_item_path, preferred_loaders) else: raise IngestionError(message="Local files are not accepted.") From 2998802c00961e36115bed93f5eda446e8500c75 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 17 Oct 2025 11:58:14 +0200 Subject: [PATCH 18/35] fix: Resolve issue with wrong error for OpenAI --- .../litellm_instructor/llm/openai/adapter.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py index 8877c2bdf..305b426b8 100644 --- a/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py +++ b/cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/openai/adapter.py @@ -156,10 +156,7 @@ class OpenAIAdapter(LLMInterface): InstructorRetryException, ) as e: if not (self.fallback_model and self.fallback_api_key): - raise ContentPolicyFilterError( - f"The provided input contains content that is not aligned with our content policy: {text_input}" - ) from e - + raise e try: return await self.aclient.chat.completions.create( model=self.fallback_model, From 50aa8aac115f8fcf4011e1001e86adf9afc89594 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Fri, 17 Oct 2025 17:33:25 +0100 Subject: [PATCH 19/35] refactor: remove `filestream` arg from `LoaderEngine.load_file(...)` --- cognee/infrastructure/loaders/LoaderEngine.py | 1 - .../tasks/ingestion/data_item_to_text_file.py | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cognee/infrastructure/loaders/LoaderEngine.py b/cognee/infrastructure/loaders/LoaderEngine.py index 87a008660..6b62f7641 100644 --- a/cognee/infrastructure/loaders/LoaderEngine.py +++ b/cognee/infrastructure/loaders/LoaderEngine.py @@ -105,7 +105,6 @@ class LoaderEngine: async def load_file( self, file_path: str, - file_stream: Optional[Any], preferred_loaders: Optional[List[str]] = None, **kwargs, ): diff --git a/cognee/tasks/ingestion/data_item_to_text_file.py b/cognee/tasks/ingestion/data_item_to_text_file.py index dc0d1d0a7..9fcafca57 100644 --- a/cognee/tasks/ingestion/data_item_to_text_file.py +++ b/cognee/tasks/ingestion/data_item_to_text_file.py @@ -48,17 +48,17 @@ async def data_item_to_text_file( await pull_from_s3(data_item_path, temp_file) temp_file.flush() # Data needs to be saved to local storage loader = get_loader_engine() - return await loader.load_file( - temp_file.name, None, preferred_loaders - ), loader.get_loader(temp_file.name, preferred_loaders) + return await loader.load_file(temp_file.name, preferred_loaders), loader.get_loader( + temp_file.name, preferred_loaders + ) # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: loader = get_loader_engine() - return await loader.load_file( - data_item_path, None, preferred_loaders - ), loader.get_loader(data_item_path, preferred_loaders) + return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader( + data_item_path, preferred_loaders + ) else: raise IngestionError(message="Local files are not accepted.") @@ -69,9 +69,9 @@ async def data_item_to_text_file( # Handle both Unix absolute paths (/path) and Windows absolute paths (C:\path) if settings.accept_local_file_path: loader = get_loader_engine() - return await loader.load_file( - data_item_path, None, preferred_loaders - ), loader.get_loader(data_item_path, preferred_loaders) + return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader( + data_item_path, preferred_loaders + ) else: raise IngestionError(message="Local files are not accepted.") From 3f7efd8b888829d3e89a8120e4345782495ed3af Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 13:33:02 +0200 Subject: [PATCH 20/35] added fixes for tests --- .github/workflows/test_different_operating_systems.yml | 2 +- .github/workflows/test_suites.yml | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test_different_operating_systems.yml b/.github/workflows/test_different_operating_systems.yml index 00e387ac4..64f1a14f9 100644 --- a/.github/workflows/test_different_operating_systems.yml +++ b/.github/workflows/test_different_operating_systems.yml @@ -9,7 +9,7 @@ on: python-versions: required: false type: string - default: '["3.10.x", "3.11.x", "3.12.x"]' + default: '["3.10.x", "3.12.x", "3.13.x"]' secrets: LLM_PROVIDER: required: true diff --git a/.github/workflows/test_suites.yml b/.github/workflows/test_suites.yml index 2f1bdebf0..5c1597a93 100644 --- a/.github/workflows/test_suites.yml +++ b/.github/workflows/test_suites.yml @@ -85,7 +85,7 @@ jobs: needs: [basic-tests, e2e-tests] uses: ./.github/workflows/test_different_operating_systems.yml with: - python-versions: '["3.10.x", "3.11.x", "3.12.x"]' + python-versions: '["3.10.x", "3.11.x", "3.12.x", "3.13.x"]' secrets: inherit # Matrix-based vector database tests diff --git a/pyproject.toml b/pyproject.toml index 30889a61e..417786e90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ authors = [ { name = "Vasilije Markovic" }, { name = "Boris Arzentar" }, ] -requires-python = ">=3.10,<=3.13" +requires-python = ">=3.10,<3.14" readme = "README.md" license = "Apache-2.0" classifiers = [ From 66876daf8581ef27d6fad1c50c17628f9a3f5d03 Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 14:38:34 +0200 Subject: [PATCH 21/35] removed docs --- .github/actions/cognee_setup/action.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/actions/cognee_setup/action.yml b/.github/actions/cognee_setup/action.yml index 4017d524b..06e5bae6b 100644 --- a/.github/actions/cognee_setup/action.yml +++ b/.github/actions/cognee_setup/action.yml @@ -41,4 +41,4 @@ runs: EXTRA_ARGS="$EXTRA_ARGS --extra $extra" done fi - uv sync --extra api --extra docs --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j --extra redis $EXTRA_ARGS + uv sync --extra api --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j --extra redis $EXTRA_ARGS From a1927548adf0ba4197251d8008daef19cfc4030b Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 14:52:02 +0200 Subject: [PATCH 22/35] added --- .github/actions/cognee_setup/action.yml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/actions/cognee_setup/action.yml b/.github/actions/cognee_setup/action.yml index 06e5bae6b..4017d524b 100644 --- a/.github/actions/cognee_setup/action.yml +++ b/.github/actions/cognee_setup/action.yml @@ -41,4 +41,4 @@ runs: EXTRA_ARGS="$EXTRA_ARGS --extra $extra" done fi - uv sync --extra api --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j --extra redis $EXTRA_ARGS + uv sync --extra api --extra docs --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j --extra redis $EXTRA_ARGS diff --git a/pyproject.toml b/pyproject.toml index 417786e90..390028a6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ chromadb = [ "chromadb>=0.6,<0.7", "pypika==0.48.9", ] -docs = ["unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] +docs = ["lxml=6.0.2, unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] codegraph = [ "fastembed<=0.6.0 ; python_version < '3.13'", "transformers>=4.46.3,<5", From 0c62916e75fac2281a6152ed84a74d476cb11437 Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 14:54:00 +0200 Subject: [PATCH 23/35] added --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 390028a6c..0f3c8c287 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ chromadb = [ "chromadb>=0.6,<0.7", "pypika==0.48.9", ] -docs = ["lxml=6.0.2, unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] +docs = ["lxml==6.0.2, unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] codegraph = [ "fastembed<=0.6.0 ; python_version < '3.13'", "transformers>=4.46.3,<5", From 8900b31decbac106ccb4b985c7d90590ad4d87ff Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 14:57:40 +0200 Subject: [PATCH 24/35] added --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0f3c8c287..461aee301 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ chromadb = [ "chromadb>=0.6,<0.7", "pypika==0.48.9", ] -docs = ["lxml==6.0.2, unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] +docs = ["lxml==6.0.2", "unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] codegraph = [ "fastembed<=0.6.0 ; python_version < '3.13'", "transformers>=4.46.3,<5", From aa577d438444fd0e82f892c7b23f5ca2b04c5a65 Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 15:02:53 +0200 Subject: [PATCH 25/35] added --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 461aee301..dae648f80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,10 +65,10 @@ distributed = [ "modal>=1.0.5,<2.0.0", ] scraping = [ - "tavily-python>=0.7.0", + "tavily-python>=0.7.12", "beautifulsoup4>=4.13.1", "playwright>=1.9.0", - "lxml>=4.9.3,<5.0.0", + "lxml>=4.9.3", "protego>=0.1", "APScheduler>=3.10.0,<=3.11.0" ] From 86ec2e9685aabbb0d54fc45a99c0ac131e3a89c4 Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 15:06:38 +0200 Subject: [PATCH 26/35] added --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index dae648f80..d4d8d535d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ chromadb = [ "chromadb>=0.6,<0.7", "pypika==0.48.9", ] -docs = ["lxml==6.0.2", "unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] +docs = ["lxml<6.0.0", "unstructured[csv, doc, docx, epub, md, odt, org, ppt, pptx, rst, rtf, tsv, xlsx, pdf]>=0.18.1,<19"] codegraph = [ "fastembed<=0.6.0 ; python_version < '3.13'", "transformers>=4.46.3,<5", From cbfa360b8f7726c1eec9bfd97d8297f2024664e3 Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 15:26:06 +0200 Subject: [PATCH 27/35] added lock file --- poetry.lock | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/poetry.lock b/poetry.lock index 62ae7be8d..c974a0b43 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "accelerate" @@ -6633,7 +6633,7 @@ description = "Fundamental package for array computing in Python" optional = false python-versions = ">=3.11" groups = ["main"] -markers = "python_version == \"3.12\" or python_full_version == \"3.13.0\"" +markers = "python_version >= \"3.12\"" files = [ {file = "numpy-2.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0ffc4f5caba7dfcbe944ed674b7eef683c7e94874046454bb79ed7ee0236f59d"}, {file = "numpy-2.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e7e946c7170858a0295f79a60214424caac2ffdb0063d4d79cb681f9aa0aa569"}, @@ -8532,7 +8532,6 @@ files = [ {file = "psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4"}, {file = "psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067"}, {file = "psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e"}, - {file = "psycopg2-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:91fd603a2155da8d0cfcdbf8ab24a2d54bca72795b90d2a3ed2b6da8d979dee2"}, {file = "psycopg2-2.9.10-cp39-cp39-win32.whl", hash = "sha256:9d5b3b94b79a844a986d029eee38998232451119ad653aea42bb9220a8c5066b"}, {file = "psycopg2-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:88138c8dedcbfa96408023ea2b0c369eda40fe5d75002c0964c78f46f11fa442"}, {file = "psycopg2-2.9.10.tar.gz", hash = "sha256:12ec0b40b0273f95296233e8750441339298e6a572f7039da5b260e3c8b60e11"}, @@ -11203,7 +11202,7 @@ description = "Easily download, build, install, upgrade, and uninstall Python pa optional = true python-versions = ">=3.9" groups = ["main"] -markers = "platform_system == \"Linux\" and platform_machine == \"x86_64\" and (extra == \"docs\" or extra == \"docling\" or extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\") or python_version == \"3.12\" and (extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\" or extra == \"docs\" or extra == \"docling\") or python_full_version == \"3.13.0\" and (extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\" or extra == \"docs\" or extra == \"docling\") or extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\"" +markers = "platform_system == \"Linux\" and platform_machine == \"x86_64\" and (extra == \"docs\" or extra == \"docling\" or extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\") or python_version >= \"3.12\" and (extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\" or extra == \"docs\" or extra == \"docling\") or extra == \"notebook\" or extra == \"dev\" or extra == \"llama-index\" or extra == \"deepeval\" or extra == \"dlt\"" files = [ {file = "setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922"}, {file = "setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c"}, @@ -13510,7 +13509,7 @@ dev = ["coverage", "deptry", "gitpython", "mkdocs-material", "mkdocs-minify-plug distributed = ["modal"] dlt = ["dlt"] docling = ["docling", "transformers"] -docs = ["unstructured"] +docs = ["lxml", "unstructured"] evals = ["gdown", "matplotlib", "pandas", "plotly", "scikit-learn"] graphiti = ["graphiti-core"] groq = ["groq"] @@ -13531,5 +13530,5 @@ scraping = ["APScheduler", "beautifulsoup4", "lxml", "playwright", "protego", "t [metadata] lock-version = "2.1" -python-versions = ">=3.10,<=3.13" -content-hash = "8d8172ac8ddc3c30ca79a1677ecf2a28897d52c0a564d8fb5646c8565c313a0f" +python-versions = ">=3.10,<3.14" +content-hash = "bcab5420339473ec08b89cde588899b60999762fb8ca9a011240d47ea86198e3" From 04719129a64809e28ed9c5e0af40dcd77a2e32dc Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 19 Oct 2025 15:53:38 +0200 Subject: [PATCH 28/35] updated env template --- .env.template | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.env.template b/.env.template index 3137636d3..89ac06830 100644 --- a/.env.template +++ b/.env.template @@ -247,10 +247,10 @@ LITELLM_LOG="ERROR" #LLM_PROVIDER="ollama" #LLM_ENDPOINT="http://localhost:11434/v1" #EMBEDDING_PROVIDER="ollama" -#EMBEDDING_MODEL="avr/sfr-embedding-mistral:latest" +#EMBEDDING_MODEL="nomic-embed-text:latest" #EMBEDDING_ENDPOINT="http://localhost:11434/api/embeddings" -#EMBEDDING_DIMENSIONS=4096 -#HUGGINGFACE_TOKENIZER="Salesforce/SFR-Embedding-Mistral" +#EMBEDDING_DIMENSIONS=768 +#HUGGINGFACE_TOKENIZER="nomic-ai/nomic-embed-text-v1.5" ########## OpenRouter (also free) ######################################################### From 400095d76df23c33b7c4783654d381255459d0a4 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Sun, 19 Oct 2025 21:30:13 +0200 Subject: [PATCH 29/35] fix: Resolve issue with multi-user mode search --- cognee/api/v1/search/search.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cognee/api/v1/search/search.py b/cognee/api/v1/search/search.py index 9f158e9d0..4051bae86 100644 --- a/cognee/api/v1/search/search.py +++ b/cognee/api/v1/search/search.py @@ -179,13 +179,6 @@ async def search( if not datasets: raise DatasetNotFoundError(message="No datasets found.") - graph_engine = await get_graph_engine() - is_empty = await graph_engine.is_empty() - - if is_empty: - logger.warning("Search attempt on an empty knowledge graph") - return [] - filtered_search_results = await search_function( query_text=query_text, query_type=query_type, From f88277c467e81f3d63b0e2f713be3d06c3c19276 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Sun, 19 Oct 2025 23:10:53 +0200 Subject: [PATCH 30/35] fix: Resolve issue with plain text files not having magic file info --- cognee/infrastructure/files/utils/guess_file_type.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cognee/infrastructure/files/utils/guess_file_type.py b/cognee/infrastructure/files/utils/guess_file_type.py index edd2d89b0..dcdd68cad 100644 --- a/cognee/infrastructure/files/utils/guess_file_type.py +++ b/cognee/infrastructure/files/utils/guess_file_type.py @@ -124,6 +124,12 @@ def guess_file_type(file: BinaryIO) -> filetype.Type: """ file_type = filetype.guess(file) + # If file type could not be determined consider it a plain text file as they don't have magic number encoding + if file_type is None: + from filetype.types.base import Type + + file_type = Type("text/plain", "txt") + if file_type is None: raise FileTypeException(f"Unknown file detected: {file.name}.") From 8c627d9e10df49d8c2315592b664081fab45e486 Mon Sep 17 00:00:00 2001 From: Hande <159312713+hande-k@users.noreply.github.com> Date: Mon, 20 Oct 2025 12:03:40 +0200 Subject: [PATCH 31/35] chore: update colab notebook on README --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a1eebae73..305bffdfe 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Build dynamic memory for Agents and replace RAG using scalable, modular ECL (Ext ## Get Started -Get started quickly with a Google Colab notebook , Deepnote notebook or starter repo +Get started quickly with a Google Colab notebook , Deepnote notebook or starter repo ## About cognee @@ -224,12 +224,12 @@ We now have a paper you can cite: ```bibtex @misc{markovic2025optimizinginterfaceknowledgegraphs, - title={Optimizing the Interface Between Knowledge Graphs and LLMs for Complex Reasoning}, + title={Optimizing the Interface Between Knowledge Graphs and LLMs for Complex Reasoning}, author={Vasilije Markovic and Lazar Obradovic and Laszlo Hajdu and Jovan Pavlovic}, year={2025}, eprint={2505.24478}, archivePrefix={arXiv}, primaryClass={cs.AI}, - url={https://arxiv.org/abs/2505.24478}, + url={https://arxiv.org/abs/2505.24478}, } ``` From 279d6e80f03420838ae9e4ca81648563290d4d36 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Mon, 20 Oct 2025 11:56:15 +0100 Subject: [PATCH 32/35] Revert "fix: search without prior cognify" --- cognee/api/v1/search/search.py | 4 ---- .../databases/graph/graph_db_interface.py | 5 ---- .../databases/graph/kuzu/adapter.py | 9 -------- .../databases/graph/neo4j_driver/adapter.py | 9 -------- cognee/tests/test_kuzu.py | 23 ++++--------------- cognee/tests/test_neo4j.py | 22 ++++-------------- cognee/tests/unit/api/test_search.py | 21 ----------------- 7 files changed, 9 insertions(+), 84 deletions(-) delete mode 100644 cognee/tests/unit/api/test_search.py diff --git a/cognee/api/v1/search/search.py b/cognee/api/v1/search/search.py index 4051bae86..0a9e76e96 100644 --- a/cognee/api/v1/search/search.py +++ b/cognee/api/v1/search/search.py @@ -1,7 +1,6 @@ from uuid import UUID from typing import Union, Optional, List, Type -from cognee.infrastructure.databases.graph import get_graph_engine from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.users.models import User from cognee.modules.search.types import SearchResult, SearchType, CombinedSearchResult @@ -9,9 +8,6 @@ from cognee.modules.users.methods import get_default_user from cognee.modules.search.methods import search as search_function from cognee.modules.data.methods import get_authorized_existing_datasets from cognee.modules.data.exceptions import DatasetNotFoundError -from cognee.shared.logging_utils import get_logger - -logger = get_logger() async def search( diff --git a/cognee/infrastructure/databases/graph/graph_db_interface.py b/cognee/infrastructure/databases/graph/graph_db_interface.py index 67df1a27c..65afdf275 100644 --- a/cognee/infrastructure/databases/graph/graph_db_interface.py +++ b/cognee/infrastructure/databases/graph/graph_db_interface.py @@ -159,11 +159,6 @@ class GraphDBInterface(ABC): - get_connections """ - @abstractmethod - async def is_empty(self) -> bool: - logger.warning("is_empty() is not implemented") - return True - @abstractmethod async def query(self, query: str, params: dict) -> List[Any]: """ diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 2d3866888..3f0fb0c57 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -198,15 +198,6 @@ class KuzuAdapter(GraphDBInterface): except FileNotFoundError: logger.warning(f"Kuzu S3 storage file not found: {self.db_path}") - async def is_empty(self) -> bool: - query = """ - MATCH (n) - RETURN true - LIMIT 1; - """ - query_result = await self.query(query) - return len(query_result) == 0 - async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]: """ Execute a Kuzu query asynchronously with automatic reconnection. diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index 5861b69cb..520295ed2 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -87,15 +87,6 @@ class Neo4jAdapter(GraphDBInterface): async with self.driver.session(database=self.graph_database_name) as session: yield session - async def is_empty(self) -> bool: - query = """ - RETURN EXISTS { - MATCH (n) - } AS node_exists; - """ - query_result = await self.query(query) - return not query_result[0]["node_exists"] - @deadlock_retry() async def query( self, diff --git a/cognee/tests/test_kuzu.py b/cognee/tests/test_kuzu.py index fe9da6dcb..8749e42d0 100644 --- a/cognee/tests/test_kuzu.py +++ b/cognee/tests/test_kuzu.py @@ -47,26 +47,10 @@ async def main(): pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt" ) - from cognee.infrastructure.databases.graph import get_graph_engine - - graph_engine = await get_graph_engine() - - is_empty = await graph_engine.is_empty() - - assert is_empty, "Kuzu graph database is not empty" - await cognee.add([explanation_file_path_quantum], dataset_name) - is_empty = await graph_engine.is_empty() - - assert is_empty, "Kuzu graph database should be empty before cognify" - await cognee.cognify([dataset_name]) - is_empty = await graph_engine.is_empty() - - assert not is_empty, "Kuzu graph database should not be empty" - from cognee.infrastructure.databases.vector import get_vector_engine vector_engine = get_vector_engine() @@ -130,10 +114,11 @@ async def main(): assert not os.path.isdir(data_root_directory), "Local data files are not deleted" await cognee.prune.prune_system(metadata=True) + from cognee.infrastructure.databases.graph import get_graph_engine - is_empty = await graph_engine.is_empty() - - assert is_empty, "Kuzu graph database is not empty" + graph_engine = await get_graph_engine() + nodes, edges = await graph_engine.get_graph_data() + assert len(nodes) == 0 and len(edges) == 0, "Kuzu graph database is not empty" finally: # Ensure cleanup even if tests fail diff --git a/cognee/tests/test_neo4j.py b/cognee/tests/test_neo4j.py index 925614e67..c74b4ab65 100644 --- a/cognee/tests/test_neo4j.py +++ b/cognee/tests/test_neo4j.py @@ -35,14 +35,6 @@ async def main(): explanation_file_path_nlp = os.path.join( pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" ) - from cognee.infrastructure.databases.graph import get_graph_engine - - graph_engine = await get_graph_engine() - - is_empty = await graph_engine.is_empty() - - assert is_empty, "Graph has to be empty" - await cognee.add([explanation_file_path_nlp], dataset_name) explanation_file_path_quantum = os.path.join( @@ -50,16 +42,9 @@ async def main(): ) await cognee.add([explanation_file_path_quantum], dataset_name) - is_empty = await graph_engine.is_empty() - - assert is_empty, "Graph has to be empty before cognify" await cognee.cognify([dataset_name]) - is_empty = await graph_engine.is_empty() - - assert not is_empty, "Graph shouldn't be empty" - from cognee.infrastructure.databases.vector import get_vector_engine vector_engine = get_vector_engine() @@ -132,8 +117,11 @@ async def main(): assert not os.path.isdir(data_root_directory), "Local data files are not deleted" await cognee.prune.prune_system(metadata=True) - is_empty = await graph_engine.is_empty() - assert is_empty, "Neo4j graph database is not empty" + from cognee.infrastructure.databases.graph import get_graph_engine + + graph_engine = await get_graph_engine() + nodes, edges = await graph_engine.get_graph_data() + assert len(nodes) == 0 and len(edges) == 0, "Neo4j graph database is not empty" if __name__ == "__main__": diff --git a/cognee/tests/unit/api/test_search.py b/cognee/tests/unit/api/test_search.py deleted file mode 100644 index 54a4cc35f..000000000 --- a/cognee/tests/unit/api/test_search.py +++ /dev/null @@ -1,21 +0,0 @@ -import pytest -import cognee - - -@pytest.mark.asyncio -async def test_empty_search_raises_SearchOnEmptyGraphError_on_empty_graph(): - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - await cognee.add("Sample input") - result = await cognee.search("Sample query") - assert result == [] - - -@pytest.mark.asyncio -async def test_empty_search_doesnt_raise_SearchOnEmptyGraphError(): - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - await cognee.add("Sample input") - await cognee.cognify() - result = await cognee.search("Sample query") - assert result != [] From 3e54b67b4d7f20c385afad0bc878943df9a0b86c Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Mon, 20 Oct 2025 15:03:35 +0200 Subject: [PATCH 33/35] fix: Resolve missing argument for distributed (#1563) ## Description Resolve missing argument for distributed ## Type of Change - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Other (please specify): ## Screenshots/Videos (if applicable) ## Pre-submission Checklist - [ ] **I have tested my changes thoroughly before submitting this PR** - [ ] **This PR contains minimal changes necessary to address the issue/feature** - [ ] My code follows the project's coding standards and style guidelines - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added necessary documentation (if applicable) - [ ] All new and existing tests pass - [ ] I have searched existing PRs to ensure this change hasn't been submitted already - [ ] I have linked any relevant issues in the description - [ ] My commits have clear and descriptive messages ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com> --- cognee/modules/pipelines/operations/run_tasks_distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cognee/modules/pipelines/operations/run_tasks_distributed.py b/cognee/modules/pipelines/operations/run_tasks_distributed.py index 95cdb0266..3fce3763d 100644 --- a/cognee/modules/pipelines/operations/run_tasks_distributed.py +++ b/cognee/modules/pipelines/operations/run_tasks_distributed.py @@ -88,6 +88,7 @@ async def run_tasks_distributed( pipeline_name: str = "unknown_pipeline", context: dict = None, incremental_loading: bool = False, + data_per_batch: int = 20, ): if not user: user = await get_default_user() From df038365c848775229e1c9255d56992352b1990e Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 20 Oct 2025 17:27:49 +0200 Subject: [PATCH 34/35] fix: fixes id in get_filtered_graph_data (#1569) ## Description Fixes get_filtered_graph_data method in neo4jAdapter. ## Type of Change - [x] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Other (please specify): ## Screenshots/Videos (if applicable) ## Pre-submission Checklist - [x] **I have tested my changes thoroughly before submitting this PR** - [x] **This PR contains minimal changes necessary to address the issue/feature** - [x] My code follows the project's coding standards and style guidelines - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have added necessary documentation (if applicable) - [x] All new and existing tests pass - [x] I have searched existing PRs to ensure this change hasn't been submitted already - [x] I have linked any relevant issues in the description - [x] My commits have clear and descriptive messages ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- cognee/infrastructure/databases/graph/neo4j_driver/adapter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index 520295ed2..365d02979 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -1067,7 +1067,7 @@ class Neo4jAdapter(GraphDBInterface): query_nodes = f""" MATCH (n) WHERE {where_clause} - RETURN ID(n) AS id, labels(n) AS labels, properties(n) AS properties + RETURN n.id AS id, labels(n) AS labels, properties(n) AS properties """ result_nodes = await self.query(query_nodes) @@ -1082,7 +1082,7 @@ class Neo4jAdapter(GraphDBInterface): query_edges = f""" MATCH (n)-[r]->(m) WHERE {where_clause} AND {where_clause.replace("n.", "m.")} - RETURN ID(n) AS source, ID(m) AS target, TYPE(r) AS type, properties(r) AS properties + RETURN n.id AS source, n.id AS target, TYPE(r) AS type, properties(r) AS properties """ result_edges = await self.query(query_edges) From 612a2252ce012fc8929ffe6523ed6bc948a4db55 Mon Sep 17 00:00:00 2001 From: vasilije Date: Tue, 21 Oct 2025 07:22:52 +0200 Subject: [PATCH 35/35] fix --- poetry.lock | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 80263027e..2773e61b9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "accelerate" @@ -4366,6 +4366,8 @@ groups = ["main"] markers = "extra == \"dlt\"" files = [ {file = "jsonpath-ng-1.7.0.tar.gz", hash = "sha256:f6f5f7fd4e5ff79c785f1573b394043b39849fb2bb47bcead935d12b00beab3c"}, + {file = "jsonpath_ng-1.7.0-py2-none-any.whl", hash = "sha256:898c93fc173f0c336784a3fa63d7434297544b7198124a68f9a3ef9597b0ae6e"}, + {file = "jsonpath_ng-1.7.0-py3-none-any.whl", hash = "sha256:f3d7f9e848cba1b6da28c55b1c26ff915dc9e0b1ba7e752a53d6da8d5cbd00b6"}, ] [package.dependencies] @@ -10208,6 +10210,13 @@ optional = false python-versions = ">=3.8" groups = ["main"] files = [ + {file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"}, + {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"}, + {file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"}, + {file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"}, + {file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"}, {file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},