Merge pull request #205 from topoteretes/COG-417-chunking-unit-tests

Cog 417 chunking unit tests
This commit is contained in:
0xideas 2024-11-18 12:52:44 +01:00 committed by GitHub
commit ced5385186
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 762 additions and 157 deletions

View file

@ -20,5 +20,8 @@ class DataPoint(BaseModel):
def get_embeddable_data(self):
if self._metadata and len(self._metadata["index_fields"]) > 0 \
and hasattr(self, self._metadata["index_fields"][0]):
return getattr(self, self._metadata["index_fields"][0])
attribute = getattr(self, self._metadata["index_fields"][0])
if isinstance(attribute, str):
return(attribute.strip())
else:
return (attribute)

View file

@ -9,7 +9,6 @@ class TextChunker():
chunk_index = 0
chunk_size = 0
paragraph_chunks = []
def __init__(self, document, get_text: callable, chunk_size: int = 1024):
self.document = document
@ -17,7 +16,7 @@ class TextChunker():
self.get_text = get_text
def read(self):
self.paragraph_chunks = []
paragraph_chunks = []
for content_text in self.get_text():
for chunk_data in chunk_by_paragraph(
content_text,
@ -25,10 +24,10 @@ class TextChunker():
batch_paragraphs = True,
):
if self.chunk_size + chunk_data["word_count"] <= self.max_chunk_size:
self.paragraph_chunks.append(chunk_data)
paragraph_chunks.append(chunk_data)
self.chunk_size += chunk_data["word_count"]
else:
if len(self.paragraph_chunks) == 0:
if len(paragraph_chunks) == 0:
yield DocumentChunk(
id = chunk_data["chunk_id"],
text = chunk_data["text"],
@ -37,10 +36,10 @@ class TextChunker():
chunk_index = self.chunk_index,
cut_type = chunk_data["cut_type"],
)
self.paragraph_chunks = []
paragraph_chunks = []
self.chunk_size = 0
else:
chunk_text = " ".join(chunk["text"] for chunk in self.paragraph_chunks)
chunk_text = " ".join(chunk["text"] for chunk in paragraph_chunks)
try:
yield DocumentChunk(
id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"),
@ -48,24 +47,24 @@ class TextChunker():
word_count = self.chunk_size,
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"],
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
)
except Exception as e:
print(e)
self.paragraph_chunks = [chunk_data]
paragraph_chunks = [chunk_data]
self.chunk_size = chunk_data["word_count"]
self.chunk_index += 1
if len(self.paragraph_chunks) > 0:
if len(paragraph_chunks) > 0:
try:
yield DocumentChunk(
id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"),
text = " ".join(chunk["text"] for chunk in self.paragraph_chunks),
text = " ".join(chunk["text"] for chunk in paragraph_chunks),
word_count = self.chunk_size,
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"],
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
)
except Exception as e:
print(e)

View file

@ -5,11 +5,15 @@ from .Document import Document
class AudioDocument(Document):
type: str = "audio"
def create_transcript(self):
result = get_llm_client().create_transcript(self.raw_data_location)
return(result.text)
def read(self, chunk_size: int):
# Transcribe the audio file
result = get_llm_client().create_transcript(self.raw_data_location)
text = result.text
text = self.create_transcript()
chunker = TextChunker(self, chunk_size = chunk_size, get_text = lambda: text)
chunker = TextChunker(self, chunk_size = chunk_size, get_text = lambda: [text])
yield from chunker.read()

View file

@ -5,11 +5,15 @@ from .Document import Document
class ImageDocument(Document):
type: str = "image"
def transcribe_image(self):
result = get_llm_client().transcribe_image(self.raw_data_location)
return(result.choices[0].message.content)
def read(self, chunk_size: int):
# Transcribe the image file
result = get_llm_client().transcribe_image(self.raw_data_location)
text = result.choices[0].message.content
text = self.transcribe_image()
chunker = TextChunker(self, chunk_size = chunk_size, get_text = lambda: text)
chunker = TextChunker(self, chunk_size = chunk_size, get_text = lambda: [text])
yield from chunker.read()

View file

@ -1,69 +1,72 @@
from uuid import uuid5, NAMESPACE_OID
from typing import Dict, Any, Iterator
from .chunk_by_sentence import chunk_by_sentence
def chunk_by_paragraph(data: str, paragraph_length: int = 1024, batch_paragraphs = True):
paragraph = ""
def chunk_by_paragraph(data: str, paragraph_length: int = 1024, batch_paragraphs: bool = True) -> Iterator[Dict[str, Any]]:
"""
Chunks text by paragraph while preserving exact text reconstruction capability.
When chunks are joined with empty string "", they reproduce the original text exactly.
"""
current_chunk = ""
current_word_count = 0
chunk_index = 0
paragraph_ids = []
last_cut_type = None
last_paragraph_id = None
paragraph_word_count = 0
paragraph_chunk_index = 0
for (paragraph_id, __, sentence, word_count, end_type) in chunk_by_sentence(data):
if paragraph_word_count > 0 and paragraph_word_count + word_count > paragraph_length:
if batch_paragraphs is True:
chunk_id = uuid5(NAMESPACE_OID, paragraph)
yield dict(
text = paragraph.strip(),
word_count = paragraph_word_count,
id = chunk_id, # When batching paragraphs, the paragraph_id is the same as chunk_id.
# paragraph_id doens't mean anything since multiple paragraphs are merged.
chunk_id = chunk_id,
chunk_index = paragraph_chunk_index,
cut_type = last_cut_type,
)
else:
yield dict(
text = paragraph.strip(),
word_count = paragraph_word_count,
id = last_paragraph_id,
chunk_id = uuid5(NAMESPACE_OID, paragraph),
chunk_index = paragraph_chunk_index,
cut_type = last_cut_type,
)
paragraph_chunk_index += 1
paragraph_word_count = 0
paragraph = ""
paragraph += (" " if len(paragraph) > 0 else "") + sentence
paragraph_word_count += word_count
if end_type == "paragraph_end" or end_type == "sentence_cut":
if batch_paragraphs is True:
paragraph += "\n\n" if end_type == "paragraph_end" else ""
else:
yield dict(
text = paragraph.strip(),
word_count = paragraph_word_count,
paragraph_id = paragraph_id,
chunk_id = uuid5(NAMESPACE_OID, paragraph),
chunk_index = paragraph_chunk_index,
cut_type = end_type,
)
paragraph_chunk_index = 0
paragraph_word_count = 0
paragraph = ""
for paragraph_id, sentence, word_count, end_type in chunk_by_sentence(data, maximum_length=paragraph_length):
# Check if this sentence would exceed length limit
if current_word_count > 0 and current_word_count + word_count > paragraph_length:
# Yield current chunk
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"paragraph_ids": paragraph_ids,
"chunk_index": chunk_index,
"cut_type": last_cut_type,
}
yield chunk_dict
# Start new chunk with current sentence
paragraph_ids = []
current_chunk = ""
current_word_count = 0
chunk_index += 1
paragraph_ids.append(paragraph_id)
current_chunk += sentence
current_word_count += word_count
# Handle end of paragraph
if end_type in ("paragraph_end", "sentence_cut") and not batch_paragraphs:
# For non-batch mode, yield each paragraph separately
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"paragraph_ids": paragraph_ids,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"chunk_index": chunk_index,
"cut_type": end_type
}
yield chunk_dict
paragraph_ids = []
current_chunk = ""
current_word_count = 0
chunk_index += 1
last_cut_type = end_type
last_paragraph_id = paragraph_id
if len(paragraph) > 0:
yield dict(
chunk_id = uuid5(NAMESPACE_OID, paragraph),
text = paragraph,
word_count = paragraph_word_count,
paragraph_id = last_paragraph_id,
chunk_index = paragraph_chunk_index,
cut_type = last_cut_type,
)
# Yield any remaining text
if current_chunk:
chunk_dict = {
"text": current_chunk,
"word_count": current_word_count,
"chunk_id": uuid5(NAMESPACE_OID, current_chunk),
"paragraph_ids": paragraph_ids,
"chunk_index": chunk_index,
"cut_type": "sentence_cut" if last_cut_type == "word" else last_cut_type
}
yield chunk_dict

View file

@ -2,30 +2,43 @@
from uuid import uuid4
from typing import Optional
from .chunk_by_word import chunk_by_word
def chunk_by_sentence(data: str):
def chunk_by_sentence(data: str, maximum_length: Optional[int] = None):
sentence = ""
paragraph_id = uuid4()
chunk_index = 0
word_count = 0
section_end = False
word_type_state = None
# the yielded word_type_state is identical to word_type, except when
# the word type is 'word', the word doesn't contain any letters
# and words with the same characteristics connect it to a preceding
# word with word_type 'paragraph_end' or 'sentence_end'
for (word, word_type) in chunk_by_word(data):
sentence += (" " if len(sentence) > 0 else "") + word
sentence += word
word_count += 1
if word_type == "paragraph_end" or word_type == "sentence_end":
yield (paragraph_id, chunk_index, sentence, word_count, word_type)
if word_type in ["paragraph_end", "sentence_end"]:
word_type_state = word_type
else:
for character in word:
if character.isalpha():
word_type_state = word_type
break
if word_type in ["paragraph_end", "sentence_end"] or (maximum_length and (word_count == maximum_length)):
yield (paragraph_id, sentence, word_count, word_type_state)
sentence = ""
word_count = 0
paragraph_id = uuid4() if word_type == "paragraph_end" else paragraph_id
chunk_index = 0 if word_type == "paragraph_end" else chunk_index + 1
if len(sentence) > 0:
section_end = "sentence_cut" if word_type_state == "word" else word_type_state
yield (
paragraph_id,
chunk_index,
sentence,
word_count,
"sentence_cut",
section_end,
)

View file

