- Test recursive split mode - Add edge case coverage - Test parameter combinations - Verify chunk order indexing - Add integration test scenarios
520 lines
14 KiB
Python
520 lines
14 KiB
Python
import pytest
|
|
|
|
from lightrag.exceptions import ChunkTokenLimitExceededError
|
|
from lightrag.operate import chunking_by_token_size
|
|
from lightrag.utils import Tokenizer, TokenizerInterface
|
|
|
|
|
|
class DummyTokenizer(TokenizerInterface):
|
|
def encode(self, content: str):
|
|
return [ord(ch) for ch in content]
|
|
|
|
def decode(self, tokens):
|
|
return "".join(chr(token) for token in tokens)
|
|
|
|
|
|
def make_tokenizer() -> Tokenizer:
|
|
return Tokenizer(model_name="dummy", tokenizer=DummyTokenizer())
|
|
|
|
|
|
# ============================================================================
|
|
# Tests for split_by_character_only=True (raises error on oversized chunks)
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_by_character_only_within_limit():
|
|
"""Test chunking when all chunks are within token limit."""
|
|
tokenizer = make_tokenizer()
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
"alpha\n\nbeta",
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert [chunk["content"] for chunk in chunks] == ["alpha", "beta"]
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_by_character_only_exceeding_limit_raises():
|
|
"""Test that oversized chunks raise ChunkTokenLimitExceededError."""
|
|
tokenizer = make_tokenizer()
|
|
oversized = "a" * 12
|
|
|
|
with pytest.raises(ChunkTokenLimitExceededError) as excinfo:
|
|
chunking_by_token_size(
|
|
tokenizer,
|
|
oversized,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=5,
|
|
)
|
|
|
|
err = excinfo.value
|
|
assert err.chunk_tokens == len(oversized)
|
|
assert err.chunk_token_limit == 5
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_chunk_error_includes_preview():
|
|
"""Test that error message includes chunk preview."""
|
|
tokenizer = make_tokenizer()
|
|
oversized = "x" * 100
|
|
|
|
with pytest.raises(ChunkTokenLimitExceededError) as excinfo:
|
|
chunking_by_token_size(
|
|
tokenizer,
|
|
oversized,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
err = excinfo.value
|
|
# Preview should be first 80 chars of a 100-char string
|
|
assert err.chunk_preview == "x" * 80
|
|
assert "Preview:" in str(err)
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_by_character_only_at_exact_limit():
|
|
"""Test chunking when chunk is exactly at token limit."""
|
|
tokenizer = make_tokenizer()
|
|
exact_size = "a" * 10
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
exact_size,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 1
|
|
assert chunks[0]["content"] == exact_size
|
|
assert chunks[0]["tokens"] == 10
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_by_character_only_one_over_limit():
|
|
"""Test that chunk with one token over limit raises error."""
|
|
tokenizer = make_tokenizer()
|
|
one_over = "a" * 11
|
|
|
|
with pytest.raises(ChunkTokenLimitExceededError) as excinfo:
|
|
chunking_by_token_size(
|
|
tokenizer,
|
|
one_over,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
err = excinfo.value
|
|
assert err.chunk_tokens == 11
|
|
assert err.chunk_token_limit == 10
|
|
|
|
|
|
# ============================================================================
|
|
# Tests for split_by_character_only=False (recursive splitting)
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_recursive_oversized_chunk():
|
|
"""Test recursive splitting of oversized chunk with split_by_character_only=False."""
|
|
tokenizer = make_tokenizer()
|
|
# 30 chars - should split into chunks of size 10
|
|
oversized = "a" * 30
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
oversized,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
# Should create 3 chunks of 10 tokens each
|
|
assert len(chunks) == 3
|
|
assert all(chunk["tokens"] == 10 for chunk in chunks)
|
|
assert all(chunk["content"] == "a" * 10 for chunk in chunks)
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_with_chunk_overlap():
|
|
"""Test chunk splitting with overlap."""
|
|
tokenizer = make_tokenizer()
|
|
# 25 chars
|
|
content = "a" * 25
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=3,
|
|
)
|
|
|
|
# With overlap of 3, chunks start at: 0, 7, 14, 21
|
|
# Chunk 1: [0:10] = 10 tokens
|
|
# Chunk 2: [7:17] = 10 tokens
|
|
# Chunk 3: [14:24] = 10 tokens
|
|
# Chunk 4: [21:25] = 4 tokens
|
|
assert len(chunks) == 4
|
|
assert chunks[0]["tokens"] == 10
|
|
assert chunks[1]["tokens"] == 10
|
|
assert chunks[2]["tokens"] == 10
|
|
assert chunks[3]["tokens"] == 4
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_multiple_chunks_with_mixed_sizes():
|
|
"""Test splitting text with multiple chunks of different sizes."""
|
|
tokenizer = make_tokenizer()
|
|
# "small\n\nlarge_chunk_here\n\nmedium"
|
|
# small: 5 tokens, large_chunk_here: 16 tokens, medium: 6 tokens
|
|
content = "small\n\n" + "a" * 16 + "\n\nmedium"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=2,
|
|
)
|
|
|
|
# First chunk "small" should be kept as is (5 tokens)
|
|
# Second chunk (16 tokens) should be split into 2 chunks
|
|
# Third chunk "medium" should be kept as is (6 tokens)
|
|
assert len(chunks) == 4
|
|
assert chunks[0]["content"] == "small"
|
|
assert chunks[0]["tokens"] == 5
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_exact_boundary():
|
|
"""Test splitting at exact chunk boundaries."""
|
|
tokenizer = make_tokenizer()
|
|
# Exactly 20 chars, should split into 2 chunks of 10
|
|
content = "a" * 20
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
assert len(chunks) == 2
|
|
assert chunks[0]["tokens"] == 10
|
|
assert chunks[1]["tokens"] == 10
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_split_very_large_text():
|
|
"""Test splitting very large text into multiple chunks."""
|
|
tokenizer = make_tokenizer()
|
|
# 100 chars should create 10 chunks with chunk_size=10, overlap=0
|
|
content = "a" * 100
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
assert len(chunks) == 10
|
|
assert all(chunk["tokens"] == 10 for chunk in chunks)
|
|
|
|
|
|
# ============================================================================
|
|
# Edge Cases
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_empty_content():
|
|
"""Test chunking with empty content."""
|
|
tokenizer = make_tokenizer()
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
"",
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 1
|
|
assert chunks[0]["content"] == ""
|
|
assert chunks[0]["tokens"] == 0
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_single_character():
|
|
"""Test chunking with single character."""
|
|
tokenizer = make_tokenizer()
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
"a",
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 1
|
|
assert chunks[0]["content"] == "a"
|
|
assert chunks[0]["tokens"] == 1
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_no_delimiter_in_content():
|
|
"""Test chunking when content has no delimiter."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a" * 30
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n", # Delimiter not in content
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
# Should still split based on token size
|
|
assert len(chunks) == 3
|
|
assert all(chunk["tokens"] == 10 for chunk in chunks)
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_no_split_character():
|
|
"""Test chunking without split_by_character (None)."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a" * 30
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character=None,
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
# Should split based purely on token size
|
|
assert len(chunks) == 3
|
|
assert all(chunk["tokens"] == 10 for chunk in chunks)
|
|
|
|
|
|
# ============================================================================
|
|
# Parameter Combinations
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_different_delimiter_newline():
|
|
"""Test with single newline delimiter."""
|
|
tokenizer = make_tokenizer()
|
|
content = "alpha\nbeta\ngamma"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 3
|
|
assert [c["content"] for c in chunks] == ["alpha", "beta", "gamma"]
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_different_delimiter_comma():
|
|
"""Test with comma delimiter."""
|
|
tokenizer = make_tokenizer()
|
|
content = "one,two,three"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character=",",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 3
|
|
assert [c["content"] for c in chunks] == ["one", "two", "three"]
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_zero_overlap():
|
|
"""Test with zero overlap (no overlap)."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a" * 20
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character=None,
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
# Should create exactly 2 chunks with no overlap
|
|
assert len(chunks) == 2
|
|
assert chunks[0]["tokens"] == 10
|
|
assert chunks[1]["tokens"] == 10
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_large_overlap():
|
|
"""Test with overlap close to chunk size."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a" * 30
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character=None,
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=9,
|
|
)
|
|
|
|
# With overlap=9, chunks start at: 0, 1, 2, 3...
|
|
# Step size = chunk_size - overlap = 10 - 9 = 1
|
|
# So we get: [0:10], [1:11], [2:12], ..., [29:30]
|
|
# range(0, 30, 1) = 0 to 29, so 30 chunks total
|
|
assert len(chunks) == 30
|
|
|
|
|
|
# ============================================================================
|
|
# Chunk Order Index Tests
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_chunk_order_index_simple():
|
|
"""Test that chunk_order_index is correctly assigned."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a\n\nb\n\nc"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=10,
|
|
)
|
|
|
|
assert len(chunks) == 3
|
|
assert chunks[0]["chunk_order_index"] == 0
|
|
assert chunks[1]["chunk_order_index"] == 1
|
|
assert chunks[2]["chunk_order_index"] == 2
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_chunk_order_index_with_splitting():
|
|
"""Test chunk_order_index with recursive splitting."""
|
|
tokenizer = make_tokenizer()
|
|
content = "a" * 30
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character=None,
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=0,
|
|
)
|
|
|
|
assert len(chunks) == 3
|
|
assert chunks[0]["chunk_order_index"] == 0
|
|
assert chunks[1]["chunk_order_index"] == 1
|
|
assert chunks[2]["chunk_order_index"] == 2
|
|
|
|
|
|
# ============================================================================
|
|
# Integration Tests
|
|
# ============================================================================
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_mixed_size_chunks_no_error():
|
|
"""Test that mixed size chunks work without error in recursive mode."""
|
|
tokenizer = make_tokenizer()
|
|
# Mix of small and large chunks
|
|
content = "small\n\n" + "a" * 50 + "\n\nmedium"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=False,
|
|
chunk_token_size=10,
|
|
chunk_overlap_token_size=2,
|
|
)
|
|
|
|
# Should handle all chunks without error
|
|
assert len(chunks) > 0
|
|
# Small chunk should remain intact
|
|
assert chunks[0]["content"] == "small"
|
|
# Large chunk should be split into multiple pieces
|
|
assert any(chunk["content"] == "a" * 10 for chunk in chunks)
|
|
# Last chunk should contain "medium"
|
|
assert any("medium" in chunk["content"] for chunk in chunks)
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_whitespace_handling():
|
|
"""Test that whitespace is properly handled in chunk content."""
|
|
tokenizer = make_tokenizer()
|
|
content = " alpha \n\n beta "
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=20,
|
|
)
|
|
|
|
# Content should be stripped
|
|
assert chunks[0]["content"] == "alpha"
|
|
assert chunks[1]["content"] == "beta"
|
|
|
|
|
|
@pytest.mark.offline
|
|
def test_consecutive_delimiters():
|
|
"""Test handling of consecutive delimiters."""
|
|
tokenizer = make_tokenizer()
|
|
content = "alpha\n\n\n\nbeta"
|
|
|
|
chunks = chunking_by_token_size(
|
|
tokenizer,
|
|
content,
|
|
split_by_character="\n\n",
|
|
split_by_character_only=True,
|
|
chunk_token_size=20,
|
|
)
|
|
|
|
# Should split on delimiter and include empty chunks
|
|
assert len(chunks) >= 2
|
|
assert "alpha" in [c["content"] for c in chunks]
|
|
assert "beta" in [c["content"] for c in chunks]
|