Merge branch 'dev' into feature/cog-3160-redis-session-conversation
This commit is contained in:
commit
5a27c37cc2
19 changed files with 1471 additions and 1400 deletions
|
|
@ -28,11 +28,10 @@ EMBEDDING_ENDPOINT=""
|
|||
EMBEDDING_API_VERSION=""
|
||||
EMBEDDING_DIMENSIONS=3072
|
||||
EMBEDDING_MAX_TOKENS=8191
|
||||
EMBEDDING_BATCH_SIZE=36
|
||||
# If embedding key is not provided same key set for LLM_API_KEY will be used
|
||||
#EMBEDDING_API_KEY="your_api_key"
|
||||
# Note: OpenAI support up to 2048 elements and Gemini supports a maximum of 100 elements in an embedding batch,
|
||||
# Cognee sets the optimal batch size for OpenAI and Gemini, but a custom size can be defined if necessary for other models
|
||||
#EMBEDDING_BATCH_SIZE=2048
|
||||
|
||||
|
||||
# If using BAML structured output these env variables will be used
|
||||
BAML_LLM_PROVIDER=openai
|
||||
|
|
@ -248,10 +247,10 @@ LITELLM_LOG="ERROR"
|
|||
#LLM_PROVIDER="ollama"
|
||||
#LLM_ENDPOINT="http://localhost:11434/v1"
|
||||
#EMBEDDING_PROVIDER="ollama"
|
||||
#EMBEDDING_MODEL="avr/sfr-embedding-mistral:latest"
|
||||
#EMBEDDING_MODEL="nomic-embed-text:latest"
|
||||
#EMBEDDING_ENDPOINT="http://localhost:11434/api/embeddings"
|
||||
#EMBEDDING_DIMENSIONS=4096
|
||||
#HUGGINGFACE_TOKENIZER="Salesforce/SFR-Embedding-Mistral"
|
||||
#EMBEDDING_DIMENSIONS=768
|
||||
#HUGGINGFACE_TOKENIZER="nomic-ai/nomic-embed-text-v1.5"
|
||||
|
||||
########## OpenRouter (also free) #########################################################
|
||||
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ async def cognify(
|
|||
graph_model: BaseModel = KnowledgeGraph,
|
||||
chunker=TextChunker,
|
||||
chunk_size: int = None,
|
||||
chunks_per_batch: int = None,
|
||||
config: Config = None,
|
||||
vector_db_config: dict = None,
|
||||
graph_db_config: dict = None,
|
||||
|
|
@ -106,6 +107,7 @@ async def cognify(
|
|||
Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2)
|
||||
Default limits: ~512-8192 tokens depending on models.
|
||||
Smaller chunks = more granular but potentially fragmented knowledge.
|
||||
chunks_per_batch: Number of chunks to be processed in a single batch in Cognify tasks.
|
||||
vector_db_config: Custom vector database configuration for embeddings storage.
|
||||
graph_db_config: Custom graph database configuration for relationship storage.
|
||||
run_in_background: If True, starts processing asynchronously and returns immediately.
|
||||
|
|
@ -210,10 +212,18 @@ async def cognify(
|
|||
}
|
||||
|
||||
if temporal_cognify:
|
||||
tasks = await get_temporal_tasks(user, chunker, chunk_size)
|
||||
tasks = await get_temporal_tasks(
|
||||
user=user, chunker=chunker, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch
|
||||
)
|
||||
else:
|
||||
tasks = await get_default_tasks(
|
||||
user, graph_model, chunker, chunk_size, config, custom_prompt
|
||||
user=user,
|
||||
graph_model=graph_model,
|
||||
chunker=chunker,
|
||||
chunk_size=chunk_size,
|
||||
config=config,
|
||||
custom_prompt=custom_prompt,
|
||||
chunks_per_batch=chunks_per_batch,
|
||||
)
|
||||
|
||||
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
|
||||
|
|
@ -240,6 +250,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
|
|||
chunk_size: int = None,
|
||||
config: Config = None,
|
||||
custom_prompt: Optional[str] = None,
|
||||
chunks_per_batch: int = 100,
|
||||
) -> list[Task]:
|
||||
if config is None:
|
||||
ontology_config = get_ontology_env_config()
|
||||
|
|
@ -258,6 +269,9 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
|
|||
"ontology_config": {"ontology_resolver": get_default_ontology_resolver()}
|
||||
}
|
||||
|
||||
if chunks_per_batch is None:
|
||||
chunks_per_batch = 100
|
||||
|
||||
default_tasks = [
|
||||
Task(classify_documents),
|
||||
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
|
||||
|
|
@ -271,20 +285,20 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
|
|||
graph_model=graph_model,
|
||||
config=config,
|
||||
custom_prompt=custom_prompt,
|
||||
task_config={"batch_size": 10},
|
||||
task_config={"batch_size": chunks_per_batch},
|
||||
), # Generate knowledge graphs from the document chunks.
|
||||
Task(
|
||||
summarize_text,
|
||||
task_config={"batch_size": 10},
|
||||
task_config={"batch_size": chunks_per_batch},
|
||||
),
|
||||
Task(add_data_points, task_config={"batch_size": 10}),
|
||||
Task(add_data_points, task_config={"batch_size": chunks_per_batch}),
|
||||
]
|
||||
|
||||
return default_tasks
|
||||
|
||||
|
||||
async def get_temporal_tasks(
|
||||
user: User = None, chunker=TextChunker, chunk_size: int = None
|
||||
user: User = None, chunker=TextChunker, chunk_size: int = None, chunks_per_batch: int = 10
|
||||
) -> list[Task]:
|
||||
"""
|
||||
Builds and returns a list of temporal processing tasks to be executed in sequence.
|
||||
|
|
@ -301,10 +315,14 @@ async def get_temporal_tasks(
|
|||
user (User, optional): The user requesting task execution, used for permission checks.
|
||||
chunker (Callable, optional): A text chunking function/class to split documents. Defaults to TextChunker.
|
||||
chunk_size (int, optional): Maximum token size per chunk. If not provided, uses system default.
|
||||
chunks_per_batch (int, optional): Number of chunks to process in a single batch in Cognify
|
||||
|
||||
Returns:
|
||||
list[Task]: A list of Task objects representing the temporal processing pipeline.
|
||||
"""
|
||||
if chunks_per_batch is None:
|
||||
chunks_per_batch = 10
|
||||
|
||||
temporal_tasks = [
|
||||
Task(classify_documents),
|
||||
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
|
||||
|
|
@ -313,9 +331,9 @@ async def get_temporal_tasks(
|
|||
max_chunk_size=chunk_size or get_max_chunk_tokens(),
|
||||
chunker=chunker,
|
||||
),
|
||||
Task(extract_events_and_timestamps, task_config={"chunk_size": 10}),
|
||||
Task(extract_events_and_timestamps, task_config={"batch_size": chunks_per_batch}),
|
||||
Task(extract_knowledge_graph_from_events),
|
||||
Task(add_data_points, task_config={"batch_size": 10}),
|
||||
Task(add_data_points, task_config={"batch_size": chunks_per_batch}),
|
||||
]
|
||||
|
||||
return temporal_tasks
|
||||
|
|
|
|||
|
|
@ -1,8 +1,17 @@
|
|||
from cognee.shared.logging_utils import get_logger
|
||||
import os
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
from fastembed import TextEmbedding
|
||||
import litellm
|
||||
import os
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine
|
||||
from cognee.infrastructure.databases.exceptions import EmbeddingException
|
||||
from cognee.infrastructure.llm.tokenizer.TikToken import (
|
||||
|
|
@ -57,6 +66,13 @@ class FastembedEmbeddingEngine(EmbeddingEngine):
|
|||
enable_mocking = str(enable_mocking).lower()
|
||||
self.mock = enable_mocking in ("true", "1", "yes")
|
||||
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def embed_text(self, text: List[str]) -> List[List[float]]:
|
||||
"""
|
||||
Embed the given text into numerical vectors.
|
||||
|
|
|
|||
|
|
@ -1,15 +1,21 @@
|
|||
import asyncio
|
||||
import logging
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from typing import List, Optional
|
||||
import numpy as np
|
||||
import math
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
import litellm
|
||||
import os
|
||||
from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine
|
||||
from cognee.infrastructure.databases.exceptions import EmbeddingException
|
||||
from cognee.infrastructure.llm.tokenizer.Gemini import (
|
||||
GeminiTokenizer,
|
||||
)
|
||||
from cognee.infrastructure.llm.tokenizer.HuggingFace import (
|
||||
HuggingFaceTokenizer,
|
||||
)
|
||||
|
|
@ -19,10 +25,6 @@ from cognee.infrastructure.llm.tokenizer.Mistral import (
|
|||
from cognee.infrastructure.llm.tokenizer.TikToken import (
|
||||
TikTokenTokenizer,
|
||||
)
|
||||
from cognee.infrastructure.databases.vector.embeddings.embedding_rate_limiter import (
|
||||
embedding_rate_limit_async,
|
||||
embedding_sleep_and_retry_async,
|
||||
)
|
||||
|
||||
litellm.set_verbose = False
|
||||
logger = get_logger("LiteLLMEmbeddingEngine")
|
||||
|
|
@ -76,8 +78,13 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
|
|||
enable_mocking = str(enable_mocking).lower()
|
||||
self.mock = enable_mocking in ("true", "1", "yes")
|
||||
|
||||
@embedding_sleep_and_retry_async()
|
||||
@embedding_rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def embed_text(self, text: List[str]) -> List[List[float]]:
|
||||
"""
|
||||
Embed a list of text strings into vector representations.
|
||||
|
|
|
|||
|
|
@ -3,8 +3,16 @@ from cognee.shared.logging_utils import get_logger
|
|||
import aiohttp
|
||||
from typing import List, Optional
|
||||
import os
|
||||
|
||||
import litellm
|
||||
import logging
|
||||
import aiohttp.http_exceptions
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
from cognee.infrastructure.databases.vector.embeddings.EmbeddingEngine import EmbeddingEngine
|
||||
from cognee.infrastructure.llm.tokenizer.HuggingFace import (
|
||||
|
|
@ -69,7 +77,6 @@ class OllamaEmbeddingEngine(EmbeddingEngine):
|
|||
enable_mocking = str(enable_mocking).lower()
|
||||
self.mock = enable_mocking in ("true", "1", "yes")
|
||||
|
||||
@embedding_rate_limit_async
|
||||
async def embed_text(self, text: List[str]) -> List[List[float]]:
|
||||
"""
|
||||
Generate embedding vectors for a list of text prompts.
|
||||
|
|
@ -92,7 +99,13 @@ class OllamaEmbeddingEngine(EmbeddingEngine):
|
|||
embeddings = await asyncio.gather(*[self._get_embedding(prompt) for prompt in text])
|
||||
return embeddings
|
||||
|
||||
@embedding_sleep_and_retry_async()
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def _get_embedding(self, prompt: str) -> List[float]:
|
||||
"""
|
||||
Internal method to call the Ollama embeddings endpoint for a single prompt.
|
||||
|
|
|
|||
|
|
@ -24,11 +24,10 @@ class EmbeddingConfig(BaseSettings):
|
|||
model_config = SettingsConfigDict(env_file=".env", extra="allow")
|
||||
|
||||
def model_post_init(self, __context) -> None:
|
||||
# If embedding batch size is not defined use 2048 as default for OpenAI and 100 for all other embedding models
|
||||
if not self.embedding_batch_size and self.embedding_provider.lower() == "openai":
|
||||
self.embedding_batch_size = 2048
|
||||
self.embedding_batch_size = 36
|
||||
elif not self.embedding_batch_size:
|
||||
self.embedding_batch_size = 100
|
||||
self.embedding_batch_size = 36
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -1,19 +1,24 @@
|
|||
import logging
|
||||
from typing import Type
|
||||
from pydantic import BaseModel
|
||||
import litellm
|
||||
import instructor
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
sleep_and_retry_async,
|
||||
)
|
||||
|
||||
from cognee.infrastructure.llm.LLMGateway import LLMGateway
|
||||
from cognee.infrastructure.llm.config import get_llm_config
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
class AnthropicAdapter(LLMInterface):
|
||||
"""
|
||||
|
|
@ -35,8 +40,13 @@ class AnthropicAdapter(LLMInterface):
|
|||
self.model = model
|
||||
self.max_completion_tokens = max_completion_tokens
|
||||
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
|
|||
|
|
@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError
|
|||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
sleep_and_retry_async,
|
||||
import logging
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
class GeminiAdapter(LLMInterface):
|
||||
"""
|
||||
|
|
@ -58,8 +65,13 @@ class GeminiAdapter(LLMInterface):
|
|||
|
||||
self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON)
|
||||
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
|
|||
|
|
@ -12,11 +12,18 @@ from cognee.infrastructure.llm.exceptions import ContentPolicyFilterError
|
|||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
sleep_and_retry_async,
|
||||
import logging
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
class GenericAPIAdapter(LLMInterface):
|
||||
"""
|
||||
|
|
@ -58,8 +65,13 @@ class GenericAPIAdapter(LLMInterface):
|
|||
|
||||
self.aclient = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON)
|
||||
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
|
|||
|
|
@ -1,20 +1,23 @@
|
|||
import litellm
|
||||
import instructor
|
||||
from pydantic import BaseModel
|
||||
from typing import Type, Optional
|
||||
from litellm import acompletion, JSONSchemaValidationError
|
||||
from typing import Type
|
||||
from litellm import JSONSchemaValidationError
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.observability.get_observe import get_observe
|
||||
from cognee.infrastructure.llm.exceptions import MissingSystemPromptPathError
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
from cognee.infrastructure.llm.LLMGateway import LLMGateway
|
||||
from cognee.infrastructure.llm.config import get_llm_config
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
sleep_and_retry_async,
|
||||
|
||||
import logging
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
logger = get_logger()
|
||||
|
|
@ -47,8 +50,13 @@ class MistralAdapter(LLMInterface):
|
|||
api_key=get_llm_config().llm_api_key,
|
||||
)
|
||||
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
@ -99,31 +107,3 @@ class MistralAdapter(LLMInterface):
|
|||
logger.error(f"Schema validation failed: {str(e)}")
|
||||
logger.debug(f"Raw response: {e.raw_response}")
|
||||
raise ValueError(f"Response failed schema validation: {str(e)}")
|
||||
|
||||
def show_prompt(self, text_input: str, system_prompt: str) -> str:
|
||||
"""
|
||||
Format and display the prompt for a user query.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
- text_input (str): Input text from the user to be included in the prompt.
|
||||
- system_prompt (str): The system prompt that will be shown alongside the user input.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
- str: The formatted prompt string combining system prompt and user input.
|
||||
"""
|
||||
if not text_input:
|
||||
text_input = "No user input provided."
|
||||
if not system_prompt:
|
||||
raise MissingSystemPromptPathError()
|
||||
|
||||
system_prompt = LLMGateway.read_query_prompt(system_prompt)
|
||||
|
||||
formatted_prompt = (
|
||||
f"""System Prompt:\n{system_prompt}\n\nUser Input:\n{text_input}\n"""
|
||||
if system_prompt
|
||||
else None
|
||||
)
|
||||
|
||||
return formatted_prompt
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
import base64
|
||||
import litellm
|
||||
import logging
|
||||
import instructor
|
||||
from typing import Type
|
||||
from openai import OpenAI
|
||||
|
|
@ -7,11 +9,17 @@ from pydantic import BaseModel
|
|||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
sleep_and_retry_async,
|
||||
)
|
||||
from cognee.infrastructure.files.utils.open_data_file import open_data_file
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
class OllamaAPIAdapter(LLMInterface):
|
||||
|
|
@ -47,8 +55,13 @@ class OllamaAPIAdapter(LLMInterface):
|
|||
OpenAI(base_url=self.endpoint, api_key=self.api_key), mode=instructor.Mode.JSON
|
||||
)
|
||||
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
@ -90,7 +103,13 @@ class OllamaAPIAdapter(LLMInterface):
|
|||
|
||||
return response
|
||||
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def create_transcript(self, input_file: str) -> str:
|
||||
"""
|
||||
Generate an audio transcript from a user query.
|
||||
|
|
@ -123,7 +142,13 @@ class OllamaAPIAdapter(LLMInterface):
|
|||
|
||||
return transcription.text
|
||||
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def transcribe_image(self, input_file: str) -> str:
|
||||
"""
|
||||
Transcribe content from an image using base64 encoding.
|
||||
|
|
|
|||
|
|
@ -7,6 +7,15 @@ from openai import ContentFilterFinishReasonError
|
|||
from litellm.exceptions import ContentPolicyViolationError
|
||||
from instructor.core import InstructorRetryException
|
||||
|
||||
import logging
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_delay,
|
||||
wait_exponential_jitter,
|
||||
retry_if_not_exception_type,
|
||||
before_sleep_log,
|
||||
)
|
||||
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.llm_interface import (
|
||||
LLMInterface,
|
||||
)
|
||||
|
|
@ -14,19 +23,13 @@ from cognee.infrastructure.llm.exceptions import (
|
|||
ContentPolicyFilterError,
|
||||
)
|
||||
from cognee.infrastructure.files.utils.open_data_file import open_data_file
|
||||
from cognee.infrastructure.llm.structured_output_framework.litellm_instructor.llm.rate_limiter import (
|
||||
rate_limit_async,
|
||||
rate_limit_sync,
|
||||
sleep_and_retry_async,
|
||||
sleep_and_retry_sync,
|
||||
)
|
||||
from cognee.modules.observability.get_observe import get_observe
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
observe = get_observe()
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
observe = get_observe()
|
||||
|
||||
|
||||
class OpenAIAdapter(LLMInterface):
|
||||
"""
|
||||
|
|
@ -97,8 +100,13 @@ class OpenAIAdapter(LLMInterface):
|
|||
self.fallback_endpoint = fallback_endpoint
|
||||
|
||||
@observe(as_type="generation")
|
||||
@sleep_and_retry_async()
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def acreate_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
@ -148,10 +156,7 @@ class OpenAIAdapter(LLMInterface):
|
|||
InstructorRetryException,
|
||||
) as e:
|
||||
if not (self.fallback_model and self.fallback_api_key):
|
||||
raise ContentPolicyFilterError(
|
||||
f"The provided input contains content that is not aligned with our content policy: {text_input}"
|
||||
) from e
|
||||
|
||||
raise e
|
||||
try:
|
||||
return await self.aclient.chat.completions.create(
|
||||
model=self.fallback_model,
|
||||
|
|
@ -186,8 +191,13 @@ class OpenAIAdapter(LLMInterface):
|
|||
) from error
|
||||
|
||||
@observe
|
||||
@sleep_and_retry_sync()
|
||||
@rate_limit_sync
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
def create_structured_output(
|
||||
self, text_input: str, system_prompt: str, response_model: Type[BaseModel]
|
||||
) -> BaseModel:
|
||||
|
|
@ -231,7 +241,13 @@ class OpenAIAdapter(LLMInterface):
|
|||
max_retries=self.MAX_RETRIES,
|
||||
)
|
||||
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def create_transcript(self, input):
|
||||
"""
|
||||
Generate an audio transcript from a user query.
|
||||
|
|
@ -263,7 +279,13 @@ class OpenAIAdapter(LLMInterface):
|
|||
|
||||
return transcription
|
||||
|
||||
@rate_limit_async
|
||||
@retry(
|
||||
stop=stop_after_delay(128),
|
||||
wait=wait_exponential_jitter(2, 128),
|
||||
retry=retry_if_not_exception_type(litellm.exceptions.NotFoundError),
|
||||
before_sleep=before_sleep_log(logger, logging.DEBUG),
|
||||
reraise=True,
|
||||
)
|
||||
async def transcribe_image(self, input) -> BaseModel:
|
||||
"""
|
||||
Generate a transcription of an image from a user query.
|
||||
|
|
|
|||
|
|
@ -14,14 +14,6 @@ from cognee.infrastructure.loaders.external.pypdf_loader import PyPdfLoader
|
|||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
try:
|
||||
from unstructured.partition.pdf import partition_pdf
|
||||
except ImportError as e:
|
||||
logger.info(
|
||||
"unstructured[pdf] not installed, can't use AdvancedPdfLoader, will use PyPdfLoader instead."
|
||||
)
|
||||
raise ImportError from e
|
||||
|
||||
|
||||
@dataclass
|
||||
class _PageBuffer:
|
||||
|
|
@ -88,6 +80,8 @@ class AdvancedPdfLoader(LoaderInterface):
|
|||
**kwargs,
|
||||
}
|
||||
# Use partition to extract elements
|
||||
from unstructured.partition.pdf import partition_pdf
|
||||
|
||||
elements = partition_pdf(**partition_kwargs)
|
||||
|
||||
# Process elements into text content
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from cognee.shared.logging_utils import get_logger
|
||||
import asyncio
|
||||
|
||||
from cognee.infrastructure.databases.exceptions import EmbeddingException
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
|
||||
|
|
@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]):
|
|||
indexed_data_point.metadata["index_fields"] = [field_name]
|
||||
index_points[index_name].append(indexed_data_point)
|
||||
|
||||
for index_name_and_field, indexable_points in index_points.items():
|
||||
first_occurence = index_name_and_field.index("_")
|
||||
index_name = index_name_and_field[:first_occurence]
|
||||
field_name = index_name_and_field[first_occurence + 1 :]
|
||||
try:
|
||||
# In case the amount of indexable points is too large we need to send them in batches
|
||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||
for i in range(0, len(indexable_points), batch_size):
|
||||
batch = indexable_points[i : i + batch_size]
|
||||
await vector_engine.index_data_points(index_name, field_name, batch)
|
||||
except EmbeddingException as e:
|
||||
logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}")
|
||||
tasks: list[asyncio.Task] = []
|
||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||
|
||||
for index_name_and_field, points in index_points.items():
|
||||
first = index_name_and_field.index("_")
|
||||
index_name = index_name_and_field[:first]
|
||||
field_name = index_name_and_field[first + 1 :]
|
||||
|
||||
# Create embedding requests per batch to run in parallel later
|
||||
for i in range(0, len(points), batch_size):
|
||||
batch = points[i : i + batch_size]
|
||||
tasks.append(
|
||||
asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch))
|
||||
)
|
||||
|
||||
# Run all embedding requests in parallel
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
return data_points
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import asyncio
|
||||
|
||||
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from collections import Counter
|
||||
|
|
@ -76,15 +78,20 @@ async def index_graph_edges(
|
|||
indexed_data_point.metadata["index_fields"] = [field_name]
|
||||
index_points[index_name].append(indexed_data_point)
|
||||
|
||||
# Get maximum batch size for embedding model
|
||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||
tasks: list[asyncio.Task] = []
|
||||
|
||||
for index_name, indexable_points in index_points.items():
|
||||
index_name, field_name = index_name.split(".")
|
||||
|
||||
# Get maximum batch size for embedding model
|
||||
batch_size = vector_engine.embedding_engine.get_batch_size()
|
||||
# We save the data in batches of {batch_size} to not put a lot of pressure on the database
|
||||
# Create embedding tasks to run in parallel later
|
||||
for start in range(0, len(indexable_points), batch_size):
|
||||
batch = indexable_points[start : start + batch_size]
|
||||
|
||||
await vector_engine.index_data_points(index_name, field_name, batch)
|
||||
tasks.append(vector_engine.index_data_points(index_name, field_name, batch))
|
||||
|
||||
# Start all embedding tasks and wait for completion
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
from typing import List
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.tasks.storage.add_data_points import add_data_points
|
||||
from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine
|
||||
import cognee
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
import json
|
||||
|
|
@ -64,7 +63,6 @@ async def create_connected_test_graph():
|
|||
|
||||
|
||||
async def get_metrics(provider: str, include_optional=True):
|
||||
create_graph_engine.cache_clear()
|
||||
cognee.config.set_graph_database_provider(provider)
|
||||
graph_engine = await get_graph_engine()
|
||||
await graph_engine.delete_graph()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,12 @@
|
|||
from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics
|
||||
import asyncio
|
||||
|
||||
|
||||
async def main():
|
||||
from cognee.tests.tasks.descriptive_metrics.metrics_test_utils import assert_metrics
|
||||
|
||||
await assert_metrics(provider="neo4j", include_optional=False)
|
||||
await assert_metrics(provider="neo4j", include_optional=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(assert_metrics(provider="neo4j", include_optional=False))
|
||||
asyncio.run(assert_metrics(provider="neo4j", include_optional=True))
|
||||
asyncio.run(main())
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ dependencies = [
|
|||
"gunicorn>=20.1.0,<24",
|
||||
"websockets>=15.0.1,<16.0.0",
|
||||
"mistralai>=1.9.10",
|
||||
"tenacity>=9.0.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
|
@ -64,6 +65,7 @@ api=[]
|
|||
distributed = [
|
||||
"modal>=1.0.5,<2.0.0",
|
||||
]
|
||||
|
||||
scraping = [
|
||||
"tavily-python>=0.7.12",
|
||||
"beautifulsoup4>=4.13.1",
|
||||
|
|
@ -72,6 +74,7 @@ scraping = [
|
|||
"protego>=0.1",
|
||||
"APScheduler>=3.10.0,<=3.11.0"
|
||||
]
|
||||
|
||||
neo4j = ["neo4j>=5.28.0,<6"]
|
||||
neptune = ["langchain_aws>=0.2.22"]
|
||||
postgres = [
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue