From af5423919b1b95622a3b39850d36a353d62702a4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 13 Nov 2025 12:37:15 +0800 Subject: [PATCH] Support async chunking functions in LightRAG processing pipeline - Add Awaitable and Union type imports - Update chunking_func type annotation - Handle coroutine results with await - Add return type validation - Update docstring for async support --- lightrag/lightrag.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 2a6cefa6..96320afc 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -12,6 +12,7 @@ from functools import partial from typing import ( Any, AsyncIterator, + Awaitable, Callable, Iterator, cast, @@ -20,6 +21,7 @@ from typing import ( Optional, List, Dict, + Union, ) from lightrag.prompt import PROMPTS from lightrag.exceptions import PipelineCancelledException @@ -244,11 +246,13 @@ class LightRAG: int, int, ], - List[Dict[str, Any]], + Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. + The function can be either synchronous or asynchronous. + The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. @@ -258,7 +262,8 @@ class LightRAG: - `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - The function should return a list of dictionaries, where each dictionary contains the following keys: + The function should return a list of dictionaries (or an awaitable that resolves to a list), + where each dictionary contains the following keys: - `tokens`: The number of tokens in the chunk. - `content`: The text content of the chunk. @@ -1778,8 +1783,8 @@ class LightRAG: ) content = content_data["content"] - # Generate chunks from document - chunks = self.chunking_func( + # Call chunking function, supporting both sync and async implementations + chunking_result = self.chunking_func( self.tokenizer, content, split_by_character, @@ -1787,9 +1792,19 @@ class LightRAG: self.chunk_overlap_token_size, self.chunk_token_size, ) - # 判断chunks是否是异步异步函数的返回 - if asyncio.iscoroutine(chunks): - chunks = await chunks + + # If result is a coroutine, await to get actual result + if asyncio.iscoroutine(chunking_result): + chunking_result = await chunking_result + + # Validate return type + if not isinstance(chunking_result, (list, tuple)): + raise TypeError( + f"chunking_func must return a list or tuple of dicts, " + f"got {type(chunking_result)}" + ) + + # Build chunks dictionary chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1797,7 +1812,7 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in chunks + for dp in chunking_result } if not chunks: