Enable different chunking methods

This commit is contained in:
Vasilije 2024-08-08 19:59:26 +02:00
parent d1ae1aeedc
commit e80d39167b
18 changed files with 134 additions and 19 deletions

View file

@ -55,10 +55,10 @@ async def cognify(datasets: Union[str, list[str]] = None, user: User = None):
data: list[Data] = await get_dataset_data(dataset_id = dataset.id)
documents = [
PdfDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "pdf" else
AudioDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "audio" else
ImageDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location) if data_item.extension == "image" else
TextDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location)
PdfDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location, chunking_strategy=None) if data_item.extension == "pdf" else
AudioDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location, chunking_strategy=None) if data_item.extension == "audio" else
ImageDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location, chunking_strategy=None) if data_item.extension == "image" else
TextDocument(id = data_item.id, title=f"{data_item.name}.{data_item.extension}", file_path=data_item.raw_data_location, chunking_strategy=None)
for data_item in data
]

View file

@ -9,6 +9,8 @@ class BaseConfig(BaseSettings):
monitoring_tool: object = MonitoringTool.LANGFUSE
graphistry_username: Optional[str] = None
graphistry_password: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
model_config = SettingsConfigDict(env_file = ".env", extra = "allow")

View file

@ -4,6 +4,8 @@ import re
from cognee.shared.data_models import ChunkStrategy
# /Users/vasa/Projects/cognee/cognee/infrastructure/data/chunking/DefaultChunkEngine.py
class DefaultChunkEngine():
def __init__(self, chunk_strategy=None, chunk_size=None, chunk_overlap=None):
self.chunk_strategy = chunk_strategy

View file

@ -2,19 +2,22 @@ from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from cognee.modules.data.processing.document_types.Document import Document
from cognee.tasks.chunking.chunking_registry import get_chunking_function
class AudioReader:
id: UUID
file_path: str
chunking_function:callable
def __init__(self, id: UUID, file_path: str):
def __init__(self, id: UUID, file_path: str, chunking_strategy:str = "paragraph"):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
self.chunking_function = get_chunking_function(chunking_strategy)
def read(self, max_chunk_size: Optional[int] = 1024):
chunk_index = 0
@ -37,7 +40,7 @@ class AudioReader:
chunked_pages.append(page_index)
page_index += 1
for chunk_data in chunk_by_paragraph(page_text, max_chunk_size, batch_paragraphs=True):
for chunk_data in self.chunking_function(page_text, max_chunk_size, batch_paragraphs=True):
if chunk_size + chunk_data["word_count"] <= max_chunk_size:
paragraph_chunks.append(chunk_data)
chunk_size += chunk_data["word_count"]

View file

@ -6,3 +6,4 @@ class Document(Protocol):
type: str
title: str
file_path: str
chunking_strategy:str

View file

@ -2,19 +2,23 @@ from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from cognee.modules.data.processing.document_types.Document import Document
from cognee.tasks.chunking import chunk_by_paragraph
from cognee.tasks.chunking.chunking_registry import get_chunking_function
class ImageReader:
id: UUID
file_path: str
chunking_strategy:str
def __init__(self, id: UUID, file_path: str):
def __init__(self, id: UUID, file_path: str, chunking_strategy:str = "paragraph"):
self.id = id
self.file_path = file_path
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
self.llm_client = get_llm_client() # You can choose different models like "tiny", "base", "small", etc.
self.chunking_function = get_chunking_function(chunking_strategy)
def read(self, max_chunk_size: Optional[int] = 1024):
chunk_index = 0

View file

@ -3,17 +3,22 @@ import logging
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from pypdf import PdfReader as pypdf_PdfReader
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from cognee.tasks.chunking import chunk_by_paragraph
from cognee.tasks.chunking.chunking_registry import get_chunking_function
from .Document import Document
class PdfReader():
id: UUID
file_path: str
def __init__(self, id: UUID, file_path: str):
def __init__(self, id: UUID, file_path: str, chunking_strategy:str = "paragraph"):
self.id = id
self.file_path = file_path
self.chunking_strategy = chunking_strategy
self.chunking_function = get_chunking_function(chunking_strategy)
def get_number_of_pages(self):
file = pypdf_PdfReader(self.file_path)
@ -33,7 +38,7 @@ class PdfReader():
page_text = page.extract_text()
chunked_pages.append(page_index)
for chunk_data in chunk_by_paragraph(page_text, max_chunk_size, batch_paragraphs = True):
for chunk_data in self.chunking_function(page_text, max_chunk_size, batch_paragraphs = True):
if chunk_size + chunk_data["word_count"] <= max_chunk_size:
paragraph_chunks.append(chunk_data)
chunk_size += chunk_data["word_count"]

View file

