Support async chunking functions in LightRAG processing pipeline

- Add Awaitable and Union type imports
- Update chunking_func type annotation
- Handle coroutine results with await
- Add return type validation
- Update docstring for async support
This commit is contained in:
yangdx 2025-11-13 12:37:15 +08:00
parent 5016025453
commit af5423919b

View file

@ -12,6 +12,7 @@ from functools import partial
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Awaitable,
Callable, Callable,
Iterator, Iterator,
cast, cast,
@ -20,6 +21,7 @@ from typing import (
Optional, Optional,
List, List,
Dict, Dict,
Union,
) )
from lightrag.prompt import PROMPTS from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException from lightrag.exceptions import PipelineCancelledException
@ -244,11 +246,13 @@ class LightRAG:
int, int,
int, int,
], ],
List[Dict[str, Any]], Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]],
] = field(default_factory=lambda: chunking_by_token_size) ] = field(default_factory=lambda: chunking_by_token_size)
""" """
Custom chunking function for splitting text into chunks before processing. Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters: The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization. - `tokenizer`: A Tokenizer instance to use for tokenization.
@ -258,7 +262,8 @@ class LightRAG:
- `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries, where each dictionary contains the following keys: The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk. - `tokens`: The number of tokens in the chunk.
- `content`: The text content of the chunk. - `content`: The text content of the chunk.
@ -1778,8 +1783,8 @@ class LightRAG:
) )
content = content_data["content"] content = content_data["content"]
# Generate chunks from document # Call chunking function, supporting both sync and async implementations
chunks = self.chunking_func( chunking_result = self.chunking_func(
self.tokenizer, self.tokenizer,
content, content,
split_by_character, split_by_character,
@ -1787,9 +1792,19 @@ class LightRAG:
self.chunk_overlap_token_size, self.chunk_overlap_token_size,
self.chunk_token_size, self.chunk_token_size,
) )
# 判断chunks是否是异步异步函数的返回
if asyncio.iscoroutine(chunks): # If result is a coroutine, await to get actual result
chunks = await chunks if asyncio.iscoroutine(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp, **dp,
@ -1797,7 +1812,7 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk "file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
} }
for dp in chunks for dp in chunking_result
} }
if not chunks: if not chunks: