cognee/cognee/modules/chunking/TextChunker.py
Boris 46c4463cb2
feat: s3 storage (#988)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: vasilije <vas.markovic@gmail.com>
Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
2025-07-14 21:47:08 +02:00

78 lines
3.3 KiB
Python

from cognee.shared.logging_utils import get_logger
from uuid import NAMESPACE_OID, uuid5
from cognee.tasks.chunks import chunk_by_paragraph
from cognee.modules.chunking.Chunker import Chunker
from .models.DocumentChunk import DocumentChunk
logger = get_logger()
class TextChunker(Chunker):
async def read(self):
paragraph_chunks = []
async for content_text in self.get_text():
for chunk_data in chunk_by_paragraph(
content_text,
self.max_chunk_size,
batch_paragraphs=True,
):
if self.chunk_size + chunk_data["chunk_size"] <= self.max_chunk_size:
paragraph_chunks.append(chunk_data)
self.chunk_size += chunk_data["chunk_size"]
else:
if len(paragraph_chunks) == 0:
yield DocumentChunk(
id=chunk_data["chunk_id"],
text=chunk_data["text"],
chunk_size=chunk_data["chunk_size"],
is_part_of=self.document,
chunk_index=self.chunk_index,
cut_type=chunk_data["cut_type"],
contains=[],
metadata={
"index_fields": ["text"],
},
)
paragraph_chunks = []
self.chunk_size = 0
else:
chunk_text = " ".join(chunk["text"] for chunk in paragraph_chunks)
try:
yield DocumentChunk(
id=uuid5(
NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"
),
text=chunk_text,
chunk_size=self.chunk_size,
is_part_of=self.document,
chunk_index=self.chunk_index,
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains=[],
metadata={
"index_fields": ["text"],
},
)
except Exception as e:
logger.error(e)
raise e
paragraph_chunks = [chunk_data]
self.chunk_size = chunk_data["chunk_size"]
self.chunk_index += 1
if len(paragraph_chunks) > 0:
try:
yield DocumentChunk(
id=uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"),
text=" ".join(chunk["text"] for chunk in paragraph_chunks),
chunk_size=self.chunk_size,
is_part_of=self.document,
chunk_index=self.chunk_index,
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains=[],
metadata={"index_fields": ["text"]},
)
except Exception as e:
logger.error(e)
raise e