@ -1,60 +1,71 @@
import re
SENTENCE_ENDINGS = r"[.;!?…]"
PARAGRAPH_ENDINGS = r"[\n\r]"
def is_real_paragraph_end(last_char: str, current_pos: int, text: str) -> bool:
"""
Determines if the current position represents a real paragraph ending.
Args:
last_char: The last processed character
current_pos: Current position in the text
text: The input text
Returns:
bool: True if this is a real paragraph end, False otherwise
"""
if re.match(SENTENCE_ENDINGS, last_char):
return True
j = current_pos + 1
if j >= len(text):
return False
next_character = text[j]
while j < len(text) and (re.match(PARAGRAPH_ENDINGS, next_character) or next_character == " "):
j += 1
if j >= len(text):
return False
next_character = text[j]
if next_character.isupper():
return True
return False
def chunk_by_word(data: str):
sentence_endings = r"[.;!?…]"
paragraph_endings = r"[\n\r]"
last_processed_character = ""
word = ""
"""
Chunks text into words and endings while preserving whitespace.
Whitespace is included with the preceding word.
Outputs can be joined with "" to recreate the original input.
"""
current_chunk = ""
i = 0
while i < len(data):
character = data[i]
if word == "" and (re.match(paragraph_endings, character) or character == " "):
i = i + 1
continue
def is_real_paragraph_end():
if re.match(sentence_endings, last_processed_character):
return True
j = i + 1
next_character = data[j] if j < len(data) else None
while next_character is not None and (re.match(paragraph_endings, next_character) or next_character == " "):
j += 1
next_character = data[j] if j < len(data) else None
if next_character and next_character.isupper():
return True
return False
if re.match(paragraph_endings, character):
yield (word, "paragraph_end" if is_real_paragraph_end() else "word")
word = ""
i = i + 1
continue
current_chunk += character
if character == " ":
yield [word, "word"]
word = ""
i = i + 1
yield (current_chunk, "word")
current_chunk = ""
i += 1
continue
word += character
last_processed_character = character
if re.match(sentence_endings, character):
# Check for ellipses.
if i + 2 <= len(data) and data[i] == "." and data[i + 1] == "." and data[i + 2] == ".":
word += ".."
i = i + 2
is_paragraph_end = i + 1 < len(data) and re.match(paragraph_endings, data[i + 1])
yield (word, "paragraph_end" if is_paragraph_end else "sentence_end")
word = ""
if re.match(SENTENCE_ENDINGS, character):
# Look ahead for whitespace
next_i = i + 1
while next_i < len(data) and data[next_i] == " ":
current_chunk += data[next_i]
next_i += 1
is_paragraph_end = next_i < len(data) and re.match(PARAGRAPH_ENDINGS, data[next_i])
yield (current_chunk, "paragraph_end" if is_paragraph_end else "sentence_end")
current_chunk = ""
i = next_i
continue
i += 1
if len(word) > 0:
yield (word, "word")
if current_chunk:
yield (current_chunk, "word")

View file

