From 6f0637a02849a8c457b9a8619c9da37208404438 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 13 Nov 2024 11:30:30 +0100 Subject: [PATCH] Small cosmetic changes --- cognee/modules/chunking/TextChunker.py | 20 ++++++++++---------- cognee/tasks/chunks/chunk_by_word.py | 21 ++++++++++----------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index 714383804..f0a72b58a 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -9,7 +9,6 @@ class TextChunker(): chunk_index = 0 chunk_size = 0 - paragraph_chunks = [] def __init__(self, document, get_text: callable, chunk_size: int = 1024): self.document = document @@ -17,6 +16,7 @@ class TextChunker(): self.get_text = get_text def read(self): + paragraph_chunks = [] for content_text in self.get_text(): for chunk_data in chunk_by_paragraph( content_text, @@ -24,10 +24,10 @@ class TextChunker(): batch_paragraphs = True, ): if self.chunk_size + chunk_data["word_count"] <= self.max_chunk_size: - self.paragraph_chunks.append(chunk_data) + paragraph_chunks.append(chunk_data) self.chunk_size += chunk_data["word_count"] else: - if len(self.paragraph_chunks) == 0: + if len(paragraph_chunks) == 0: yield DocumentChunk( id = chunk_data["chunk_id"], text = chunk_data["text"], @@ -36,10 +36,10 @@ class TextChunker(): chunk_index = self.chunk_index, cut_type = chunk_data["cut_type"], ) - self.paragraph_chunks = [] + paragraph_chunks = [] self.chunk_size = 0 else: - chunk_text = " ".join(chunk["text"] for chunk in self.paragraph_chunks) + chunk_text = " ".join(chunk["text"] for chunk in paragraph_chunks) try: yield DocumentChunk( id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"), @@ -47,24 +47,24 @@ class TextChunker(): word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"], + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], ) except Exception as e: print(e) - self.paragraph_chunks = [chunk_data] + paragraph_chunks = [chunk_data] self.chunk_size = chunk_data["word_count"] self.chunk_index += 1 - if len(self.paragraph_chunks) > 0: + if len(paragraph_chunks) > 0: try: yield DocumentChunk( id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"), - text = " ".join(chunk["text"] for chunk in self.paragraph_chunks), + text = " ".join(chunk["text"] for chunk in paragraph_chunks), word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"], + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], ) except Exception as e: print(e) diff --git a/cognee/tasks/chunks/chunk_by_word.py b/cognee/tasks/chunks/chunk_by_word.py index 8621754d5..e82a9cd98 100644 --- a/cognee/tasks/chunks/chunk_by_word.py +++ b/cognee/tasks/chunks/chunk_by_word.py @@ -1,27 +1,26 @@ import re -def chunk_by_word(data: str): - sentence_endings = r"[.;!?…]" - paragraph_endings = r"[\n\r]" - last_processed_character = "" +SENTENCE_ENDINGS = r"[.;!?…]" +PARAGRAPH_ENDINGS = r"[\n\r]" +def chunk_by_word(data: str): + last_processed_character = "" word = "" i = 0 - while i < len(data): character = data[i] - if word == "" and (re.match(paragraph_endings, character) or character == " "): + if word == "" and (re.match(PARAGRAPH_ENDINGS, character) or character == " "): i = i + 1 continue def is_real_paragraph_end(): - if re.match(sentence_endings, last_processed_character): + if re.match(SENTENCE_ENDINGS, last_processed_character): return True j = i + 1 next_character = data[j] if j < len(data) else None - while next_character is not None and (re.match(paragraph_endings, next_character) or next_character == " "): + while next_character is not None and (re.match(PARAGRAPH_ENDINGS, next_character) or next_character == " "): j += 1 next_character = data[j] if j < len(data) else None if next_character and next_character.isupper(): @@ -29,7 +28,7 @@ def chunk_by_word(data: str): return False - if re.match(paragraph_endings, character): + if re.match(PARAGRAPH_ENDINGS, character): yield (word, "paragraph_end" if is_real_paragraph_end() else "word") word = "" i = i + 1 @@ -44,13 +43,13 @@ def chunk_by_word(data: str): word += character last_processed_character = character - if re.match(sentence_endings, character): + if re.match(SENTENCE_ENDINGS, character): # Check for ellipses. if i + 2 <= len(data) and data[i] == "." and data[i + 1] == "." and data[i + 2] == ".": word += ".." i = i + 2 - is_paragraph_end = i + 1 < len(data) and re.match(paragraph_endings, data[i + 1]) + is_paragraph_end = i + 1 < len(data) and re.match(PARAGRAPH_ENDINGS, data[i + 1]) yield (word, "paragraph_end" if is_paragraph_end else "sentence_end") word = ""