@ -1,16 +1,22 @@
from uuid import UUID, uuid5, NAMESPACE_OID
from typing import Optional
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.modules.data.processing.chunk_types.DocumentChunk import DocumentChunk
from cognee.tasks.chunking.chunking_registry import get_chunking_function
from .Document import Document
class TextReader():
id: UUID
file_path: str
chunking_strategy:str
def __init__(self, id: UUID, file_path: str):
def __init__(self, id: UUID, file_path: str, chunking_strategy:str="paragraph"):
self.id = id
self.file_path = file_path
self.chunking_strategy = chunking_strategy
self.chunking_function = get_chunking_function(chunking_strategy)
def get_number_of_pages(self):
num_pages = 1 # Pure text is not formatted
@ -39,7 +45,7 @@ class TextReader():
chunked_pages.append(page_index)
page_index += 1
for chunk_data in chunk_by_paragraph(page_text, max_chunk_size, batch_paragraphs = True):
for chunk_data in self.chunking_function(page_text, max_chunk_size, batch_paragraphs = True):
if chunk_size + chunk_data["word_count"] <= max_chunk_size:
paragraph_chunks.append(chunk_data)
chunk_size += chunk_data["word_count"]

View file

View file

@ -0,0 +1,39 @@
import logging
from cognee.base_config import get_base_config
BaseConfig = get_base_config()
async def translate_text(data, source_language:str='sr', target_language:str='en', region_name='eu-west-1'):
"""
Translate text from source language to target language using AWS Translate.
Parameters:
data (str): The text to be translated.
source_language (str): The source language code (e.g., 'sr' for Serbian). ISO 639-2 Code https://www.loc.gov/standards/iso639-2/php/code_list.php
target_language (str): The target language code (e.g., 'en' for English). ISO 639-2 Code https://www.loc.gov/standards/iso639-2/php/code_list.php
region_name (str): AWS region name.
Returns:
str: Translated text or an error message.
"""
import boto3
from botocore.exceptions import BotoCoreError, ClientError
if not data:
yield "No text provided for translation."
if not source_language or not target_language:
yield "Both source and target language codes are required."
try:
translate = boto3.client(service_name='translate', region_name=region_name, use_ssl=True)
result = translate.translate_text(Text=data, SourceLanguageCode=source_language, TargetLanguageCode=target_language)
yield result.get('TranslatedText', 'No translation found.')
except BotoCoreError as e:
logging.info(f"BotoCoreError occurred: {e}")
yield "Error with AWS Translate service configuration or request."
except ClientError as e:
logging.info(f"ClientError occurred: {e}")
yield "Error with AWS client or network issue."

View file

@ -1,4 +1,4 @@
from cognee.modules.data.chunking import chunk_by_paragraph
from cognee.tasks.chunking import chunk_by_paragraph
if __name__ == "__main__":
def test_chunking_on_whole_text():

View file

@ -1,6 +1,8 @@
from uuid import uuid5, NAMESPACE_OID
from .chunk_by_sentence import chunk_by_sentence
from cognee.tasks.chunking.chunking_registry import register_chunking_function
@register_chunking_function("paragraph")
def chunk_by_paragraph(data: str, paragraph_length: int = 1024, batch_paragraphs = True):
paragraph = ""
last_cut_type = None

View file

@ -1,6 +1,11 @@
from uuid import uuid4
from .chunk_by_word import chunk_by_word
from cognee.tasks.chunking.chunking_registry import register_chunking_function
@register_chunking_function("sentence")
def chunk_by_sentence(data: str):
sentence = ""
paragraph_id = uuid4()

View file

@ -0,0 +1,10 @@
chunking_registry = {}
def register_chunking_function(name):
def decorator(func):
chunking_registry[name] = func
return func
return decorator
def get_chunking_function(name: str):
return chunking_registry.get(name)

View file

@ -0,0 +1,36 @@
import logging
async def detect_language(data:str):
"""
Detect the language of the given text and return its ISO 639-1 language code.
If the detected language is Croatian ('hr'), it maps to Serbian ('sr').
The text is trimmed to the first 100 characters for efficient processing.
Parameters:
text (str): The text for language detection.
Returns:
str: The ISO 639-1 language code of the detected language, or 'None' in case of an error.
"""
# Trim the text to the first 100 characters
from langdetect import detect, LangDetectException
trimmed_text = data[:100]
try:
# Detect the language using langdetect
detected_lang_iso639_1 = detect(trimmed_text)
logging.info(f"Detected ISO 639-1 code: {detected_lang_iso639_1}")
# Special case: map 'hr' (Croatian) to 'sr' (Serbian ISO 639-2)
if detected_lang_iso639_1 == 'hr':
yield 'sr'
yield detected_lang_iso639_1
except LangDetectException as e:
logging.error(f"Language detection error: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
yield None