1. Dynamic metadata retrieval, refactored function 2. Load with using marshmallow, allows dynamic fields now 3. Added chunkers, different varieties 4. Fixed PDF loading so it is better standardized
85 lines
No EOL
3.2 KiB
Python
85 lines
No EOL
3.2 KiB
Python
from langchain.document_loaders import PyPDFLoader
|
|
|
|
from level_2.shared.chunk_strategy import ChunkStrategy
|
|
import re
|
|
def chunk_data(chunk_strategy=None, source_data=None, chunk_size=None, chunk_overlap=None):
|
|
|
|
if chunk_strategy == ChunkStrategy.VANILLA:
|
|
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
|
|
|
|
elif chunk_strategy == ChunkStrategy.PARAGRAPH:
|
|
chunked_data = chunk_data_by_paragraph(source_data,chunk_size, chunk_overlap)
|
|
|
|
elif chunk_strategy == ChunkStrategy.SENTENCE:
|
|
chunked_data = chunk_by_sentence(source_data, chunk_size, chunk_overlap)
|
|
elif chunk_strategy == ChunkStrategy.EXACT:
|
|
chunked_data = chunk_data_exact(source_data, chunk_size, chunk_overlap)
|
|
else:
|
|
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
|
|
return chunked_data
|
|
|
|
|
|
def vanilla_chunker(source_data, chunk_size, chunk_overlap):
|
|
# loader = PyPDFLoader(source_data)
|
|
# adapt this for different chunking strategies
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
# Set a really small chunk size, just to show.
|
|
chunk_size=100,
|
|
chunk_overlap=20,
|
|
length_function=len
|
|
)
|
|
pages = text_splitter.create_documents([source_data])
|
|
# pages = source_data.load_and_split()
|
|
return pages
|
|
def chunk_data_exact(data_chunks, chunk_size, chunk_overlap):
|
|
data = "".join(data_chunks)
|
|
chunks = []
|
|
for i in range(0, len(data), chunk_size - chunk_overlap):
|
|
chunks.append(data[i:i + chunk_size])
|
|
return chunks
|
|
|
|
|
|
def chunk_by_sentence(data_chunks, chunk_size, overlap):
|
|
# Split by periods, question marks, exclamation marks, and ellipses
|
|
data = "".join(data_chunks)
|
|
|
|
# The regular expression is used to find series of charaters that end with one the following chaacters (. ! ? ...)
|
|
sentence_endings = r'(?<=[.!?…]) +'
|
|
sentences = re.split(sentence_endings, data)
|
|
|
|
sentence_chunks = []
|
|
for sentence in sentences:
|
|
if len(sentence) > chunk_size:
|
|
chunks = chunk_data_exact([sentence], chunk_size, overlap)
|
|
sentence_chunks.extend(chunks)
|
|
else:
|
|
sentence_chunks.append(sentence)
|
|
return sentence_chunks
|
|
|
|
|
|
def chunk_data_by_paragraph(data_chunks, chunk_size, overlap, bound=0.75):
|
|
data = "".join(data_chunks)
|
|
total_length = len(data)
|
|
chunks = []
|
|
check_bound = int(bound * chunk_size)
|
|
start_idx = 0
|
|
|
|
while start_idx < total_length:
|
|
# Set the end index to the minimum of start_idx + default_chunk_size or total_length
|
|
end_idx = min(start_idx + chunk_size, total_length)
|
|
|
|
# Find the next paragraph index within the current chunk and bound
|
|
next_paragraph_index = data.find('\n\n', start_idx + check_bound, end_idx)
|
|
|
|
# If a next paragraph index is found within the current chunk
|
|
if next_paragraph_index != -1:
|
|
# Update end_idx to include the paragraph delimiter
|
|
end_idx = next_paragraph_index + 2
|
|
|
|
chunks.append(data[start_idx:end_idx + overlap])
|
|
|
|
# Update start_idx to be the current end_idx
|
|
start_idx = end_idx
|
|
|
|
return chunks |