* Update cognify and the networkx client to prepare for running in Neo4j * Fix for openai model * Add the fix to the infra so that the models can be passed to the library. Enable llm_provider to be passed. * Auto graph generation now works with neo4j * Added fixes for both neo4j and networkx * Explicitly name semantic node connections * Added updated docs, readme, chunkers and updates to cognify * Make docs build trigger only when changes on it happen * Update docs, test git actions * Separate cognify logic into tasks * Introduce dspy knowledge graph extraction --------- Co-authored-by: Boris Arzentar <borisarzentar@gmail.com>
243 lines
No EOL
8.6 KiB
Python
243 lines
No EOL
8.6 KiB
Python
""" Chunking strategies for splitting text into smaller parts."""
|
|
from __future__ import annotations
|
|
from cognee.shared.data_models import ChunkStrategy
|
|
import re
|
|
|
|
from typing import Any, List, Optional
|
|
|
|
|
|
class CharacterTextSplitter():
|
|
"""Splitting text that looks at characters."""
|
|
|
|
def __init__(
|
|
self, separator: str = "\n\n", is_separator_regex: bool = False, **kwargs: Any
|
|
) -> None:
|
|
"""Create a new TextSplitter."""
|
|
super().__init__(**kwargs)
|
|
self._separator = separator
|
|
self._is_separator_regex = is_separator_regex
|
|
|
|
def split_text(self, text: str) -> List[str]:
|
|
"""Split incoming text and return chunks."""
|
|
# First we naively split the large input into a bunch of smaller ones.
|
|
separator = (
|
|
self._separator if self._is_separator_regex else re.escape(self._separator)
|
|
)
|
|
splits = _split_text_with_regex(text, separator, self._keep_separator)
|
|
_separator = "" if self._keep_separator else self._separator
|
|
return self._merge_splits(splits, _separator)
|
|
|
|
|
|
def _split_text_with_regex(
|
|
text: str, separator: str, keep_separator: bool
|
|
) -> List[str]:
|
|
# Now that we have the separator, split the text
|
|
if separator:
|
|
if keep_separator:
|
|
# The parentheses in the pattern keep the delimiters in the result.
|
|
_splits = re.split(f"({separator})", text)
|
|
splits = [_splits[i] + _splits[i + 1] for i in range(1, len(_splits), 2)]
|
|
if len(_splits) % 2 == 0:
|
|
splits += _splits[-1:]
|
|
splits = [_splits[0]] + splits
|
|
else:
|
|
splits = re.split(separator, text)
|
|
else:
|
|
splits = list(text)
|
|
return [s for s in splits if s != ""]
|
|
|
|
|
|
class RecursiveCharacterTextSplitter():
|
|
"""Splitting text by recursively look at characters.
|
|
|
|
Recursively tries to split by different characters to find one
|
|
that works.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
separators: Optional[List[str]] = None,
|
|
keep_separator: bool = True,
|
|
is_separator_regex: bool = False,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Create a new TextSplitter."""
|
|
super().__init__(keep_separator=keep_separator, **kwargs)
|
|
self._separators = separators or ["\n\n", "\n", " ", ""]
|
|
self._is_separator_regex = is_separator_regex
|
|
|
|
def _split_text(self, text: str, separators: List[str]) -> List[str]:
|
|
"""Split incoming text and return chunks."""
|
|
final_chunks = []
|
|
# Get appropriate separator to use
|
|
separator = separators[-1]
|
|
new_separators = []
|
|
for i, _s in enumerate(separators):
|
|
_separator = _s if self._is_separator_regex else re.escape(_s)
|
|
if _s == "":
|
|
separator = _s
|
|
break
|
|
if re.search(_separator, text):
|
|
separator = _s
|
|
new_separators = separators[i + 1 :]
|
|
break
|
|
|
|
_separator = separator if self._is_separator_regex else re.escape(separator)
|
|
splits = _split_text_with_regex(text, _separator, self._keep_separator)
|
|
|
|
# Now go merging things, recursively splitting longer texts.
|
|
_good_splits = []
|
|
_separator = "" if self._keep_separator else separator
|
|
for s in splits:
|
|
if self._length_function(s) < self._chunk_size:
|
|
_good_splits.append(s)
|
|
else:
|
|
if _good_splits:
|
|
merged_text = self._merge_splits(_good_splits, _separator)
|
|
final_chunks.extend(merged_text)
|
|
_good_splits = []
|
|
if not new_separators:
|
|
final_chunks.append(s)
|
|
else:
|
|
other_info = self._split_text(s, new_separators)
|
|
final_chunks.extend(other_info)
|
|
if _good_splits:
|
|
merged_text = self._merge_splits(_good_splits, _separator)
|
|
final_chunks.extend(merged_text)
|
|
return final_chunks
|
|
|
|
def split_text(self, text: str) -> List[str]:
|
|
return self._split_text(text, self._separators)
|
|
|
|
def chunk_data(chunk_strategy=None, source_data=None, chunk_size=None, chunk_overlap=None):
|
|
"""
|
|
Chunk data based on the specified strategy.
|
|
|
|
Parameters:
|
|
- chunk_strategy: The strategy to use for chunking.
|
|
- source_data: The data to be chunked.
|
|
- chunk_size: The size of each chunk.
|
|
- chunk_overlap: The overlap between chunks.
|
|
|
|
Returns:
|
|
- The chunked data.
|
|
"""
|
|
|
|
if chunk_strategy == ChunkStrategy.VANILLA:
|
|
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
|
|
|
|
elif chunk_strategy == ChunkStrategy.PARAGRAPH:
|
|
chunked_data = chunk_data_by_paragraph(source_data,chunk_size, chunk_overlap)
|
|
|
|
elif chunk_strategy == ChunkStrategy.SENTENCE:
|
|
chunked_data = chunk_by_sentence(source_data, chunk_size, chunk_overlap)
|
|
elif chunk_strategy == ChunkStrategy.EXACT:
|
|
chunked_data = chunk_data_exact(source_data, chunk_size, chunk_overlap)
|
|
elif chunk_strategy == ChunkStrategy.SUMMARY:
|
|
chunked_data = summary_chunker(source_data, chunk_size, chunk_overlap)
|
|
else:
|
|
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
|
|
|
|
return chunked_data
|
|
|
|
|
|
def vanilla_chunker(source_data, chunk_size=100, chunk_overlap=20):
|
|
# adapt this for different chunking strategies
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
# Set a really small chunk size, just to show.
|
|
chunk_size=chunk_size,
|
|
chunk_overlap=chunk_overlap,
|
|
length_function=len
|
|
)
|
|
# try:
|
|
# pages = text_splitter.create_documents([source_data])
|
|
# except:
|
|
# try:
|
|
pages = text_splitter.create_documents([source_data])
|
|
# except:
|
|
# pages = text_splitter.create_documents(source_data.content)
|
|
# pages = source_data.load_and_split()
|
|
return pages
|
|
|
|
def summary_chunker(source_data, chunk_size=400, chunk_overlap=20):
|
|
"""
|
|
Chunk the given source data into smaller parts, returning the first five and last five chunks.
|
|
|
|
Parameters:
|
|
- source_data (str): The source data to be chunked.
|
|
- chunk_size (int): The size of each chunk.
|
|
- chunk_overlap (int): The overlap between consecutive chunks.
|
|
|
|
Returns:
|
|
- List: A list containing the first five and last five chunks of the chunked source data.
|
|
"""
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=chunk_size,
|
|
chunk_overlap=chunk_overlap,
|
|
length_function=len
|
|
)
|
|
|
|
try:
|
|
pages = text_splitter.create_documents([source_data])
|
|
except:
|
|
pages = text_splitter.create_documents(source_data.content)
|
|
|
|
# Return the first 5 and last 5 chunks
|
|
if len(pages) > 10:
|
|
return pages[:5] + pages[-5:]
|
|
else:
|
|
return pages # Return all chunks if there are 10 or fewer
|
|
|
|
def chunk_data_exact(data_chunks, chunk_size, chunk_overlap):
|
|
data = "".join(data_chunks)
|
|
chunks = []
|
|
for i in range(0, len(data), chunk_size - chunk_overlap):
|
|
chunks.append(data[i:i + chunk_size])
|
|
return chunks
|
|
|
|
|
|
def chunk_by_sentence(data_chunks, chunk_size, overlap):
|
|
# Split by periods, question marks, exclamation marks, and ellipses
|
|
data = "".join(data_chunks)
|
|
|
|
# The regular expression is used to find series of charaters that end with one the following chaacters (. ! ? ...)
|
|
sentence_endings = r'(?<=[.!?…]) +'
|
|
sentences = re.split(sentence_endings, data)
|
|
|
|
sentence_chunks = []
|
|
for sentence in sentences:
|
|
if len(sentence) > chunk_size:
|
|
chunks = chunk_data_exact([sentence], chunk_size, overlap)
|
|
sentence_chunks.extend(chunks)
|
|
else:
|
|
sentence_chunks.append(sentence)
|
|
return sentence_chunks
|
|
|
|
|
|
def chunk_data_by_paragraph(data_chunks, chunk_size, overlap, bound=0.75):
|
|
data = "".join(data_chunks)
|
|
total_length = len(data)
|
|
chunks = []
|
|
check_bound = int(bound * chunk_size)
|
|
start_idx = 0
|
|
|
|
while start_idx < total_length:
|
|
# Set the end index to the minimum of start_idx + default_chunk_size or total_length
|
|
end_idx = min(start_idx + chunk_size, total_length)
|
|
|
|
# Find the next paragraph index within the current chunk and bound
|
|
next_paragraph_index = data.find('\n\n', start_idx + check_bound, end_idx)
|
|
|
|
# If a next paragraph index is found within the current chunk
|
|
if next_paragraph_index != -1:
|
|
# Update end_idx to include the paragraph delimiter
|
|
end_idx = next_paragraph_index + 2
|
|
|
|
chunks.append(data[start_idx:end_idx + overlap])
|
|
|
|
# Update start_idx to be the current end_idx
|
|
start_idx = end_idx
|
|
|
|
return chunks |