diff --git a/cognee/tasks/chunks/chunk_by_word.py b/cognee/tasks/chunks/chunk_by_word.py index e82a9cd98..7ebf4bced 100644 --- a/cognee/tasks/chunks/chunk_by_word.py +++ b/cognee/tasks/chunks/chunk_by_word.py @@ -4,20 +4,29 @@ SENTENCE_ENDINGS = r"[.;!?…]" PARAGRAPH_ENDINGS = r"[\n\r]" def chunk_by_word(data: str): + """ + Chunks text into words and endings while preserving whitespace. + Whitespace is included with the preceding word. + Outputs can be joined with "" to recreate the original input. + """ last_processed_character = "" - word = "" + current_chunk = "" i = 0 + + # Handle leading whitespace if any + while i < len(data) and (re.match(PARAGRAPH_ENDINGS, data[i]) or data[i] == " "): + current_chunk += data[i] + i += 1 + if current_chunk: + yield (current_chunk, "word") + current_chunk = "" + while i < len(data): character = data[i] - - if word == "" and (re.match(PARAGRAPH_ENDINGS, character) or character == " "): - i = i + 1 - continue - + def is_real_paragraph_end(): if re.match(SENTENCE_ENDINGS, last_processed_character): return True - j = i + 1 next_character = data[j] if j < len(data) else None while next_character is not None and (re.match(PARAGRAPH_ENDINGS, next_character) or next_character == " "): @@ -25,35 +34,44 @@ def chunk_by_word(data: str): next_character = data[j] if j < len(data) else None if next_character and next_character.isupper(): return True - return False - + if re.match(PARAGRAPH_ENDINGS, character): - yield (word, "paragraph_end" if is_real_paragraph_end() else "word") - word = "" - i = i + 1 + if current_chunk: + yield (current_chunk, "word") + current_chunk = "" + yield (character, "paragraph_end" if is_real_paragraph_end() else "word") + i += 1 continue - - if character == " ": - yield [word, "word"] - word = "" - i = i + 1 - continue - - word += character + + current_chunk += character last_processed_character = character - + + if character == " ": + yield (current_chunk, "word") + current_chunk = "" + i += 1 + continue + if re.match(SENTENCE_ENDINGS, character): - # Check for ellipses. - if i + 2 <= len(data) and data[i] == "." and data[i + 1] == "." and data[i + 2] == ".": - word += ".." - i = i + 2 - - is_paragraph_end = i + 1 < len(data) and re.match(PARAGRAPH_ENDINGS, data[i + 1]) - yield (word, "paragraph_end" if is_paragraph_end else "sentence_end") - word = "" - + # Check for ellipses + if i + 2 < len(data) and data[i:i+3] == "...": + current_chunk += ".." + i += 2 + + # Look ahead for whitespace + next_i = i + 1 + while next_i < len(data) and data[next_i] == " ": + current_chunk += data[next_i] + next_i += 1 + + is_paragraph_end = next_i < len(data) and re.match(PARAGRAPH_ENDINGS, data[next_i]) + yield (current_chunk, "paragraph_end" if is_paragraph_end else "sentence_end") + current_chunk = "" + i = next_i + continue + i += 1 - - if len(word) > 0: - yield (word, "word") + + if current_chunk: + yield (current_chunk, "word") \ No newline at end of file