@ -0,0 +1,44 @@
import uuid
from unittest.mock import patch
from cognee.modules.data.processing.document_types.AudioDocument import AudioDocument
GROUND_TRUTH = [
{"word_count": 57, "len_text": 353, "cut_type": "sentence_end"},
{"word_count": 58, "len_text": 358, "cut_type": "sentence_end"},
{"word_count": 41, "len_text": 219, "cut_type": "sentence_end"},
]
TEST_TEXT = """
"Mike, we need to talk about the payment processing service."
"Good timing. The board wants one-click checkout by end of quarter."
"That's exactly the problem. The service is held together with duct tape. One wrong move and—"
"Sarah, we've been over this. The market won't wait."
"And neither will a system collapse! The technical debt is crushing us. Every new feature takes twice as long as it should."
"Then work twice as hard. Our competitors—"
"Our competitors will laugh when our whole system goes down during Black Friday! We're talking about financial transactions here, not some blog comments section."
"Write up your concerns in a doc. Right now, we ship one-click."
"Then you'll ship it without me. I won't stake my reputation on a house of cards."
"Are you threatening to quit?"
"No, I'm threatening to be right. And when it breaks, I want it in writing that you chose this."
"The feature ships, Sarah. That's final.\""""
def test_AudioDocument():
document = AudioDocument(
id=uuid.uuid4(), name="audio-dummy-test", raw_data_location=""
)
with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT):
for ground_truth, paragraph_data in zip(
GROUND_TRUTH, document.read(chunk_size=64)
):
assert (
ground_truth["word_count"] == paragraph_data.word_count
), f'{ground_truth["word_count"] = } != {paragraph_data.word_count = }'
assert ground_truth["len_text"] == len(
paragraph_data.text
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
assert (
ground_truth["cut_type"] == paragraph_data.cut_type
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'

View file

@ -0,0 +1,34 @@
import uuid
from unittest.mock import patch
from cognee.modules.data.processing.document_types.ImageDocument import ImageDocument
GROUND_TRUTH = [
{"word_count": 51, "len_text": 298, "cut_type": "sentence_end"},
{"word_count": 62, "len_text": 369, "cut_type": "sentence_end"},
{"word_count": 44, "len_text": 294, "cut_type": "sentence_end"},
]
TEST_TEXT = """A dramatic confrontation unfolds as a red fox and river otter engage in an energetic wrestling match at the water's edge. The fox, teeth bared in a playful snarl, has its front paws locked with the otter's flippers as they roll through the shallow stream, sending water spraying in all directions. The otter, displaying its surprising agility on land, counters by twisting its sleek body and attempting to wrap itself around the fox's shoulders, its whiskered face inches from the fox's muzzle.
The commotion has attracted an audience: a murder of crows has gathered in the low branches, their harsh calls adding to the chaos as they hop excitedly from limb to limb. One particularly bold crow dive-bombs the wrestling pair, causing both animals to momentarily freeze mid-tussle, creating a perfect snapshot of suspended actionthe fox's fur dripping wet, the otter's body coiled like a spring, and the crow's wings spread wide against the golden morning light."""
def test_ImageDocument():
document = ImageDocument(
id=uuid.uuid4(), name="image-dummy-test", raw_data_location=""
)
with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT):
for ground_truth, paragraph_data in zip(
GROUND_TRUTH, document.read(chunk_size=64)
):
assert (
ground_truth["word_count"] == paragraph_data.word_count
), f'{ground_truth["word_count"] = } != {paragraph_data.word_count = }'
assert ground_truth["len_text"] == len(
paragraph_data.text
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
assert (
ground_truth["cut_type"] == paragraph_data.cut_type
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'

View file

@ -4,8 +4,8 @@ import uuid
from cognee.modules.data.processing.document_types.PdfDocument import PdfDocument
GROUND_TRUTH = [
{"word_count": 879, "len_text": 5622, "cut_type": "sentence_end"},
{"word_count": 951, "len_text": 6384, "cut_type": "sentence_end"},
{"word_count": 879, "len_text": 5607, "cut_type": "sentence_end"},
{"word_count": 953, "len_text": 6363, "cut_type": "sentence_end"},
]
@ -16,12 +16,12 @@ def test_PdfDocument():
"test_data",
"artificial-intelligence.pdf",
)
pdf_doc = PdfDocument(
document = PdfDocument(
id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path
)
for ground_truth, paragraph_data in zip(
GROUND_TRUTH, pdf_doc.read(chunk_size=1024)
GROUND_TRUTH, document.read(chunk_size=1024)
):
assert (
ground_truth["word_count"] == paragraph_data.word_count

View file

@ -0,0 +1,46 @@
import os
import uuid
import pytest
from cognee.modules.data.processing.document_types.TextDocument import TextDocument
GROUND_TRUTH = {
"code.txt": [
{"word_count": 205, "len_text": 1024, "cut_type": "sentence_cut"},
{"word_count": 104, "len_text": 833, "cut_type": "paragraph_end"},
],
"Natural_language_processing.txt": [
{"word_count": 128, "len_text": 984, "cut_type": "paragraph_end"},
{"word_count": 1, "len_text": 1, "cut_type": "paragraph_end"},
],
}
@pytest.mark.parametrize(
"input_file,chunk_size",
[("code.txt", 256), ("Natural_language_processing.txt", 128)],
)
def test_TextDocument(input_file, chunk_size):
test_file_path = os.path.join(
os.sep,
*(os.path.dirname(__file__).split(os.sep)[:-2]),
"test_data",
input_file,
)
document = TextDocument(
id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path
)
for ground_truth, paragraph_data in zip(
GROUND_TRUTH[input_file], document.read(chunk_size=chunk_size)
):
assert (
ground_truth["word_count"] == paragraph_data.word_count
), f'{ground_truth["word_count"] = } != {paragraph_data.word_count = }'
assert ground_truth["len_text"] == len(
paragraph_data.text
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
assert (
ground_truth["cut_type"] == paragraph_data.cut_type
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'

View file

@ -1,7 +1,7 @@
import pytest
import numpy as np
import pytest
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge, Node
def test_node_initialization():
@ -12,11 +12,13 @@ def test_node_initialization():
assert len(node.status) == 2
assert np.all(node.status == 1)
def test_node_invalid_dimension():
"""Test that initializing a Node with a non-positive dimension raises an error."""
with pytest.raises(ValueError, match="Dimension must be a positive integer"):
Node("node1", dimension=0)
def test_add_skeleton_neighbor():
"""Test adding a neighbor to a node."""
node1 = Node("node1")
@ -24,6 +26,7 @@ def test_add_skeleton_neighbor():
node1.add_skeleton_neighbor(node2)
assert node2 in node1.skeleton_neighbours
def test_remove_skeleton_neighbor():
"""Test removing a neighbor from a node."""
node1 = Node("node1")
@ -32,6 +35,7 @@ def test_remove_skeleton_neighbor():
node1.remove_skeleton_neighbor(node2)
assert node2 not in node1.skeleton_neighbours
def test_add_skeleton_edge():
"""Test adding an edge updates both skeleton_edges and skeleton_neighbours."""
node1 = Node("node1")
@ -41,6 +45,7 @@ def test_add_skeleton_edge():
assert edge in node1.skeleton_edges
assert node2 in node1.skeleton_neighbours
def test_remove_skeleton_edge():
"""Test removing an edge updates both skeleton_edges and skeleton_neighbours."""
node1 = Node("node1")
@ -51,6 +56,7 @@ def test_remove_skeleton_edge():
assert edge not in node1.skeleton_edges
assert node2 not in node1.skeleton_neighbours
def test_is_node_alive_in_dimension():
"""Test checking node's alive status in a specific dimension."""
node = Node("node1", dimension=2)
@ -58,25 +64,30 @@ def test_is_node_alive_in_dimension():
node.status[1] = 0
assert not node.is_node_alive_in_dimension(1)
def test_node_alive_invalid_dimension():
"""Test that checking alive status with an invalid dimension raises an error."""
node = Node("node1", dimension=1)
with pytest.raises(ValueError, match="Dimension 1 is out of range"):
node.is_node_alive_in_dimension(1)
def test_node_equality():
"""Test equality between nodes."""
node1 = Node("node1")
node2 = Node("node1")
assert node1 == node2
def test_node_hash():
"""Test hashing for Node."""
node = Node("node1")
assert hash(node) == hash("node1")
### Tests for Edge ###
def test_edge_initialization():
"""Test that an Edge is initialized correctly."""
node1 = Node("node1")
@ -89,6 +100,7 @@ def test_edge_initialization():
assert len(edge.status) == 2
assert np.all(edge.status == 1)
def test_edge_invalid_dimension():
"""Test that initializing an Edge with a non-positive dimension raises an error."""
node1 = Node("node1")
@ -96,6 +108,7 @@ def test_edge_invalid_dimension():
with pytest.raises(ValueError, match="Dimensions must be a positive integer."):
Edge(node1, node2, dimension=0)
def test_is_edge_alive_in_dimension():
"""Test checking edge's alive status in a specific dimension."""
node1 = Node("node1")
@ -105,6 +118,7 @@ def test_is_edge_alive_in_dimension():
edge.status[1] = 0
assert not edge.is_edge_alive_in_dimension(1)
def test_edge_alive_invalid_dimension():
"""Test that checking alive status with an invalid dimension raises an error."""
node1 = Node("node1")
@ -113,6 +127,7 @@ def test_edge_alive_invalid_dimension():
with pytest.raises(ValueError, match="Dimension 1 is out of range"):
edge.is_edge_alive_in_dimension(1)
def test_edge_equality_directed():
"""Test equality between directed edges."""
node1 = Node("node1")
@ -121,6 +136,7 @@ def test_edge_equality_directed():
edge2 = Edge(node1, node2, directed=True)
assert edge1 == edge2
def test_edge_equality_undirected():
"""Test equality between undirected edges."""
node1 = Node("node1")
@ -129,6 +145,7 @@ def test_edge_equality_undirected():
edge2 = Edge(node2, node1, directed=False)
assert edge1 == edge2
def test_edge_hash_directed():
"""Test hashing for directed edges."""
node1 = Node("node1")
@ -136,9 +153,10 @@ def test_edge_hash_directed():
edge = Edge(node1, node2, directed=True)
assert hash(edge) == hash((node1, node2))
def test_edge_hash_undirected():
"""Test hashing for undirected edges."""
node1 = Node("node1")
node2 = Node("node2")
edge = Edge(node1, node2, directed=False)
assert hash(edge) == hash(frozenset({node1, node2}))
assert hash(edge) == hash(frozenset({node1, node2}))

View file

@ -1,7 +1,7 @@
import pytest
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Node, Edge
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge, Node
@pytest.fixture
@ -9,6 +9,7 @@ def setup_graph():
"""Fixture to initialize a CogneeGraph instance."""
return CogneeGraph()
def test_add_node_success(setup_graph):
"""Test successful addition of a node."""
graph = setup_graph
@ -16,6 +17,7 @@ def test_add_node_success(setup_graph):
graph.add_node(node)
assert graph.get_node("node1") == node
def test_add_duplicate_node(setup_graph):
"""Test adding a duplicate node raises an exception."""
graph = setup_graph
@ -24,6 +26,7 @@ def test_add_duplicate_node(setup_graph):
with pytest.raises(ValueError, match="Node with id node1 already exists."):
graph.add_node(node)
def test_add_edge_success(setup_graph):
"""Test successful addition of an edge."""
graph = setup_graph
@ -37,6 +40,7 @@ def test_add_edge_success(setup_graph):
assert edge in node1.skeleton_edges
assert edge in node2.skeleton_edges
def test_add_duplicate_edge(setup_graph):
"""Test adding a duplicate edge raises an exception."""
graph = setup_graph
@ -49,6 +53,7 @@ def test_add_duplicate_edge(setup_graph):
with pytest.raises(ValueError, match="Edge .* already exists in the graph."):
graph.add_edge(edge)
def test_get_node_success(setup_graph):
"""Test retrieving an existing node."""
graph = setup_graph
@ -56,11 +61,13 @@ def test_get_node_success(setup_graph):
graph.add_node(node)
assert graph.get_node("node1") == node
def test_get_node_nonexistent(setup_graph):
"""Test retrieving a nonexistent node returns None."""
graph = setup_graph
assert graph.get_node("nonexistent") is None
def test_get_edges_success(setup_graph):
"""Test retrieving edges of a node."""
graph = setup_graph
@ -72,6 +79,7 @@ def test_get_edges_success(setup_graph):
graph.add_edge(edge)
assert edge in graph.get_edges("node1")
def test_get_edges_nonexistent_node(setup_graph):
"""Test retrieving edges for a nonexistent node raises an exception."""
graph = setup_graph

View file

@ -0,0 +1,53 @@
from itertools import product
import numpy as np
import pytest
from cognee.tasks.chunks import chunk_by_paragraph, chunk_by_word
from cognee.tests.unit.processing.chunks.test_input import INPUT_TEXTS
paragraph_lengths = [64, 256, 1024]
batch_paragraphs_vals = [True, False]
@pytest.mark.parametrize(
"input_text,paragraph_length,batch_paragraphs",
list(product(list(INPUT_TEXTS.values()), paragraph_lengths, batch_paragraphs_vals)),
)
def test_chunk_by_paragraph_isomorphism(input_text, paragraph_length, batch_paragraphs):
chunks = chunk_by_paragraph(input_text, paragraph_length, batch_paragraphs)
reconstructed_text = "".join([chunk["text"] for chunk in chunks])
assert (
reconstructed_text == input_text
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
@pytest.mark.parametrize(
"input_text,paragraph_length,batch_paragraphs",
list(product(list(INPUT_TEXTS.values()), paragraph_lengths, batch_paragraphs_vals)),
)
def test_paragraph_chunk_length(input_text, paragraph_length, batch_paragraphs):
chunks = list(chunk_by_paragraph(input_text, paragraph_length, batch_paragraphs))
chunk_lengths = np.array(
[len(list(chunk_by_word(chunk["text"]))) for chunk in chunks]
)
larger_chunks = chunk_lengths[chunk_lengths > paragraph_length]
assert np.all(
chunk_lengths <= paragraph_length
), f"{paragraph_length = }: {larger_chunks} are too large"
@pytest.mark.parametrize(
"input_text,paragraph_length,batch_paragraphs",
list(product(list(INPUT_TEXTS.values()), paragraph_lengths, batch_paragraphs_vals)),
)
def test_chunk_by_paragraph_chunk_numbering(
input_text, paragraph_length, batch_paragraphs
):
chunks = chunk_by_paragraph(input_text, paragraph_length, batch_paragraphs)
chunk_indices = np.array([chunk["chunk_index"] for chunk in chunks])
assert np.all(
chunk_indices == np.arange(len(chunk_indices))
), f"{chunk_indices = } are not monotonically increasing"

View file

@ -8,12 +8,12 @@ GROUND_TRUTH = {
"cut_type": "paragraph_end",
},
{
"text": "This is a second paragraph. First two paragraphs are whole.",
"text": "\nThis is a second paragraph. First two paragraphs are whole.",
"word_count": 10,
"cut_type": "paragraph_end",
},
{
"text": "Third paragraph is a bit longer and is finished with a dot.",
"text": "\nThird paragraph is a bit longer and is finished with a dot.",
"word_count": 12,
"cut_type": "sentence_end",
},
@ -25,12 +25,12 @@ GROUND_TRUTH = {
"cut_type": "paragraph_end",
},
{
"text": "This is a second paragraph. First two paragraphs are whole.",
"text": "\nThis is a second paragraph. First two paragraphs are whole.",
"word_count": 10,
"cut_type": "paragraph_end",
},
{
"text": "Third paragraph is cut and is missing the dot at the end",
"text": "\nThird paragraph is cut and is missing the dot at the end",
"word_count": 12,
"cut_type": "sentence_cut",
},
@ -39,11 +39,11 @@ GROUND_TRUTH = {
INPUT_TEXT = {
"whole_text": """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is a bit longer and is finished with a dot.""",
This is a second paragraph. First two paragraphs are whole.
Third paragraph is a bit longer and is finished with a dot.""",
"cut_text": """This is example text. It contains multiple sentences.
This is a second paragraph. First two paragraphs are whole.
Third paragraph is cut and is missing the dot at the end""",
This is a second paragraph. First two paragraphs are whole.
Third paragraph is cut and is missing the dot at the end""",
}

View file

@ -0,0 +1,41 @@
from itertools import product
import numpy as np
import pytest
from cognee.tasks.chunks import chunk_by_sentence, chunk_by_word
from cognee.tests.unit.processing.chunks.test_input import INPUT_TEXTS
maximum_length_vals = [None, 8, 64]
@pytest.mark.parametrize(
"input_text,maximum_length",
list(product(list(INPUT_TEXTS.values()), maximum_length_vals)),
)
def test_chunk_by_sentence_isomorphism(input_text, maximum_length):
chunks = chunk_by_sentence(input_text, maximum_length)
reconstructed_text = "".join([chunk[1] for chunk in chunks])
assert (
reconstructed_text == input_text
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
@pytest.mark.parametrize(
"input_text,maximum_length",
list(
product(
list(INPUT_TEXTS.values()),
[val for val in maximum_length_vals if val is not None],
)
),
)
def test_paragraph_chunk_length(input_text, maximum_length):
chunks = list(chunk_by_sentence(input_text, maximum_length))
chunk_lengths = np.array([len(list(chunk_by_word(chunk[1]))) for chunk in chunks])
larger_chunks = chunk_lengths[chunk_lengths > maximum_length]
assert np.all(
chunk_lengths <= maximum_length
), f"{maximum_length = }: {larger_chunks} are too large"

View file

@ -0,0 +1,40 @@
import numpy as np
import pytest
from cognee.tasks.chunks import chunk_by_word
from cognee.tests.unit.processing.chunks.test_input import INPUT_TEXTS
@pytest.mark.parametrize(
"input_text",
[
INPUT_TEXTS["english_text"],
INPUT_TEXTS["english_lists"],
INPUT_TEXTS["python_code"],
INPUT_TEXTS["chinese_text"],
],
)
def test_chunk_by_word_isomorphism(input_text):
chunks = chunk_by_word(input_text)
reconstructed_text = "".join([chunk[0] for chunk in chunks])
assert (
reconstructed_text == input_text
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
@pytest.mark.parametrize(
"input_text",
[
INPUT_TEXTS["english_text"],
INPUT_TEXTS["english_lists"],
INPUT_TEXTS["python_code"],
INPUT_TEXTS["chinese_text"],
],
)
def test_chunk_by_word_splits(input_text):
chunks = np.array(list(chunk_by_word(input_text)))
space_test = np.array([" " not in chunk[0].strip() for chunk in chunks])
assert np.all(
space_test
), f"These chunks contain spaces within them: {chunks[space_test == False]}"

View file

@ -0,0 +1,284 @@
import pytest
INPUT_TEXTS = {
"empty": "",
"single_char": "x",
"whitespace": " \n\t \r\n ",
"unicode_special": "Hello 👋 مرحبا שָׁלוֹם",
"mixed_endings": "line1\r\nline2\nline3\r\nline4",
"many_newlines": "\n\n\n\ntext\n\n\n\n",
"html_mixed": "<p>Hello</p>\nPlain text\n<div>World</div>",
"urls_emails": "Visit https://example.com or email user@example.com",
"elipses": "Hello...How are you…",
"english_lists": """Let me think through the key attributes that would be important to test in a text chunking system.
Here are the essential attributes to test:
Chunking Boundaries Accuracy:
Proper sentence boundary detection
Handling of punctuation marks
Recognition of paragraph breaks
Treatment of special characters and whitespace
Proper handling of quotes and nested text structures
Language Support:
Handling of different languages and scripts
Support for multilingual documents
Proper Unicode handling
Treatment of language-specific punctuation
Special Cases Handling:
Lists and bullet points
Tables and structured content
Code blocks or technical content
Citations and references
Headers and footers
URLs and email addresses
Performance Metrics:
Processing speed for different text lengths
Memory usage with large documents
Scalability with increasing document size
Consistency across multiple runs
Document Format Support:
Plain text handling
HTML/XML content
PDF text extraction
Markdown formatting
Mixed format documents
Error Handling:
Malformed input text
Incomplete sentences
Truncated documents
Invalid characters
Missing punctuation
Configuration Flexibility:
Adjustable chunk sizes
Customizable boundary rules
Configurable overlap between chunks
Token vs. character-based chunking options
Preservation of Context:
Maintaining semantic coherence
Preserving contextual relationships
Handling cross-references
Maintaining document structure
Would you like me to elaborate on any of these attributes or discuss specific testing strategies for them?""",
"python_code": """from typing import (
Literal as L,
Any,
TypeAlias,
overload,
TypeVar,
Protocol,
type_check_only,
)
from numpy import generic
from numpy._typing import (
ArrayLike,
NDArray,
_ArrayLikeInt,
_ArrayLike,
)
__all__ = ["pad"]
_SCT = TypeVar("_SCT", bound=generic)
@type_check_only
class _ModeFunc(Protocol):
def __call__(
self,
vector: NDArray[Any],
iaxis_pad_width: tuple[int, int],
iaxis: int,
kwargs: dict[str, Any],
/,
) -> None: ...
_ModeKind: TypeAlias = L[
"constant",
"edge",
"linear_ramp",
"maximum",
"mean",
"median",
"minimum",
"reflect",
"symmetric",
"wrap",
"empty",
]
# TODO: In practice each keyword argument is exclusive to one or more
# specific modes. Consider adding more overloads to express this in the future.
# Expand `**kwargs` into explicit keyword-only arguments
@overload
def pad(
array: _ArrayLike[_SCT],
pad_width: _ArrayLikeInt,
mode: _ModeKind = ...,
*,
stat_length: None | _ArrayLikeInt = ...,
constant_values: ArrayLike = ...,
end_values: ArrayLike = ...,
reflect_type: L["odd", "even"] = ...,
) -> NDArray[_SCT]: ...
@overload
def pad(
array: ArrayLike,
pad_width: _ArrayLikeInt,
mode: _ModeKind = ...,
*,
stat_length: None | _ArrayLikeInt = ...,
constant_values: ArrayLike = ...,
end_values: ArrayLike = ...,
reflect_type: L["odd", "even"] = ...,
) -> NDArray[Any]: ...
@overload
def pad(
array: _ArrayLike[_SCT],
pad_width: _ArrayLikeInt,
mode: _ModeFunc,
**kwargs: Any,
) -> NDArray[_SCT]: ...
@overload
def pad(
array: ArrayLike,
pad_width: _ArrayLikeInt,
mode: _ModeFunc,
**kwargs: Any,
) -> NDArray[Any]: ...""",
"chinese_text": """在这个繁华的城市里,藏着一个古老的小巷,名叫杨柳巷。巷子两旁的青石板路已经被无数行人的脚步磨得发亮,斑驳的老墙上爬满了常青藤,给这个充满历史气息的小巷增添了一抹生机。每天清晨,巷子里都会飘出阵阵香气,那是张婆婆家的早点铺子散发出的包子和豆浆的味道。老店门前经常排着长队,有步履匆匆的上班族,也有悠闲散步的老人。巷子深处有一家传统的茶馆,古色古香的木桌椅上总是坐满了品茶聊天的街坊邻里。傍晚时分,夕阳的余晖洒在石板路上,为这个充满生活气息的小巷染上一层温暖的金色。街角的老榕树下,常常有卖唱的艺人在这里驻足,用沧桑的嗓音讲述着这座城市的故事。偶尔,还能看到三三两两的游客举着相机,试图捕捉这里独特的市井风情。这条看似普通的小巷,承载着太多市民的回忆和岁月的痕迹,它就像是这座城市的一个缩影,悄悄地诉说着曾经的故事。""",
"english_text": """O for that warning voice, which he who saw
Th' Apocalyps, heard cry in Heaven aloud,
Then when the Dragon, put to second rout,
Came furious down to be reveng'd on men,
Wo to the inhabitants on Earth! that now, [ 5 ]
While time was, our first-Parents had bin warnd
The coming of thir secret foe, and scap'd
Haply so scap'd his mortal snare; for now
Satan, now first inflam'd with rage, came down,
The Tempter ere th' Accuser of man-kind, [ 10 ]
To wreck on innocent frail man his loss
Of that first Battel, and his flight to Hell:
Yet not rejoycing in his speed, though bold,
Far off and fearless, nor with cause to boast,
Begins his dire attempt, which nigh the birth [ 15 ]
Now rowling, boiles in his tumultuous brest,
And like a devillish Engine back recoiles
Upon himself; horror and doubt distract
His troubl'd thoughts, and from the bottom stirr
The Hell within him, for within him Hell [ 20 ]
He brings, and round about him, nor from Hell
One step no more then from himself can fly
By change of place: Now conscience wakes despair
That slumberd, wakes the bitter memorie
Of what he was, what is, and what must be [ 25 ]
Worse; of worse deeds worse sufferings must ensue.
Sometimes towards Eden which now in his view
Lay pleasant, his grievd look he fixes sad,
Sometimes towards Heav'n and the full-blazing Sun,
Which now sat high in his Meridian Towre: [ 30 ]
Then much revolving, thus in sighs began.
O thou that with surpassing Glory crownd,
Look'st from thy sole Dominion like the God
Of this new World; at whose sight all the Starrs
Hide thir diminisht heads; to thee I call, [ 35 ]
But with no friendly voice, and add thy name
O Sun, to tell thee how I hate thy beams
That bring to my remembrance from what state
I fell, how glorious once above thy Spheare;
Till Pride and worse Ambition threw me down [ 40 ]
Warring in Heav'n against Heav'ns matchless King:
Ah wherefore! he deservd no such return
From me, whom he created what I was
In that bright eminence, and with his good
Upbraided none; nor was his service hard. [ 45 ]
What could be less then to afford him praise,
The easiest recompence, and pay him thanks,
How due! yet all his good prov'd ill in me,
And wrought but malice; lifted up so high
I sdeind subjection, and thought one step higher [ 50 ]
Would set me highest, and in a moment quit
The debt immense of endless gratitude,
So burthensome, still paying, still to ow;
Forgetful what from him I still receivd,
And understood not that a grateful mind [ 55 ]
By owing owes not, but still pays, at once
Indebted and dischargd; what burden then?
O had his powerful Destiny ordaind
Me some inferiour Angel, I had stood
Then happie; no unbounded hope had rais'd [ 60 ]
Ambition. Yet why not? som other Power
As great might have aspir'd, and me though mean
Drawn to his part; but other Powers as great
Fell not, but stand unshak'n, from within
Or from without, to all temptations arm'd. [ 65 ]
Hadst thou the same free Will and Power to stand?
Thou hadst: whom hast thou then or what to accuse,
But Heav'ns free Love dealt equally to all?
Be then his Love accurst, since love or hate,
To me alike, it deals eternal woe. [ 70 ]
Nay curs'd be thou; since against his thy will
Chose freely what it now so justly rues.
Me miserable! which way shall I flie
Infinite wrauth, and infinite despaire?
Which way I flie is Hell; my self am Hell; [ 75 ]
And in the lowest deep a lower deep
Still threatning to devour me opens wide,
To which the Hell I suffer seems a Heav'n.
O then at last relent: is there no place
Left for Repentance, none for Pardon left? [ 80 ]
None left but by submission; and that word
Disdain forbids me, and my dread of shame
Among the Spirits beneath, whom I seduc'd
With other promises and other vaunts
Then to submit, boasting I could subdue [ 85 ]
Th' Omnipotent. Ay me, they little know
How dearly I abide that boast so vaine,
Under what torments inwardly I groane:
While they adore me on the Throne of Hell,
With Diadem and Sceptre high advanc'd [ 90 ]
The lower still I fall, onely Supream
In miserie; such joy Ambition findes.
But say I could repent and could obtaine
By Act of Grace my former state; how soon
Would higth recall high thoughts, how soon unsay [ 95 ]
What feign'd submission swore: ease would recant
Vows made in pain, as violent and void.
For never can true reconcilement grow
Where wounds of deadly hate have peirc'd so deep:
Which would but lead me to a worse relapse [ 100 ]""",
}